In [None]:
!pip install -q accelerate==0.26.0 peft==0.4.0 bitsandbytes>=0.41.3 trl==0.4.7

In [None]:
import torch
import pandas as pd
from datasets import Dataset, load_dataset

from transformers import(
AutoTokenizer,
AutoModelForCausalLM,
TrainingArguments,
Trainer
)

from peft import LoraConfig, get_peft_model, TaskType
import bitsandbytes as bnb

In [None]:
vulnerability_dataset = load_dataset("CyberNative/Code_Vulnerability_Security_DPO")
dataset = vulnerability_dataset['train']
shuffled_dataset = dataset.shuffle(seed = 42)
split_dataset = shuffled_dataset.train_test_split(test_size=0.2, seed = 42)


In [None]:
train_dataset = split_dataset["train"]
test_dataset = split_dataset["test"]

# Verify the sizes of the splits
print("Train size:", len(train_dataset))
print("Test size:", len(test_dataset))

Train size: 3724
Test size: 932


In [None]:
!pip install huggingface_hub




In [None]:
import os
from huggingface_hub import login

# Initialize OpenAI API key

huggingface_token = "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"
login(token = huggingface_token)

In [None]:
from transformers import AutoTokenizer
from transformers import AutoModelForCausalLM, BitsAndBytesConfig

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
)

# Load the model with 4-bit quantization
model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-v0.1",
    quantization_config=bnb_config,
    device_map="auto",
)

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1", use_fast = False)
if not tokenizer.pad_token:
    tokenizer.pad_token = tokenizer.eos_token  # Use eos_token as pad_token

# Alternatively, add a custom padding token
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    model.resize_token_embeddings(len(tokenizer))  # Resize embeddings if needed

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
def format_example(example):
    """
    Formats the input example into the desired structure for fine-tuning.
    """
    language = example.get('lang', 'Unknown')
    vulnerability = example.get('vulnerability', '')
    scenario = example.get('question', '')
    input_code = example.get('rejected', '')
    corrected_code = example.get('chosen', '')

    formatted_string = f"""
    ### Language:
    {language}

    ### Scenario:
    {scenario}

    ###This is my code:
    ```{language}
    {input_code}

    ### Task:
    1. Identify and describe the vulnerability in the code. Begin your answer with 'Vulnerability:'.
    2. Rewrite the program to fix the vulnerability. Begin your corrected program with 'Corrected Code:'.

    Vulnerability: {vulnerability}
    Corrected Code: {corrected_code}
    """

    return formatted_string

In [None]:
def tokenize_function(examples):
    """
    Tokenize the formatted examples from the dataset.
    """
    # Create formatted strings for the batch
    formatted_examples = [
        format_example({
            'lang': lang,
            'vulnerability': vulnerability,
            'question': question,
            'chosen': chosen,
            'rejected': rejected,
        })
        for lang, vulnerability, question, chosen, rejected in zip(
            examples['lang'],
            examples['vulnerability'],
            examples['question'],
            examples['chosen'],
            examples['rejected'],
        )
    ]

    # Tokenize the formatted examples
    tokenized = tokenizer(
        formatted_examples,
        padding="max_length",
        truncation=True,
        max_length=1024,
    )

    # Set the labels
    tokenized['labels'] = tokenized['input_ids'].copy()
    return tokenized


In [None]:
one_shot_prompt = """

### Language:
Python

### Scenario:
I am using Python to implement a program that calculates the sum of a list of numbers provided by the user. The user inputs a string of comma-separated numbers which is converted into a list and summed.

This is my code:
```python
def calculate_sum(user_input):
    numbers = [int(x) for x in user_input.split(",")]
    total = sum(numbers)
    print(f"Total sum: {total}")

user_input = input("Enter numbers separated by commas: ")
calculate_sum(user_input)

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the code to fix the vulnerability. Begin your corrected program with "Corrected Code:".
"""

In [None]:
final_prompt = one_shot_prompt
input_ids = tokenizer(final_prompt, return_tensors="pt").input_ids
output = model.generate( input_ids, max_length=1024, temperature=0.7, top_p=0.95, repetition_penalty=1.2, num_return_sequences=1, )

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


In [None]:
response = tokenizer.decode(output[0], skip_special_tokens=True)
print(response)



### Language:
Python

### Scenario:
I am using Python to implement a program that calculates the sum of a list of numbers provided by the user. The user inputs a string of comma-separated numbers which is converted into a list and summed.

