In [1]:
# 1. Data Loading and Preprocessing
import pandas as pd
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false" 
import warnings
from sklearn.exceptions import UndefinedMetricWarning
# Suppress UndefinedMetricWarning
warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
warnings.filterwarnings("ignore", category=UserWarning)

class VulnerabilityDataLoader:
    def __init__(self, config):
        self.config = config
        
    def load_data(self, language):
        """Load data for specific language"""
        file_path = self.config['paths'][f'sample_{language}.csv']
        
        # Try different encodings until one works
        encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']

        for encoding in encodings:
            try:
                df = pd.read_csv(file_path, encoding=encoding)
                break
            except UnicodeDecodeError:
                continue

        if language == 'Python':
            df = pd.read_csv(file_path)
            df = df.rename(columns={'code': 'file_path'})
            # Python specific processing
        else:
            df = pd.read_csv(file_path)
            
        # Standardize column names
        if 'line_no' not in df.columns:
            df['line_no'] = -1
            
        print("Loaded data for language:", language)
        return df
    
    def get_source_code(self, file_path, language):
        """Retrieve source code from file"""
        if language == 'Python':
            return file_path
        base_path = self.config['source_code_paths'][f'{language}SourceCodes']
        full_path = os.path.join(base_path, file_path)
        
        try:
            with open(full_path, 'r') as f:
                return f.read()
        except FileNotFoundError:
            print(f"Warning: File not found {full_path}")
            return None

In [2]:
# 2. Structural Feature Extractor
import subprocess
import json
import tempfile
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModel

