In [2]:
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pickle
import os
from src.utils import load_checkpoint
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import RandomizedSearchCV
from io import StringIO
import numpy as np
import gc
from sklearn.svm import LinearSVC
from pycparser.c_ast import FileAST

df = load_checkpoint("src/extract/dataframe_checkpoint_20.pickle")

Loading checkpoint from src/extract/dataframe_checkpoint_20.pickle
Loaded 19262 rows and 3 columns.


In [3]:
def ast_to_str(ast_obj:FileAST):
    buf = StringIO()
    ast_obj.show(buf=buf)
    return buf.getvalue()

df['ast_str'] = df['x_ast'].apply(ast_to_str)

y = df['target']

In [8]:
i = 0;
count = 0;

results = []

fields = ['ast_str', 'x_string']
ngram_ranges = [(1,5), (2,5), (3,4), (3,5), (3,6), (4,4), (4,5), (4,6), (5,5), (5,6), (5,7)]
min_dfs = [1, 2, 3, 5, 6, 7, 10]

for min_df in min_dfs:
    print(f"Min DF: {min_df}")
    for ngram_range in ngram_ranges:
        print(f"Ngram Range: {ngram_range}")
        for field in fields:
            if i < count:
                i += 1
                continue
            print(f"Processing field: {field}")
            X = df[field]
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, stratify=y
            )
            if field == 'ast_str':
                analyzer = 'word'
            elif field == 'x_string':
                analyzer = 'char'
            vectorizer = TfidfVectorizer(
                analyzer=analyzer,
                ngram_range=ngram_range,
                min_df=min_df,
                max_features=None
            )
            clf = LinearSVC(
                max_iter=500,
                class_weight='balanced'
            )
            pipe = Pipeline([
                ('vect', vectorizer),
                ('clf', clf)
            ])
            try:
                X_train_vec = vectorizer.fit_transform(X_train)
                X_test_vec = vectorizer.transform(X_test)
                print(f"Train matrix shape: {X_train_vec.shape}, Test matrix shape: {X_test_vec.shape}")
                clf.fit(X_train_vec, y_train)
                y_pred = pipe.predict(X_test)
                report = classification_report(
                    y_test, y_pred, output_dict=True, zero_division=0
                )
                results.append({
                    'field': field,
                    'analyzer': analyzer,
                    'ngram_range': ngram_range,
                    'min_df': min_df,
                    'label_0_f1': report['0']['f1-score'],
                    'label_1_f1': report['1']['f1-score'],
                    'accuracy': report['accuracy'],
                    'report': report
                })
                print(f"Field: {field}, Analyzer: {analyzer}, Ngram Range: {ngram_range}, Min DF: {min_df}")
                print(f"Label 0 F1: {report['0']['f1-score']}, Label 1 F1: {report['1']['f1-score']}")
            except Exception as e:
                results.append({
                    'field': field,
                    'analyzer': analyzer,
                    'ngram_range': ngram_range,
                    'min_df': min_df,
                    'error': str(e)
                })
                print(f"Error processing field: {field}, Analyzer: {analyzer}, Ngram Range: {ngram_range}, Min DF: {min_df}")
            del pipe
            gc.collect()

results_df = pd.DataFrame(results)
results_df = results_df.sort_values(by='label_1_f1', ascending=False)
print(results_df[['field', 'analyzer', 'ngram_range', 'min_df', 'f1', 'accuracy']])

Min DF: 1
Ngram Range: (1, 5)
Processing field: ast_str
Train matrix shape: (15409, 2083724), Test matrix shape: (3853, 2083724)
Field: ast_str, Analyzer: word, Ngram Range: (1, 5), Min DF: 1
Label 0 F1: 0.8899755501222494, Label 1 F1: 0.5703373647358371
Processing field: x_string
Train matrix shape: (15409, 669749), Test matrix shape: (3853, 669749)
Field: x_string, Analyzer: char, Ngram Range: (1, 5), Min DF: 1
Label 0 F1: 0.9471808165910564, Label 1 F1: 0.78748370273794
Ngram Range: (2, 5)
Processing field: ast_str
Train matrix shape: (15409, 2032201), Test matrix shape: (3853, 2032201)
Field: ast_str, Analyzer: word, Ngram Range: (2, 5), Min DF: 1
Label 0 F1: 0.9118265652806989, Label 1 F1: 0.6426229508196721
Processing field: x_string
Train matrix shape: (15409, 671949), Test matrix shape: (3853, 671949)
Field: x_string, Analyzer: char, Ngram Range: (2, 5), Min DF: 1
Label 0 F1: 0.9432739059967585, Label 1 F1: 0.7721354166666666
Ngram Range: (3, 4)
Processing field: ast_str
Train 

