In [49]:
import pandas as pd
import numpy as np
from urllib.parse import urlparse
import math
import re
import joblib
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split
from collections import defaultdict

# ======================
# 1. Feature Extraction
# ======================
def extract_url_features(url):
    """Extracts phishing detection features from any URL"""
    try:
        parsed = urlparse(url)
        domain = parsed.netloc.split(':')[0]
        
        features = {
            'url_length': len(url),
            'domain_has_ip': int(bool(re.match(r'^\d+\.\d+\.\d+\.\d+$', domain))),
            'num_special_chars': sum(1 for c in url if c in '/:?&%=.-_~@'),
            'num_digits': sum(c.isdigit() for c in url),
            'domain_length': len(domain),
            'subdomain_length': len(domain.split('.')[0]),
            'num_subdomains': len(domain.split('.')) - 1,
            'is_common_tld': int(domain.endswith(('.com', '.org', '.net', '.gov', '.edu', '.io'))),
            'typosquatting': int(any(t in domain.lower() for t in ['paypa1', 'g00gle', 'amaz0n', 'faceb00k', 'y0utube'])),
            'has_banking_kw': int(any(kw in url.lower() for kw in ['login', 'bank', 'account', 'secure', 'verify'])),
            'has_hex': int(bool(re.search(r'%[0-9a-fA-F]{2}', url))),
            'has_at_symbol': int('@' in url),
            'uses_https': int(parsed.scheme == 'https'),
            'path_depth': parsed.path.count('/'),
            'entropy': -sum((url.count(c)/len(url)) * math.log2(url.count(c)/len(url)) 
                       for c in set(url) if url.count(c) > 0),
            'vowel_ratio': sum(1 for c in domain if c.lower() in 'aeiou') / len(domain) if domain else 0,
            'consecutive_chars': int(bool(re.search(r'([a-zA-Z])\1{2}', domain))),
        }
        return features
    except Exception as e:
        print(f"Error extracting features: {e}")
        return None