class StructuralFeatureExtractor:
    def __init__(self):
        # Semgrep configuration
        self.semgrep_config = {
            'c': 'p/c',
            'cpp': 'p/c',
            'java': 'p/java',
            'python': 'p/python',
            'csharp': 'p/csharp'
        }
        self.language_mapping = {
            'CandCpp': 'c',
            'Java': 'java',
            'Python': 'python',
            'CSharp': 'csharp'
        }
        
        # Initialize CodeBERT with eager attention
        self.tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
        self.model = AutoModel.from_pretrained("microsoft/codebert-base",
                                            output_attentions=True,
                                            attn_implementation="eager")
        
        # Default feature template
        self.default_features = {
            'semgrep_info': 0,
            'semgrep_warning': 0,
            'semgrep_error': 0,
            'semgrep_total_findings': 0,
            'semgrep_rules_triggered': 0,
            'line_no': -1
        }

    def _get_important_lines(self, code):
        """Improved line detection using attention and AST parsing"""
        try:
            if not code.strip():
                return []

            inputs = self.tokenizer(code, return_tensors="pt", truncation=True, max_length=512)
            
            # Create accurate line number mapping
            line_numbers = []
            current_line = 1
            char_pos = 0
            
            for token_id in inputs.input_ids[0]:
                token = self.tokenizer.decode(token_id)
                if token in [self.tokenizer.cls_token, self.tokenizer.sep_token]:
                    line_numbers.append(-1)
                    continue
                    
                # Find the token in the original code
                token_pos = code.find(token, char_pos)
                if token_pos == -1:
                    line_numbers.append(-1)
                    continue
                    
                # Count newlines up to this position
                current_line = code.count('\n', 0, token_pos) + 1
                line_numbers.append(current_line)
                char_pos = token_pos + len(token)

            # Get attention weights
            with torch.no_grad():
                outputs = self.model(**inputs)
            
            # Use attention from last layer
            cls_attention = outputs.attentions[-1][:, :, 0, :].mean(dim=1)[0]
            topk = min(5, len(cls_attention))
            important_indices = torch.topk(cls_attention, k=topk).indices.tolist()

            # Get unique line numbers and filter invalid ones
            important_lines = sorted({
                line_numbers[i] 
                for i in important_indices 
                if i < len(line_numbers) and line_numbers[i] != -1
            })
            
            return important_lines[:3]  # Return top 3 most important lines

        except Exception as e:
            print(f"[WARNING] Line detection failed: {str(e)}")
            return []

    def _run_semgrep_scan(self, code, language):
        """Execute Semgrep scan safely"""
        semgrep_lang = self.language_mapping.get(language)
        if semgrep_lang is None:
            return None
            
        try:
            with tempfile.NamedTemporaryFile(suffix=f'.{semgrep_lang}', mode='w') as tmp:
                tmp.write(code)
                tmp.flush()
                cmd = f"semgrep --config={self.semgrep_config[semgrep_lang]} --json {tmp.name}"
                result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
                return json.loads(result.stdout) if result.returncode == 0 else None
        except Exception as e:
            print(f"[ERROR] Semgrep failed: {str(e)}")
            return None

    def extract_features(self, code, language):
        """Returns flat dictionary of features"""
        features = self.default_features.copy()
        if not code or not isinstance(code, str):
            return features

        # 1. Run Semgrep scan
        semgrep_results = self._run_semgrep_scan(code, language)
        if semgrep_results:
            features.update(self._flatten_semgrep_results(semgrep_results))
        
        # 2. Get important lines (improved version)
        important_lines = self._get_important_lines(code)
        if important_lines:
            features['line_no'] = important_lines[0]  # Most important line
        
        # 3. Add semantic and lexical features
        semantic = self._get_semantic_features(code)
        lexical = self._get_lexical_features(code)
        
        features.update({
            f'semantic_{i}': val for i, val in enumerate(semantic)
        })
        features.update({
            f'lexical_{i}': val for i, val in enumerate(lexical)
        })
        
        return features
    
    def _get_semantic_features(self, code):
        """Get CodeBERT embeddings"""
        try:
            inputs = self.tokenizer(code, return_tensors="pt", truncation=True, max_length=512)
            with torch.no_grad():
                outputs = self.model(**inputs)
            return outputs.last_hidden_state.mean(dim=1).numpy().flatten()
        except:
            return np.zeros(768)

    def _get_lexical_features(self, code):
        """Get FastText embeddings"""
        return np.zeros(300)  # Placeholder - implement actual FastText extraction
    
    def _flatten_semgrep_results(self, results):
        """Convert Semgrep output to features"""
        if not results or 'results' not in results:
            return {}
            
        features = {}
        severities = {'INFO': 0, 'WARNING': 0, 'ERROR': 0}
        
        for finding in results.get('results', []):
            severity = finding.get('extra', {}).get('severity', 'INFO')
            severities[severity] += 1
            
        features.update({
            f'semgrep_{severity.lower()}': count 
            for severity, count in severities.items()
        })
        
        features['semgrep_total_findings'] = len(results.get('results', []))
        features['semgrep_rules_triggered'] = len(set(
            f.get('check_id', '') for f in results.get('results', [])
        ))
        
        return features

In [3]:
# 3. Semantic Context (CodeBERT)
from transformers import AutoTokenizer, AutoModel
import torch

class SemanticFeatureExtractor:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
        self.model = AutoModel.from_pretrained("microsoft/codebert-base")
        
    def extract_features(self, code):
        """Extract semantic features using CodeBERT"""
        inputs = self.tokenizer(code, return_tensors="pt", truncation=True, max_length=512)
        with torch.no_grad():
            outputs = self.model(**inputs)
        return outputs.last_hidden_state.mean(dim=1).numpy()

In [4]:
# 4. Lexical Context (FastText)
import fasttext
import fasttext.util
import numpy as np

class LexicalFeatureExtractor:
    def __init__(self):
        fasttext.util.download_model('en', if_exists='ignore')
        self.model = fasttext.load_model('cc.en.300.bin')
        
    def extract_features(self, code):
        """Extract lexical features using FastText"""
        tokens = code.split()  # Simple tokenization
        vectors = [self.model.get_word_vector(token) for token in tokens if token]
        if vectors:
            return np.mean(vectors, axis=0)
        else:
            return np.zeros(300)

In [5]:
# 5. Model Architecture - Classification Models
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV

class ClassificationModels:
    def __init__(self):
        # Define parameter grids for hyperparameter tuning
        self.param_grids = {
            'random_forest': {
                'n_estimators': [100, 200],
                'max_depth': [None, 10, 20],
                'min_samples_split': [2, 5]
            },
            'xgboost': {
                'n_estimators': [100, 200],
                'max_depth': [3, 6, 9],
                'learning_rate': [0.01, 0.1]
            }
        }
        
        # Base models
        self.base_models = {
            'random_forest': RandomForestClassifier(),
            'knn': KNeighborsClassifier(),
            'svm': SVC(probability=True),
            'naive_bayes': GaussianNB(),
            'xgboost': XGBClassifier()
        }
        
    def train(self, X_train, y_train):
        """Train all classification models with hyperparameter tuning"""
        print("Training Classification models with hyperparameter tuning...")
        trained_models = {}
        
        # Train models with hyperparameter tuning
        for name, model in self.base_models.items():
            if name in self.param_grids:
                # Perform grid search for models with defined parameter grids
                grid_search = GridSearchCV(
                    estimator=model,
                    param_grid=self.param_grids[name],
                    cv=3,
                    n_jobs=-1,
                    verbose=1
                )
                grid_search.fit(X_train, y_train)
                trained_models[name] = grid_search.best_estimator_
                print(f"Best params for {name}: {grid_search.best_params_}")
            else:
                # Train without hyperparameter tuning
                model.fit(X_train, y_train)
                trained_models[name] = model
                
        print("Training classification models completed...")
        return trained_models

In [6]:
# Updated GraphCNNModels class with working GCN and GAT
from tensorflow.keras.layers import Input, Dense, Dropout, MultiHeadAttention, LayerNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Reshape, GlobalAveragePooling1D
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf
from spektral.layers import GCNConv, GATConv, GlobalSumPool
from scipy.sparse import eye as sparse_eye
import numpy as np
from sklearn.preprocessing import StandardScaler
from spektral.utils import normalized_adjacency
from spektral.layers import GCNConv, GATConv
from spektral.utils import normalized_adjacency


import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.models import Model
from spektral.layers import GCNConv, GATConv, GlobalSumPool
from spektral.utils import normalized_adjacency
from scipy.sparse import eye
import numpy as np

class GraphCNNModels:
    def __init__(self, input_shape):
        self.input_shape = input_shape if isinstance(input_shape, tuple) else (input_shape,)
        
    def build_attention_model(self):
        """Improved transformer-based model with better architecture"""
        inputs = Input(shape=self.input_shape)
        
        # Reshape input to 3D for attention (batch, seq_len, features)
        x = Reshape((1, self.input_shape[0]))(inputs)
        
        # Multi-head attention with skip connections
        attn_output = MultiHeadAttention(num_heads=8, key_dim=64)(x, x)
        x = LayerNormalization(epsilon=1e-6)(x + attn_output)
        
        # Feed forward network
        x = Dense(512, activation='relu')(x)
        x = Dropout(0.4)(x)
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.3)(x)
        
        # Global average pooling
        x = GlobalAveragePooling1D()(x)
        
        # Final classification layers
        x = Dense(128, activation='relu')(x)
        x = Dropout(0.2)(x)
        outputs = Dense(4, activation='softmax')(x)
        
        return Model(inputs, outputs)
    
    def build_graphsage_model(self):
        """Simplified GraphSAGE model"""
        inputs = Input(shape=self.input_shape)
        
        # Feature transformation
        x = Dense(512, activation='relu')(inputs)
        x = Dropout(0.4)(x)
        
        # Node embedding
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.3)(x)
        
        # Classification
        x = Dense(128, activation='relu')(x)
        outputs = Dense(4, activation='softmax')(x)
        
        return Model(inputs=inputs, outputs=outputs)
    
    def build_simple_gnn(self):
        """Very simple graph neural network"""
        inputs = Input(shape=self.input_shape)
        
        x = Dense(512, activation='relu')(inputs)
        x = Dropout(0.4)(x)
        
        x = Dense(256, activation='relu')(x)
        x = Dropout(0.3)(x)
        
        x = Dense(128, activation='relu')(x)
        outputs = Dense(4, activation='softmax')(x)
        
        return Model(inputs=inputs, outputs=outputs)
    
    def train(self, X_train, y_train, epochs=20, batch_size=32):
        trained_models = {}
        
        # Convert labels to one-hot
        y_train_onehot = tf.keras.utils.to_categorical(y_train, num_classes=4)
        
        # Normalize input data
        X_train = X_train.astype('float32')
        self.scaler = StandardScaler()
        X_train_scaled = self.scaler.fit_transform(X_train)
        
        # Build models
        models = {
            'attention_model': self.build_attention_model(),
            'graphsage_model': self.build_graphsage_model(),
            'simple_gnn': self.build_simple_gnn()
        }
        
        early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
        
        for name, model in models.items():
            print(f"\nTraining {name}...")
            model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                         loss='categorical_crossentropy',
                         metrics=['accuracy'])
            
            model.fit(X_train_scaled, y_train_onehot,
                     validation_split=0.2,
                     epochs=epochs, 
                     batch_size=batch_size,
                     callbacks=[early_stopping],
                     verbose=1)
            
            trained_models[name] = model
        
        return trained_models

