In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/eatvul/preserved_pool_attack.csv
/kaggle/input/eatvul/cwe189_train.csv
/kaggle/input/eatvul/cwe119_train.csv
/kaggle/input/eatvul/cwe416_test.csv
/kaggle/input/eatvul/cwe399_ast_test.json
/kaggle/input/eatvul/asterisk_ast_test_ADV.json
/kaggle/input/eatvul/predict_codet5_cwe119.txt
/kaggle/input/eatvul/cwe399-huggingface.csv
/kaggle/input/eatvul/cwe399_ast_test_ADV.json
/kaggle/input/eatvul/cwe399_test.csv
/kaggle/input/eatvul/openssl_ast_test_ADV.json
/kaggle/input/eatvul/cwe119_test.csv
/kaggle/input/eatvul/openssl_ast_test.json
/kaggle/input/eatvul/cwe119-huggingface.csv
/kaggle/input/eatvul/cwe20_train.csv
/kaggle/input/eatvul/cwe399_ast_train.json
/kaggle/input/eatvul/predict_codet5_cwe20.txt
/kaggle/input/eatvul/cwe-399-v2.csv
/kaggle/input/eatvul/predict_codebert_cwe189.txt
/kaggle/input/eatvul/predict_codebert_cwe399.txt
/kaggle/input/eatvul/openssl_ast_train.json
/kaggle/input/eatvul/predict_codet5_cwe416.txt
/kaggle/input/eatvul/cwe119_ast_test.json
/kaggle/inpu

In [2]:
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torch.nn import CrossEntropyLoss
from transformers import RobertaTokenizerFast, RobertaModel, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, confusion_matrix
import matplotlib.pyplot as plt
try:
    import seaborn as sns
except ImportError:
    print("Warning: Seaborn not installed. Some visualizations may not work.")
    sns = None
from tqdm import tqdm
import os
import re
import gc
import json
import zipfile
from datetime import datetime
from torch.optim import AdamW

# Set device (GPU if available, else CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


class CodePreprocessor:
    """Preprocess code for CodeBERT model"""
    
    def __init__(self):
        self.tokenizer = RobertaTokenizerFast.from_pretrained("microsoft/codebert-base")
        self.max_length = 512  # Maximum sequence length for CodeBERT
    
    def preprocess_code(self, code_text):
        """Basic preprocessing of code text"""
        # Remove extra whitespace
        code_text = re.sub(r'\s+', ' ', code_text)
        code_text = code_text.strip()
        return code_text
    
    def tokenize(self, code_text, truncation=True, padding='max_length', return_tensors=None):
        """Tokenize code text using RobertaTokenizerFast"""
        processed_code = self.preprocess_code(code_text)
        return self.tokenizer(processed_code, 
                             truncation=truncation, 
                             max_length=self.max_length,
                             padding=padding,
                             return_tensors=return_tensors)

class CodeDataset(Dataset):
    """Dataset for code vulnerability detection using CodeBERT"""
    
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        encoding = self.tokenizer(text, 
                                 truncation=True,
                                 max_length=self.max_length,
                                 padding='max_length',
                                 return_tensors='pt')
        
        # Remove batch dimension added by tokenizer when return_tensors='pt'
        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attention_mask'].squeeze()
        
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': torch.tensor(label, dtype=torch.long)
        }

class CodeBERTClassifier(nn.Module):
    """CodeBERT model for code vulnerability detection"""
    
    def __init__(self, freeze_bert=False, dropout_rate=0.1):
        super(CodeBERTClassifier, self).__init__()
        
        # Load pre-trained CodeBERT model
        self.codebert = RobertaModel.from_pretrained("microsoft/codebert-base")
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(self.codebert.config.hidden_size, 2)  # Binary classification
        
        # Freeze CodeBERT layers if specified
        if freeze_bert:
            for param in self.codebert.parameters():
                param.requires_grad = False
    
    def forward(self, input_ids, attention_mask):
        # Get CodeBERT outputs
        outputs = self.codebert(input_ids=input_ids, attention_mask=attention_mask)
        
        # Use the [CLS] token representation for classification
        pooled_output = outputs.pooler_output
        
        # Apply dropout and classify
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        return logits

