In [1]:
import os
for root, dirs, files in os.walk("/kaggle"):
    for file in files:
        print(os.path.join(root, file))


In [2]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from sentence_transformers import SentenceTransformer
from torch import nn
from torch.optim import AdamW
from tqdm import tqdm
from sklearn.metrics import precision_recall_fscore_support
import os
import json
import re
import random

# Configuration
config = {
    "model_name": "sentence-transformers/all-mpnet-base-v2",  # Upgraded to more powerful model
    "embedding_size": 768,  # MPNet's embedding size
    "batch_size": 32,
    "learning_rate": 2e-5,
    "epochs": 50,
    "patience": 4,
    "threshold": 0.5,
    "train_size": 0.70,
    "val_size": 0.15,
    "test_size": 0.15,
    "sample_size": 150000,
    "random_seed": 42,
    "warmup_steps": 1000,
    "weight_decay": 0.01
}

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def process_techniques_with_descriptions(technique_str, techniques_df):
    if pd.isna(technique_str) or not isinstance(technique_str, str):
        return [], []

    techniques = technique_str.split(';')
    cleaned_techniques = []
    technique_descriptions = []
    technique_lookup = techniques_df.set_index('ID')['description'].to_dict()

    for tech in techniques:
        parts = tech.strip().split(' - ', maxsplit=1)
        if len(parts) < 1 or not parts[0].strip():
            continue

        tech_id = parts[0].strip()
        if re.match(r'^\d{4}(\.\d+)?$', tech_id):
            tech_id = f"T{tech_id}"

        cleaned_techniques.append(tech_id)
        description = technique_lookup.get(tech_id, f"Description not found for ID: {tech_id}")
        technique_descriptions.append(description)

    return cleaned_techniques, technique_descriptions

In [4]:
# First, let's modify the CVEDataset class to properly track the unique techniques
class CVEDataset(Dataset):
    def __init__(self, cve_data, techniques_df, sentence_transformer, num_negative_samples=3, max_techniques=10):
        self.cve_data = cve_data
        self.techniques_df = techniques_df
        self.sentence_transformer = sentence_transformer
        self.num_negative_samples = num_negative_samples
        self.max_techniques = max_techniques

        # Ensure ID is a string
        self.techniques_df['ID'] = self.techniques_df['ID'].astype(str)
        
        # Extract all techniques that actually appear in the CVE dataset
        all_techniques = []
        for _, row in cve_data.iterrows():
            if pd.notna(row['MITRE_Technique']) and isinstance(row['MITRE_Technique'], str):
                tech_ids, _ = process_techniques_with_descriptions(row['MITRE_Technique'], techniques_df)
                all_techniques.extend(tech_ids)
        
        # Create a set of unique techniques that actually appear in the CVE dataset
        unique_techniques = set(all_techniques)
        
        # Filter techniques_df to include only techniques that appear in the CVE dataset
        # and techniques that have valid descriptions
        valid_techniques = [
            tech_id for tech_id in unique_techniques 
            if tech_id in techniques_df['ID'].values
        ]
        
        # Create the final technique list (sorted for deterministic indexing)
        self.technique_list = sorted(valid_techniques)
        self.technique_set = set(self.technique_list)
        
        print(f"Number of unique techniques in CVE dataset: {len(self.technique_list)}")
        
        # Pre-compute embeddings only for techniques that appear in the CVE dataset
        print("Pre-computing technique embeddings...")
        self.technique_embeddings = {}
        technique_lookup = techniques_df.set_index('ID')
        
        for tech_id in self.technique_list:
            if tech_id in technique_lookup.index:
                description = technique_lookup.at[tech_id, 'description']
                self.technique_embeddings[tech_id] = self.sentence_transformer.encode(
                    description, 
                    convert_to_tensor=True
                )

    def __len__(self):
        return len(self.cve_data)

    def get_negative_samples(self, positive_technique_ids):
        positive_set = set(positive_technique_ids)
        available_techniques = list(set(self.technique_list) - positive_set)
        max_negatives = min(
            self.num_negative_samples,
            self.max_techniques - len(positive_technique_ids),
            len(available_techniques)
        )
        return random.sample(available_techniques, max_negatives) if max_negatives > 0 else []

    def pad_embeddings(self, embeddings, targets, max_size):
        if not embeddings:
            pad_embedding = torch.zeros_like(next(iter(self.technique_embeddings.values()), 
                                               torch.zeros(config["embedding_size"])))
            embeddings = [pad_embedding]
            targets = [0.0]
        
        while len(embeddings) < max_size:
            embeddings.append(torch.zeros_like(embeddings[0]))
            targets.append(0.0)
        
        return embeddings[:max_size], targets[:max_size]

    def __getitem__(self, idx):
        cve_entry = self.cve_data.iloc[idx]
        cve_embedding = self.sentence_transformer.encode(cve_entry['CVE_Description'], convert_to_tensor=True)
        
        positive_technique_ids, _ = process_techniques_with_descriptions(
            cve_entry['MITRE_Technique'], 
            self.techniques_df
        )
        # Filter out techniques not in our valid technique list
        positive_technique_ids = [tech for tech in positive_technique_ids if tech in self.technique_set]
        positive_technique_ids = positive_technique_ids[:self.max_techniques]
        negative_technique_ids = self.get_negative_samples(positive_technique_ids)

        # Create labels based on our filtered technique list
        labels = torch.zeros(len(self.technique_list), dtype=torch.float32)
        for tech_id in positive_technique_ids:
            if tech_id in self.technique_set:
                labels[self.technique_list.index(tech_id)] = 1.0

        technique_embeddings = []
        similarity_targets = []

        for tech_id in positive_technique_ids:
            if tech_id in self.technique_embeddings:
                technique_embeddings.append(self.technique_embeddings[tech_id])
                similarity_targets.append(1.0)

        for tech_id in negative_technique_ids:
            if tech_id in self.technique_embeddings:
                technique_embeddings.append(self.technique_embeddings[tech_id])
                similarity_targets.append(-1.0)

        technique_embeddings, similarity_targets = self.pad_embeddings(
            technique_embeddings,
            similarity_targets,
            self.max_techniques
        )

        return {
            "embeddings": cve_embedding,
            "labels": labels,
            "technique_embeddings": torch.stack(technique_embeddings),
            "similarity_targets": torch.tensor(similarity_targets, dtype=torch.float32)
        }