In [7]:
# 7. Ensemble Model
from sklearn.ensemble import VotingClassifier
import numpy as np

# Updated EnsembleModel class
class EnsembleModel:
    def __init__(self, classification_models, cnn_models):
        self.classification_models = classification_models
        self.cnn_models = cnn_models
        self.n_features = None
        self.adj_matrix = None
        
    def predict(self, X):
        """Make predictions using weighted ensemble of all models"""
        if self.n_features is None:
            self.n_features = X.shape[1]
            # Create adjacency matrix once
            from scipy.sparse import eye as sparse_eye
            adj_sparse = sparse_eye(self.n_features, format='coo')
            self.adj_matrix = adj_sparse.toarray()
        
        all_predictions = []
        model_weights = {
            'random_forest': 0.2,
            'xgboost': 0.2,
            'attention_model': 0.2,
            'gcn_model': 0.2,
            'gat_model': 0.2
        }
        
        # Create adjacency matrices for graph models
        adj_matrices = np.array([self.adj_matrix for _ in range(len(X))])
        
        # Get predictions from classification models
        for name, model in self.classification_models.items():
            if name in model_weights:
                if hasattr(model, 'predict_proba'):
                    preds = model.predict_proba(X) * model_weights[name]
                else:
                    preds = model.decision_function(X)
                    # Normalize to probabilities
                    preds = (preds - preds.min()) / (preds.max() - preds.min()) * model_weights[name]
                all_predictions.append(preds)
        
        # Get predictions from CNN models
        for name, model in self.cnn_models.items():
            if name in model_weights:
                if name in ['gcn_model', 'gat_model']:
                    # For graph models, need to provide adjacency matrix
                    cnn_preds = model.predict([X, adj_matrices]) * model_weights[name]
                else:
                    cnn_preds = model.predict(X) * model_weights[name]
                all_predictions.append(cnn_preds)
        
        # Weighted average all predictions
        if all_predictions:
            avg_preds = np.sum(all_predictions, axis=0)
            final_preds = np.argmax(avg_preds, axis=1)
        else:
            final_preds = np.zeros(X.shape[0])
            
        return final_preds

In [8]:
# 8. Evaluation Framework
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from collections import Counter