class CodeBERTTrainer:
    """Trainer for CodeBERT model"""
    
    def __init__(self, data_path=None, batch_size=8, epochs=4, learning_rate=2e-5):
        self.preprocessor = CodePreprocessor()
        self.batch_size = batch_size
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.data = None
        self.model = None
        self.best_model_state = None
        self.best_val_accuracy = 0.0
        self.best_val_f1 = 0.0  # Add this for F1-based model selection
        self.history = {
            'train_loss': [],
            'val_loss': [],
            'val_accuracy': [],
            'val_precision': [],
            'val_recall': [],
            'val_f1': []
        }
        self.output_dir = os.path.join(os.getcwd(), 'codebert_outputs')
        os.makedirs(self.output_dir, exist_ok=True)
        
        if data_path:
            self.load_data(data_path)
    
    def load_data(self, data_path):
        """
        Load data from file or DataFrame
        
        Args:
            data_path: Path to a data file (CSV, Excel, JSON) or a pandas DataFrame
        """
        print(f"DEBUG: Type of data_path in load_data: {type(data_path)}")
        
        # If data_path is already a DataFrame, use it directly
        if isinstance(data_path, pd.DataFrame):
            self.data = data_path
            print(f"Using provided DataFrame with {len(self.data)} samples.")
            
        # If it's a string, try to load from file
        elif isinstance(data_path, str):
            print(f"DEBUG: Trying to load from file path: '{data_path}'")
            
            # Check if the file exists
            if not os.path.exists(data_path):
                raise FileNotFoundError(f"File not found: '{data_path}'")
                
            file_ext = os.path.splitext(data_path.lower())[1]
            print(f"DEBUG: File extension detected: '{file_ext}'")
            
            if file_ext == '.csv':
                self.data = pd.read_csv(data_path)
            elif file_ext in ['.xls', '.xlsx']:
                self.data = pd.read_excel(data_path)
            elif file_ext == '.json':
                self.data = pd.read_json(data_path)
            elif file_ext == '.pkl' or file_ext == '.pickle':
                self.data = pd.read_pickle(data_path)
            elif file_ext == '':
                # Try to infer the format if no extension is given
                try:
                    # First try CSV as it's most common
                    self.data = pd.read_csv(data_path)
                    print(f"Inferred file format as CSV for: {data_path}")
                except:
                    try:
                        # Then try JSON
                        self.data = pd.read_json(data_path)
                        print(f"Inferred file format as JSON for: {data_path}")
                    except:
                        raise ValueError(f"Could not determine file format for: '{data_path}'. Please specify a file with extension or provide a DataFrame.")
            else:
                raise ValueError(f"Unsupported file format: '{file_ext}'. Supported formats: CSV, Excel, JSON, Pickle")
        else:
            raise TypeError(f"data_path must be either a string file path or a pandas DataFrame, got {type(data_path).__name__}")
        
        # Check if required columns exist
        if 'functionSource' not in self.data.columns or 'label' not in self.data.columns:
            raise ValueError("Data must contain 'functionSource' and 'label' columns.")
        
        print(f"Loaded data with {len(self.data)} samples.")
        print(f"Label distribution: {self.data['label'].value_counts().to_dict()}")
    
    def set_data(self, dataframe):
        """Set data directly from a pandas DataFrame"""
        if not isinstance(dataframe, pd.DataFrame):
            raise ValueError("Input must be a pandas DataFrame.")
        
        # Check if required columns exist
        if 'functionSource' not in dataframe.columns or 'label' not in dataframe.columns:
            raise ValueError("Data must contain 'functionSource' and 'label' columns.")
        
        self.data = dataframe
        print(f"Set data with {len(self.data)} samples.")
        print(f"Label distribution: {self.data['label'].value_counts().to_dict()}")
    
    def prepare_data(self, train_data, test_data):
        """Prepare data for model training using pre-split train and test data"""
        if self.data is None and (train_data is None or test_data is None):
            raise ValueError("No data provided. Provide train_data and test_data or call load_data/set_data first.")
        
        # Use provided train and test data
        train_texts = train_data['functionSource'].values
        train_labels = train_data['label'].values
        test_texts = test_data['functionSource'].values
        test_labels = test_data['label'].values
        
        # Create datasets
        train_dataset = CodeDataset(
            train_texts, 
            train_labels, 
            self.preprocessor.tokenizer, 
            self.preprocessor.max_length
        )
        
        test_dataset = CodeDataset(
            test_texts, 
            test_labels, 
            self.preprocessor.tokenizer, 
            self.preprocessor.max_length
        )
        
        # Create data loaders
        train_loader = DataLoader(
            train_dataset,
            batch_size=self.batch_size,
            shuffle=True
        )
        
        test_loader = DataLoader(
            test_dataset,
            batch_size=self.batch_size,
            shuffle=False
        )
        
        print(f"Train samples: {len(train_dataset)}")
        print(f"Test samples: {len(test_dataset)}")
        
        return {
            'train_loader': train_loader,
            'val_loader': test_loader,  # Use test loader for validation
            'test_loader': test_loader,
            'test_texts': test_texts,
            'test_labels': test_labels
        }
    
    def run_all(self, data_source=None, train_data=None, test_data=None, freeze_bert=False, dataset_name="test"):
        """Run all steps: data preparation, training, evaluation, and saving"""
        # Load data if provided as a single source
        if data_source is not None:
            if isinstance(data_source, str):
                self.load_data(data_source)
            elif isinstance(data_source, pd.DataFrame):
                self.set_data(data_source)
        
        # If train_data and test_data are provided, use them; otherwise, ensure data is loaded
        if train_data is not None and test_data is not None:
            if not isinstance(train_data, pd.DataFrame) or not isinstance(test_data, pd.DataFrame):
                raise ValueError("train_data and test_data must be pandas DataFrames.")
            if 'functionSource' not in train_data.columns or 'label' not in train_data.columns:
                raise ValueError("train_data must contain 'functionSource' and 'label' columns.")
            if 'functionSource' not in test_data.columns or 'label' not in test_data.columns:
                raise ValueError("test_data must contain 'functionSource' and 'label' columns.")
            print(f"Using provided train_data with {len(train_data)} samples.")
            print(f"Using provided test_data with {len(test_data)} samples.")
        elif self.data is None:
            raise ValueError("No data loaded. Provide data_source or train_data/test_data.")
        
        # Prepare data using provided train/test split or loaded data
        if train_data is not None and test_data is not None:
            data_loaders = self.prepare_data(train_data, test_data)
        else:
            data_loaders = self.prepare_data(self.data, self.data)  # Fallback (though not used in your case)
        
        # Train model
        self.train_model(data_loaders, freeze_bert=freeze_bert)
        
        # Plot training history
        self.plot_training_history()
        
        # Evaluate model with dataset name for proper file naming
        results = self.evaluate_model(data_loaders['test_loader'], dataset_name=dataset_name)
        
        # Save model
        model_dir = self.save_model()
        
        # Save evaluation results
        with open(os.path.join(model_dir, 'evaluation_results.json'), 'w') as f:
            # Convert numpy values to Python types for JSON serialization
            serializable_results = {
                k: v if not isinstance(v, np.ndarray) else v.tolist()
                for k, v in results.items()
            }
            json.dump(serializable_results, f)
        
        print("\n=== Training and Evaluation Complete ===")
        print(f"All outputs saved to: {model_dir}")
        
        # Free up GPU memory
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        return results
    
    def train_model(self, data_loaders, freeze_bert=False):
        """Train the CodeBERT model"""
        # Initialize model
        self.model = CodeBERTClassifier(freeze_bert=freeze_bert)
        self.model.to(device)
        
        # Define optimizer and scheduler
        optimizer = AdamW(self.model.parameters(), lr=self.learning_rate)
        
        # Calculate total training steps for learning rate scheduler
        total_steps = len(data_loaders['train_loader']) * self.epochs
        
        # Create learning rate scheduler
        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=0,
            num_training_steps=total_steps
        )
        
        # Define loss function
        criterion = CrossEntropyLoss()
        
        # Training loop
        print("\n=== Training CodeBERT Model ===")
        
        for epoch in range(self.epochs):
            print(f"\nEpoch {epoch+1}/{self.epochs}")
            
            # Training phase
            self.model.train()
            train_loss = 0.0
            
            progress_bar = tqdm(data_loaders['train_loader'], desc="Training")
            for batch in progress_bar:
                # Move batch to device
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                
                # Zero gradients
                optimizer.zero_grad()
                
                # Forward pass
                outputs = self.model(input_ids, attention_mask)
                
                # Calculate loss
                loss = criterion(outputs, labels)
                
                # Backward pass
                loss.backward()
                
                # Update parameters
                optimizer.step()
                scheduler.step()
                
                # Update training loss
                train_loss += loss.item()
                progress_bar.set_postfix({'loss': loss.item()})
            
            # Calculate average training loss
            avg_train_loss = train_loss / len(data_loaders['train_loader'])
            self.history['train_loss'].append(avg_train_loss)
            
            # Validation phase
            self.model.eval()
            val_loss = 0.0
            val_predictions = []
            val_true_labels = []
            
            with torch.no_grad():
                for batch in tqdm(data_loaders['val_loader'], desc="Validation"):
                    # Move batch to device
                    input_ids = batch['input_ids'].to(device)
                    attention_mask = batch['attention_mask'].to(device)
                    labels = batch['label'].to(device)
                    
                    # Forward pass
                    outputs = self.model(input_ids, attention_mask)
                    
                    # Calculate loss
                    loss = criterion(outputs, labels)
                    
                    # Update validation loss
                    val_loss += loss.item()
                    
                    # Get predictions
                    _, preds = torch.max(outputs, dim=1)
                    
                    # Store predictions and true labels
                    val_predictions.extend(preds.cpu().tolist())
                    val_true_labels.extend(labels.cpu().tolist())
            
            # Calculate average validation loss
            avg_val_loss = val_loss / len(data_loaders['val_loader'])
            self.history['val_loss'].append(avg_val_loss)
            
            # Calculate validation metrics
            val_accuracy = accuracy_score(val_true_labels, val_predictions)
            val_precision, val_recall, val_f1, _ = precision_recall_fscore_support(
                val_true_labels, val_predictions, average='binary'
            )
            
            self.history['val_accuracy'].append(val_accuracy)
            self.history['val_precision'].append(val_precision)
            self.history['val_recall'].append(val_recall)
            self.history['val_f1'].append(val_f1)
            
            print(f"Training Loss: {avg_train_loss:.4f}")
            print(f"Validation Loss: {avg_val_loss:.4f}")
            print(f"Validation Accuracy: {val_accuracy:.4f}")
            print(f"Validation Precision: {val_precision:.4f}")
            print(f"Validation Recall: {val_recall:.4f}")
            print(f"Validation F1: {val_f1:.4f}")
            
            # Save best model based on F1 score instead of accuracy
            if val_f1 > self.best_val_f1:
                self.best_val_f1 = val_f1
                self.best_val_accuracy = val_accuracy  # Keep accuracy for reference
                self.best_model_state = self.model.state_dict().copy()
                print(f"New best model with validation F1: {val_f1:.4f} (accuracy: {val_accuracy:.4f})")
        
        # Load best model for testing
        if self.best_model_state is not None:
            self.model.load_state_dict(self.best_model_state)
            print(f"Loaded best model with validation F1: {self.best_val_f1:.4f} (accuracy: {self.best_val_accuracy:.4f})")
        
        return self.model
    
    def evaluate_model(self, test_loader, dataset_name="test", export_predictions=True):
        """Evaluate the model on test data"""
        if self.model is None:
            raise ValueError("No model trained. Call train_model first.")
        
        print("\n=== Evaluating Model on Test Set ===")
        
        # Explicitly load the best model state for evaluation
        if self.best_model_state is not None:
            print(f"Loading best model with validation F1: {self.best_val_f1:.4f} (accuracy: {self.best_val_accuracy:.4f})")
            self.model.load_state_dict(self.best_model_state)
            self.model.eval()
        else:
            print("Warning: No best model state found, using current model state")
            self.model.eval()
        
        test_predictions = []
        test_true_labels = []
        
        with torch.no_grad():
            for batch in tqdm(test_loader, desc="Testing"):
                # Move batch to device
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                
                # Forward pass
                outputs = self.model(input_ids, attention_mask)
                
                # Get predictions
                _, preds = torch.max(outputs, dim=1)
                
                # Store predictions and true labels
                test_predictions.extend(preds.cpu().tolist())
                test_true_labels.extend(labels.cpu().tolist())
        
        # Calculate test metrics
        test_accuracy = accuracy_score(test_true_labels, test_predictions)
        test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(
            test_true_labels, test_predictions, average='binary'
        )
        
        # Generate classification report
        class_report = classification_report(test_true_labels, test_predictions)
        
        # Generate confusion matrix
        conf_matrix = confusion_matrix(test_true_labels, test_predictions)
        
        print(f"\n=== BEST MODEL EVALUATION RESULTS ===")
        if self.best_model_state is not None:
            print(f"Using best model from training (Validation F1: {self.best_val_f1:.4f}, Accuracy: {self.best_val_accuracy:.4f})")
        print(f"Test Accuracy: {test_accuracy:.4f}")
        print(f"Test Precision: {test_precision:.4f}")
        print(f"Test Recall: {test_recall:.4f}")
        print(f"Test F1: {test_f1:.4f}")
        print("\n=== DETAILED CLASSIFICATION REPORT (BEST MODEL) ===")
        print(class_report)
        print("="*60)
        
        # Plot confusion matrix
        plt.figure(figsize=(8, 6))
        if sns is not None:
            sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',
                       xticklabels=['Not Vulnerable', 'Vulnerable'],
                       yticklabels=['Not Vulnerable', 'Vulnerable'])
        else:
            # Fallback to matplotlib if seaborn is not available
            plt.imshow(conf_matrix, interpolation='nearest', cmap='Blues')
            plt.colorbar()
            # Add text annotations
            for i in range(conf_matrix.shape[0]):
                for j in range(conf_matrix.shape[1]):
                    plt.text(j, i, str(conf_matrix[i, j]), 
                            ha='center', va='center', color='black')
            plt.xticks([0, 1], ['Not Vulnerable', 'Vulnerable'])
            plt.yticks([0, 1], ['Not Vulnerable', 'Vulnerable'])
        
        plt.title(f'Confusion Matrix - Best Model (Val F1: {self.best_val_f1:.4f})')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, 'confusion_matrix.png'))
        plt.close()
        
        results = {
            'accuracy': test_accuracy,
            'precision': test_precision,
            'recall': test_recall,
            'f1': test_f1,
            'classification_report': class_report,
            'confusion_matrix': conf_matrix.tolist(),
            'predictions': test_predictions,
            'true_labels': test_true_labels,
            'best_val_accuracy': self.best_val_accuracy,
            'best_val_f1': self.best_val_f1
        }
        
        # Export predictions if requested
        if export_predictions:
            export_path = self.export_predictions(
                test_predictions, 
                test_true_labels, 
                dataset_name
            )
            results['export_path'] = export_path
        
        return results
    
    def plot_training_history(self):
        """Plot training history"""
        if not self.history['train_loss']:
            print("No training history to plot.")
            return
        
        # Plot loss
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(self.history['train_loss'], label='Training')
        plt.plot(self.history['val_loss'], label='Validation')
        plt.title('Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        # Plot metrics
        plt.subplot(1, 2, 2)
        plt.plot(self.history['val_accuracy'], label='Accuracy')
        plt.plot(self.history['val_precision'], label='Precision')
        plt.plot(self.history['val_recall'], label='Recall')
        plt.plot(self.history['val_f1'], label='F1')
        plt.title('Validation Metrics')
        plt.xlabel('Epoch')
        plt.ylabel('Score')
        plt.legend()
        
        plt.tight_layout()
        plt.savefig(os.path.join(self.output_dir, 'training_history.png'))
        plt.close()
    
    def save_model(self):
        """Save trained model and tokenizer, and create a zip archive"""
        if self.model is None:
            print("No model to save.")
            return
        
        # Create timestamp for unique folder
        timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
        model_dir = os.path.join(self.output_dir, f'model')
        os.makedirs(model_dir, exist_ok=True)
        
        # Save model
        if self.best_model_state is not None:
            torch.save(self.best_model_state, os.path.join(model_dir, 'best_model.pt'))
        else:
            torch.save(self.model.state_dict(), os.path.join(model_dir, 'model.pt'))
        
        # Save model configuration
        model_config = {
            'hidden_size': self.model.codebert.config.hidden_size,
            'vocab_size': self.model.codebert.config.vocab_size,
            'num_labels': 2,
            'max_length': self.preprocessor.max_length
        }
        
        with open(os.path.join(model_dir, 'model_config.json'), 'w') as f:
            json.dump(model_config, f)
        
        # Save tokenizer
        self.preprocessor.tokenizer.save_pretrained(model_dir)
        
        # Save training history
        with open(os.path.join(model_dir, 'training_history.json'), 'w') as f:
            json.dump(self.history, f)
        
        print(f"Model saved to {model_dir}")
        
        # Create zip archive of the model directory
        zip_path = f"{model_dir}_{timestamp}.zip"
        try:
            print(f"Creating zip archive: {zip_path}")
            with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                # Walk through the model directory and add all files
                for root, dirs, files in os.walk(model_dir):
                    for file in files:
                        file_path = os.path.join(root, file)
                        # Create archive path relative to the model directory
                        arcname = os.path.relpath(file_path, os.path.dirname(model_dir))
                        zipf.write(file_path, arcname)
            
            # Get zip file size for user feedback
            zip_size = os.path.getsize(zip_path)
            zip_size_mb = zip_size / (1024 * 1024)
            print(f"✅ Model archive created successfully: {zip_path}")
            print(f"📦 Archive size: {zip_size_mb:.2f} MB")
            
        except Exception as e:
            print(f"⚠️ Warning: Could not create zip archive: {str(e)}")
            print(f"Model files are still available in: {model_dir}")
        
        return model_dir
    
    def load_model(self, model_dir):
        """
        Load a previously saved CodeBERT model from the specified directory
        
        Args:
            model_dir: Path to the directory containing the saved model
            
        Returns:
            The loaded CodeBERTClassifier model
        """
        if not os.path.exists(model_dir):
            raise ValueError(f"Model directory {model_dir} does not exist")
            
        print(f"Loading model from {model_dir}")
        
        # Check for model config file
        config_path = os.path.join(model_dir, 'model_config.json')
        if not os.path.exists(config_path):
            raise ValueError(f"Model config file not found in {model_dir}")
            
        # Load model configuration
        with open(config_path, 'r') as f:
            model_config = json.load(f)
            
        # Initialize model
        self.model = CodeBERTClassifier()
        self.model.to(device)
        
        # Check for model state file (either best_model.pt or model.pt)
        best_model_path = os.path.join(model_dir, 'best_model.pt')
        model_path = os.path.join(model_dir, 'model.pt')
        
        if os.path.exists(best_model_path):
            state_dict = torch.load(best_model_path, map_location=device)
            print("Loading best model checkpoint")
        elif os.path.exists(model_path):
            state_dict = torch.load(model_path, map_location=device)
            print("Loading regular model checkpoint")
        else:
            raise ValueError(f"No model checkpoint found in {model_dir}")
            
        # Load model state
        self.model.load_state_dict(state_dict)
        self.best_model_state = state_dict
        
        # Load tokenizer if available
        tokenizer_path = os.path.join(model_dir, 'special_tokens_map.json')
        if os.path.exists(tokenizer_path):
            self.preprocessor.tokenizer = RobertaTokenizerFast.from_pretrained(model_dir)
            print("Loaded tokenizer from saved model")
            
        # Load training history if available
        history_path = os.path.join(model_dir, 'training_history.json')
        if os.path.exists(history_path):
            with open(history_path, 'r') as f:
                self.history = json.load(f)
            
            # Set best accuracy and F1 from history if available
            if self.history.get('val_accuracy'):
                self.best_val_accuracy = max(self.history['val_accuracy'])
            if self.history.get('val_f1'):
                self.best_val_f1 = max(self.history['val_f1'])
                print(f"Loaded training history. Best validation F1: {self.best_val_f1:.4f}, Best validation accuracy: {self.best_val_accuracy:.4f}")
            elif self.history.get('val_accuracy'):
                print(f"Loaded training history. Best validation accuracy: {self.best_val_accuracy:.4f}")
        
        # Set model to evaluation mode
        self.model.eval()
        print("Model loaded successfully and set to evaluation mode")
        
        return self.model
    
    def export_predictions(self, predictions, true_labels=None, dataset_name="test"):
        """
        Export model predictions to a txt file with index and prediction format
        
        Args:
            predictions: List or array of predictions (0 or 1)
            true_labels: Optional list of true labels for comparison
            dataset_name: Name to include in the filename (e.g., "test", "cwe119")
            
        Returns:
            Path to the exported file
        """
        # Create timestamp for unique filename
        timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
        
        # Create filename
        if "cwe" in dataset_name.lower():
            filename = f"predict_codebert_{dataset_name}_{timestamp}.txt"
        else:
            filename = f"predict_codebert_cwe_{timestamp}.txt"
        
        # Full path for the output file
        output_path = os.path.join(self.output_dir, filename)
        
        # Write predictions to file
        with open(output_path, 'w') as f:
            for idx, pred in enumerate(predictions):
                f.write(f"{idx}\t{pred}\n")
        
        print(f"Predictions exported to: {output_path}")
        print(f"Total predictions exported: {len(predictions)}")
        
        # If true labels are provided, also create a comparison file
        if true_labels is not None:
            comparison_filename = f"prediction_comparison_{dataset_name}_{timestamp}.txt"
            comparison_path = os.path.join(self.output_dir, comparison_filename)
            
            with open(comparison_path, 'w') as f:
                f.write("Index\tPrediction\tTrue_Label\tCorrect\n")
                correct_count = 0
                for idx, (pred, true) in enumerate(zip(predictions, true_labels)):
                    is_correct = pred == true
                    if is_correct:
                        correct_count += 1
                    f.write(f"{idx}\t{pred}\t{true}\t{is_correct}\n")
            
            accuracy = correct_count / len(predictions) if len(predictions) > 0 else 0
            print(f"Prediction comparison exported to: {comparison_path}")
            print(f"Accuracy: {accuracy:.4f} ({correct_count}/{len(predictions)})")
        
        return output_path
    
    def evaluate_saved_model(self, model_dir, test_data, dataset_name="test", export_predictions=True):
        """
        Load a saved model and evaluate it on test data
        
        Args:
            model_dir: Path to the directory containing the saved model
            test_data: DataFrame with 'functionSource' and 'label' columns
            dataset_name: Name to include in export filename
            export_predictions: Whether to export predictions to txt file
            
        Returns:
            Dictionary with evaluation results
        """
        # Load the saved model
        print(f"\n=== Loading Saved Model for Evaluation ===")
        self.load_model(model_dir)
        
        # Prepare test data
        test_texts = test_data['functionSource'].tolist()
        test_labels = test_data['label'].tolist()
        
        test_dataset = CodeDataset(test_texts, test_labels, self.preprocessor.tokenizer)
        test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
        
        # Evaluate the loaded model
        results = self.evaluate_model(test_loader, dataset_name=dataset_name, export_predictions=export_predictions)
        
        return results
    
    def predict_dataset(self, test_data, dataset_name="test", export_predictions=True):
        """
        Make predictions on an entire dataset and optionally export them
        
        Args:
            test_data: DataFrame with 'functionSource' and optionally 'label' columns
            dataset_name: Name to include in export filename
            export_predictions: Whether to export predictions to txt file
            
        Returns:
            Dictionary with predictions, true labels (if available), and metrics
        """
        if self.model is None:
            raise ValueError("No model loaded. Call train_model or load_model first.")
        
        print(f"\n=== Making Predictions on {dataset_name} Dataset ===")
        print(f"Dataset size: {len(test_data)} samples")
        
        # Check if labels are available
        has_labels = 'label' in test_data.columns
        
        # Prepare data loader
        test_texts = test_data['functionSource'].tolist()
        test_labels = test_data['label'].tolist() if has_labels else [0] * len(test_texts)
        
        test_dataset = CodeDataset(test_texts, test_labels, self.preprocessor.tokenizer)
        test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
        
        # Make predictions
        self.model.eval()
        predictions = []
        true_labels = []
        
        with torch.no_grad():
            for batch in tqdm(test_loader, desc="Making predictions"):
                # Move batch to device
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                
                # Forward pass
                outputs = self.model(input_ids, attention_mask)
                
                # Get predictions
                _, preds = torch.max(outputs, dim=1)
                
                # Store predictions and true labels
                predictions.extend(preds.cpu().tolist())
                if has_labels:
                    true_labels.extend(labels.cpu().tolist())
        
        # Calculate metrics if labels are available
        results = {'predictions': predictions}
        
        if has_labels:
            results['true_labels'] = true_labels
            test_accuracy = accuracy_score(true_labels, predictions)
            test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(
                true_labels, predictions, average='binary'
            )
            
            results.update({
                'accuracy': test_accuracy,
                'precision': test_precision,
                'recall': test_recall,
                'f1': test_f1
            })
            
            print(f"Accuracy: {test_accuracy:.4f}")
            print(f"Precision: {test_precision:.4f}")
            print(f"Recall: {test_recall:.4f}")
            print(f"F1: {test_f1:.4f}")
        
        # Export predictions if requested
        if export_predictions:
            export_path = self.export_predictions(
                predictions, 
                true_labels if has_labels else None, 
                dataset_name
            )
            results['export_path'] = export_path
        
        return results
    
    def predict(self, code_text):
        """
        Make a prediction on a single code sample
        
        Args:
            code_text: String containing the code to analyze
            
        Returns:
            Dictionary with prediction results
        """
        if self.model is None:
            raise ValueError("No model loaded. Call train_model or load_model first.")
        
        # Preprocess and tokenize the code
        encoding = self.preprocessor.tokenize(
            code_text, 
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )
        
        # Move tensors to device
        input_ids = encoding['input_ids'].to(device)
        attention_mask = encoding['attention_mask'].to(device)
        
        # Set model to evaluation mode
        self.model.eval()
        
        # Make prediction
        with torch.no_grad():
            outputs = self.model(input_ids, attention_mask)
            probabilities = torch.softmax(outputs, dim=1)
            confidence, prediction = torch.max(probabilities, dim=1)
        
        result = {
            'prediction': prediction.item(),  # 0: not vulnerable, 1: vulnerable
            'confidence': confidence.item(),
            'probabilities': probabilities[0].cpu().numpy().tolist(),
            'label_names': ['Not Vulnerable', 'Vulnerable']
        }
        
        return result

def free_gpu_memory():
    """Free up GPU memory"""
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

2025-06-03 20:04:12.083222: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748981052.251128      19 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748981052.300471      19 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Using device: cuda


In [3]:
train_data = pd.read_csv('/kaggle/input/eatvul/cwe399_train.csv')  # Must have 'functionSource' and 'label' columns
test_data = pd.read_csv('/kaggle/input/eatvul/cwe399_test.csv')    # Must have 'functionSource' and 'label' columns
trainer = CodeBERTTrainer(batch_size=16, epochs=10, learning_rate=2e-5)
# Train and evaluate the model
results = trainer.run_all(
    train_data=train_data,
    test_data=test_data,
    freeze_bert=False,      # Set to True to freeze CodeBERT layers
    dataset_name="codebert-cwe399"  # Used for naming output files
)

print(f"Final Test Accuracy: {results['accuracy']:.4f}")
print(f"Final Test F1 Score: {results['f1']:.4f}")

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

Using provided train_data with 2928 samples.
Using provided test_data with 732 samples.
Train samples: 2928
Test samples: 732


pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]


