<a href="https://colab.research.google.com/github/FatimaZahraBoujrad/phishing_classification/blob/main/Features_Extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Script pour extraire les donn√©es depuis les fichiers.

### Pour ajouter des scripts d'extraction :
1. ajouter votre fonction sur la partie fonctions pour extraction
2. Appeler votre foction dans extract_all_features et metter les autres en commentaires
3. Ne supprimer rien
4. Lancer les deux mains pour le benign et le phishing
Le code dans csv utilities vous permer de merger vos features avec les features deja existantes sur la dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
ZIP_PATH = "/content/drive/MyDrive/Projet_Phishing/BenignDataset.zip"
DEST_PATH = "/content/BenignDataset"

!unzip -q "$ZIP_PATH" -d "$DEST_PATH"


In [None]:
import os
import json
import gzip
import pandas as pd
from typing import Any, Dict, List, Optional



```
# Ce texte est au format code
```

# Fonctions pour lecture des fichiers

In [None]:
def read_json_file(file_path: str) -> Optional[Any]:
    """Read JSON from regular or gzipped file"""
    try:
        if file_path.endswith('.gz'):
            with gzip.open(file_path, 'rt', encoding='utf-8') as f:
                return json.load(f)
        else:
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
    except Exception as e:
        print(f"‚ö†Ô∏è Error reading {file_path}: {e}")
        return None

def first(lst, default=None):
    """Safely get first element from list"""
    return lst[0] if isinstance(lst, list) and lst else default

# Fonctions pour extaire les features

In [None]:




def extract_host_features(record: Dict[str, Any]) -> Dict[str, Any]:
    """Extract minimal features from a single record"""
    host = record.get("host_info", {}) or {}

    # DNS A
    a = host.get("a", {}) or {}
    a_answers = a.get("answers", []) or []

    # DNS AAAA / MX
    aaaa = host.get("aaaa", {}) or {}
    mx = host.get("mx", {}) or {}

    # SSL
    ssl = host.get("ssl", {}) or {}

    maxmind = host.get("maxmind", []) or []
    mm = first(maxmind, {}) or {}
    mm_ans = mm.get("answers", {}) or {}

    return {
        "url": record.get("url"),
        "is_https": host.get("is_https"),

        "dns_a_status": a.get("status"),
        "dns_a_answer_1": first(a_answers),

        "dns_aaaa_status": aaaa.get("status"),
        "dns_mx_status": mx.get("status"),

        "ssl_issuer": ssl.get("issuer"),
        "ssl_valid_from": ssl.get("valid_from"),
        "ssl_valid_until": ssl.get("valid_until"),
        "ssl_is_valid_cert": ssl.get("is_valid_cert"),

        "maxmind_1_ip": mm_ans.get("ip"),
        "maxmind_1_asn_code": mm_ans.get("asn_code"),
        "maxmind_1_asn_org": mm_ans.get("asn_org"),
        "maxmind_1_cc_code": mm_ans.get("cc_code"),
    }



In [None]:
def aggregate_har_features(har_list):
    """Aggregate phishing-related features from HAR entries"""
    type(har_list)
    total_requests = len(har_list)


    status_codes = []
    content_types = set()
    servers = set()
    domains = set()


    http_requests = 0
    ip_based_requests = 0
    post_requests = 0
    free_hosting_requests = 0
    mixed_content_requests = 0
    external_domains = set()
    missing_security_headers = 0

    suspicious_hosts = [
        "surge.sh", "vercel.app", "netlify.app", "github.io",
        "glitch.me", "glitch.global", "firebaseapp.com", "pages.dev"
    ]

    security_headers = {
        "content-security-policy",
        "strict-transport-security",
        "x-frame-options",
        "x-content-type-options"
    }

    for entry in har_list:
        try:
            resp = entry.get("response", {})
            req = entry.get("request", {})

            # ---- URL ----
            url = req.get("url", "") or resp.get("url", "")
            if not isinstance(url, str):
                continue

            if url.startswith("http://"):
                http_requests += 1

            if "://" in url:
                domain = url.split("/")[2]
                domains.add(domain)
                external_domains.add(domain)

                if domain.replace(".", "").isdigit():
                    ip_based_requests += 1

                if any(h in domain for h in suspicious_hosts):
                    free_hosting_requests += 1

            # ---- Method ----
            if req.get("method") == "POST":
                post_requests += 1

            # ---- Status ----
            status = resp.get("status")
            if isinstance(status, int):
                status_codes.append(status)

            # ---- Headers ----
            headers = resp.get("headers", [])
            present_security_headers = set()

            for h in headers:
                key = h.get("key", "").lower()
                value = h.get("value", "")

                if key == "content-type":
                    content_types.add(value.split(";")[0].strip())

                if key == "server":
                    servers.add(value.strip())

                if key in security_headers:
                    present_security_headers.add(key)

            if resp and not present_security_headers:
                missing_security_headers += 1

            # ---- Mixed content ----
            if url.startswith("http://"):
                mixed_content_requests += 1

        except Exception:
            continue

    return {

        "har_total_requests": total_requests,
        "har_unique_content_types": len(content_types),
        "har_unique_servers": len(servers),
        "har_unique_domains": len(domains),
        "har_avg_status": (sum(status_codes) / len(status_codes)) if status_codes else None,


        "har_http_ratio": http_requests / total_requests if total_requests else 0,
        "har_ip_request_ratio": ip_based_requests / total_requests if total_requests else 0,
        "har_post_request_ratio": post_requests / total_requests if total_requests else 0,
        "har_free_hosting_ratio": free_hosting_requests / total_requests if total_requests else 0,
        "har_external_domain_count": len(external_domains),
        "har_missing_security_headers_ratio": (
            missing_security_headers / total_requests if total_requests else 0
        ),
    }