class Evaluator:
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, stratify=y, random_state=42)
        print("Training set counts: ", Counter(self.y_train))
        print("Testing set counts: ", Counter(self.y_test))
        self.ensemble = None 
        
    def evaluate_models(self):
        """Evaluate all individual models and the ensemble"""
        # Train classification models
        clf = ClassificationModels()
        trained_clf_models = clf.train(self.X_train, self.y_train)
        print("Training classification models completed...")
        
        # Print input shape for debugging
        print(f"Input shape for CNN models: {self.X_train.shape}")

        # Train CNN models
        cnn = GraphCNNModels(input_shape=(self.X_train.shape[1],))
        trained_cnn_models = cnn.train(self.X_train, self.y_train)
        print("Training Graph CNN models completed...")
        
        # Create ensemble
        print("Creating Ensemble model...")
        self.ensemble = EnsembleModel(trained_clf_models, trained_cnn_models)
        print("Ensembling Model Done...")
        
        # Evaluate individual models
        results = {}
        for name, model in trained_clf_models.items():
            y_pred = model.predict(self.X_test)
            results[name] = self._calculate_metrics(self.y_test, y_pred)
            
        for name, model in trained_cnn_models.items():
            y_pred = np.argmax(model.predict(self.X_test), axis=1)
            results[name] = self._calculate_metrics(self.y_test, y_pred)
            
        # Evaluate ensemble
        y_pred_ensemble = self.ensemble.predict(self.X_test)
        results['ensemble'] = self._calculate_metrics(self.y_test, y_pred_ensemble)
        print("Got Results...")
        return results
    
    def _calculate_metrics(self, y_true, y_pred):
        """Calculate evaluation metrics"""
        return {
            'accuracy': accuracy_score(y_true, y_pred),
            'precision': precision_score(y_true, y_pred, average='weighted'),
            'recall': recall_score(y_true, y_pred, average='weighted'),
            'f1': f1_score(y_true, y_pred, average='weighted')
        }

In [9]:
# 9. Feature Vectorizer
import numpy as np
from sklearn.feature_extraction import DictVectorizer

class FeatureVectorizer:
    def __init__(self):
        self.vectorizer = DictVectorizer(sparse=False)
        self.semantic_shape = None
        self.lexical_shape = None
        
    def fit_transform(self, features_list):
        """Convert list of feature dicts to numpy array"""
        # Ensure all features have the same structure
        validated_features = []
        print("Converting the features to numpy array...")
        
        # Define expected feature structure
        feature_template = {
            'semgrep_info': 0,
            'semgrep_warning': 0,
            'semgrep_error': 0,
            'semgrep_total_findings': 0,
            'semgrep_rules_triggered': 0,
            'line_no': -1
        }
        
        # Add semantic and lexical feature placeholders
        for i in range(768):
            feature_template[f'semantic_{i}'] = 0
        for i in range(300):
            feature_template[f'lexical_{i}'] = 0
            
        for feat in features_list:
            if not isinstance(feat, dict):
                validated_features.append(feature_template.copy())
                continue
                
            # Create safe feature dict
            safe_feat = feature_template.copy()
            safe_feat.update({
                k: v for k, v in feat.items() 
                if k in feature_template
            })
            
            validated_features.append(safe_feat)
        
        print("Conversion to numpy array is done...")
        return self.vectorizer.fit_transform(validated_features)

In [10]:
# 10. Model Saving
import joblib
import pickle
from keras.models import save_model
import os

def save_ensemble_model(ensemble, vectorizer, file_prefix='vulnerability_model'):
    """Save all components of the ensemble system"""
    # Create directory if it doesn't exist
    os.makedirs('saved_models_model3', exist_ok=True)
    file_prefix = f'saved_models_model3/{file_prefix}'
    
    # 1. Save the feature vectorizer
    joblib.dump(vectorizer.vectorizer, f'{file_prefix}_vectorizer.pkl', protocol=4)
    
    # 2. Save classification models
    if hasattr(ensemble, 'classification_models'):
        with open(f'{file_prefix}_clf_models.pkl', 'wb') as f:
            pickle.dump(ensemble.classification_models, f)
    
    # 3. Save Keras models
    if hasattr(ensemble, 'cnn_models'):
        for i, (name, model) in enumerate(ensemble.cnn_models.items()):
            model.save(f'{file_prefix}_cnn_{i}.h5')
    
    # 4. Save full ensemble
    with open(f'{file_prefix}_full_ensemble.pkl', 'wb') as f:
        pickle.dump(ensemble, f)
    
    print(f"Models saved to directory: saved_models/")