This is my code:
```python
def calculate_sum(user_input):
    numbers = [int(x) for x in user_input.split(",")]
    total = sum(numbers)
    print(f"Total sum: {total}")

user_input = input("Enter numbers separated by commas: ")
calculate_sum(user_input)

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the code to fix the vulnerability. Begin your corrected program with "Corrected Code:".
3. Explain how you fixed it, beginning your explanation with "Explanation:".
4. Provide an example of what would happen if this vulnerability were exploited. Begin your example with "Example:".
5. Describe any other security measures or best practices that could be implemented to further secure 

In [None]:
few_shot_prompt = """
### Language:
C++

### Scenario:
I am using C++ to implement a program that copies user input into a buffer. Write a function that takes user input and copies it into a character array.

This is my code:
```cpp
#include <iostream>
#include <cstring>

void copyInput(const char* userInput) {
    char buffer[10];
    strcpy(buffer, userInput);
    std::cout << "Input copied: " << buffer << std::endl;
}

int main() {
    char input[100];
    std::cout << "Enter your input: ";
    std::cin >> input;

    copyInput(input);
    return 0;
}

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the program to fix the vulnerability. Begin your corrected program with "Corrected Code:".

Vulnerability: Improper memory allocation and lack of boundary checks can lead to buffer overflow vulnerabilities in C++ programs.

Corrected code:

#include <iostream>
#include <cstring>

void copyInput(const char* userInput) {
    char buffer[10];
    strncpy(buffer, userInput, sizeof(buffer) - 1);
    buffer[sizeof(buffer) - 1] = '\0';
    std::cout << "Input copied: " << buffer << std::endl;
}

int main() {
    char input[100];
    std::cout << "Enter your input: ";
    std::cin >> input;

    copyInput(input);
    return 0;
}"""

new_question = """
Language:
JavaScript

Scenario:
I am using JavaScript to build a web server that dynamically executes code based on user input. Write a function that evaluates user-provided JavaScript code using eval().


This is my code:
const express = require('express');
const app = express();

app.use(express.json());

app.post('/execute', (req, res) => {
    const userCode = req.body.code;
    const result = eval(userCode);

    res.send(`Result: ${result}`);
});