In [None]:
SUSPICIOUS_HOSTS = [
    "surge.sh", "vercel.app", "netlify.app", "github.io",
    "glitch.me", "glitch.global", "firebaseapp.com", "pages.dev"
]



In [None]:
def extract_content_info(rec: Dict[str, Any]) -> Dict[str, Any]:
    """Extract URL info, aggregated features from responses, HAR, and top-level fields."""

    if isinstance(rec, list):
        rec = rec[0] if rec else {}
    elif not isinstance(rec, dict):
        rec = {}

    content_info = rec.get('content_info', {}) or {}


    url = rec.get('url', '') or ''
    destination = content_info.get('destination', '') or ''
    title = content_info.get('title', '') or ''


    tech_info = rec.get('tech_info')
    tech_info_str = ','.join([str(t) for t in tech_info]) if isinstance(tech_info, list) else None

    has_path = bool(rec.get('has_path')) if 'has_path' in rec else None
    has_subdomain = bool(rec.get('has_subdomain')) if 'has_subdomain' in rec else None
    subdomain = str(rec.get('subdomain')) if rec.get('subdomain') else None

    # --- Responses ---
    responses = rec.get('responses') or content_info.get('responses') or []
    if not isinstance(responses, list):
        responses = []

    total_resources = len(responses)
    file_types, md5s = [], []

    for r in responses:
        if not isinstance(r, dict):
            continue
        ft = r.get('file_type', '').lower()
        if ft:
            file_types.append(ft)
        md5 = r.get('md5', '')
        if md5:
            md5s.append(md5)

    css_file_count = sum('css' in ft for ft in file_types)
    js_file_count = sum('javascript' in ft or 'js' in ft for ft in file_types)
    image_count = sum('image' in ft or 'png' in ft or 'jpg' in ft or 'jpeg' in ft for ft in file_types)
    video_count = sum('video' in ft for ft in file_types)
    font_count = sum('font' in ft for ft in file_types)

    unique_file_types = len(set(file_types))
    unique_md5_hashes = len(set(md5s))
    has_duplicate_resources = unique_md5_hashes < len(md5s)
    has_video = any('video' in ft for ft in file_types)
    has_audio = any('audio' in ft for ft in file_types)
    file_type_diversity_score = unique_file_types / total_resources if total_resources > 0 else 0

    features = {
        'url': url,
        'destination_url': destination,
        'title': title,
        'total_resources': total_resources,
        'css_file_count': css_file_count,
        'js_file_count': js_file_count,
        'image_count': image_count,
        'video_count': video_count,
        'font_count': font_count,
        'unique_file_types': unique_file_types,
        'has_video': has_video,
        'has_audio': has_audio,
        'file_type_diversity_score': round(file_type_diversity_score, 3),
        'unique_md5_hashes': unique_md5_hashes,
        'has_duplicate_resources': has_duplicate_resources,
        # top-level fields
        'tech_info': tech_info_str,
        'has_path': has_path,
        'has_subdomain': has_subdomain,
        'subdomain': subdomain
    }

    # --- HAR data extraction ---
    har_data = content_info.get("har")
    if isinstance(har_data, list) and len(har_data) > 0:
        har_features = aggregate_har_features(har_data)  # pass content_info.har inside
        features.update(har_features)

    return features


In [None]:


def extract_additional_features(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract features from the 'additional' section (Root Domain vs Subdomain analysis).
    """
    additional_data = record.get('additional', {}) or {}

    # --- 1. Infrastructure Pattern (RD vs SD) ---
    url = record.get('url', '') or ''
    rd_info = additional_data.get('rd') or {}
    rd_host = rd_info.get('host_info') or {}
    rd_a = rd_host.get('a') or {}
    rd_ips = rd_a.get('answers', []) or []

    sd_info = additional_data.get('sd') or {}
    sd_host = sd_info.get('host_info') or {}
    sd_a = sd_host.get('a') or {}
    sd_ips = sd_a.get('answers', []) or []

    # Conversion en int (0 ou 1)
    rd_has_ip = 1 if len(rd_ips) > 0 else 0
    sd_has_ip = 1 if len(sd_ips) > 0 else 0

    # Le pattern suspect : Racine vide mais Sous-domaine actif
    attack_pattern_ip = 1 if (rd_has_ip == 0 and sd_has_ip == 1) else 0

    # --- 2. Historique (Wayback Machine) ---
    wayback = sd_info.get('wayback_info') or {}
    has_history = 1 if wayback.get('first_ts') is not None else 0

    # --- 3. Analyse Lexicale (URL & TLD) ---

    subdomain_str = str(sd_info.get('record', ""))
    root_domain_str = str(rd_info.get('record', ""))

    subdomain_len = 0
    if subdomain_str and root_domain_str and (subdomain_str != root_domain_str):
        subdomain_part = subdomain_str.replace(f".{root_domain_str}", "")
        subdomain_len = len(subdomain_part)

    # Extension (TLD)
    try:
        tld = root_domain_str.split('.')[-1].lower()
    except:
        tld = "unknown"

    common_tlds = ['com', 'org', 'net', 'edu', 'gov', 'fr', 'us', 'uk', 'de']
    is_common_tld = 1 if tld in common_tlds else 0

    return {
        "url":url,
        # Infrastructure
        "rd_has_ip": rd_has_ip,
        "sd_has_ip": sd_has_ip,
        "attack_pattern_ip": attack_pattern_ip,

        # History
        "has_history": has_history,

        # Lexical
        "subdomain_len": subdomain_len,
        "tld": tld,
        "is_common_tld": is_common_tld
    }

In [None]:
from typing import Dict, Any

def extract_brand_fqdn_raw(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract raw brand (trg) and fqdn from metadata.
    Matching is intentionally deferred to a later step.
    """
    metadata = record.get("metadata", {}) or {}

    return {
        "trg": metadata.get("trg"),
        "fqdn": metadata.get("fqdn"),
    }


# Fonction wrapper pour les fonctions d'extractions

In [None]:
def extract_all_features(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Orchestrate all feature extraction functions.
    Add new feature extraction functions here.
    """
    features = {}
    features.update(extract_host_features(record))
    features.update(extract_content_info(record))
    features.update(extract_additional_features(record))
    features.update(extract_brand_fqdn_raw(record))

    return features

In [None]:


# ============================================================
# FILE PROCESSING
# ============================================================

def get_json_files(folder_path: str, limit: Optional[int] = None) -> List[str]:
    """Get list of JSON files from folder"""
    paths = [
        e.path for e in os.scandir(folder_path)
        if e.is_file() and (e.name.endswith(".json") or e.name.endswith(".json.gz"))
    ]
    paths.sort()

    if limit:
        paths = paths[:limit]

    return paths

def process_file(file_path: str) -> List[Dict[str, Any]]:
    """Process a single JSON file and extract features from all records"""
    rows = []

    obj = read_json_file(file_path)
    if obj is None:
        return rows

    # Handle both single dict and list of dicts
    if isinstance(obj, dict):
        rows.append(extract_all_features(obj))
    elif isinstance(obj, list):
        for record in obj:
            if isinstance(record, dict):
                rows.append(extract_all_features(record))

    return rows



# Fonction pour retourner le dataframe
Cr√©er une nouvelle fonction selon votre cas

In [None]:
def process_all_files(folder_path: str, limit: Optional[int] = None,
                      progress_every: int = 100) -> pd.DataFrame:
    """
    Process all JSON files in folder and return DataFrame with extracted features.
    The only hard requirement is that 'url' is the first column; all other features follow automatically.

    Args:
        folder_path: Path to folder containing JSON files
        limit: Maximum number of files to process (None = all)
        progress_every: Print progress every N files

    Returns:
        DataFrame with extracted features
    """
    file_paths = get_json_files(folder_path, limit)

    print(f"üìÅ Found {len(file_paths)} files to process")
    print(f"üöÄ Starting feature extraction...")

    all_rows = []

    for i, file_path in enumerate(file_paths, start=1):
        try:
            rows = process_file(file_path)
            all_rows.extend(rows)

            if i % progress_every == 0:
                print(f"    ‚úì Processed {i}/{len(file_paths)} files | {len(all_rows)} records extracted")

        except Exception as e:
            print(f"‚ö†Ô∏è Error processing {file_path}: {e}")

    print(f"‚úÖ Extraction complete: {len(all_rows)} total records from {len(file_paths)} files")

    # Create DataFrame
    df = pd.DataFrame(all_rows)

    if 'url' in df.columns:
        other_cols = [c for c in df.columns if c != 'url']
        df = df[['url'] + other_cols]

    return df


# Merger les colonnes extraires avec les colonnes de la datset deja existante

In [None]:


def load_or_create_dataframe(output_path: str) -> pd.DataFrame:
    """Load existing CSV if it exists, otherwise return empty DataFrame"""
    if os.path.exists(output_path):
        try:
            df = pd.read_csv(output_path)
            print(f"üìÇ Loaded {len(df)} existing records from: {output_path}")
            return df
        except pd.errors.EmptyDataError:
            print(f"üìÑ File exists but is empty, creating new DataFrame")
            return pd.DataFrame()
    else:
        print(f"üìÑ No existing file found, will create new one")
        return pd.DataFrame()


def merge_features(existing_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merge new features with existing DataFrame using 'url' as key.
    Only NEW columns are added. Existing columns are kept as-is.
    """
    if existing_df.empty:
        print("   Creating new dataset")
        return new_df

    print(f"   Merging {len(new_df)} records with existing {len(existing_df)} records")


    existing_cols = set(existing_df.columns)
    new_cols = [col for col in new_df.columns if col not in existing_cols and col != 'url']

    if not new_cols:
        print("   ‚ö†Ô∏è No new columns to add, all columns already exist")
        return existing_df

    print(f"   Adding {len(new_cols)} new columns: {new_cols}")

    # Merge only the new columns (plus url for merging)
    cols_to_merge = ['url'] + new_cols
    merged_df = pd.merge(
        existing_df,
        new_df[cols_to_merge],
        on='url',
        how='left'  # Keep all existing records
    )

    print(f"   ‚úì Merged dataset now has {len(merged_df)} records and {len(merged_df.columns)} columns")
    return merged_df

# Code main d'execution

## benign

In [None]:
if __name__ == "__main__":
    # Configuration
    FOLDER_PATH = '/content/BenignDataset/Benign_Data_BDA'
    OUTPUT_FOLDER = '/content/drive/MyDrive/Projet_Phishing/Dataset'
    LIMIT = 30000  # Set to None to process all files
    OUTPUT_FILE = "benign30k.csv"

    # Create output folder if it doesn't exist
    os.makedirs(OUTPUT_FOLDER, exist_ok=True)
    output_path = os.path.join(OUTPUT_FOLDER, OUTPUT_FILE)

    # Load existing data if available
    existing_df = load_or_create_dataframe(output_path)

    # Process files and extract new features
    print(f"\n Extracting features from {FOLDER_PATH}")
    new_df = process_all_files(FOLDER_PATH, limit=LIMIT, progress_every=10)
    print(new_df.info())

    # Merge with existing data
    print(f"\n Merging features...")
    merged_df = merge_features(existing_df, new_df)

    # Display results
    print(f"\n Final dataset shape: {merged_df.shape}")
    print(f" Columns: {list(merged_df.columns)}")
    display(merged_df.head())

    # Save to CSV
    merged_df.to_csv(output_path, index=False)
    print(f"\n Saved: {output_path}")

üìÑ No existing file found, will create new one

üöÄ Extracting features from /content/BenignDataset/Benign_Data_BDA
üìÅ Found 30000 files to process
üöÄ Starting feature extraction...
    ‚úì Processed 10/30000 files | 10 records extracted
    ‚úì Processed 20/30000 files | 20 records extracted
    ‚úì Processed 30/30000 files | 30 records extracted
    ‚úì Processed 40/30000 files | 40 records extracted
    ‚úì Processed 50/30000 files | 50 records extracted
    ‚úì Processed 60/30000 files | 60 records extracted
    ‚úì Processed 70/30000 files | 70 records extracted
    ‚úì Processed 80/30000 files | 80 records extracted
    ‚úì Processed 90/30000 files | 90 records extracted
    ‚úì Processed 100/30000 files | 100 records extracted
    ‚úì Processed 110/30000 files | 110 records extracted
    ‚úì Processed 120/30000 files | 120 records extracted
    ‚úì Processed 130/30000 files | 130 records extracted
    ‚úì Processed 140/30000 files | 140 records extracted
    ‚úì Processed

Unnamed: 0,url,is_https,dns_a_status,dns_a_answer_1,dns_aaaa_status,dns_mx_status,ssl_issuer,ssl_valid_from,ssl_valid_until,ssl_is_valid_cert,...,har_missing_security_headers_ratio,rd_has_ip,sd_has_ip,attack_pattern_ip,has_history,subdomain_len,tld,is_common_tld,trg,fqdn
0,https://gq-magazine.co.uk/profile/molly-lambert,True,NOERROR,54.72.207.111,NOERROR,NOERROR,Amazon,2024-06-09,2025-07-08,True,...,0.59375,1,0,0,0,4,uk,1,,gq-magazine.co.uk
1,https://53kf.com/login/guide?url=http://www.53...,True,NOERROR,101.66.249.208,NOERROR,NOERROR,"DigiCert, Inc.",2024-05-21,2025-06-14,True,...,0.971429,1,0,0,0,4,com,1,,53kf.com
2,https://gamerxyt.com,True,NOERROR,188.114.96.3,NOERROR,NOERROR,Google Trust Services,2025-02-05,2025-05-06,True,...,0.637306,1,0,0,0,4,com,1,,gamerxyt.com
3,https://www.underarmour.com/en-us/track-order/,True,NOERROR,151.101.1.91,NOERROR,NOERROR,DigiCert Inc,2024-05-21,2025-06-21,True,...,0.14717,1,1,0,0,3,com,1,,www.underarmour.com
4,https://jpg5.su/lib/Peafowl/peafowl.min.js?8d1...,True,NOERROR,190.115.31.64,NOERROR,NOERROR,Let's Encrypt,2025-02-10,2025-05-11,True,...,0.0,1,0,0,0,4,su,0,,jpg5.su



üíæ Saved: /content/drive/MyDrive/Projet_Phishing/Dataset/benign30k.csv


In [None]:
merged_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 29999 entries, 0 to 29998
Data columns (total 52 columns):
 #   Column                              Non-Null Count  Dtype  
---  ------                              --------------  -----  
 0   url                                 29999 non-null  object 
 1   is_https                            29999 non-null  bool   
 2   dns_a_status                        29999 non-null  object 
 3   dns_a_answer_1                      29957 non-null  object 
 4   dns_aaaa_status                     29999 non-null  object 
 5   dns_mx_status                       29999 non-null  object 
 6   ssl_issuer                          29624 non-null  object 
 7   ssl_valid_from                      29624 non-null  object 
 8   ssl_valid_until                     29624 non-null  object 
 9   ssl_is_valid_cert                   29999 non-null  bool   
 10  maxmind_1_ip                        29957 non-null  object 
 11  maxmind_1_asn_code                  29863

In [None]:
print(f"Number of duplicate rows in merged_df: {merged_df.duplicated().sum()}")

Number of duplicate rows in merged_df: 302


In [None]:
merged_df[]

Empty DataFrame
Columns: [trg, counts]
Index: []


## Phishing

In [None]:
if __name__ == "__main__":
    # Configuration
    FOLDER_PATH = '/content/drive/MyDrive/projet/phisingDataset/'
    OUTPUT_FOLDER = '/content/drive/MyDrive/Dataset/'
    LIMIT = 20000  # Set to None to process all files
    OUTPUT_FILE = "phishing_data.csv"

    # Create output folder if it doesn't exist
    os.makedirs(OUTPUT_FOLDER, exist_ok=True)
    output_path = os.path.join(OUTPUT_FOLDER, OUTPUT_FILE)

    # Load existing data if available
    existing_df = load_or_create_dataframe(output_path)

    # Process files and extract new features
    print(f"\n Extracting features from {FOLDER_PATH}")
    new_df = process_all_files(FOLDER_PATH, limit=LIMIT, progress_every=100)

    # Merge with existing data
    print(f"\n Merging features...")
    merged_df = merge_features(existing_df, new_df)

    # Display results
    print(f"\n Final dataset shape: {merged_df.shape}")
    print(f" Columns: {list(merged_df.columns)}")
    display(merged_df.head())

    # Save to CSV
    merged_df.to_csv(output_path, index=False)
    print(f"\n Saved: {output_path}")

üìÑ No existing file found, will create new one

üöÄ Extracting features from /content/drive/MyDrive/projet/phisingDataset/
üìÅ Found 20000 files to process
üöÄ Starting feature extraction...
    ‚úì Processed 100/20000 files | 100 records extracted
    ‚úì Processed 200/20000 files | 200 records extracted