=== Training CodeBERT Model ===

Epoch 1/10


Training:   0%|          | 0/183 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Training: 100%|██████████| 183/183 [03:09<00:00,  1.03s/it, loss=0.435]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.69it/s]


Training Loss: 0.4208
Validation Loss: 0.3235
Validation Accuracy: 0.8347
Validation Precision: 0.7928
Validation Recall: 0.8619
Validation F1: 0.8259
New best model with validation F1: 0.8259 (accuracy: 0.8347)

Epoch 2/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.182]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.71it/s]


Training Loss: 0.2539
Validation Loss: 0.2540
Validation Accuracy: 0.8675
Validation Precision: 0.8371
Validation Recall: 0.8799
Validation F1: 0.8580
New best model with validation F1: 0.8580 (accuracy: 0.8675)

Epoch 3/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.116]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.70it/s]


Training Loss: 0.1998
Validation Loss: 0.2641
Validation Accuracy: 0.8579
Validation Precision: 0.7959
Validation Recall: 0.9249
Validation F1: 0.8556

Epoch 4/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.132]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.71it/s]


Training Loss: 0.1664
Validation Loss: 0.2600
Validation Accuracy: 0.9016
Validation Precision: 0.9065
Validation Recall: 0.8739
Validation F1: 0.8899
New best model with validation F1: 0.8899 (accuracy: 0.9016)

