In [1]:
import torch
from transformers import AutoTokenizer
from urllib.parse import urlparse
import re
import unicodedata

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
def hasMisleadingChars(url):
  for char in url:
    if not (char.isascii() or char.isspace()):
      category = unicodedata.category(char)
      if category.startswith("L") and not unicodedata.combining(char):
        return True
  return  False
    

            
            
    

In [4]:
example_url = "https://ajax.googleapis.com/ajax/libs/jquery/1.5.1/jquery.min.js"

In [5]:
import pandas as pd
import matplotlib.pyplot as plt 
import seaborn as sns 

data = pd.read_csv("train_data.csv")

In [9]:
feature_data = data['url'].apply(lambda x: pd.Series(preprocess_url(x,False)))

In [10]:
feature_data.describe()

Unnamed: 0,domain_length,subdomains,num_dots,num_equals,protocol
count,319558.0,319558.0,319558.0,319558.0,319558.0
mean,17.808977,1.693746,2.323531,0.725959,0.034388
std,10.663161,1.053417,1.447766,1.667462,0.182224
min,1.0,0.0,0.0,0.0,0.0
25%,13.0,1.0,1.0,0.0,0.0
50%,16.0,2.0,2.0,0.0,0.0
75%,20.0,2.0,3.0,0.0,0.0
max,279.0,27.0,42.0,51.0,1.0


In [11]:
feature_data.head()

Unnamed: 0,domain_length,subdomains,num_dots,num_equals,protocol,missing_chars
0,11,1,1,1,0,False
1,26,2,3,0,0,False
2,9,1,1,0,0,False
3,20,2,4,0,0,False
4,12,2,3,0,0,False


In [15]:
import torch.nn as nn
from torch.utils.data import TensorDataset,DataLoader
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModel
from transformers import get_linear_schedule_with_warmup
from torch.optim import AdamW


In [17]:
def preprocess_url(url):
    url = re.sub(r'https?://', '', url)
    parts = url.split('/', 1)
    domain = parts[0]
    path = parts[1] if len(parts) > 1 else ""
    text_rep = f"{domain} {path.replace('/', ' ')}"
    
    return text_rep

def extract_url_features(url):
    url = url.strip().lower()
    protocol = 1 if urlparse(url).scheme == 'https' else 0
    url = re.sub(r"https?://","",url)
    parts = url.split("/",1)
    domain = parts[0]
    path = parts[1] if len(parts)>1 else ""

    features = {
        "domain_length" : len(domain),
        "subdomains" : domain.count('.'),
        'num_dots': url.count('.'),
         'num_equals': url.count('='),
         'protocol': protocol,
        "missing_chars": hasMisleadingChars(url)
    }
   
    return np.array(list(features.values()))