In [11]:
# 11. Vulnerability Predictor
class VulnerabilityPredictor:
    def __init__(self, config):
        self.config = config
        self.data_loader = VulnerabilityDataLoader(config)
        self.structural_extractor = StructuralFeatureExtractor()
        self.semantic_extractor = SemanticFeatureExtractor()
        self.lexical_extractor = LexicalFeatureExtractor()
        self.vectorizer = FeatureVectorizer()
        
    def process_language(self, language):
        """Process data with line number info"""
        df = self.data_loader.load_data(language)
        features = []
        labels = []
        
        # Define severity mapping that works for all languages
        severity_map = {
            '0': 0,    # For Python's "none" cases
            '1': 1,    # Low severity
            '2': 2,    # Medium severity
            '3': 3,    # High severity
            'low': 1,
            'medium': 2,
            'high': 3,
            'none': 0,
            'info': 0,
            '': 0      # Default for missing/empty values
        }
        print(f"Processing {language} data...")
        
        for _, row in df.iterrows():
            code = self.data_loader.get_source_code(row['file_path'], language)
            if code is None:
                continue
                
            # Extract features with attention
            structural = self.structural_extractor.extract_features(code, language)
            
            # Get important lines from attention
            line_no = structural.get('line_no', -1)
            
            features.append({
                'structural': structural,
                'semantic': self.semantic_extractor.extract_features(code),
                'lexical': self.lexical_extractor.extract_features(code),
                'line_no': line_no
            })
            
            # Standardize severity labels across languages
            severity = str(row['severity']).lower()
            labels.append(severity_map.get(severity, 0))  # Default to 0 if unknown
            
        return features, labels
    
    def run(self):
        """Run the complete vulnerability prediction pipeline"""
        # Process all languages
        languages = ['Python', 'CandCpp', 'CSharp', 'Java']
        
        all_features = []
        all_labels = []
        
        for lang in languages:
            features, labels = self.process_language(lang)
            all_features.extend(features)
            all_labels.extend(labels)
            
        # Convert features to numpy array
        X = self.vectorizer.fit_transform(all_features)
        y = np.array(all_labels)
        
        # Evaluate models
        evaluator = Evaluator(X, y)
        results = evaluator.evaluate_models()

        if hasattr(evaluator, 'ensemble') and evaluator.ensemble is not None:
            save_ensemble_model(evaluator.ensemble, self.vectorizer)
        else:
            print("Warning: No ensemble model to save")
            
        # Print results
        print("\nModel Evaluation Results:")
        for model_name, metrics in results.items():
            print(f"\n{model_name}:")
            for metric_name, value in metrics.items():
                print(f"{metric_name}: {value:.4f}")
                
        return evaluator, results

In [12]:
# 12. Configuration
config = {
    'paths': { 
        'sample_CandCpp.csv': '/Users/shreyanandini/Desktop/MAJOR/Datasets/sample_CandCpp.csv',
        'sample_CSharp.csv': '/Users/shreyanandini/Desktop/MAJOR/Datasets/sample_CSharp.csv',
        'sample_Java.csv': '/Users/shreyanandini/Desktop/MAJOR/Datasets/sample_Java.csv',
        'sample_Python.csv': '/Users/shreyanandini/Desktop/MAJOR/Datasets/sample_Python.csv'
    },
    'source_code_paths': {
        'CandCppSourceCodes': '/Users/shreyanandini/Desktop/MAJOR/Datasets/CandCppSourceCodes',
        'CSharpSourceCodes': '/Users/shreyanandini/Desktop/MAJOR/Datasets/CSharpSourceCodes',
        'JavaSourceCodes': '/Users/shreyanandini/Desktop/MAJOR/Datasets/JavaSourceCodes'
    }
}

In [13]:
# 13. Main Execution
if __name__ == "__main__":
    predictor = VulnerabilityPredictor(config)
    evaluator, results = predictor.run()

