In [2]:
import pandas as pd
import numpy as np
import requests
from bs4 import BeautifulSoup
import joblib
from urllib.parse import urlparse
from datetime import datetime
import whois
from functools import lru_cache

class URLPhishingDetector:
    """
    A class to detect phishing URLs using Layer 2 and Layer 3 features.
    Includes caching for model artifacts to improve performance.
    """
    
    def __init__(self, model_path='l2_l3_xgboost_model.pkl', 
                 scaler_path='scaler.joblib',
                 tld_map_path='l2_l3_tld_map.pkl'):
        """
        Initialize the detector with paths to saved artifacts.
        
        Args:
            model_path: Path to the trained model pickle file
            scaler_path: Path to the fitted scaler pickle file
            tld_map_path: Path to the TLD frequency map pickle file
        """
        self.model_path = model_path
        self.scaler_path = scaler_path
        self.tld_map_path = tld_map_path
        
        # Lazy loading - artifacts will be loaded on first use
        self._model = None
        self._scaler = None
        self._tld_map = None
    
    @property
    def model(self):
        """Lazy load and cache the model."""
        if self._model is None:
            self._model = joblib.load(self.model_path)
        return self._model
    
    @property
    def scaler(self):
        """Lazy load and cache the scaler."""
        if self._scaler is None:
            self._scaler = joblib.load(self.scaler_path)
        return self._scaler
    
    @property
    def tld_map(self):
        """Lazy load and cache the TLD frequency map."""
        if self._tld_map is None:
            self._tld_map = joblib.load(self.tld_map_path)
        return self._tld_map
    
    @staticmethod
    def get_tld(url):
        """
        Extract the top-level domain from a URL.
        
        Args:
            url: The URL to parse
            
        Returns:
            str: The TLD (e.g., 'com', 'org', 'edu')
        """
        try:
            domain = urlparse(url).netloc
            if not domain:
                return 'none'
            parts = domain.split('.')
            if len(parts) > 1:
                # Handle country-code TLDs like co.uk
                return '.'.join(parts[-2:]) if len(parts[-1]) == 2 and len(parts[-2]) <= 3 else parts[-1]
            return 'none'
        except:
            return 'error'
    
    def analyze_redirects(self, start_url):
        """
        Follow redirect chain and extract Layer 2 features.
        
        Args:
            start_url: The URL to analyze
            
        Returns:
            tuple: (final_url, l2_features_dict)
        """
        l2_features = {
            'hop_count': -1,
            'uses_url_shortener': 0,
            'final_url_tld': 'none'
        }
        
        redirect_chain = []
        current_url = str(start_url)
        max_hops = 10
        
        # Check for URL shorteners
        try:
            if urlparse(current_url).netloc in ['bit.ly', 'tinyurl.com', 't.co', 'goo.gl', 'buff.ly']:
                l2_features['uses_url_shortener'] = 1
        except:
            pass
        
        # Follow redirects
        try:
            for i in range(max_hops):
                response = requests.head(
                    current_url, 
                    allow_redirects=False, 
                    timeout=5,
                    headers={
                        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                    }
                )
                redirect_chain.append(current_url)
                
                if 300 <= response.status_code < 400 and 'Location' in response.headers:
                    next_url = response.headers['Location']
                    if not urlparse(next_url).netloc:
                        next_url = requests.compat.urljoin(current_url, next_url)
                    current_url = next_url
                else:
                    break
            
            l2_features['hop_count'] = len(redirect_chain) - 1
            
        except requests.exceptions.RequestException as e:
            # Connection error - return None to indicate failure
            return None, l2_features
        
        # Get final URL TLD
        final_url = current_url
        l2_features['final_url_tld'] = self.get_tld(final_url)
        
        return final_url, l2_features
    
    def analyze_html(self, url):
        """
        Fetch and analyze HTML content to extract Layer 3 features.
        
        Args:
            url: The URL to analyze
            
        Returns:
            dict: Layer 3 features
        """
        l3_features = {
            'num_script_tags': -1,
            'has_password_field': -1,
            'num_external_links': -1,
            'dom_depth': -1
        }
        
        if not url:
            return l3_features
        
        try:
            response = requests.get(
                url, 
                timeout=10,
                headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                }
            )
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Feature 1: Number of script tags
            l3_features['num_script_tags'] = len(soup.find_all('script'))
            
            # Feature 2: Has password field
            l3_features['has_password_field'] = 1 if soup.find('input', {'type': 'password'}) else 0
            
            # Feature 3: Number of external links
            external_links = 0
            current_domain = urlparse(url).netloc
            for link in soup.find_all('a', href=True):
                link_domain = urlparse(link['href']).netloc
                if link_domain and link_domain != current_domain:
                    external_links += 1
            l3_features['num_external_links'] = external_links
            
            # Feature 4: DOM depth
            def get_depth(element, depth):
                if not hasattr(element, 'children'):
                    return depth
                max_child_depth = depth
                for child in element.children:
                    if child.name:
                        child_depth = get_depth(child, depth + 1)
                        if child_depth > max_child_depth:
                            max_child_depth = child_depth
                return max_child_depth
            
            l3_features['dom_depth'] = get_depth(soup, 0)
            
        except requests.exceptions.RequestException:
            pass  # Return default -1 values
        except Exception:
            pass  # Return default -1 values
        
        return l3_features
    
    def preprocess_url(self, url):
        """
        Complete preprocessing pipeline for a single URL.
        
        Args:
            url: The URL to analyze
            
        Returns:
            tuple: (features_dict, scaled_features_array) or raises exception
            
        Raises:
            ConnectionError: If unable to connect to the URL
        """
        # Layer 2: Analyze redirects
        final_url, l2_features = self.analyze_redirects(url)
        
        # Check if connection failed (hop_count == -1)
        if l2_features['hop_count'] == -1:
            raise ConnectionError(f"Unable to connect to URL: {url}")
        
        # Layer 3: Analyze HTML (only if L2 succeeded)
        if final_url:
            l3_features = self.analyze_html(final_url)
        else:
            l3_features = {
                'num_script_tags': -1,
                'has_password_field': -1,
                'num_external_links': -1,
                'dom_depth': -1
            }
        
        # Combine features
        combined_features = {
            'hop_count': l2_features['hop_count'],
            'uses_url_shortener': l2_features['uses_url_shortener'],
            'num_script_tags': l3_features['num_script_tags'],
            'has_password_field': l3_features['has_password_field'],
            'num_external_links': l3_features['num_external_links'],
            'dom_depth': l3_features['dom_depth']
        }
        
        # Map TLD to frequency (1 if not in training data)
        tld = l2_features['final_url_tld']
        combined_features['final_url_tld_freq'] = self.tld_map.get(tld, 1)
        
        # Convert to DataFrame with correct column order
        feature_columns = [
            'hop_count', 'uses_url_shortener', 'num_script_tags',
            'has_password_field', 'num_external_links', 'dom_depth',
            'final_url_tld_freq'
        ]
        feature_values = [
            combined_features['hop_count'],
            combined_features['uses_url_shortener'],
            combined_features['num_script_tags'],
            combined_features['has_password_field'],
            combined_features['num_external_links'],
            combined_features['dom_depth'],
            combined_features['final_url_tld_freq']
        ]

        
        features_df = pd.DataFrame(combined_features, columns=feature_columns, index = [0])
        
        # Scale features
        #scaled_features = self.scaler.transform(features_df)
        #removing scaling because retrained model without scaling, trying to fix an issue, won't affect performacne for tree models
        scaled_features = features_df
        return combined_features, scaled_features
    
    def predict(self, url, return_probability=False):
        """
        Predict whether a URL is phishing or benign.
        
        Args:
            url: The URL to analyze
            return_probability: If True, return probability scores instead of binary prediction
            
        Returns:
            dict: Prediction result with the following keys:
                - 'url': The analyzed URL
                - 'prediction': 'phishing' or 'benign' (or probability if return_probability=True)
                - 'confidence': Probability of the predicted class
                - 'features': Dictionary of extracted features
                
        Raises:
            ConnectionError: If unable to connect to the URL
        """
        # Preprocess the URL
        features_dict, scaled_features = self.preprocess_url(url)
        
        # Make prediction
        if return_probability:
            # Return probability scores [benign_prob, phishing_prob]
            probabilities = self.model.predict_proba(scaled_features)[0]
            prediction = {
                'benign': float(probabilities[0]),
                'phishing': float(probabilities[1])
            }
            confidence = float(max(probabilities))
        else:
            # Return binary prediction (0 = benign, 1 = phishing)
            pred = self.model.predict(scaled_features)[0]
            probabilities = self.model.predict_proba(scaled_features)[0]
            prediction = 'phishing' if pred == 1 else 'benign'
            confidence = float(probabilities[pred])
        
        return {
            'url': url,
            'prediction': prediction,
            'confidence': confidence,
            'features': features_dict
        }