# ===========================
# 2. Phishing Detector Class
# ===========================
class PhishingDetector:
    def __init__(self, model_path=None):
        self.model = Pipeline([
            ('scaler', StandardScaler()),
            ('classifier', LGBMClassifier(
                n_estimators=200,
                max_depth=7,
                class_weight='balanced',
                random_state=42
            ))
        ])
        # Pre-trusted domains (can be expanded)
        self.trusted_domains = {
            'google.com', 'youtube.com', 'facebook.com', 'github.com',
            'amazon.com', 'wikipedia.org', 'microsoft.com', 'apple.com',
            'twitter.com', 'linkedin.com', 'netflix.com', 'spotify.com'
        }
        self.domain_reputation = defaultdict(int)
        
        if model_path:
            try:
                saved_data = joblib.load(model_path)
                self.model = saved_data['model']
                self.trusted_domains.update(saved_data.get('trusted_domains', []))
                print("Loaded pre-trained model with dynamic domain knowledge")
            except Exception as e:
                print(f"Could not load model: {e}")

    def _extract_base_domain(self, url):
        """Extracts base domain (e.g. 'google.com' from 'mail.google.com')"""
        try:
            domain = urlparse(url).netloc.lower()
            if domain.startswith('www.'):
                domain = domain[4:]
            parts = domain.split('.')
            if len(parts) > 2:
                return f"{parts[-2]}.{parts[-1]}"
            return domain
        except:
            return ""

    def is_trusted_domain(self, url):
        """Check if domain is in trusted list"""
        domain = self._extract_base_domain(url)
        return domain in self.trusted_domains

    def train(self, X, y):
        """Train model and learn trusted domains"""
        # Extract domains from training data
        domains = [self._extract_base_domain(url) for url in X.index]
        legit_domains = [d for d, label in zip(domains, y) if label == 0]
        
        # Calculate domain frequencies
        domain_counts = pd.Series(legit_domains).value_counts()
        
        # Automatically identify trusted domains (top 200 legitimate domains)
        new_trusted = set(domain_counts.head(200).index)
        self.trusted_domains.update(new_trusted)
        print(f"Learned {len(new_trusted)} new trusted domains from training data")
        
        # Train the model
        self.model.fit(X, y)

    def save_model(self, path):
        """Save model with learned domain knowledge"""
        joblib.dump({
            'model': self.model,
            'trusted_domains': list(self.trusted_domains)
        }, path)

    def predict(self, url):
        """Predict with dynamic domain analysis"""
        if not url.startswith(('http://', 'https://')):
            url = 'https://' + url
            
        domain = self._extract_base_domain(url)
        
        # Dynamic trusted domain check
        if self.is_trusted_domain(url):
            return {
                'url': url,
                'is_phishing': False,
                'confidence': 'high',
                'probability': max(0.01, 0.1 - (self.domain_reputation.get(domain, 0) * 0.001)),
                'domain': domain,
                'indicators': [f'Trusted domain: {domain}']
            }

        features = extract_url_features(url)
        if not features:
            return {
                'url': url,
                'is_phishing': None,
                'error': 'Could not extract features',
                'confidence': 'low'
            }

        try:
            features_df = pd.DataFrame([features])
            proba = self.model.predict_proba(features_df)[0][1]
            
            # Dynamic probability adjustments
            if features['uses_https']:
                proba *= 0.7
            if features['is_common_tld']:
                proba *= 0.6
            if domain.endswith(('.gov', '.edu', '.mil')):
                proba *= 0.3
            
            # Final verdict (threshold = 0.75)
            is_phishing = proba > 0.75
            
            indicators = []
            if features['domain_has_ip']:
                indicators.append("Uses IP address (risky)")
            if features['typosquatting']:
                indicators.append("Typosquatting detected")
            if features['entropy'] > 4.0 and not self.is_trusted_domain(url):
                indicators.append(f"Suspicious randomness (entropy: {features['entropy']:.2f})")
            if features['has_banking_kw'] and not self.is_trusted_domain(url):
                indicators.append("Contains sensitive keywords")
            
            return {
                'url': url,
                'is_phishing': bool(is_phishing),
                'probability': float(proba),
                'confidence': 'high' if (proba > 0.9 or proba < 0.2) else 'medium',
                'domain': domain,
                'indicators': indicators if indicators else ['No strong indicators']
            }
        except Exception as e:
            return {
                'url': url,
                'is_phishing': None,
                'error': str(e),
                'confidence': 'low'
            }