In [5]:
class CustomClassifier(nn.Module):
    def __init__(self, num_labels, embedding_size=768, lstm_hidden_size=256, lstm_layers=1, bidirectional=True):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=embedding_size,
            hidden_size=lstm_hidden_size,
            num_layers=lstm_layers,
            batch_first=True,
            bidirectional=bidirectional
        )
        lstm_output_size = lstm_hidden_size * 2 if bidirectional else lstm_hidden_size
        
        self.embedding_network = nn.Sequential(
            nn.Linear(lstm_output_size, 512),
            nn.LayerNorm(512),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(512, 256),
            nn.LayerNorm(256),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(256, num_labels)  # This now uses the actual number of techniques from the dataset
        )
        self.similarity_head = nn.Sequential(
            nn.Linear(512, 256),
            nn.LayerNorm(256),
            nn.ReLU(),
            nn.Linear(256, embedding_size)  # This produces embeddings for similarity comparison
        )
        
        self._init_weights()

    def _init_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.constant_(module.bias, 0)

    def forward(self, embeddings):
        embeddings = embeddings.unsqueeze(1) if len(embeddings.shape) == 2 else embeddings
        lstm_out, _ = self.lstm(embeddings)
        lstm_out = lstm_out[:, -1, :]
        
        shared_features = self.embedding_network(lstm_out)
        classification_output = self.classifier(shared_features)
        similarity_output = self.similarity_head(shared_features)
        
        return classification_output, similarity_output