# Usage examples:
if __name__ == "__main__":
    # Initialize the detector (artifacts will be loaded on first use)
    detector = URLPhishingDetector(
        model_path='l2_l3_xgboost_model.pkl',
        scaler_path='scaler.joblib',
        tld_map_path='l2_l3_tld_map.pkl'
    )
    
    # Example 1: Predict a single URL
    try:
        result = detector.predict("https://google.com")
        print("Prediction:", result['prediction'])
        print("Confidence:", f"{result['confidence']:.2%}")
        print("Features:", result['features'])
    except ConnectionError as e:
        print(f"Error: {e}")
    
    # Example 2: Get probability scores
    try:
        result = detector.predict("https://example.com", return_probability=True)
        print("\nProbability Distribution:")
        print(f"  Benign: {result['prediction']['benign']:.2%}")
        print(f"  Phishing: {result['prediction']['phishing']:.2%}")
    except ConnectionError as e:
        print(f"Error: {e}")
    
    # Example 3: Batch processing (efficient with caching)
    urls = [
        "https://www.google.com",
        "https://github.com",
        "https://stackoverflow.com"
    ]
    
    print("\nBatch Predictions:")
    for url in urls:
        try:
            result = detector.predict(url)
            print(f"{url}: {result['prediction']} ({result['confidence']:.2%})")
        except ConnectionError:
            print(f"{url}: Unable to connect")