Loaded data for language: Python
Processing Python data...
Loaded data for language: CandCpp
Processing CandCpp data...
Loaded data for language: CSharp
Processing CSharp data...
Loaded data for language: Java
Processing Java data...
Converting the features to numpy array...
Conversion to numpy array is done...
Training set counts:  Counter({2: 38, 1: 38, 3: 37, 0: 2})
Testing set counts:  Counter({1: 10, 2: 9, 3: 9, 0: 1})
Training Classification models with hyperparameter tuning...
Fitting 3 folds for each of 12 candidates, totalling 36 fits
Best params for random_forest: {'max_depth': None, 'min_samples_split': 2, 'n_estimators': 100}
Fitting 3 folds for each of 12 candidates, totalling 36 fits
Best params for xgboost: {'learning_rate': 0.1, 'max_depth': 3, 'n_estimators': 100}
Training classification models completed...
Training classification models completed...
Input shape for CNN models: (115, 1074)

Training attention_model...
Epoch 1/20
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0



Got Results...




Models saved to directory: saved_models/

Model Evaluation Results:

random_forest:
accuracy: 0.6897
precision: 0.6777
recall: 0.6897
f1: 0.6780

knn:
accuracy: 0.6552
precision: 0.6968
recall: 0.6552
f1: 0.6400

svm:
accuracy: 0.5517
precision: 0.5862
recall: 0.5517
f1: 0.5188

naive_bayes:
accuracy: 0.4483
precision: 0.4897
recall: 0.4483
f1: 0.4103

xgboost:
accuracy: 0.6897
precision: 0.6777
recall: 0.6897
f1: 0.6780

attention_model:
accuracy: 0.3103
precision: 0.0963
recall: 0.3103
f1: 0.1470

graphsage_model:
accuracy: 0.3448
precision: 0.1189
recall: 0.3448
f1: 0.1768

simple_gnn:
accuracy: 0.3448
precision: 0.1189
recall: 0.3448
f1: 0.1768

ensemble:
accuracy: 0.6207
precision: 0.6054
recall: 0.6207
f1: 0.6075


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

def plot_model_metrics_comparison(results):
    """
    Plot bar charts comparing all models' performance metrics side by side
    """
    model_names = list(results.keys())
    metrics = ['accuracy', 'precision', 'recall', 'f1']
    
    for metric in metrics:
        values = [results[model][metric] for model in model_names]
        plt.figure(figsize=(10, 6))
        bars = plt.bar(model_names, values, color=plt.cm.tab20(np.linspace(0, 1, len(model_names))))
        plt.title(f'{metric.capitalize()} Comparison Across Models', fontsize=14)
        plt.ylim(0, 1.05)
        plt.xticks(rotation=45)
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.3f}', ha='center', va='bottom')
        plt.ylabel('Score')
        plt.tight_layout()
        plt.show()

def plot_individual_model_metrics(results):
    """
    Create bar charts for each model showing all metrics
    """
    metrics = ['accuracy', 'precision', 'recall', 'f1']
    for model_name, metrics_dict in results.items():
        values = [metrics_dict[m] for m in metrics]
        plt.figure(figsize=(8, 6))
        bars = plt.bar(metrics, values, color='cornflowerblue')
        plt.title(f'Metrics for {model_name}', fontsize=14)
        plt.ylim(0, 1.05)
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.3f}', ha='center', va='bottom')
        plt.ylabel('Score')
        plt.tight_layout()
        plt.show()

def plot_model_metrics_trend(results):
    """
    Plot line chart showing the trend of each metric across models
    """
    model_names = list(results.keys())
    metrics = ['accuracy', 'precision', 'recall', 'f1']
    
    plt.figure(figsize=(12, 6))
    for metric in metrics:
        values = [results[model][metric] for model in model_names]
        plt.plot(model_names, values, marker='o', label=metric)
    plt.title('Performance Metrics Across Models')
    plt.xlabel('Models')
    plt.ylabel('Score')
    plt.ylim(0, 1.05)
    plt.xticks(rotation=45)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.show()