# =================
# 3. Main Execution
# =================
if __name__ == "__main__":
    detector = PhishingDetector()
    
    try:
        print("Loading dataset...")
        data = pd.read_csv('C:/Users/msi/OneDrive/Bureau/CyberIA/StealthPhisher2025.csv')
        
        # Auto-detect columns
        url_col = next((col for col in data.columns if 'url' in col.lower()), 'URL')
        label_col = next((col for col in data.columns if col.lower() in ['label', 'phishing']), 'Label')
        
        print(f"Using columns: URLs='{url_col}', Labels='{label_col}'")
        
        # Convert labels
        label_mapping = {
            'legitimate': 0, 'phishing': 1, 'safe': 0, 'malicious': 1,
            '0': 0, '1': 1, 'false': 0, 'true': 1
        }
        y = data[label_col].astype(str).str.lower().map(label_mapping)
        
        if y.isna().any():
            invalid_labels = data[label_col][y.isna()].unique()
            raise ValueError(f"Found unmapped labels: {invalid_labels}")
        
        # Extract features
        print("Extracting features...")
        features = []
        valid_indices = []
        url_index = []
        
        for i, url in enumerate(data[url_col]):
            feat = extract_url_features(str(url))
            if feat:
                features.append(feat)
                valid_indices.append(i)
                url_index.append(url)
        
        X = pd.DataFrame(features, index=url_index)
        y = y.iloc[valid_indices].astype(int)
        
        print(f"Processed {len(X)} URLs")
        print("Label distribution:\n", y.value_counts())
        
        # Train model
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        print("\nTraining model...")
        detector.train(X_train, y_train)
        detector.save_model('phishing_model.pkl')
        print("Model trained and saved")
        
        # Show some learned domains
        print("\nSample trusted domains:", list(detector.trusted_domains)[:10])
        
        # Interactive testing
        print("\n" + "="*50)
        print("PHISHING DETECTOR READY")
        print("="*50)
        
        while True:
            url = input("\nEnter URL to check (or 'quit'): ").strip()
            if url.lower() == 'quit':
                break
                
            result = detector.predict(url)
            print("\n=== Analysis Result ===")
            print(f"URL: {result['url']}")
            print(f"Domain: {result.get('domain', 'N/A')}")
            print("Status:", "PHISHING" if result.get('is_phishing') else "SAFE")
            print(f"Confidence: {result.get('confidence', 'unknown')}")
            if 'probability' in result:
                print(f"Probability: {result['probability']:.2%}")
            print("Key Indicators:")
            for indicator in result.get('indicators', ['No strong indicators']):
                print(f"- {indicator}")
                
    except Exception as e:
        print(f"\nERROR: {str(e)}")
        print("\nTroubleshooting Guide:")
        print("1. Verify CSV file exists and is accessible")
        print("2. Check column headers match expected format")
        print("3. Ensure URLs are properly formatted")

Loading dataset...
Using columns: URLs='URL', Labels='Label'
Extracting features...
Processed 336749 URLs
Label distribution:
 Label
1    175806
0    160943
Name: count, dtype: int64

Training model...
Learned 200 new trusted domains from training data
[LightGBM] [Info] Number of positive: 140537, number of negative: 128862
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.007865 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 977
[LightGBM] [Info] Number of data points in the train set: 269399, number of used features: 16
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.500000 -> initscore=0.000000
[LightGBM] [Info] Start training from score 0.000000
Model trained and saved

Sample trusted domains: ['gv.at', 'il.us', 'or.jp', 'gov.pk', 'com.my', 'utoronto.ca', 'uk.com', 'utexas.edu', 'edu.cn', 'facebook.com']

PHISHING DETECTOR READY



Enter URL to check (or 'quit'):  https://www.facebook.com/



=== Analysis Result ===
URL: https://www.facebook.com/
Domain: facebook.com
Status: SAFE
Confidence: high
Probability: 10.00%
Key Indicators:
- Trusted domain: facebook.com



Enter URL to check (or 'quit'):  https://www.msn.com/?ocid=wispr&pc=u477&AR=1



=== Analysis Result ===
URL: https://www.msn.com/?ocid=wispr&pc=u477&AR=1
Domain: msn.com
Status: SAFE
Confidence: medium
Probability: 42.00%
Key Indicators:
- Suspicious randomness (entropy: 4.42)



Enter URL to check (or 'quit'):  http://localhost:8888/notebooks/OneDrive/Bureau/CyberIA/detector_fixed.ipynb?



=== Analysis Result ===
URL: http://localhost:8888/notebooks/OneDrive/Bureau/CyberIA/detector_fixed.ipynb?
Domain: localhost:8888
Status: PHISHING
Confidence: high
Probability: 100.00%
Key Indicators:
- Suspicious randomness (entropy: 4.64)



Enter URL to check (or 'quit'):  quit


In [55]:
import shutil
shutil.make_archive('phishing_model', 'zip', '.', 'phishing_model.pkl')
print("Download ready: phishing_model.zip")

Download ready: phishing_model.zip


In [57]:
# Change this line in the main execution section:
detector.save_model('phishing_model.pkl')  # Old version
# To this:
detector.save_model('PH.pkl')  # New version