Prediction: phishing
Confidence: 76.59%
Features: {'hop_count': 1, 'uses_url_shortener': 0, 'num_script_tags': 6, 'has_password_field': 0, 'num_external_links': 3, 'dom_depth': 11, 'final_url_tld_freq': 715}

Probability Distribution:
  Benign: 29.89%
  Phishing: 70.11%

Batch Predictions:
https://www.google.com: phishing (78.16%)
https://github.com: benign (52.09%)
https://stackoverflow.com: benign (99.78%)


In [16]:
model = joblib.load('l2_l3_xgboost_model.pkl')
scaler = joblib.load('scaler.joblib')

# Check what the scaler expects
print("Scaler expects this many features:", scaler.n_features_in_)
print("\nScaler feature names (if available):")
if hasattr(scaler, 'feature_names_in_'):
    print(scaler.feature_names_in_)
else:
    print("Feature names not stored in scaler")

# Check model
print("\nModel expects this many features:", model.n_features_in_)
if hasattr(model, 'feature_names_in_'):
    print("Model feature names:")
    print(model.feature_names_in_)

Scaler expects this many features: 7

Scaler feature names (if available):
['hop_count' 'uses_url_shortener' 'num_script_tags' 'has_password_field'
 'num_external_links' 'dom_depth' 'final_url_tld_freq']

Model expects this many features: 7


In [13]:
detector = URLPhishingDetector(...)
features, scaled = detector.preprocess_url("https://google.com")
print("Features for google.com:", features)
print("Scaled features:", scaled)

Features for google.com: {'hop_count': 1, 'uses_url_shortener': 0, 'num_script_tags': 6, 'has_password_field': 0, 'num_external_links': 3, 'dom_depth': 11, 'final_url_tld_freq': 715}
Scaled features: [[0.    0.    0.    0.    0.125 0.    0.   ]]


In [14]:
print("Expected feature order:", scaler.feature_names_in_)


Expected feature order: ['hop_count' 'uses_url_shortener' 'num_script_tags' 'has_password_field'
 'num_external_links' 'dom_depth' 'final_url_tld_freq']


In [17]:
print("Combined features dict:", combined_features)
print("DataFrame values:", features_df.values)
print("DataFrame dtypes:", features_df.dtypes)

NameError: name 'combined_features' is not defined