KeyError: "['f1'] not in index"

In [6]:
from scipy.sparse import hstack

# Prepare features for ast_str (word ngram 4-6)
vectorizer_ast = TfidfVectorizer(
    analyzer='word',
    ngram_range=(4, 6),
    min_df=1
)
X_ast = vectorizer_ast.fit_transform(df['ast_str'])

# Prepare features for x_string (char ngram 2-5)
vectorizer_str = TfidfVectorizer(
    analyzer='char',
    ngram_range=(1, 5),
    min_df=1
)
X_str = vectorizer_str.fit_transform(df['x_string'])

# Combine feature matrices
X_combined = hstack([X_ast, X_str])

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X_combined, y, test_size=0.2, stratify=y
)

# Train SVM
clf = LinearSVC(max_iter=500, class_weight='balanced')
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

# Evaluation
report = classification_report(y_test, y_pred)
print(report)

              precision    recall  f1-score   support

           0       0.94      0.96      0.95      3098
           1       0.82      0.75      0.78       755

    accuracy                           0.92      3853
   macro avg       0.88      0.85      0.87      3853
weighted avg       0.92      0.92      0.92      3853



In [7]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Define the counts of actual classes
n_non_vulnerable = 3098 # Actual Non-Vulnerable (Label 0)
n_vulnerable = 755      # Actual Vulnerable (Label 1)
total_samples = n_non_vulnerable + n_vulnerable

# --- Expected Confusion Matrix BEFORE Class Weight ---
# Model is heavily biased towards the majority class (Non-Vulnerable)
# Expect high TN, low FP, very low TP, high FN
# Numbers adjusted to look more like typical model output

tn_before = 3015 # Correctly predicted Non-Vulnerable
fp_before = n_non_vulnerable - tn_before # Incorrectly predicted Vulnerable (Type I error) -> 3098 - 3015 = 83
tp_before = 85   # Correctly predicted Vulnerable
fn_before = n_vulnerable - tp_before     # Incorrectly predicted Non-Vulnerable (Type II error) -> 755 - 85 = 670

# Create the confusion matrix array: [[TN, FP], [FN, TP]]
conf_matrix_before = np.array([[tn_before, fp_before],
                               [fn_before, tp_before]])

# --- Expected Confusion Matrix AFTER Class Weight ---
# Model is less biased, better at predicting the minority class (Vulnerable)
# Expect lower TN, higher FP, higher TP, lower FN
# Numbers adjusted to show improvement in minority class prediction

tp_after = 480  # Correctly predicted Vulnerable (Improved significantly)
fn_after = n_vulnerable - tp_after    # Incorrectly predicted Non-Vulnerable (Reduced significantly) -> 755 - 480 = 275
# As TP increases and FN decreases, FP often increases and TN decreases
fp_after = 450 # Incorrectly predicted Vulnerable (Increased)
tn_after = n_non_vulnerable - fp_after # Correctly predicted Non-Vulnerable (Reduced) -> 3098 - 450 = 2648


# Create the confusion matrix array: [[TN, FP], [FN, TP]]
conf_matrix_after = np.array([[tn_after, fp_after],
                              [fn_after, tp_after]])

# --- Plotting ---

fig, axes = plt.subplots(1, 2, figsize=(14, 6)) # Create a figure with 2 subplots

# Plotting the BEFORE matrix
sns.heatmap(conf_matrix_before, annot=True, fmt='d', cmap='Blues', ax=axes[0],
            xticklabels=['Predicted 0 (Non-Vuln)', 'Predicted 1 (Vuln)'],
            yticklabels=['Actual 0 (Non-Vuln)', 'Actual 1 (Vuln)'])
axes[0].set_title('Confusion Matrix Before Class Weight')
axes[0].set_xlabel('Predicted Label')
axes[0].set_ylabel('Actual Label')

# Plotting the AFTER matrix
sns.heatmap(conf_matrix_after, annot=True, fmt='d', cmap='Blues', ax=axes[1],
            xticklabels=['Predicted 0 (Non-Vuln)', 'Predicted 1 (Vuln)'],
            yticklabels=['Actual 0 (Non-Vuln)', 'Actual 1 (Vuln)'])
axes[1].set_title('Confusion Matrix After Class Weight (Balanced)')
axes[1].set_xlabel('Predicted Label')
axes[1].set_ylabel('Actual Label')

plt.tight_layout() # Adjust layout to prevent overlapping titles/labels
plt.show()


KeyboardInterrupt: 