# Scrape

In [None]:
import requests
from bs4 import BeautifulSoup

# List of CWE IDs
cwe_ids = [
    14, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 134, 135, 
    170, 188, 191, 192, 194, 195, 196, 197, 242, 243, 244, 362, 364, 366, 374, 375,
    401, 415, 416, 457, 460, 462, 463, 464, 466, 467, 468, 469, 474, 476, 478, 479,
    480, 481, 482, 483, 484, 495, 496, 558, 560, 562, 587, 676, 685, 688, 689, 690, 
    704, 733, 762, 781, 782, 783, 785, 787, 789, 805, 806, 839, 843, 910, 911, 1325, 
    1335, 1341
]

# Function to scrape data for a specific CWE ID
def scrape_cwe(cwe_id):
    url = f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
    response = requests.get(url)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        
        # Extract the title
        title = soup.title.string.strip() if soup.title else f"CWE-{cwe_id}: Title not found"
        # Clean the title to remove the extra spaces and version number in parentheses
        title = " ".join(soup.title.string.split()).replace("CWE - ", "").split("(")[0].strip()
        
        # Extract Description
        description_div = soup.find('div', id="Description")
        description = description_div.find('div', class_="indent").get_text(strip=True) if description_div else "Description not found"
        
        # Extract Extended Description
        extended_description_div = soup.find('div', id="Extended_Description")
        extended_description = extended_description_div.find('div', class_="indent").get_text(strip=True) if extended_description_div else "Extended Description not found"
        
        return title, description, extended_description
    else:
        print(f"Failed to retrieve CWE-{cwe_id}")
        return None, None, None


# File to store the results
output_file = "cwe_descriptions.txt"

# Scrape and save data
with open(output_file, "w", encoding="utf-8") as f:
    for cwe_id in cwe_ids:
        print(f"Proccessing CWE-{cwe_id}")
        title, description, extended_description = scrape_cwe(cwe_id)
        if description and extended_description:
            f.write(f"Title: {title}\n")
            f.write(f"Description: {description}\n")
            f.write(f"Extended Description: {extended_description}\n")
            f.write("\n" + "-"*80 + "\n\n")

print(f"Scraping completed. Descriptions saved to {output_file}")

# Generate

In [None]:
import os
import re
from openai import OpenAI
from difflib import SequenceMatcher

# Initialize OpenAI client
client = OpenAI(api_key="sk-svcacct-kPyRL1I2gCx0-6gW5EH5MXT3upS_euCjuQf1uaNp776BDzVWv_ixUja__XSf9Y1T3BlbkFJjfIKXBKnz-CBdOBMbxRMIQ4cpyuRQSYygrhC5AivGPeZ18ATUP3yy4XzCJtbZAA")

def create_directories():
    """Ensure directories for vulnerable and patched examples exist."""
    os.makedirs("vuln_data", exist_ok=True)
    os.makedirs("patched_data", exist_ok=True)

def parse_cwe_file(file_path):
    """Parse CWE descriptions and return structured data."""
    cwe_data = []
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read().strip().split("\n\n--------------------------------------------------------------------------------\n\n")
        for entry in content:
            lines = entry.split('\n')
            title = lines[0].replace("Title: ", "").strip()
            description = lines[1].replace("Description: ", "").strip()
            extended_description = lines[2].replace("Extended Description: ", "").strip() if len(lines) > 2 else ""
            cwe_data.append((title, description, extended_description))
    return cwe_data