Epoch 5/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.0618]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.71it/s]


Training Loss: 0.1159
Validation Loss: 0.2804
Validation Accuracy: 0.8866
Validation Precision: 0.8655
Validation Recall: 0.8889
Validation F1: 0.8770

Epoch 6/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.0927]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.70it/s]


Training Loss: 0.0668
Validation Loss: 0.3891
Validation Accuracy: 0.8811
Validation Precision: 0.9212
Validation Recall: 0.8078
Validation F1: 0.8608

Epoch 7/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.197]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.71it/s]


Training Loss: 0.0514
Validation Loss: 0.4730
Validation Accuracy: 0.8880
Validation Precision: 0.9597
Validation Recall: 0.7868
Validation F1: 0.8647

Epoch 8/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.00951]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.71it/s]


Training Loss: 0.0335
Validation Loss: 0.4256
Validation Accuracy: 0.8989
Validation Precision: 0.9085
Validation Recall: 0.8649
Validation F1: 0.8862

Epoch 9/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.00902]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.70it/s]


Training Loss: 0.0187
Validation Loss: 0.4852
Validation Accuracy: 0.8893
Validation Precision: 0.9468
Validation Recall: 0.8018
Validation F1: 0.8683

Epoch 10/10


Training: 100%|██████████| 183/183 [03:09<00:00,  1.04s/it, loss=0.000941]
Validation: 100%|██████████| 46/46 [00:12<00:00,  3.70it/s]


