# Data Cleanup Before Training

In [None]:
import pandas as pd
import os
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import time
from functools import partial

input_source = "../artifacts/dataset"
output_source = "../artifacts/dataset_cleanup"

input_files = [
    "../artifacts/dataset/data.csv",
    "../artifacts/dataset/phisthank_verified_online.csv",
    "../artifacts/dataset/url_haus.csv"
]
output_path = "../artifacts/dataset_cleanup/data_cleaned.csv"

cols_req = ["url"]
cols_additional = ["type", "isMalicious"]

### Define url is safe or not 

**Reference**

- [Cyber crime info center - records repository](https://www.cybercrimeinfocenter.org/records-repository)
- [Threat hunting suspicious tlds](https://detect.fyi/threat-hunting-suspicious-tlds-a742c2adbf58)
- [Malicious domain extensions](https://www.gomyitguy.com/blog-news-updates/malicious-domain-extensions)
- [Url haus](https://urlhaus.abuse.ch/api/#csv)
- [Phishtank](https://phishtank.org/)

In [None]:
MALICIOUS_CONTAIN = ["defacement", "phishing", "malware"]
MALICIOUS_TLD = [
    '.exe', '.bat', '.cmd', '.scr', '.pif', '.vbs', '.js', '.dll', '.sys', '.drv',
    '.hint', '.tmp', '.dat', '.bin', '.msi', '.torrent', '.lnk', '.reg', '.sh', '.flac'
]

COLUMN_MAPPING = {
    'URL': 'url',
    'Url': 'url',
    'Type': 'type',
    'category': 'type',
    'Category': 'type',
    'label': 'type',
    'Label': 'type',
    'malicious': 'isMalicious',
    'Malicious': 'isMalicious',
    'is_malicious': 'isMalicious'
}

### Cleanup Dataset Functions

**Requirement column**
- `url`

**Additional column**
- `type`: type of url
- `isMalicious`: flag

In [None]:
def load_single_file(file_path):
    if os.path.exists(file_path):
        try:
            df = pd.read_csv(file_path)
            return {
                'success': True,
                'data': df,
                'file': file_path,
                'rows': len(df),
                'columns': list(df.columns)
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e),
                'file': file_path,
                'data': None
            }
    else:
        return {
            'success': False,
            'error': 'File not found',
            'file': file_path,
            'data': None
        }

In [None]:
def load_and_merge_files(input_files, max_workers=4):
    print("Loading files using thread workers...")
    start_time = time.time()
    
    all_dataframes = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_file = {executor.submit(load_single_file, file_path): file_path 
                        for file_path in input_files}
        
        for future in as_completed(future_to_file):
            result = future.result()
            
            if result['success']:
                print(f"✓ Loaded {result['file']}: {result['rows']} rows, columns: {result['columns']}")
                all_dataframes.append(result['data'])
            else:
                print(f"✗ Error loading {result['file']}: {result['error']}")
    
    if all_dataframes:
        merged_data = pd.concat(all_dataframes, ignore_index=True)
        elapsed_time = time.time() - start_time
        print(f"Merged data: {len(merged_data)} total rows in {elapsed_time:.2f}s")
        return merged_data
    else:
        print("No files were loaded successfully")
        return None

In [None]:
def standardize_columns(df):
    for old_name, new_name in COLUMN_MAPPING.items():
        if old_name in df.columns:
            df.rename(columns={old_name: new_name}, inplace=True)
    
    if 'url' not in df.columns:
        print("Error: No 'url' column found in the data")
        return None
    
    if 'type' not in df.columns:
        df['type'] = 'unknown'
    
    if 'isMalicious' not in df.columns:
        df['isMalicious'] = False
    
    return df

In [None]:
def clean_urls(df):
    initial_count = len(df)
    df = df.dropna(subset=['url'])
    df = df[df['url'].str.strip() != '']
    
    print(f"Removed {initial_count - len(df)} rows with empty URLs")
    
    df['url'] = df['url'].str.strip()
    
    initial_count = len(df)
    df = df.drop_duplicates(subset=['url'], keep='first')
    print(f"Removed {initial_count - len(df)} duplicate URLs")
    
    return df

In [None]:
def extract_tld(url):
    try:
        if not url.startswith(('http://', 'https://')):
            url = 'http://' + url
        
        parsed_url = urlparse(url)
        domain = parsed_url.netloc.lower()
        
        if '.' in domain:
            return '.' + domain.split('.')[-1]
        return ''
    except:
        return ''

In [None]:
def is_malicious_row(row):
    url = str(row['url']).lower()
    url_type = str(row['type']).lower()
    
    type_is_malicious = any(malicious_word in url_type for malicious_word in MALICIOUS_CONTAIN)
    tld_is_malicious = any(url.endswith(tld.lower()) for tld in MALICIOUS_TLD)
    
    return type_is_malicious or tld_is_malicious

In [None]:
def classify_single_batch(batch_data, batch_idx, total_batches, lock, processed_count):
    batch_data['isMalicious'] = batch_data.apply(is_malicious_row, axis=1)
    
    mask_unknown = batch_data['type'] == 'unknown'
    batch_data.loc[mask_unknown & batch_data['isMalicious'], 'type'] = 'suspicious'
    batch_data.loc[mask_unknown & ~batch_data['isMalicious'], 'type'] = 'benign'
    
    with lock:
        processed_count[0] += len(batch_data)
        if batch_idx % 10 == 0 or batch_idx == total_batches - 1:
            print(f"Processed batch {batch_idx + 1}/{total_batches} "
                  f"({processed_count[0]} URLs classified)")
    
    return batch_data

In [None]:
def classify_urls_threaded(df, max_workers=4, batch_size=1000):
    print(f"Classifying URLs using {max_workers} thread workers...")
    start_time = time.time()
    
    total_rows = len(df)
    batches = []
    for i in range(0, total_rows, batch_size):
        batch = df.iloc[i:i + batch_size].copy()
        batches.append(batch)
    
    total_batches = len(batches)
    print(f"Processing {total_rows} URLs in {total_batches} batches of {batch_size}")
    
    processed_count = [0]
    lock = Lock()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_batch = {
            executor.submit(classify_single_batch, batch, idx, total_batches, lock, processed_count): idx 
            for idx, batch in enumerate(batches)
        }
        
        batch_results = {}
        for future in as_completed(future_to_batch):
            batch_idx = future_to_batch[future]
            batch_results[batch_idx] = future.result()
    
    processed_batches = [batch_results[i] for i in range(total_batches)]
    result_df = pd.concat(processed_batches, ignore_index=True)
    
    elapsed_time = time.time() - start_time
    print(f"URL classification completed in {elapsed_time:.2f}s")
    
    return result_df

In [None]:
def analyze_batch_for_stats(batch_data, tld_list):
    stats = {
        'total_count': len(batch_data),
        'malicious_count': batch_data['isMalicious'].sum(),
        'type_counts': batch_data['type'].value_counts().to_dict(),
        'tld_counts': {}
    }
    
    for tld in tld_list:
        count = batch_data['url'].str.lower().str.endswith(tld.lower()).sum()
        if count > 0:
            stats['tld_counts'][tld] = count
    
    return stats

In [None]:
def generate_statistics_threaded(df, max_workers=4):
    print("Generating statistics using thread workers...")
    start_time = time.time()
    
    batch_size = 5000
    total_rows = len(df)
    batches = []
    for i in range(0, total_rows, batch_size):
        batch = df.iloc[i:i + batch_size]
        batches.append(batch)
    
    all_stats = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        analyze_func = partial(analyze_batch_for_stats, tld_list=MALICIOUS_TLD)
        futures = [executor.submit(analyze_func, batch) for batch in batches]
        
        for future in as_completed(futures):
            all_stats.append(future.result())
    
    total_urls = sum(stat['total_count'] for stat in all_stats)
    malicious_count = sum(stat['malicious_count'] for stat in all_stats)
    benign_count = total_urls - malicious_count
    
    combined_type_counts = {}
    for stat in all_stats:
        for url_type, count in stat['type_counts'].items():
            combined_type_counts[url_type] = combined_type_counts.get(url_type, 0) + count
    
    combined_tld_counts = {}
    for stat in all_stats:
        for tld, count in stat['tld_counts'].items():
            combined_tld_counts[tld] = combined_tld_counts.get(tld, 0) + count
    
    elapsed_time = time.time() - start_time
    
    print(f"\n=== CLEANUP STATISTICS (Generated in {elapsed_time:.2f}s) ===")
    print(f"Total URLs: {total_urls}")
    print(f"Malicious URLs: {malicious_count} ({malicious_count/total_urls*100:.2f}%)")
    print(f"Benign URLs: {benign_count} ({benign_count/total_urls*100:.2f}%)")
    
    print("\n=== TYPE DISTRIBUTION ===")
    sorted_types = sorted(combined_type_counts.items(), key=lambda x: x[1], reverse=True)
    for url_type, count in sorted_types:
        print(f"{url_type}: {count} ({count/total_urls*100:.2f}%)")
    
    print("\n=== MALICIOUS TLD DETECTION ===")
    if combined_tld_counts:
        sorted_tlds = sorted(combined_tld_counts.items(), key=lambda x: x[1], reverse=True)
        for tld, count in sorted_tlds:
            print(f"URLs ending with {tld}: {count}")
    else:
        print("No URLs found with malicious TLDs")

In [None]:
def save_cleaned_data(df, output_path):
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    final_columns = ["url", "type", "isMalicious"]
    cleaned_data = df[final_columns].copy()
    
    cleaned_data.to_csv(output_path, index=False)
    print(f"\nCleaned data saved to: {output_path}")
    
    print("\n=== SAMPLE CLEANED DATA ===")
    print(cleaned_data.head(10))
    
    return cleaned_data

### Working

In [None]:
def run_cleanup_pipeline(input_files, output_path, max_workers=4):
    print(f"=== STARTING URL DATA CLEANUP PIPELINE (Using {max_workers} workers) ===\n")
    pipeline_start = time.time()
    
    merged_data = load_and_merge_files(input_files, max_workers)
    if merged_data is None:
        return None
    
    standardized_data = standardize_columns(merged_data)
    if standardized_data is None:
        return None
    
    cleaned_data = clean_urls(standardized_data)
    classified_data = classify_urls_threaded(cleaned_data, max_workers)
    generate_statistics_threaded(classified_data, max_workers)
    final_data = save_cleaned_data(classified_data, output_path)
    
    total_time = time.time() - pipeline_start
    print(f"\n=== CLEANUP PIPELINE COMPLETED in {total_time:.2f}s ===")
    return final_data

In [None]:
try:
    cleaned_df = run_cleanup_pipeline(input_files, output_path, max_workers=4)
    
    if cleaned_df is not None:
        print(f"\nFinal cleaned dataset shape: {cleaned_df.shape}")
        print("Columns:", list(cleaned_df.columns))
        print(f"Total records: {len(cleaned_df)}")
        print(f"Malicious URLs: {cleaned_df['isMalicious'].sum()}")
        print(f"Benign URLs: {(~cleaned_df['isMalicious']).sum()}")
    else:
        print("Pipeline failed to complete successfully")
        
except Exception as e:
    print(f"Error during cleanup: {str(e)}")