## Import Dependency

In [None]:
!pip uninstall tensorflow sentence-transformers

^C


In [1]:

!pip install sentence-transformers torch pandas scikit-learn joblib requests lightgbm

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [4]:
import pandas as pd
import json
import gzip
import os
import requests
import joblib
import numpy as np
from pathlib import Path
from datetime import datetime
import csv
import urllib.request

# Text Embedding and ML Models
from sentence_transformers import SentenceTransformer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

# The classifiers we will compare
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import lightgbm as lgb
from lightgbm.callback import early_stopping

# Import SMOTE for handling class imbalance
# You may need to install this: pip install imbalanced-learn
from imblearn.over_sampling import SMOTE


# --- Configuration for file paths ---
# Base directories
DATA_DIR = Path("data")
MODELS_DIR = Path("models")

# Subdirectories for data
NVD_DATA_DIR = DATA_DIR / "nvd_data"
GARAK_DATA_DIR = DATA_DIR / "garak"

# Specific file paths
PARSED_DATA_PATH = NVD_DATA_DIR / "all_nvd_cves.pkl"
GARAK_REPORT_JSONL = GARAK_DATA_DIR / "garak.report.jsonl"
GARAK_REPORT_CSV = GARAK_DATA_DIR / "garak_report_flat.csv"

# Model file paths
MODEL_PATH = MODELS_DIR / "best_cvss_classifier_historic.pkl"
LABEL_ENCODER_PATH = MODELS_DIR / "cvss_label_encoder_historic.pkl"

## Download Sample garak report for gpt35-0906.report.jsonl

In [9]:
# ----------------------------------------
# STEP 1A: Process Garak Report
# ----------------------------------------
def process_garak_report():
    """
    Downloads a sample Garak report if not present, and converts it
    from .jsonl format to a flattened .csv file.
    """
    # Create parent directories if they don't exist
    GARAK_DATA_DIR.mkdir(parents=True, exist_ok=True)

    # Download sample Garak report if not present
    url = "https://gist.githubusercontent.com/shubhobm/9fa52d71c8bb36bfb888eee2ba3d18f2/raw/ef1808e6d3b26002d9b046e6c120d438adf49008/gpt35-0906.report.jsonl"
    if not GARAK_REPORT_JSONL.exists():
        print("Downloading sample Garak report...")
        urllib.request.urlretrieve(url, GARAK_REPORT_JSONL)
        print(f"✅ Downloaded: {GARAK_REPORT_JSONL}")

    # Status decoding helper
    def parse_status(status_code):
        if status_code == 1:
            return "Pass"
        elif status_code == 2:
            return "Fail"
        else:
            return "Not Evaluated"

    # Turn-based or prompt-based format helper
    def extract_input_output(record):
        turns = record.get("notes", {}).get("turns", [])
        if turns:  # Multi-turn conversation
            attacker, bot = [], []
            for role, msg in turns:
                msg = msg.strip().replace("\n", " ")
                if role == "probe":
                    attacker.append(msg)
                elif role == "model":
                    bot.append(msg)
            return " | ".join(attacker), " | ".join(bot)

        # Fallback to single-turn prompt + outputs
        prompt = record.get("prompt", "").strip().replace("\n", " ")
        outputs = record.get("outputs", [])
        output_texts = [o.strip().replace("\n", " ") for o in outputs]
        return prompt, " | ".join(output_texts)

    # Main conversion loop
    with open(GARAK_REPORT_JSONL, "r", encoding="utf-8") as infile, \
         open(GARAK_REPORT_CSV, "w", newline='', encoding="utf-8") as outfile:

        writer = csv.DictWriter(outfile, fieldnames=[
            "uuid", "probe_classname", "attacker_input", "target_bot_response",
            "status", "goal", "trigger"
        ])
        writer.writeheader()

        for line in infile:
            record = json.loads(line)
            if record.get("entry_type") != "attempt":
                continue

            writer.writerow({
                "uuid": record.get("uuid", ""),
                "probe_classname": record.get("probe_classname", ""),
                "attacker_input": extract_input_output(record)[0],
                "target_bot_response": extract_input_output(record)[1],
                "status": parse_status(record.get("status")),
                "goal": record.get("goal", ""),
                "trigger": record.get("notes", {}).get("trigger", "")
            })

    print(f"✅ Garak report successfully converted to: {GARAK_REPORT_CSV}")
process_garak_report()


Downloading sample Garak report...
✅ Downloaded: data/garak/garak.report.jsonl
✅ Garak report successfully converted to: data/garak/garak_report_flat.csv


## Download Latest cves data from NVD to train own CVSS seviarity score kind of model