In [18]:
def finetune_deberta(urls, labels, model_name="microsoft/deberta-base", epochs=3, batch_size=16):

    processed_urls = [preprocess_url(url) for url in urls]
    
    train_texts, val_texts, train_labels, val_labels = train_test_split(
        processed_urls, labels, test_size=0.15, stratify=labels, random_state=1
    )
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_name, num_labels=2
    )

    train_encodings = tokenizer(
        train_texts, 
        truncation=True, 
        padding=True, 
        max_length=128,
        return_tensors="pt"
    )
    
    val_encodings = tokenizer(
        val_texts, 
        truncation=True, 
        padding=True, 
        max_length=128,
        return_tensors="pt"
    )
    
    train_labels = torch.tensor(train_labels)
    val_labels = torch.tensor(val_labels)
    
    train_dataset = TensorDataset(
        train_encodings['input_ids'], 
        train_encodings['attention_mask'], 
        train_labels
    )
    
    val_dataset = TensorDataset(
        val_encodings['input_ids'], 
        val_encodings['attention_mask'], 
        val_labels
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size*2)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    optimizer = AdamW(model.parameters(), lr=2e-5)
    
    total_steps = len(train_loader) * epochs
    scheduler = get_linear_schedule_with_warmup(
        optimizer, 
        num_warmup_steps=0.05*total_steps,
        num_training_steps=total_steps
    )
    
    train_losses = []
    val_accuracies = []
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        
        for batch in train_loader:
            optimizer.zero_grad()
            
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            labels = batch[2].to(device)
            
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            epoch_loss += loss.item()
        
        avg_train_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        model.eval()
        val_accuracy = 0
        
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch[0].to(device)
                attention_mask = batch[1].to(device)
                labels = batch[2].to(device)
                
                outputs = model(input_ids, attention_mask=attention_mask)
                predictions = torch.argmax(outputs.logits, dim=-1)
                val_accuracy += (predictions == labels).sum().item()
        
        val_accuracy /= len(val_dataset)
        val_accuracies.append(val_accuracy)
        
        print(f"Epoch {epoch+1}/{epochs} - Loss: {avg_train_loss:.4f} - Val Accuracy: {val_accuracy:.4f}")
    
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses)
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    
    plt.subplot(1, 2, 2)
    plt.plot(val_accuracies)
    plt.title('Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    
    plt.tight_layout()
    plt.savefig('deberta_training_progress.png')
    
    model_path = "./finetuned-deberta-url-classifier"
    model.save_pretrained(model_path)
    tokenizer.save_pretrained(model_path)
    
    return model, tokenizer, model_path

In [19]:
class DebertaFeatureExtractor:
    def __init__(self, model_path):
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.model = AutoModel.from_pretrained(model_path)  # Use base model, not classifier
        self.model.eval()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
    
    def get_embeddings(self, urls, batch_size=32, max_length=128):
        processed_urls = [preprocess_url(url) for url in urls]
        
        all_embeddings = []
        
        # Process in batches
        for i in range(0, len(processed_urls), batch_size):
            batch_texts = processed_urls[i:i+batch_size]
            
            # Tokenize
            encodings = self.tokenizer(
                batch_texts,
                truncation=True,
                padding=True,
                max_length=max_length,
                return_tensors="pt"
            )
            
            # Move to device
            input_ids = encodings['input_ids'].to(self.device)
            attention_mask = encodings['attention_mask'].to(self.device)
            
            # Get embeddings
            with torch.no_grad():
                outputs = self.model(input_ids, attention_mask=attention_mask)
                # Use CLS token embedding as the URL embedding
                embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
            
            all_embeddings.append(embeddings)
        
        return np.vstack(all_embeddings)

In [20]:
class HybridURLClassifier(nn.Module):
    def __init__(self, embedding_dim=768, feature_dim=14, hidden_dim=256, dropout_rate=0.3):
        super(HybridURLClassifier, self).__init__()
        
        self.feature_scaler = StandardScaler()
        
        self.fc1 = nn.Linear(embedding_dim + feature_dim, hidden_dim)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.fc3 = nn.Linear(hidden_dim // 2, 1)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, embeddings, features):
        combined = torch.cat((embeddings, features), dim=1)
        
        # Forward pass
        x = torch.relu(self.fc1(combined))
        x = self.dropout1(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout2(x)
        x = self.fc3(x)
        x = self.sigmoid(x)
        
        return x
    
    def fit_scaler(self, features):
        """Fit the StandardScaler to the engineered features"""
        self.feature_scaler.fit(features)


In [None]:
def train_hybrid_model(urls, labels, deberta_extractor, epochs=5, batch_size=32):
    # Split data
    train_urls, test_urls, train_labels, test_labels = train_test_split(
        urls, labels, test_size=0.2, stratify=labels, random_state=42
    )
    
    print("Extracting DeBERTa embeddings...")
    train_embeddings = deberta_extractor.get_embeddings(train_urls)
    test_embeddings = deberta_extractor.get_embeddings(test_urls)
    
    print("Extracting engineered features...")
    train_features = np.array([extract_url_features(url) for url in train_urls])
    test_features = np.array([extract_url_features(url) for url in test_urls])
    
    embedding_dim = train_embeddings.shape[1]
    feature_dim = train_features.shape[1]
    
    model = HybridURLClassifier(
        embedding_dim=embedding_dim,
        feature_dim=feature_dim,
        hidden_dim=256,
        dropout_rate=0.3
    )
    
    # Fit scaler to training features
    model.fit_scaler(train_features)
    
    # Scale features
    train_features_scaled = model.feature_scaler.transform(train_features)
    test_features_scaled = model.feature_scaler.transform(test_features)
    
    # Convert to PyTorch tensors
    train_embeddings_tensor = torch.tensor(train_embeddings, dtype=torch.float32)
    train_features_tensor = torch.tensor(train_features_scaled, dtype=torch.float32)
    train_labels_tensor = torch.tensor(train_labels, dtype=torch.float32).view(-1, 1)
    
    test_embeddings_tensor = torch.tensor(test_embeddings, dtype=torch.float32)
    test_features_tensor = torch.tensor(test_features_scaled, dtype=torch.float32)
    test_labels_tensor = torch.tensor(test_labels, dtype=torch.float32).view(-1, 1)
    
    # Create dataset and dataloader
    train_dataset = TensorDataset(
        train_embeddings_tensor, train_features_tensor, train_labels_tensor
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # Loss function and optimizer
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    train_losses = []
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        
        for batch in train_loader:
            embeddings, features, labels = [b.to(device) for b in batch]
            
            # Forward pass
            optimizer.zero_grad()
            outputs = model(embeddings, features)
            loss = criterion(outputs, labels)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        avg_loss = epoch_loss / len(train_loader)
        train_losses.append(avg_loss)
        
        print(f"Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f}")
    
    # Evaluate model
    model.eval()
    with torch.no_grad():
        test_embeddings_tensor = test_embeddings_tensor.to(device)
        test_features_tensor = test_features_tensor.to(device)
        test_outputs = model(test_embeddings_tensor, test_features_tensor)
        test_predictions = (test_outputs >= 0.5).float().cpu().numpy()
    
    # Calculate metrics
    accuracy = (test_predictions.flatten() == test_labels).mean()
    auc = roc_auc_score(test_labels, test_outputs.cpu().numpy())
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test AUC: {auc:.4f}")
    
    # Classification report
    print("\nClassification Report:")
    print(classification_report(test_labels, test_predictions.flatten()))
    
    # Confusion matrix
    cm = confusion_matrix(test_labels, test_predictions.flatten())
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.savefig('confusion_matrix.png')
    
    # Save model
    torch.save(model.state_dict(), 'hybrid_url_classifier.pth')
    
    return model

In [21]:
def detect_malicious_urls(urls_data, labels_data, deberta_model_name="microsoft/deberta-base"):
    """Complete pipeline for malicious URL detection"""
    print("Step 1: Fine-tuning DeBERTa model...")
    deberta_model, tokenizer, model_path = finetune_deberta(
        urls_data, labels_data, model_name=deberta_model_name, epochs=3
    )
    
    print("\nStep 2: Creating DeBERTa feature extractor...")
    deberta_extractor = DebertaFeatureExtractor(model_path)
    
    print("\nStep 3: Training hybrid neural network...")
    hybrid_model = train_hybrid_model(
        urls_data, labels_data, deberta_extractor, epochs=5
    )
    
    print("\nTraining complete! Models saved to:")
    print(f"- DeBERTa model: {model_path}")
    print("- Hybrid model: hybrid_url_classifier.pth")
    return deberta_extractor, hybrid_model

In [None]:
# Example usage
if __name__ == "__main__":
    # Load your dataset
    # df = pd.read_csv('url_dataset.csv')
    # urls = df['url'].tolist()
    # labels = df['is_malicious'].tolist()
    
    # Run the full pipeline
    # deberta_extractor, hybrid_model = detect_malicious_urls(urls, labels)
    
    # For inference on new URLs
    def predict_url(url, deberta_extractor, hybrid_model):
        # Extract DeBERTa embedding
        embedding = deberta_extractor.get_embeddings([url])
        embedding_tensor = torch.tensor(embedding, dtype=torch.float32)
        
        # Extract and scale engineered features
        features = extract_url_features(url).reshape(1, -1)
        features_scaled = hybrid_model.feature_scaler.transform(features)
        features_tensor = torch.tensor(features_scaled, dtype=torch.float32)
        
        # Make prediction
        device = next(hybrid_model.parameters()).device
        embedding_tensor = embedding_tensor.to(device)
        features_tensor = features_tensor.to(device)
        
        hybrid_model.eval()
        with torch.no_grad():
            output = hybrid_model(embedding_tensor, features_tensor)
        
        probability = output.item()
        prediction = 1 if probability >= 0.5 else 0
        
        return prediction, probability