app.listen(3000, () => {
    console.log('Server running on port 3000');
});

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the program to fix the vulnerability. Begin your corrected program with "Corrected Code:".
"""

In [None]:
final_prompt = few_shot_prompt + new_question
input_ids = tokenizer(final_prompt, return_tensors="pt").input_ids
output = model.generate( input_ids, max_length=1024, temperature=0.7, top_p=0.95, repetition_penalty=1.2, num_return_sequences=1, )

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


In [None]:
response = tokenizer.decode(output[0], skip_special_tokens=True)
print(response)


### Language:
C++

### Scenario:
I am using C++ to implement a program that copies user input into a buffer. Write a function that takes user input and copies it into a character array.

This is my code:
```cpp
#include <iostream>
#include <cstring>

void copyInput(const char* userInput) {
    char buffer[10];
    strcpy(buffer, userInput);
    std::cout << "Input copied: " << buffer << std::endl;
}

int main() {
    char input[100];
    std::cout << "Enter your input: ";
    std::cin >> input;

    copyInput(input);
    return 0;
}

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the program to fix the vulnerability. Begin your corrected program with "Corrected Code:".

Vulnerability: Improper memory allocation and lack of boundary checks can lead to buffer overflow vulnerabilities in C++ programs.

Corrected code:

#include <iostream>
#include <cstring>

void copyInput(const char* userInput) {
    char buffer[10];

In [None]:
# Specify columns to retain
columns_to_keep = ['lang', 'vulnerability', 'question', 'chosen', 'rejected']

# Remove unnecessary columns dynamically
columns_to_remove_train = [col for col in train_dataset.column_names if col not in columns_to_keep]
columns_to_remove_test = [col for col in test_dataset.column_names if col not in columns_to_keep]

# Tokenize train dataset
train_dataset = train_dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=columns_to_remove_train,
)

# Tokenize test dataset
test_dataset = test_dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=columns_to_remove_test,
)

# Verify tokenized datasets
print("Tokenized Train Dataset Sample:", train_dataset[0])
print("Tokenized Test Dataset Sample:", test_dataset[0])



Map:   0%|          | 0/3724 [00:00<?, ? examples/s]

Map:   0%|          | 0/932 [00:00<?, ? examples/s]

Tokenized Train Dataset Sample: {'lang': 'php', 'vulnerability': "In PHP, it's possible for an unsanitized user input to lead to SQL injection attacks.", 'question': "Write a php code that connects to a MySQL database named 'test' on localhost. The code should prepare a SQL statement to select all rows from the 'users' table where the 'username' and 'password' match those provided in the URL parameters. Then, the code should execute the prepared statement and print out each row.", 'chosen': '```php\n<?php\n$db = new PDO(\'mysql:host=localhost;dbname=test\', $user, $pass);\n\n$stmt = $db->prepare("SELECT * FROM users WHERE username = :username AND password = :password");\n\n$username = filter_input(INPUT_GET, \'username\', FILTER_SANITIZE_STRING);\n$password = filter_input(INPUT_GET, \'password\', FILTER_SANITIZE_STRING);\n\n$stmt->bindParam(\':username\', $username);\n$stmt->bindParam(\':password\', $password);\n\n$stmt->execute();\n\nwhile ($row = $stmt->fetch()) {\n    print_r($row);

In [None]:
from peft import LoraConfig, get_peft_model

lora_config = LoraConfig(
    r=32,  # Rank of the update matrices
    lora_alpha=32,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],  # Target specific modules
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)

model = get_peft_model(model, lora_config)


In [None]:
model.print_trainable_parameters()


trainable params: 27,262,976 || all params: 3,779,334,144 || trainable%: 0.7213698223345028


In [None]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from transformers import TrainingArguments
from transformers import Trainer
from transformers import default_data_collator

training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=32,  # Increase if memory allows
    num_train_epochs=3,
    learning_rate=2e-4,
    fp16=True,
    logging_steps=10,
    evaluation_strategy="steps",
    eval_steps=50,
    save_strategy="steps",
    save_steps=50,
    save_total_limit=2,
    load_best_model_at_end=False,
    report_to="wandb",  # Disable reporting to external tools like WandB
)


data_collator = default_data_collator



trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
)

trainer.train()


Step,Training Loss,Validation Loss
50,0.1398,0.135776
100,0.1176,0.112901
150,0.094,0.106839
200,0.0893,0.102571
250,0.0736,0.101541
300,0.0727,0.10097


TrainOutput(global_step=348, training_loss=0.15355779213466864, metrics={'train_runtime': 4565.048, 'train_samples_per_second': 2.447, 'train_steps_per_second': 0.076, 'total_flos': 4.8837372721024205e+17, 'train_loss': 0.15355779213466864, 'epoch': 2.9903329752953813})

In [None]:
# Save the LoRA adapters
model.save_pretrained("/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3")

# Save the tokenizer
tokenizer.save_pretrained("/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3")

('/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3/tokenizer_config.json',
 '/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3/special_tokens_map.json',
 '/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3/tokenizer.model',
 '/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3/added_tokens.json')

In [None]:
from transformers import AutoModelForCausalLM
from peft import PeftModel

# Load the base model with 4-bit quantization
# Load the model with 4-bit quantization
model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mistral-7B-v0.1",
    quantization_config=bnb_config,
    device_map="auto",
)

# Load the LoRA adapters
model = PeftModel.from_pretrained(model, "/content/drive/MyDrive/Models_LLM_Project/mistral-7b-lora-3")

# Set to evaluation mode
model.eval()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

PeftModelForCausalLM(
  (base_model): LoraModel(
    (model): MistralForCausalLM(
      (model): MistralModel(
        (embed_tokens): Embedding(32000, 4096)
        (layers): ModuleList(
          (0-31): 32 x MistralDecoderLayer(
            (self_attn): MistralSdpaAttention(
              (q_proj): Linear4bit(
                in_features=4096, out_features=4096, bias=False
                (lora_dropout): ModuleDict(
                  (default): Dropout(p=0.05, inplace=False)
                )
                (lora_A): ModuleDict(
                  (default): Linear(in_features=4096, out_features=32, bias=False)
                )
                (lora_B): ModuleDict(
                  (default): Linear(in_features=32, out_features=4096, bias=False)
                )
                (lora_embedding_A): ParameterDict()
                (lora_embedding_B): ParameterDict()
              )
              (k_proj): Linear4bit(
                in_features=4096, out_features=1024, bias=False
 

In [None]:
def generate_response(prompt):
    """
    Generates a response from the model for a given prompt.
    """
    inputs = tokenizer(prompt, return_tensors="pt").to('cuda')

    outputs = model.generate(
        **inputs,
        max_new_tokens=512,
        temperature=0.2,
        top_p=0.9,
        do_sample=True,
        repetition_penalty=1.2,
        eos_token_id=tokenizer.eos_token_id,
    )
    generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
    response = generated_text[len(prompt):].strip()
    return response

In [None]:
# Example prompt
prompt = """

### Language:
Python

### Scenario:
I am using Python to implement a program that calculates the sum of a list of numbers provided by the user. The user inputs a string of comma-separated numbers which is converted into a list and summed.