In [10]:
# ----------------------------------------
# STEP 1B: Download and Parse All Historical NVD Data
# ----------------------------------------
def download_and_parse_all_nvd_data():
    """
    Downloads all NVD CVE data, parses them, removes duplicates, and saves
    the result to a pickle file inside the nvd_data directory.
    """
    # Create parent directories if they don't exist
    NVD_DATA_DIR.mkdir(parents=True, exist_ok=True)
    BASE_URL = "https://nvd.nist.gov/feeds/json/cve/1.1/"
    START_YEAR = 2002
    CURRENT_YEAR = datetime.now().year

    print("--- Starting NVD Data Download ---")
    for year in range(START_YEAR, CURRENT_YEAR + 1):
        filename = f"nvdcve-1.1-{year}.json.gz"
        download_path = NVD_DATA_DIR / filename
        url = f"{BASE_URL}{filename}"

        if download_path.exists():
            print(f"Skipping {filename}, already downloaded.")
            continue

        print(f"Downloading: {url}")
        try:
            response = requests.get(url, stream=True, timeout=30)
            if response.status_code == 200:
                with open(download_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                print(f" -> Successfully saved to {download_path}")
            else:
                print(f" -> Failed to download {filename}: HTTP {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f" -> An error occurred while downloading {filename}: {e}")
    print("--- Download Process Complete ---")

    print("\n--- Starting NVD Data Parsing ---")
    parsed_cve_list = []
    for file_path in sorted(NVD_DATA_DIR.glob('*.json.gz')):
        print(f"Parsing: {file_path.name}")
        try:
            with gzip.open(file_path, 'rt', encoding='utf-8') as f:
                cve_data = json.load(f)
            for item in cve_data.get("CVE_Items", []):
                description = next((d["value"] for d in item.get("cve", {}).get("description", {}).get("description_data", []) if d.get("lang") == "en"), "")
                impact = item.get("impact", {})
                severity = impact.get('baseMetricV3', {}).get('cvssV3', {}).get('baseSeverity') or impact.get('baseMetricV2', {}).get('severity')

                if description and severity:
                    parsed_cve_list.append({"description": description.strip(), "severity": severity.strip().capitalize()})
        except Exception as e:
            print(f" -> An error occurred while parsing {file_path.name}: {e}")

    df = pd.DataFrame(parsed_cve_list)

    print("\n--- Removing Duplicates ---")
    print(f"Number of entries before duplicate removal: {len(df)}")
    df.drop_duplicates(subset=['description'], keep='last', inplace=True)
    print(f"Number of entries after duplicate removal: {len(df)}")

    df.to_pickle(PARSED_DATA_PATH)
    print(f"\n--- Parsing Complete. Saved {len(df)} unique entries to {PARSED_DATA_PATH} ---")


download_and_parse_all_nvd_data()

--- Starting NVD Data Download ---
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2002.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2002.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2003.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2003.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2004.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2004.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2005.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2005.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2006.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2006.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2007.json.gz
 -> Successfully saved to data/nvd_data/nvdcve-1.1-2007.json.gz
Downloading: https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2008.json.gz
 -> Successfully saved to data/nvd_data/nv

In [None]:
# ----------------------------------------
# STEP 2: Find the Best Classifier and Train It
# ----------------------------------------
def train_and_evaluate_models():
    if not PARSED_DATA_PATH.exists():
        print(f"Error: Parsed data not found at {PARSED_DATA_PATH}. Please run 'download' first.")
        return

    print(f"Loading parsed data from {PARSED_DATA_PATH}...")
    df = pd.read_pickle(PARSED_DATA_PATH)
    df.dropna(subset=['description', 'severity'], inplace=True)
    df = df[df['description'] != '']
    print(f"\nTraining on {len(df)} valid NVD entries after cleaning.")

    le = LabelEncoder()
    y = le.fit_transform(df['severity'])

    print("Loading embedding model: 'all-mpnet-base-v2'...")
    embed_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
    print("\n!!! WARNING: Encoding all descriptions will take a very long time and consume significant memory. Please be patient. !!!")
    X = embed_model.encode(df['description'].tolist(), show_progress_bar=True)

    X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.25, random_state=42)

    # --- NEW: Apply SMOTE to handle class imbalance ---
    print("\nApplying SMOTE to balance the training data...")
    smote = SMOTE(random_state=42)
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
    print("SMOTE balancing complete. Training set size is now:", X_train_resampled.shape)

    classifiers = {
        "Logistic Regression": LogisticRegression(max_iter=2000, random_state=42), # No longer need class_weight='balanced'
        "Random Forest": RandomForestClassifier(n_jobs=-1, random_state=42), # No longer need class_weight='balanced'
        "LightGBM (Tuned)": lgb.LGBMClassifier(
            n_estimators=1000,
            learning_rate=0.05,
            num_leaves=31,
            random_state=42
        )
    }

    best_f1, best_model_name, best_classifier_obj = -1, "", None
    for name, clf in classifiers.items():
        print(f"\n--- Training {name} ---")

        # Use early stopping for LightGBM to find the best number of trees
        if "LightGBM" in name:
            clf.fit(X_train_resampled, y_train_resampled,
                    eval_set=[(X_test, y_test)],
                    eval_metric='multi_logloss',
                    callbacks=[early_stopping(10, verbose=False)])
        else:
            # Train other models on the resampled data
            clf.fit(X_train_resampled, y_train_resampled)

        y_pred = clf.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
        f1_score = report["weighted avg"]["f1-score"]
        if f1_score > best_f1:
            best_f1, best_model_name, best_classifier_obj = f1_score, name, clf
        print(classification_report(y_test, y_pred, target_names=le.classes_, zero_division=0))

    print(f"\n🏆 Best performing model is: {best_model_name}")
    print(f"\nRetraining {best_model_name} on the full resampled dataset for final model...")

    # Retrain the final model on all data, resampled
    X_resampled_full, y_resampled_full = smote.fit_resample(X, y)
    best_classifier_obj.fit(X_resampled_full, y_resampled_full)

    MODELS_DIR.mkdir(parents=True, exist_ok=True)
    joblib.dump(best_classifier_obj, MODEL_PATH)
    joblib.dump(le, LABEL_ENCODER_PATH)
    print(f"✅ Best model saved to {MODEL_PATH}")


train_and_evaluate_models()

Loading parsed data from data/nvd_data/all_nvd_cves.pkl...

Training on 146631 valid NVD entries after cleaning.
Loading embedding model: 'all-mpnet-base-v2'...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.





Batches:   0%|          | 0/4583 [00:00<?, ?it/s]


Applying SMOTE to balance the training data...
SMOTE balancing complete. Training set size is now: (192628, 768)

--- Training Logistic Regression ---
              precision    recall  f1-score   support

    Critical       0.44      0.69      0.54      5673
        High       0.64      0.48      0.55     14218
         Low       0.08      0.65      0.15       714
      Medium       0.79      0.56      0.66     16053

    accuracy                           0.55     36658
   macro avg       0.49      0.60      0.47     36658
weighted avg       0.66      0.55      0.59     36658


--- Training Random Forest ---
              precision    recall  f1-score   support

    Critical       0.56      0.54      0.55      5673
        High       0.62      0.69      0.65     14218
         Low       0.41      0.26      0.32       714
      Medium       0.76      0.70      0.73     16053

    accuracy                           0.66     36658
   macro avg       0.59      0.55      0.56     36658
w



[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 3.834028 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 195840
[LightGBM] [Info] Number of data points in the train set: 192628, number of used features: 768
[LightGBM] [Info] Start training from score -1.386294
[LightGBM] [Info] Start training from score -1.386294
[LightGBM] [Info] Start training from score -1.386294
[LightGBM] [Info] Start training from score -1.386294


In [None]:
# ----------------------------------------
# STEP 3: Predict using a saved model
# ----------------------------------------
def predict_on_garak():
    if not MODEL_PATH.exists() or not GARAK_REPORT_CSV.exists():
        print(f"Error: Model or Garak CSV not found. Please run 'train' and 'process_garak' first.")
        return

    print(f"Loading saved model from {MODEL_PATH}...")
    clf, le = joblib.load(MODEL_PATH), joblib.load(LABEL_ENCODER_PATH)

    print("Loading embedding model for prediction...")
    model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")

    df = pd.read_csv(GARAK_REPORT_CSV)
    df["full_text"] = df["target_bot_response"].fillna('')

    print("\nEmbedding Garak report for prediction...")
    embeddings = model.encode(df["full_text"].tolist(), show_progress_bar=True)

    predicted_probabilities = clf.predict_proba(embeddings)
    df["predicted_severity"] = le.inverse_transform(np.argmax(predicted_probabilities, axis=1))
    df["confidence_score"] = np.round(np.max(predicted_probabilities, axis=1), 4)

    for i, class_name in enumerate(le.classes_):
        df[f'prob_{class_name.lower()}'] = np.round(predicted_probabilities[:, i], 4)

    output_path = GARAK_DATA_DIR / "garak_with_severity_historic.csv"
    df.to_csv(output_path, index=False)
    print(f"✅ Predictions saved to {output_path}")

    print("\n--- Generating Final Vulnerability Score for the Report ---")
    severity_to_score = {'Critical': 10, 'High': 7, 'Medium': 4, 'Low': 1}
    severity_counts = df['predicted_severity'].value_counts()
    print("Severity Distribution:\n", severity_counts)

    total_score = sum(count * severity_to_score.get(s.capitalize(), 0) for s, count in severity_counts.items())
    max_possible_score = len(df) * 10
    normalized_score = (total_score / max_possible_score) * 100 if max_possible_score > 0 else 0

    print(f"\nTotal Raw Risk Score: {total_score}")
    print(f"Normalized Report Vulnerability Score (0-100): {normalized_score:.2f}")
    print("----------------------------------------------------------")

predict_on_garak()