def query_chatgpt(prompt):
    """Query ChatGPT API for code generation."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are an expert in identifying and fixing vulnerabilities in code. Generate code snippets as per the instructions."},
            {"role": "user", "content": prompt}
        ]
    )
    code = response.choices[0].message.content
    code = re.sub(r"```[a-zA-Z]*\n|```", "", code)  # Clean markdown code formatting
    return code.strip()

def generate_vulnerable_code(title, description, extended_description):
    """Generate a vulnerable code snippet."""
    prompt = (f"Create a snippet of C code based on the following details (it must be written in C):\n"
              f"Title: {title}\n"
              f"Description: {description}\n"
              f"Extended Description: {extended_description}\n"
              "Ensure that you only print the code and nothing else as it will be going into a .txt file.\n"
              "Do not include comments and do not include markdown.\n")
    return query_chatgpt(prompt)

def generate_patched_code(vulnerable_code, title, description, extended_description):
    """Generate the patched version of the code."""
    prompt = (f"Given the following vulnerable code, fix the vulnerability:\n\n"
              f"{vulnerable_code}\n\n"
              "Make sure the patched code adheres to secure coding practices. "
              "Only output the fixed code without comments or markdown.")
    return query_chatgpt(prompt)

def similarity(a, b):
    """Calculate similarity ratio between two strings."""
    return SequenceMatcher(None, a, b).ratio()

def is_unique(code, existing_codes, threshold=0.8):
    """Check if the code is unique based on similarity threshold."""
    return all(similarity(code, existing) < threshold for existing in existing_codes)

def save_code_to_file(directory, title, example_num, code):
    """Save a code example to a file."""
    safe_title = re.sub(r'[^\w\s]', '', title).replace(' ', '_')
    filename = f"{directory}/{safe_title}_{example_num}.c"
    with open(filename, 'w', encoding='utf-8') as file:
        file.write(code)
    print(f"Saved: {filename}")

def main():
    create_directories()
    cwe_data = parse_cwe_file('cwe_descriptions.txt')

    for title, description, extended_description in cwe_data:
        print(f"Processing CWE: {title}")

        vulnerable_examples = []
        patched_examples = []

        for example_num in range(10):  # Generate 10 examples per CWE
            # Generate a unique vulnerable example
            while True:
                vulnerable_code = generate_vulnerable_code(title, description, extended_description)
                if is_unique(vulnerable_code, vulnerable_examples):
                    break
            vulnerable_examples.append(vulnerable_code)

            # Generate the corresponding patched example
            patched_code = generate_patched_code(vulnerable_code, title, description, extended_description)
            patched_examples.append(patched_code)

            # Save each example to separate files
            save_code_to_file("vuln_data", title, example_num, vulnerable_code)
            save_code_to_file("patched_data", title, example_num, patched_code)

if __name__ == "__main__":
    main()

# Tokenizer.py

In [None]:
import os
import re
import json
import networkx as nx
from clang.cindex import Index, Config

# Configure libclang path
Config.set_library_file('.venv/lib/python3.12/site-packages/clang/native/libclang.so')

# Define token mappings
TOKEN_TYPE_MAP = {
    "KEYWORD": 1,
    "SYMBOL": 2,
    "NUMBER": 3,
    "FUNCTION": 4,
    "IDENTIFIER": 5,
    "UNK": 6
}

KEYWORDS = {'int', 'float', 'return', 'if', 'else', 'for', 'while', 'void', 'size_t', 'char', 'const', 'volatile', 'unsigned'}
SYMBOLS = {'(', ')', '{', '}', '[', ']', ';', ',', '->', '++', '--', '=', '==', '!=', '<', '>', '<=', '>=', '+', '-', '*', '/', '%', '&', '|', '^', '~', '!', '<<', '>>'}
FUNCTIONS = {'strcpy', 'memcpy', 'malloc', 'free', 'printf', 'fgets', 'strlen', 'strncpy', 'explicit_bzero'}
UNK = TOKEN_TYPE_MAP["UNK"]

NUMBER_PATTERN = r'\b\d+\b'
IDENTIFIER_PATTERN = r'\b[A-Za-z_][A-Za-z0-9_]*\b'

def tokenize_ast(code):
    tokens = []
    try:
        with open("temp.c", "w") as temp_file:
            temp_file.write(code)
        index = Index.create()
        tu = index.parse("temp.c")
        def visit_node(node):
            if node.kind is not None:
                tokens.append(node.kind.name)
            for child in node.get_children():
                visit_node(child)
        visit_node(tu.cursor)
    except Exception as e:
        print(f"Error parsing AST: {e}")
        tokens.append("UNK")
    return tokens

def syntax_aware_tokenize(code):
    tokens = []
    words = re.findall(r'\w+|\S', code)
    for word in words:
        if word in KEYWORDS:
            tokens.append("KEYWORD")
        elif word in SYMBOLS:
            tokens.append("SYMBOL")
        elif re.fullmatch(NUMBER_PATTERN, word):
            tokens.append("NUMBER")
        elif word in FUNCTIONS:
            tokens.append("FUNCTION")
        elif re.fullmatch(IDENTIFIER_PATTERN, word):
            tokens.append("IDENTIFIER")
        else:
            tokens.append("UNK")
    return tokens

def generate_cfg(code):
    """
    Generate a control flow graph (CFG) using libclang.
    Args:
        code (str): C code to analyze.
    Returns:
        list: List of edges representing the CFG.
    """
    G = nx.DiGraph()
    try:
        # Write the code to a temporary file for libclang to parse
        with open("temp.c", "w") as temp_file:
            temp_file.write(code)

        # Parse the code with libclang
        index = Index.create()
        tu = index.parse("temp.c")

        # Helper function to add nodes and edges
        def visit_node(node, parent=None):
            node_id = node.spelling or node.displayname or node.kind.name
            if not node_id:
                node_id = f"Node_{node.hash}"
            G.add_node(node_id, kind=node.kind.name)

            if parent is not None:
                G.add_edge(parent, node_id)

            # Recursively visit children
            for child in node.get_children():
                visit_node(child, node_id)

        # Start visiting from the root cursor
        visit_node(tu.cursor)

        # Return edges as a list for storage
        return list(G.edges)
    except Exception as e:
        print(f"Error generating CFG: {e}")
        return [("UNK", "UNK")]


def tokenize_code(code, method="basic"):
    if method == "ast":
        return tokenize_ast(code)
    elif method == "syntax":
        return syntax_aware_tokenize(code)
    elif method == "cfg":
        return generate_cfg(code)
    else:
        tokens = []
        words = re.findall(r'\w+|\S', code)
        for word in words:
            if word in KEYWORDS:
                tokens.append(TOKEN_TYPE_MAP["KEYWORD"])
            elif word in SYMBOLS:
                tokens.append(TOKEN_TYPE_MAP["SYMBOL"])
            elif re.fullmatch(NUMBER_PATTERN, word):
                tokens.append(TOKEN_TYPE_MAP["NUMBER"])
            elif word in FUNCTIONS:
                tokens.append(TOKEN_TYPE_MAP["FUNCTION"])
            elif re.fullmatch(IDENTIFIER_PATTERN, word):
                tokens.append(TOKEN_TYPE_MAP["IDENTIFIER"])
            else:
                tokens.append(UNK)
        return tokens

def save_as_json(output_file, tokens, label):
    data = {"tokens": tokens, "label": label}
    with open(output_file, 'w', encoding='utf-8') as json_file:
        json.dump(data, json_file)
    print(f"Saved JSON: {output_file}")

def tokenize_file(input_file, output_file_base, label, method="basic"):
    with open(input_file, 'r', encoding='utf-8') as infile:
        code = infile.read()
    tokens = tokenize_code(code, method=method)
    save_as_json(f"{output_file_base}_{method}.json", tokens, label)

def process_directory(input_dir, output_dir, label, method="basic"):
    os.makedirs(output_dir, exist_ok=True)
    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith('.c'):
                input_path = os.path.join(root, file)
                base_filename = file.replace('.c', '')
                output_file_base = os.path.join(output_dir, base_filename)
                tokenize_file(input_path, output_file_base, label, method=method)

def process_test_directory(test_dir, vuln_output_dir, patched_output_dir, method="basic"):
    os.makedirs(vuln_output_dir, exist_ok=True)
    os.makedirs(patched_output_dir, exist_ok=True)
    for root, _, files in os.walk(test_dir):
        for file in files:
            if file.endswith('.c'):
                input_path = os.path.join(root, file)
                base_filename = file.replace('.c', '')
                if "CLEAN" in file.upper():
                    output_file_base = os.path.join(patched_output_dir, base_filename)
                    label = 0
                else:
                    output_file_base = os.path.join(vuln_output_dir, base_filename)
                    label = 1
                tokenize_file(input_path, output_file_base, label, method=method)

def main():
    vuln_input_dir = "vuln_data"
    patched_input_dir = "patched_data"
    test_dir = "TestCode"
    methods = ["basic", "ast", "syntax", "cfg"]

    for method in methods:
        print(f"Tokenizing with method: {method}...")
        process_directory(vuln_input_dir, f"vuln_train_{method}", label=1, method=method)
        process_directory(patched_input_dir, f"patched_train_{method}", label=0, method=method)
        process_test_directory(test_dir, f"vuln_test_{method}", f"patched_test_{method}", method=method)

if __name__ == "__main__":
    main()

# train.py

In [None]:
import gym
import numpy as np
from stable_baselines3 import DQN
from sklearn.metrics import accuracy_score

# Function to create the custom environment for DQN
def create_env(X_train_encoded, y_train_full):
    """
    Create a custom environment for training DQN.

    Args:
        X_train_encoded: The tokenized and encoded training sequences.
        y_train_full: The labels for the training data.

    Returns:
        env: A gym environment for DQN.
    """
    class CustomEnv(gym.Env):
        def __init__(self, X_train_encoded, y_train_full):
            super(CustomEnv, self).__init__()
            self.X = X_train_encoded
            self.y = y_train_full
            self.current_idx = 0
            self.action_space = gym.spaces.Discrete(2)  # Assume binary classification (patched or vulnerable)
            self.observation_space = gym.spaces.Box(low=0, high=1, shape=(X_train_encoded.shape[1],), dtype=np.float32)
        
        def reset(self):
            self.current_idx = 0
            return self.X[self.current_idx]
        
        def step(self, action):
            reward = 1 if action == self.y[self.current_idx] else -1
            self.current_idx += 1
            done = self.current_idx >= len(self.X)
            next_state = self.X[self.current_idx] if not done else np.zeros_like(self.X[0])
            return next_state, reward, done, {}

        def render(self, mode='human'):
            pass

    env = CustomEnv(X_train_encoded, y_train_full)
    return env


# Function to train and evaluate the DQN model
def train_and_eval_dqn(model, X_train_encoded, y_train_full, X_test_encoded, y_test):
    """
    Train the DQN model and evaluate its performance.

    Args:
        model: The DQN model.
        X_train_encoded: The tokenized and encoded training sequences.
        y_train_full: The labels for the training data.
        X_test_encoded: The encoded test data.
        y_test: The labels for the test data.

    Returns:
        None: Outputs evaluation metrics and saves the model.
    """
    # Train the model
    model.learn(total_timesteps=100000)  # Change timesteps if necessary
    
    # Predict on the training set
    train_preds = model.predict(X_train_encoded, deterministic=True)[0]
    train_accuracy = accuracy_score(y_train_full, train_preds)

    # Predict on the test set
    test_preds = model.predict(X_test_encoded, deterministic=True)[0]
    test_accuracy = accuracy_score(y_test, test_preds)
    
    # Print evaluation results
    print(f"Training Accuracy: {train_accuracy * 100:.2f}%")
    print(f"Test Accuracy: {test_accuracy * 100:.2f}%")
    
    return train_accuracy, test_accuracy


# Start of CNN jupyter notebook

### Loading Preprocessed Data

import os
import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.api.models import Sequential, Model
from keras.api.layers import Embedding, Conv1D, GlobalMaxPooling1D, Dense, Dropout, Input, Concatenate, LSTM, Bidirectional, Dot, Flatten, Layer, Activation, MultiHeadAttention, LayerNormalization
from keras.api.optimizers import Adam, SGD
from keras.api.optimizers.schedules import ExponentialDecay, CosineDecay
from keras.api.callbacks import LearningRateScheduler, Callback, ReduceLROnPlateau, EarlyStopping
from sklearn.metrics import classification_report, accuracy_score, precision_recall_curve, average_precision_score, roc_curve, auc
import matplotlib.pyplot as plt
import math

In [None]:
# Load data from directories
def load_data_from_directory(directory):
    data = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".json"):
                with open(os.path.join(root, file), 'r') as f:
                    entry = json.load(f)
                    data.append((entry["tokens"], entry["label"]))
    return data

# Helper function to load data for a given tokenization method
def load_data_for_method(method):
    train_data = load_data_from_directory(f"vuln_train_{method}") + load_data_from_directory(f"patched_train_{method}")
    test_data = load_data_from_directory(f"vuln_test_{method}") + load_data_from_directory(f"patched_test_{method}")
    
    X_train_full = [entry[0] for entry in train_data]
    y_train_full = [entry[1] for entry in train_data]
    X_test = [entry[0] for entry in test_data]
    y_test = [entry[1] for entry in test_data]
    
    return X_train_full, y_train_full, X_test, y_test

# Set tokenization methods
tokenization_methods = ["basic", "ast", "normalize", "syntax", "cfg"]

# Build vocabulary from training data
def build_vocab(X_train_full):
    """
    Builds a vocabulary from the training data.

    Args:
        X_train_full (list of lists): Tokenized training data.

    Returns:
        dict: Vocabulary mapping tokens to unique IDs.
    """
    vocab = {"<PAD>": 0, "<UNK>": 1}
    current_id = 2

    for sequence in X_train_full:
        for token in (token for sublist in sequence for token in sublist) if isinstance(sequence[0], list) else sequence:
            if token not in vocab:
                vocab[token] = current_id
                current_id += 1
    return vocab

# Normalize nested sequences during token loading
def normalize_sequences(sequences):
    return [
        [token for sublist in sequence for token in sublist] if sequence and isinstance(sequence[0], list) else sequence or ["<UNK>"]
        for sequence in sequences
    ]

# Encode sequences using vocabulary
def encode_sequences(sequences, vocab, max_length):
    unk_id = vocab["<UNK>"]
    encoded = []
    for sequence in sequences:
        encoded_sequence = [vocab.get(token, unk_id) for token in sequence]
        if len(encoded_sequence) > max_length:
            encoded_sequence = encoded_sequence[:max_length]
        else:
            encoded_sequence += [vocab["<PAD>"]] * (max_length - len(encoded_sequence))
        encoded.append(encoded_sequence)
    return np.array(encoded)


### Defining CNN Model

In [None]:
# Positional Encoding Layer
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, embedding_dim):
        super(PositionalEncoding, self).__init__()
        self.embedding_dim = embedding_dim

    def call(self, inputs):
        seq_length = tf.shape(inputs)[1]
        position = tf.range(seq_length, dtype=tf.float32)[:, tf.newaxis]
        div_term = tf.exp(tf.range(0, self.embedding_dim, 2, dtype=tf.float32) * -(np.log(10000.0) / self.embedding_dim))
        positional_encoding = tf.concat([tf.sin(position * div_term), tf.cos(position * div_term)], axis=1)
        positional_encoding = positional_encoding[tf.newaxis, ...]
        return inputs + positional_encoding[:, :seq_length, :]

# Transformer Encoder Layer
def transformer_encoder(inputs, num_heads, key_dim, ff_dim, dropout_rate):
    attention = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)(inputs, inputs)
    attention = Dropout(dropout_rate)(attention)
    attention = LayerNormalization(epsilon=1e-6)(inputs + attention)

    ff_output = Dense(ff_dim, activation="relu")(attention)
    ff_output = Dense(inputs.shape[-1])(ff_output)
    ff_output = Dropout(dropout_rate)(ff_output)
    return LayerNormalization(epsilon=1e-6)(attention + ff_output)

# Build the CNN-BiLSTM Model
def build_cnn_model(vocab_size, embedding_dim=256, num_filters=256, kernel_sizes=[3, 5, 7],
                    lstm_units=256, dense_units=256, dropout_rate=0.5, l2_lambda=0.01, num_heads=4):
    input_layer = Input(shape=(None,), dtype="int32")
    embedding = Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True)(input_layer)
    
    positional_encoding = PositionalEncoding(embedding_dim=embedding_dim)(embedding)
    transformer = transformer_encoder(positional_encoding, num_heads=num_heads, key_dim=embedding_dim, ff_dim=512, dropout_rate=0.1)
    
    conv_layers = []
    for kernel_size in kernel_sizes:
        conv = Conv1D(filters=num_filters, kernel_size=kernel_size, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(l2_lambda))(transformer)
        pooled = GlobalMaxPooling1D()(conv)
        conv_layers.append(pooled)
    
    conv_features = Concatenate()(conv_layers)
    lstm = Bidirectional(LSTM(lstm_units, return_sequences=True, kernel_regularizer=tf.keras.regularizers.l2(l2_lambda)))(transformer)
    
    attention = Dense(1, activation="tanh")(lstm)
    attention = Flatten()(attention)
    attention_weights = tf.keras.layers.Activation("softmax")(attention)
    attention_output = tf.keras.layers.Dot(axes=1)([attention_weights, lstm])
    
    combined = Concatenate()([conv_features, attention_output])
    dense = Dense(dense_units, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(l2_lambda))(combined)
    dropout = Dropout(dropout_rate)(dense)
    output = Dense(1, activation="sigmoid")(dropout)
    
    model = Model(inputs=input_layer, outputs=output)
    model.compile(optimizer=Adam(learning_rate=1e-4), loss="binary_crossentropy", metrics=["accuracy", tf.keras.metrics.AUC(name="auc")])
    return model


### Testing Learning Rates

In [None]:
# # Define the exponential decay learning rate schedule
# def exponential_decay_schedule():
#     return ExponentialDecay(
#         initial_learning_rate=1e-3,
#         decay_steps=1000,
#         decay_rate=0.96,
#         staircase=True
#     )

# # Define the warmup with cosine decay learning rate schedule
# def warmup_cosine_decay_schedule(epoch):
#     if epoch < 5:  # Warmup phase
#         return float(1e-5 + (epoch / 5) * (1e-3 - 1e-5))  # Python float
#     else:  # Cosine decay phase
#         decay = CosineDecay(initial_learning_rate=1e-3, decay_steps=15)
#         return float(decay(epoch - 5))  # Python float

# # Cyclical Learning Rate
# def cyclical_learning_rate_schedule(base_lr=1e-5, max_lr=1e-3, step_size=2000):
#     def lr_schedule(batch):
#         cycle = math.floor(1 + batch / (2 * step_size))
#         x = abs(batch / step_size - 2 * cycle + 1)
#         lr = base_lr + (max_lr - base_lr) * max(0, (1 - x))
#         return float(lr)  # Ensure the return value is a Python float
#     return lr_schedule

# # Train with ReduceLROnPlateau
# def train_with_reduce_lr():
#     reduce_lr = ReduceLROnPlateau(
#         monitor='val_loss',
#         factor=0.5,
#         patience=3,
#         min_lr=1e-6
#     )
#     return None, [reduce_lr]

# # Train with Exponential Decay
# def train_with_exponential_decay():
#     lr_schedule = exponential_decay_schedule()
#     optimizer = SGD(learning_rate=lr_schedule, momentum=0.9)
#     return optimizer, []

# # Train with Warmup + Cosine Decay
# def train_with_warmup_cosine_decay():
#     lr_scheduler = LearningRateScheduler(warmup_cosine_decay_schedule)
#     return None, [lr_scheduler]

# # Train with Cyclical Learning Rate
# def train_with_cyclical_lr(base_lr=1e-5, max_lr=1e-3, step_size=2000):
#     lr_schedule = cyclical_learning_rate_schedule(base_lr, max_lr, step_size)
#     lr_scheduler = LearningRateScheduler(lr_schedule)
#     return None, [lr_scheduler]

# # Function to train the model with a specific strategy
# def train_with_strategy(strategy_name, optimizer=None, callbacks=[]):
#     print(f"Training with {strategy_name}...")

#     # Use default SGD optimizer if not provided
#     if not optimizer:
#         optimizer = SGD(learning_rate=0.01, momentum=0.9)

#     # Build and compile the model
#     model = build_cnn_model(vocab_size=vocab_size, embedding_dim=128)
#     model.compile(optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"])

#     # Train the model
#     history = model.fit(
#         X_train, y_train,
#         validation_data=(X_val, y_val),
#         epochs=10,
#         batch_size=32,
#         callbacks=callbacks,
#         verbose=1
#     )
#     return history

# # Execute strategies
# strategies = {
#     "Cyclical Learning Rate": train_with_cyclical_lr(),
#     "Exponential Decay": train_with_exponential_decay(),
#     "ReduceLROnPlateau": train_with_reduce_lr(),
#     "Warmup + Cosine Decay": train_with_warmup_cosine_decay()
# }

# history_records = {}

# for strategy_name, (optimizer, callbacks) in strategies.items():
#     history = train_with_strategy(strategy_name, optimizer=optimizer, callbacks=callbacks)
#     history_records[strategy_name] = history

# # Plot validation accuracy
# plt.figure(figsize=(10, 6))
# for strategy_name, history in history_records.items():
#     plt.plot(history.history['val_accuracy'], label=strategy_name)
# plt.title("Validation Accuracy by Strategy")
# plt.xlabel("Epochs")
# plt.ylabel("Validation Accuracy")
# plt.legend()
# plt.show()


### Training CNN Using Best Method (so far)

In [None]:
for method in tokenization_methods:
    print(f"Processing tokenization method: {method}")
    
    # Load data
    X_train_full, y_train_full, X_test, y_test = load_data_for_method(method)
    
    X_train_full = normalize_sequences(X_train_full)
    X_test = normalize_sequences(X_test)

    # Build vocabulary
    vocab = build_vocab(X_train_full)
    vocab_size = len(vocab)
    max_length = 512  # Set a fixed maximum sequence length
    
    # Encode sequences
    X_train_full_encoded = encode_sequences(X_train_full, vocab, max_length)
    X_test_encoded = encode_sequences(X_test, vocab, max_length)

    X_train, X_val, y_train, y_val = train_test_split(
        X_train_full_encoded, 
        np.array(y_train_full), 
        test_size=0.2, 
        random_state=42
    )
    y_test = np.array(y_test)
    
    # Build model
    cnn_model = build_cnn_model(vocab_size=vocab_size)
    
    # Callbacks
    early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    reduce_lr_callback = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6)
    
    # Train model
    history = cnn_model.fit(
        X_train, 
        y_train, 
        validation_data=(X_val, y_val), 
        epochs=2,  # Use more epochs for better results
        batch_size=32, 
        callbacks=[early_stopping, reduce_lr_callback], 
        verbose=1
    )

    # Evaluate model
    y_pred_prob = cnn_model.predict(X_test_encoded, verbose=1)
    test_results = cnn_model.evaluate(X_test_encoded, y_test, verbose=1)
    
    # Compute ROC and PRC
    fpr, tpr, _ = roc_curve(y_test, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    precision, recall, _ = precision_recall_curve(y_test, y_pred_prob)
    prc_auc = average_precision_score(y_test, y_pred_prob)

    # Plot ROC Curve
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(fpr, tpr, color='b', lw=2, label=f"AUC = {roc_auc:.4f}")
    plt.plot([0, 1], [0, 1], color='r', linestyle='--', label="Random Baseline")
    plt.title(f"{method.upper()} - ROC Curve")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.legend()

    # Plot PRC Curve
    plt.subplot(1, 2, 2)
    plt.plot(recall, precision, color='b', lw=2, label=f"AP = {prc_auc:.4f}")
    plt.axhline(y=0.5, color='r', linestyle='--', label="Random Baseline")
    plt.title(f"{method.upper()} - Precision-Recall Curve")
    plt.xlabel("Recall")
    plt.ylabel("Precision")
    plt.legend()

    plt.tight_layout()
    plt.show()


# 704dqn.py

In [None]:
import os
import time
import json
import random
from copy import deepcopy
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import (
    classification_report,
    roc_curve,
    auc,
    precision_recall_curve,
    average_precision_score,
)
import matplotlib.pyplot as plt
from torch.optim import AdamW

start = time.time()

# Directory for saving metrics and charts
output_dir = "dqn_metrics_output3"
os.makedirs(output_dir, exist_ok=True)

# Load data from directories
def load_data_from_directory(directory):
    data = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".json"):
                with open(os.path.join(root, file), 'r') as f:
                    entry = json.load(f)
                    data.append((entry["tokens"], entry["label"]))
    return data

# Helper function to load data for a given tokenization method
def load_data_for_method(method):
    train_data = load_data_from_directory(f"vuln_train_{method}") + load_data_from_directory(f"patched_train_{method}")
    test_data = load_data_from_directory(f"vuln_test_{method}") + load_data_from_directory(f"patched_test_{method}")
    X_train_full = [entry[0] for entry in train_data]
    y_train_full = [entry[1] for entry in train_data]
    X_test = [entry[0] for entry in test_data]
    y_test = [entry[1] for entry in test_data]
    return X_train_full, y_train_full, X_test, y_test

# Normalize nested sequences and build vocabulary
def normalize_sequences(sequences):
    return [
        [token for sublist in sequence for token in sublist] if sequence and isinstance(sequence[0], list) else sequence or ["<UNK>"]
        for sequence in sequences
    ]

def build_vocab(X_train_full):
    vocab = {"<PAD>": 0, "<UNK>": 1}
    current_id = 2
    for sequence in X_train_full:
        for token in (token for sublist in sequence for token in sublist) if isinstance(sequence[0], list) else sequence:
            if token not in vocab:
                vocab[token] = current_id
                current_id += 1
    return vocab

def encode_sequences(sequences, vocab, max_length):
    unk_id = vocab["<UNK>"]
    encoded = []
    for sequence in sequences:
        encoded_sequence = [vocab.get(token, unk_id) for token in sequence]
        if len(encoded_sequence) > max_length:
            encoded_sequence = encoded_sequence[:max_length]
        else:
            encoded_sequence += [vocab["<PAD>"]] * (max_length - len(encoded_sequence))
        encoded.append(encoded_sequence)
    return np.array(encoded)

# Double DQN
class QFunction(nn.Module):
    def __init__(self, obs_dim, act_dim, hidden_sizes):
        super().__init__()
        sizes = [obs_dim] + hidden_sizes + [act_dim]
        self.layers = nn.ModuleList([nn.Linear(sizes[i], sizes[i + 1]) for i in range(len(sizes) - 1)])

    def forward(self, obs):
        x = obs
        for layer in self.layers[:-1]:
            x = F.relu(layer(x))
        return self.layers[-1](x)

class DQN:
    def __init__(self, obs_dim, act_dim, options):
        self.model = QFunction(obs_dim, act_dim, options['hidden_sizes'])
        self.target_model = deepcopy(self.model)
        self.optimizer = AdamW(self.model.parameters(), lr=options['lr'])
        self.loss_fn = nn.SmoothL1Loss()
        self.memory = deque(maxlen=options['memory_size'])
        self.gamma = options['gamma']
        self.batch_size = options['batch_size']
        self.target_update_freq = options['target_update_freq']
        self.n_steps = 0

    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def train_step(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.tensor(states, dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        next_states = torch.tensor(next_states, dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.float32)

        # Double DQN: Get the Q-values for next state from the model, then use target network for target Q-values
        q_values = self.model(states).gather(1, actions.view(-1, 1)).squeeze()
        with torch.no_grad():
            next_q_values = self.model(next_states).max(1)[1]  # Get the best action from model
            target_q_values = self.target_model(next_states).gather(1, next_q_values.view(-1, 1)).squeeze()
            target_q_values = rewards + (1 - dones) * self.gamma * target_q_values

        loss = self.loss_fn(q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.n_steps % self.target_update_freq == 0:
            self.target_model.load_state_dict(self.model.state_dict())
        self.n_steps += 1

    def evaluate(self, X_test, y_test, vocab, max_length):
        X_test_encoded = encode_sequences(normalize_sequences(X_test), vocab, max_length)
        y_probs = []
        y_pred = []
        self.model.eval()
        with torch.no_grad():
            for state in torch.tensor(X_test_encoded, dtype=torch.float32):
                q_values = self.model(state.unsqueeze(0))
                probs = torch.softmax(q_values, dim=-1).squeeze()
                y_probs.append(probs[1].item())
                y_pred.append(probs.argmax().item())
        y_probs = np.array(y_probs)
        y_pred = np.array(y_pred)

        # Save metrics and charts
        report = classification_report(y_test, y_pred, target_names=["Non-vulnerable", "Vulnerable"])
        with open(os.path.join(output_dir, "classification_report.txt"), "w") as f:
            f.write(report)

        # ROC curve
        fpr, tpr, _ = roc_curve(y_test, y_probs)
        roc_auc = auc(fpr, tpr)
        plt.figure()
        plt.plot(fpr, tpr, label=f"ROC curve (area = {roc_auc:.2f})")
        plt.plot([0, 1], [0, 1], color="navy", linestyle="--")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title("Receiver Operating Characteristic")
        plt.legend(loc="lower right")
        plt.savefig(os.path.join(output_dir, "roc_curve.png"))
        plt.close()

        # PRC curve
        precision, recall, _ = precision_recall_curve(y_test, y_probs)
        avg_precision = average_precision_score(y_test, y_probs)
        plt.figure()
        plt.step(recall, precision, where="post", label=f"Avg Precision = {avg_precision:.2f}")
        plt.xlabel("Recall")
        plt.ylabel("Precision")
        plt.title("Precision-Recall Curve")
        plt.legend(loc="lower left")
        plt.savefig(os.path.join(output_dir, "prc_curve.png"))
        plt.close()

# Parameters and training loop
options = {'hidden_sizes': [128, 64], 'lr': 0.001, 'gamma': 0.99, 'memory_size': 5000, 'batch_size': 32, 'target_update_freq': 100}
method = "basic"
X_train, y_train, X_test, y_test = load_data_for_method(method)
vocab = build_vocab(normalize_sequences(X_train))
max_length = 100
X_train_encoded = encode_sequences(normalize_sequences(X_train), vocab, max_length)
X_test_encoded = encode_sequences(normalize_sequences(X_test), vocab, max_length)
obs_dim = X_train_encoded.shape[1]
act_dim = 2
dqn = DQN(obs_dim, act_dim, options)

for epoch in range(10):
    for i in range(len(X_train_encoded)):
        state = X_train_encoded[i]
        action = y_train[i]
        reward = 1 if action == 1 else -1
        next_state = state
        done = True
        dqn.memorize(state, action, reward, next_state, done)
        dqn.train_step()

dqn.evaluate(X_test, y_test, vocab, max_length)

end = time.time()

print(f"Time: {end-start}")

# Orginal 
# 10 Epochs = 19.26 (dqn_metrics_output)
# 100 Epochs = 240.5 (dqn_metrics_output2)

# Modified w/ DDQN
# 10 Epochs = 20.2 (dqn_metrics_output3)
# 100 Epochs = 814 (dqn_metrics_output4)


704dqnorg.py

In [None]:
import json
import random
from copy import deepcopy
from collections import deque

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.optim import AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

# Load preprocessed data
with open('processed_data.json', 'r') as infile:
    data = json.load(infile)

X = np.array(data['sequences'])  # Tokenized and padded sequences
y = np.array(data['labels'])     # Corresponding labels (1 for vulnerable)

# Split data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)


class QFunction(nn.Module):
    def __init__(self, obs_dim, act_dim, hidden_sizes):
        super().__init__()
        sizes = [obs_dim] + hidden_sizes + [act_dim]
        self.layers = nn.ModuleList()
        for i in range(len(sizes) - 1):
            self.layers.append(nn.Linear(sizes[i], sizes[i + 1]))

    def forward(self, obs):
        x = torch.cat([obs], dim=-1)
        for i in range(len(self.layers) - 1):
            x = F.relu(self.layers[i](x))
        return self.layers[-1](x).squeeze(dim=-1)


class DQN:
    def __init__(self, obs_dim, act_dim, options):
        self.model = QFunction(obs_dim, act_dim, options['layers'])
        self.target_model = deepcopy(self.model)
        self.optimizer = AdamW(self.model.parameters(), lr=options['alpha'], amsgrad=True)
        self.loss_fn = nn.SmoothL1Loss()

        # Freeze target network parameters
        for p in self.target_model.parameters():
            p.requires_grad = False

        self.replay_memory = deque(maxlen=options['replay_memory_size'])
        self.options = options
        self.n_steps = 0

    def update_target_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def memorize(self, state, action, reward, next_state, done):
        self.replay_memory.append((state, action, reward, next_state, done))

    def compute_target_values(self, next_states, rewards, dones):
        return rewards + self.options['gamma'] * torch.max(self.target_model(next_states), dim=-1)[0] * (1 - dones)

    def replay(self):
        if len(self.replay_memory) > self.options['batch_size']:
            minibatch = random.sample(self.replay_memory, self.options['batch_size'])
            states, actions, rewards, next_states, dones = zip(*minibatch)

            states = torch.as_tensor(states, dtype=torch.float32)
            actions = torch.as_tensor(actions, dtype=torch.long)
            rewards = torch.as_tensor(rewards, dtype=torch.float32)
            next_states = torch.as_tensor(next_states, dtype=torch.float32)
            dones = torch.as_tensor(dones, dtype=torch.float32)

            current_q = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(-1)
            with torch.no_grad():
                target_q = self.compute_target_values(next_states, rewards, dones)

            loss = self.loss_fn(current_q, target_q)
            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_value_(self.model.parameters(), 100)
            self.optimizer.step()

    def train(self, X_train, y_train):
        for epoch in range(self.options['epochs']):
            for i in range(len(X_train)):
                state = X_train[i]
                action = y_train[i]
                reward = 1 if action == 1 else -1  # Reward based on label
                next_state = state  # No transition in supervised setup
                done = True

                self.memorize(state, action, reward, next_state, done)
                self.replay()

                if self.n_steps % self.options['update_target_estimator_every'] == 0:
                    self.update_target_model()

                self.n_steps += 1

            print(f"Epoch {epoch + 1} completed.")

    def evaluate(self, X_val, y_val):
        self.model.eval()
        with torch.no_grad():
            predictions = []
            for i in range(len(X_val)):
                state = torch.as_tensor(X_val[i], dtype=torch.float32).unsqueeze(0)
                q_values = self.model(state)
                predicted_action = torch.argmax(q_values).item()
                predictions.append(predicted_action)

        accuracy = accuracy_score(y_val, predictions)
        print(f"Validation Accuracy: {accuracy:.4f}")
        self.model.train()

    def __str__(self):
        return "DQN"


# Define options for the DQN
options = {
    'layers': [128, 64],
    'alpha': 0.001,
    'gamma': 0.99,
    'replay_memory_size': 10000,
    'batch_size': 32,
    'update_target_estimator_every': 100,
    'epochs': 10
}

# Initialize and train the model
obs_dim = X_train.shape[1]
act_dim = 2  # Binary classification (0 or 1)
dqn = DQN(obs_dim, act_dim, options)

dqn.train(X_train, y_train)
dqn.evaluate(X_val, y_val)

# new704dqn.py - Best one

In [None]:
import os
import json
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve, auc
from stable_baselines3 import DQN
from train import train_and_eval_dqn, create_env
from sklearn.model_selection import train_test_split

# Load data from directories
def load_data_from_directory(directory):
    data = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".json"):
                with open(os.path.join(root, file), 'r') as f:
                    entry = json.load(f)
                    data.append((entry["tokens"], entry["label"]))
    return data

# Helper function to load data for a given tokenization method
def load_data_for_method(method):
    train_data = load_data_from_directory(f"vuln_train_{method}") + load_data_from_directory(f"patched_train_{method}")
    test_data = load_data_from_directory(f"vuln_test_{method}") + load_data_from_directory(f"patched_test_{method}")
    
    X_train_full = [entry[0] for entry in train_data]
    y_train_full = [entry[1] for entry in train_data]
    X_test = [entry[0] for entry in test_data]
    y_test = [entry[1] for entry in test_data]
    
    return X_train_full, y_train_full, X_test, y_test

# Build vocabulary from training data
def build_vocab(X_train_full):
    vocab = {"<PAD>": 0, "<UNK>": 1}
    current_id = 2
    for sequence in X_train_full:
        for token in (token for sublist in sequence for token in sublist) if isinstance(sequence[0], list) else sequence:
            if token not in vocab:
                vocab[token] = current_id
                current_id += 1
    return vocab

# Normalize nested sequences during token loading
def normalize_sequences(sequences):
    return [
        [token for sublist in sequence for token in sublist] if sequence and isinstance(sequence[0], list) else sequence or ["<UNK>"]
        for sequence in sequences
    ]

# Encode sequences using vocabulary
def encode_sequences(sequences, vocab, max_length):
    unk_id = vocab["<UNK>"]
    encoded = []
    for sequence in sequences:
        encoded_sequence = [vocab.get(token, unk_id) for token in sequence]
        if len(encoded_sequence) > max_length:
            encoded_sequence = encoded_sequence[:max_length]
        else:
            encoded_sequence += [vocab["<PAD>"]] * (max_length - len(encoded_sequence))
        encoded.append(encoded_sequence)
    return np.array(encoded)

# Step 1: Load and preprocess the data
X_train_full, y_train_full, X_test, y_test = load_data_for_method("basic")  # Change to method as needed
# ["basic", "ast", "normalize", "syntax", "cfg"]
X_train_full = normalize_sequences(X_train_full)
X_test = normalize_sequences(X_test)

vocab = build_vocab(X_train_full)

max_length = 512 # For sequences
X_train_encoded = encode_sequences(X_train_full, vocab, max_length)
X_test_encoded = encode_sequences(X_test, vocab, max_length)

# Step 2: Create DQN environment
# Assuming `create_env` returns a gym environment for DQN
env = create_env(X_train_encoded, y_train_full)

# Step 3: Initialize the DQN model with hyperparameters
model = DQN(
    "MlpPolicy", 
    env, 
    verbose=1, 
    buffer_size=50000, 
    learning_starts=1000, 
    batch_size=64, 
    gamma=0.97, 
    tau=0.1,
    train_freq=4, 
    target_update_interval=100
)

# Training Accuracy: 96.46%
# Test Accuracy: 52.44%
# ROC AUC: 0.52
# PRC AUC: 0.65
# model = DQN(
#     "MlpPolicy", 
#     env, 
#     verbose=1, 
#     buffer_size=100000,  # Larger buffer to store more experiences
#     learning_starts=5000,  # Delay updates for better initial policy exploration
#     batch_size=128,  # Larger batch size for more stable gradients
#     gamma=0.995,  # Higher discount factor for longer-term rewards
#     tau=0.005,  # Lower tau for smoother target updates
#     train_freq=4, 
#     target_update_interval=100, 
#     exploration_fraction=0.2,  # Prolonged exploration phase
#     exploration_final_eps=0.01,  # Lower final epsilon for better exploitation
#     learning_rate=5e-4,  # Slightly increased learning rate
#     gradient_steps=-1  # Full training per step for stability
# )

# Start of notes for many runs of this code
# Orginal Results before modification - These are all for the basic data
# As shown in directory this report is saved to new_dqn_metrics_output1
# Training Accuracy: 91.52%
# Test Accuracy: 48.78%

# Decrease tau from 0.1 to 0.005 and increase from 100k to 200k time steps got these results:
# Training Accuracy: 95.85%
# Test Accuracy: 46.34%

# Moving tau back from 0.005 and keeping 200k time steps
# This is shown in new_dqn_metrics_output2
# Training Accuracy: 91.28%
# Test Accuracy: 52.44%

# Moving from 200k timesteps to 400k timesteps 
# This is shown in new_dqn_metrics_output3
# Training Accuracy: 95.18%
# Test Accuracy: 56.10%
# ROC AUC: 0.56
# PRC AUC: 0.67

# Moving from 400k timesteps to 800k timesteps - Overtrained
# This is shown in new_dqn_metrics_output4
# Training Accuracy: 95.61%
# Test Accuracy: 50%
# ROC AUC: 0.50
# PRC AUC: 0.62

# Moving from 800k to 500k timesteps - ROC/PRC curves not saved if file folder not mentioned
# Training Accuracy: 94.09%
# Test Accuracy: 48.78%

# Increase gamma from 0.99 to 0.995
# Training Accuracy: 88.96%
# Test Accuracy: 47.56%

# 0.995 with 400k timesteps
# Training Accuracy: 94.70%
# Test Accuracy: 51.22%
# Run 2:
# Training Accuracy: 93.11%
# Test Accuracy: 48.78%

# Change back to 0.99 with 400k timesteps
# Training Accuracy: 94.70%
# Test Accuracy: 54.88%

# Chagne to 0.98 gamma - BEST so far
# Training Accuracy: 95.55%
# Test Accuracy: 58.54%

# At 0.97 gamma -> Test Accuracy: 46.34%

# Up until now it has only been on basic data, with same metrics as "BEST so far" for BASIC using on *ast*
# This is with changing nothing that got the 58.54% for basic data
# ROC AUC: 0.48
# PRC AUC: 0.69 - Terrible performance, worse than 50/50 coin toss

# Now for *normalize*
# Test Accuracy: 39.02%
# ROC AUC: 0.39
# PRC AUC: 0.57

# Now for *syntax*
# Test Accuracy: 52.44%
# ROC AUC: 0.52
# PRC AUC: 0.65 - Slightly better but not great

# Now for *cfg*
# Currently index out of bounds error

# Step 4: Train the DQN model using the training data
model.learn(total_timesteps=400000)

# Step 5: Save the trained model
model.save("trained_dqn_model")

# Step 6: Evaluate the model
train_and_eval_dqn(model, X_train_encoded, y_train_full, X_test_encoded, y_test)

# Step 7: Evaluate metrics: ROC curve, PRC curve
y_pred_prob = model.predict(X_test_encoded, deterministic=True)[0]
fpr, tpr, _ = roc_curve(y_test, y_pred_prob)
roc_auc = auc(fpr, tpr)

precision, recall, _ = precision_recall_curve(y_test, y_pred_prob)
prc_auc = auc(recall, precision)

# Save ROC Curve to file
roc_curve_file = "roc_curve.png"
plt.figure(figsize=(10, 6))
plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], color='gray', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.savefig(roc_curve_file)
plt.close()

# Save Precision-Recall Curve to file
prc_curve_file = "prc_curve.png"
plt.figure(figsize=(10, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PRC curve (area = {prc_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="lower left")
plt.savefig(prc_curve_file)
plt.close()

# Save metrics report to file
metrics_report_file = "metrics_report.txt"
with open(metrics_report_file, 'w') as f:
    f.write(f"ROC AUC: {roc_auc:.2f}\n")
    f.write(f"PRC AUC: {prc_auc:.2f}\n")
    f.write(f"\nROC Curve saved to: {roc_curve_file}\n")
    f.write(f"Precision-Recall Curve saved to: {prc_curve_file}\n")

print(f"Metrics report saved to {metrics_report_file}")