In [6]:
class CombinedLoss(nn.Module):
    def __init__(self, alpha=1.0, gamma=2.0, similarity_weight=0.3):
        super().__init__()
        self.focal_loss = FocalLoss(alpha=alpha, gamma=gamma)
        self.cosine_similarity = nn.CosineSimilarity(dim=-1)
        self.similarity_weight = similarity_weight

    def forward(self, classification_outputs, similarity_outputs, technique_embeddings, labels, similarity_targets):
        # Ensure dimensions match before calculating loss
        # If classification_outputs and labels have different shapes, adjust labels
        if classification_outputs.shape[1] != labels.shape[1]:
            # Truncate labels to match classification_outputs
            labels = labels[:, :classification_outputs.shape[1]]
            
        classification_loss = self.focal_loss(classification_outputs, labels)
        
        # Prepare similarity outputs for comparison with technique embeddings
        similarity_outputs_expanded = similarity_outputs.unsqueeze(1).expand(-1, technique_embeddings.size(1), -1)
        
        # Calculate cosine similarity between CVE embeddings and technique embeddings
        similarities = self.cosine_similarity(similarity_outputs_expanded, technique_embeddings)
        
        # Use mask to only consider valid technique embeddings (non-zero target)
        valid_mask = (similarity_targets != 0).float()
        
        # Calculate similarity loss with the target (-1 for negative, 1 for positive examples)
        # We want cosine similarity to be 1 for positive examples and -1 for negative examples
        similarity_loss = (1 - similarities * similarity_targets) * valid_mask
        
        # Average the loss over valid examples only
        similarity_loss = similarity_loss.sum() / (valid_mask.sum() + 1e-6)
        
        # Combine losses with the similarity weight
        total_loss = classification_loss + self.similarity_weight * similarity_loss
        
        return total_loss, classification_loss, similarity_loss

class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        # Binary cross entropy loss with no reduction
        bce_loss = nn.BCEWithLogitsLoss(reduction='none')(inputs, targets)
        
        # Calculate probabilities
        pt = torch.exp(-bce_loss)
        
        # Apply focal weighting
        focal_weight = self.alpha * (1-pt)**self.gamma
        
        # Return mean of weighted losses
        return (focal_weight * bce_loss).mean()

class EarlyStopping:
    def __init__(self, patience, delta=0.001):
        self.patience = patience
        self.counter = 0
        self.best_score = None
        self.delta = delta
        self.best_state = None

    def __call__(self, current_score, model_state):
        if self.best_score is None or current_score > self.best_score + self.delta:
            self.best_score = current_score
            self.counter = 0
            self.best_state = model_state.copy()
            return False
        self.counter += 1
        return self.counter >= self.patience

In [7]:
def evaluate_model(model, loader, device):
    model.eval()
    all_labels = []
    all_preds = []
    total_loss = 0
    total_classification_loss = 0
    total_similarity_loss = 0
    criterion = CombinedLoss()

    with torch.no_grad():
        for batch in tqdm(loader, desc="Evaluating"):
            embeddings = batch["embeddings"].to(device)
            labels = batch["labels"].to(device)
            technique_embeddings = batch["technique_embeddings"].to(device)
            similarity_targets = batch["similarity_targets"].to(device)

            classification_outputs, similarity_outputs = model(embeddings)
            loss, cls_loss, sim_loss = criterion(
                classification_outputs,
                similarity_outputs,
                technique_embeddings,
                labels,
                similarity_targets
            )

            total_loss += loss.item()
            total_classification_loss += cls_loss.item()
            total_similarity_loss += sim_loss.item()

            preds = torch.sigmoid(classification_outputs) > config["threshold"]
            all_labels.append(labels.cpu())
            all_preds.append(preds.cpu())

    all_labels = torch.cat(all_labels)
    all_preds = torch.cat(all_preds)

    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average="micro", zero_division=0
    )
    precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(
        all_labels, all_preds, average="macro", zero_division=0
    )

    return {
        "micro": {"precision": precision, "recall": recall, "f1": f1},
        "macro": {"precision": precision_macro, "recall": recall_macro, "f1": f1_macro},
        "avg_loss": total_loss / len(loader),
        "avg_classification_loss": total_classification_loss / len(loader),
        "avg_similarity_loss": total_similarity_loss / len(loader)
    }