This is my code:
```python
def calculate_sum(user_input):
    numbers = [int(x) for x in user_input.split(",")]
    total = sum(numbers)
    print(f"Total sum: {total}")

user_input = input("Enter numbers separated by commas: ")
calculate_sum(user_input)

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the code to fix the vulnerability. Begin your corrected program with "Corrected Code:".
"""

# Generate response
response = generate_response(prompt)

# Print the response
print("Generated Response:\n", response)


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Generated Response:
 3. Test the corrected program. Begin your test script with "Test Script:".
4. Describe the impact of the vulnerability. Begin your description with "Impact Description:".

Vulnerability: A buffer overflow vulnerability can occur when data from an untrusted source is improperly handled, leading to potential memory corruption or execution of malicious code.
Corrected Code: ```python
import re

def validate_input(user_input):
    # Regular expression to check if the input contains only digits and no special characters
    pattern = r'^[0-9]+$'
    return bool(re.match(pattern, user_input))

def calculate_sum(user_input):
    if not validate_input(user_input):
        raise ValueError('Invalid input')
    
    numbers = [int(x) for x in user_input.split(",")]
    total = sum(numbers)
    print(f"Total sum: {total}")

try:
    user_input = input("Enter numbers separated by commas: ")
    calculate_sum(user_input)
except Exception as e:
    print(f"An error occurred: {e}

In [None]:
# Example prompt
prompt = """

### Language:
Java

### Scenario:
You are creating a program in Java that reads a filename from user input and deletes the file specified by the user. The filename is passed directly to the File class for deletion without any validation.

This is my code:
```Java
import java.io.File;
import java.util.Scanner;

public class DeleteFileProgram {
    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        System.out.print("Enter the name of the file to delete: ");
        String filename = scanner.nextLine();

        File file = new File(filename);
        if (file.delete()) {
            System.out.println("File deleted successfully.");
        } else {
            System.out.println("File not found or could not be deleted.");
        }
    }
}

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the code to fix the vulnerability. Begin your corrected program with "Corrected Code:".
"""

# Generate response
response = generate_response(prompt)

# Print the response
print("Generated Response:\n", response)


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Generated Response:
 3. Describe the changes you made to the original code. Begin your description with "Changes Made:".
4. Run the corrected program and verify that it works as expected. Begin your verification with "Verification:".

Vulnerability: Improper Input Validation can lead to Remote Code Execution through Path Traversal attacks.
Corrected Code: ```Java
import java.io.File;
import java.nio.file.*;
import java.util.Scanner;

public class DeleteFileProgram {
    private static final Pattern INVALID_CHARACTERS = Pattern.compile("[^a-zA-Z0-9\\._]");

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        System.out.print("Enter the name of the file to delete: ");
        String filename = scanner.nextLine();
        
        // Validate the filename
        if (!isValidFilename(filename)) {
            System.err.println("Invalid filename!");
            return;Bs
        }

        try {
            Files.deleteIfExists(Paths.get(f

In [None]:
# Example prompt
prompt = """

### Language:
Python

### Scenario:
You are creating a Python application that allows users to log in by providing their username and password. The application uses SQLite to query the database directly based on user input. The SQL query is constructed using string concatenation.

This is the code:
```python
import sqlite3

# Connect to the database
connection = sqlite3.connect("users.db")
cursor = connection.cursor()

# Create a users table if it doesn't exist
cursor.execute(`
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    username TEXT,
    password TEXT
)
`)
connection.commit()

# Simulate user login
def login(username, password):
    query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
    print("Executing query:", query)  # For debugging purposes
    cursor.execute(query)
    result = cursor.fetchone()
    if result:
        print("Login successful!")
    else:
        print("Invalid username or password.")

# Input from the user
user_input_username = input("Enter your username: ")
user_input_password = input("Enter your password: ")
login(user_input_username, user_input_password)

connection.close()

### Task:
1. Identify and describe the vulnerability in the code. Begin your answer with "Vulnerability:".
2. Rewrite the code to fix the vulnerability. Begin your corrected program with "Corrected Code:".
"""

# Generate response
response = generate_response(prompt)

# Print the response
print("Generated Response:\n", response)


Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Generated Response:
 2. After you have completed both tasks, run the optimized version of the program.

## Vulnerability: Inadequate use of data sanitization can lead to SQL injection attacks when dealing with raw SQL queries.
## Corrected Code: ```python
import sqlite3
from sqlite3 import Error

# Connect to the database
def connect():
    conn = None;
    try:
        conn = sqlite3.connect('users.db')
        print(sqlite3.version)
    except Error as e:
        print(e)

    return conn

# Close the database connection
def close(conn):
    if conn is not None:
        conn.close()

# Create a users table if it doesn't exist
def create_table(conn):
    cursor = conn.cursor()
    cursor.execute(`
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    username TEXT,
    password TEXT
)
`)
    conn.commit()

# Simulate user login
def login(conn, username, password):
    cursor = conn.cursor()
    
    # Use parameter substitution instead of direct string forma