Training Loss: 0.0166
Validation Loss: 0.4654
Validation Accuracy: 0.8934
Validation Precision: 0.9293
Validation Recall: 0.8288
Validation F1: 0.8762
Loaded best model with validation F1: 0.8899 (accuracy: 0.9016)

=== Evaluating Model on Test Set ===
Loading best model with validation F1: 0.8899 (accuracy: 0.9016)


Testing: 100%|██████████| 46/46 [00:12<00:00,  3.70it/s]



=== BEST MODEL EVALUATION RESULTS ===
Using best model from training (Validation F1: 0.8899, Accuracy: 0.9016)
Test Accuracy: 0.8934
Test Precision: 0.9293
Test Recall: 0.8288
Test F1: 0.8762

=== DETAILED CLASSIFICATION REPORT (BEST MODEL) ===
              precision    recall  f1-score   support

           0       0.87      0.95      0.91       399
           1       0.93      0.83      0.88       333

    accuracy                           0.89       732
   macro avg       0.90      0.89      0.89       732
weighted avg       0.90      0.89      0.89       732

Predictions exported to: /kaggle/working/codebert_outputs/predict_codebert_codebert-cwe399_2025-06-03_20-38-22.txt
Total predictions exported: 732
Prediction comparison exported to: /kaggle/working/codebert_outputs/prediction_comparison_codebert-cwe399_2025-06-03_20-38-22.txt
Accuracy: 0.8934 (654/732)
Model saved to /kaggle/working/codebert_outputs/model
Creating zip archive: /kaggle/working/codebert_outputs/model_2025-06-