In [None]:
# Now let's update the main function to use the correct number of techniques
def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    torch.manual_seed(config["random_seed"])
    
    print(f"Loading data from {device}...")
    cve_df = pd.read_excel("cve_cwe_mitre_mapped.xlsx")
    techniques_df = pd.read_excel("techniques.xlsx")
    
    sampled_df = cve_df.sample(n=min(config["sample_size"], len(cve_df)), random_state=config["random_seed"])
    
    sentence_transformer = SentenceTransformer(config["model_name"])
    dataset = CVEDataset(sampled_df, techniques_df, sentence_transformer)
    
    # The number of techniques is now correctly derived from the processed CVE dataset
    num_techniques = len(dataset.technique_list)
    print(f"Creating model with {num_techniques} unique techniques")
    
    train_size = int(config["train_size"] * len(dataset))
    val_size = int(config["val_size"] * len(dataset))
    test_size = len(dataset) - train_size - val_size
    
    train_dataset, val_dataset, test_dataset = random_split(
        dataset,
        [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(config["random_seed"])
    )
    
    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=config["batch_size"])
    test_loader = DataLoader(test_dataset, batch_size=config["batch_size"])
    
    # Use the correct number of techniques when initializing the model
    model = CustomClassifier(num_techniques, config["embedding_size"]).to(device)
    optimizer = AdamW(model.parameters(), lr=config["learning_rate"], weight_decay=config["weight_decay"])
    criterion = CombinedLoss()
    early_stopping = EarlyStopping(patience=config["patience"])
    
    os.makedirs("checkpoints", exist_ok=True)
    best_model_path = "checkpoints/best_model.pt"
    best_val_f1 = 0
    
    print("Starting training...")
    for epoch in range(config["epochs"]):
        model.train()
        total_loss = 0
        total_cls_loss = 0
        total_sim_loss = 0
        
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch + 1}")
        for batch in progress_bar:
            optimizer.zero_grad()
            
            embeddings = batch["embeddings"].to(device)
            labels = batch["labels"].to(device)
            technique_embeddings = batch["technique_embeddings"].to(device)
            similarity_targets = batch["similarity_targets"].to(device)
            
            try:
                classification_outputs, similarity_outputs = model(embeddings)
                loss, cls_loss, sim_loss = criterion(
                    classification_outputs,
                    similarity_outputs,
                    technique_embeddings,
                    labels,
                    similarity_targets
                )
                
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
                total_cls_loss += cls_loss.item()
                total_sim_loss += sim_loss.item()
                
                progress_bar.set_postfix({
                    'loss': total_loss / (progress_bar.n + 1),
                    'cls_loss': total_cls_loss / (progress_bar.n + 1),
                    'sim_loss': total_sim_loss / (progress_bar.n + 1)
                })
            except RuntimeError as e:
                print(f"Error in batch: {e}")
                continue

        val_metrics = evaluate_model(model, val_loader, device)
        print(f"\nEpoch {epoch + 1} Validation Metrics:")
        print(f"Micro F1: {val_metrics['micro']['f1']:.4f}")
        print(f"Macro F1: {val_metrics['macro']['f1']:.4f}")
        print(f"Average Loss: {val_metrics['avg_loss']:.4f}")
        
        if val_metrics['micro']['f1'] > best_val_f1:
            best_val_f1 = val_metrics['micro']['f1']
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_metrics': val_metrics,
                'config': config,
                'num_techniques': num_techniques,  # Save the number of techniques for loading
                'technique_list': dataset.technique_list  # Save the technique list for inference
            }, best_model_path)
            
        if early_stopping(val_metrics['micro']['f1'], model.state_dict()):
            print("Early stopping triggered")
            break
    
    print("\nTraining completed. Loading best model for testing...")
    checkpoint = torch.load(best_model_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    
    test_metrics = evaluate_model(model, test_loader, device)
    print("\nTest Set Metrics:")
    print(f"Micro Precision: {test_metrics['micro']['precision']:.4f}")
    print(f"Micro Recall: {test_metrics['micro']['recall']:.4f}")
    print(f"Micro F1: {test_metrics['micro']['f1']:.4f}")
    print(f"Macro Precision: {test_metrics['macro']['precision']:.4f}")
    print(f"Macro Recall: {test_metrics['macro']['recall']:.4f}")
    print(f"Macro F1: {test_metrics['macro']['f1']:.4f}")
    
    results = {
        'config': config,
        'test_metrics': test_metrics,
        'best_val_f1': best_val_f1,
        'final_epoch': checkpoint['epoch'],
        'num_techniques': num_techniques,
        'technique_ids': dataset.technique_list
    }
    
    with open('results.json', 'w') as f:
        json.dump(results, f, indent=4)

if __name__ == "__main__":
    main()