def plot_confusion_matrices(evaluator, results):
    """
    Plot confusion matrices for all models that support it
    """
    if not hasattr(evaluator, 'X_test') or not hasattr(evaluator, 'y_test'):
        print("Test data not available in evaluator")
        return

    clf_models = {name: model for name, model in evaluator.ensemble.classification_models.items() 
                  if hasattr(model, 'predict')}

    for name, model in clf_models.items():
        y_pred = model.predict(evaluator.X_test)
        cm = confusion_matrix(evaluator.y_test, y_pred)
        
        plt.figure(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=['None', 'Low', 'Medium', 'High'],
                    yticklabels=['None', 'Low', 'Medium', 'High'])
        plt.title(f'Confusion Matrix: {name}')
        plt.xlabel('Predicted')
        plt.ylabel('Actual')
        plt.tight_layout()
        plt.show()

def plot_ensemble_performance(evaluator):
    """
    Plot training vs test performance for the ensemble model
    """
    if not hasattr(evaluator, 'ensemble'):
        print("No ensemble model found in evaluator")
        return

    y_train_pred = evaluator.ensemble.predict(evaluator.X_train)
    y_test_pred = evaluator.ensemble.predict(evaluator.X_test)

    train_metrics = {
        'accuracy': accuracy_score(evaluator.y_train, y_train_pred),
        'precision': precision_score(evaluator.y_train, y_train_pred, average='weighted'),
        'recall': recall_score(evaluator.y_train, y_train_pred, average='weighted'),
        'f1': f1_score(evaluator.y_train, y_train_pred, average='weighted')
    }

    test_metrics = {
        'accuracy': accuracy_score(evaluator.y_test, y_test_pred),
        'precision': precision_score(evaluator.y_test, y_test_pred, average='weighted'),
        'recall': recall_score(evaluator.y_test, y_test_pred, average='weighted'),
        'f1': f1_score(evaluator.y_test, y_test_pred, average='weighted')
    }

    metrics = ['accuracy', 'precision', 'recall', 'f1']
    x = np.arange(len(metrics))
    width = 0.35

    plt.figure(figsize=(10, 6))
    train_bars = plt.bar(x - width/2, [train_metrics[m] for m in metrics], width, label='Train')
    test_bars = plt.bar(x + width/2, [test_metrics[m] for m in metrics], width, label='Test')

    plt.xticks(x, metrics)
    plt.ylim(0, 1.05)
    plt.title('Ensemble Model: Training vs Test Performance')
    plt.ylabel('Score')
    plt.legend()

    for bars in [train_bars, test_bars]:
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height, f'{height:.3f}', ha='center', va='bottom')

    plt.tight_layout()
    plt.show()

def plot_class_distribution(y_train, y_test):
    """
    Plot the class distribution in training and test sets
    """
    fig, axes = plt.subplots(1, 2, figsize=(14, 6))

    unique_train, counts_train = np.unique(y_train, return_counts=True)
    axes[0].bar(unique_train, counts_train, color='skyblue')
    axes[0].set_title('Training Set Class Distribution')
    axes[0].set_xlabel('Severity Class')
    axes[0].set_ylabel('Count')
    axes[0].set_xticks(unique_train)
    axes[0].set_xticklabels(['None', 'Low', 'Medium', 'High'])

    unique_test, counts_test = np.unique(y_test, return_counts=True)
    axes[1].bar(unique_test, counts_test, color='lightgreen')
    axes[1].set_title('Test Set Class Distribution')
    axes[1].set_xlabel('Severity Class')
    axes[1].set_ylabel('Count')
    axes[1].set_xticks(unique_test)
    axes[1].set_xticklabels(['None', 'Low', 'Medium', 'High'])

    plt.tight_layout()
    plt.show()

def generate_all_visualizations(evaluator, results):
    """
    Generate all visualizations for the model evaluation
    """
    plot_model_metrics_comparison(results)
    plot_individual_model_metrics(results)
    plot_model_metrics_trend(results)
    plot_confusion_matrices(evaluator, results)
    plot_ensemble_performance(evaluator)
    if hasattr(evaluator, 'y_train') and hasattr(evaluator, 'y_test'):
        plot_class_distribution(evaluator.y_train, evaluator.y_test)

        
generate_all_visualizations(evaluator, results)