In [None]:
model_name = 'unsloth/Meta-Llama-3.1-8B-Instruct-bnb-4bit'
model_alias = 'llama3.1-8b'

In [None]:
from unsloth import FastLanguageModel
from datasets import load_dataset
import pandas as pd
from tqdm import tqdm
import csv
import re
from pathlib import Path

In [None]:
BASE_DIR = Path('/vol/bitbucket/kza23/finetuning')
WORK_DIR = BASE_DIR / model_alias

In [None]:
max_seq_length = 20 # Choose any! We auto support RoPE Scaling internally!
dtype = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = model_name,
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)

FastLanguageModel.for_inference(model)

In [None]:
results = pd.read_csv(WORK_DIR/f"{model_alias}-outputs.csv")
results.loc[results["real"].isnull(), 'real'] = "There is no vulnearbility"
results

In [None]:
with open(BASE_DIR/'prompts/evaluation.txt', mode='r') as f:
    eval_prompt = f.read()

split_token = eval_prompt.split('\n')[-2]

criteria = []
for idx in range(1, 4):
    with open(BASE_DIR/f'prompts/criterion{idx}.txt', mode='r') as f:
        criteria.append(f.read())


In [None]:
print(eval_prompt)
print(split_token)
for cirterion in criteria:
    print(cirterion)

In [None]:
def run_criteria(query):
    ret_reasoning = ''
    verdicts = []
    for idx, criterion in enumerate(criteria):
        format_input = query.format(criterion).replace('{{', '{').replace('}}', '}')
        inputs = tokenizer(format_input, return_tensors="pt", truncation=True).to("cuda")
        output_tokens = model.generate(
            **inputs,
            max_new_tokens=512,
            pad_token_id=tokenizer.pad_token_id,
            use_cache = True
        )

        decoded_output = tokenizer.decode(
            output_tokens[0],
            skip_special_tokens=True,
            pad_token_id=tokenizer.pad_token_id,
        )

        result = decoded_output.split(f"{split_token}\n")[1].strip()
        parts = result.split('Verdict: ')
        if len(parts) == 2:
            reasoning, verdict = parts
        else:
            verdict = "N/A"
            reasoning = parts[0]

        ret_reasoning += f'Criterion {idx+1}:\n{reasoning.strip()}\n'
        verdicts.append(verdict.strip())
    return verdicts, ret_reasoning

def run_query(data, query, show_tqdm=True):
    queries = data.apply(lambda row: query.format(
        "{}",
        row['real'].replace('\\n', '\n').replace('{', '{{').replace('}', '}}'),
        row['output'].replace('\\n', '\n').replace('{', '{{').replace('}', '}}'),
    ), axis=1)

    if show_tqdm:
        iterator = tqdm(enumerate(queries), total=len(queries))
    else:
        iterator = enumerate(queries)

    for idx, query in iterator:
        real_contains_vuln = data.iloc[idx]["real"] != "There is no vulnearbility"
        output_contains_vuln = data.iloc[idx]["output"] != "There is no vulnearbility"

        if real_contains_vuln != output_contains_vuln:
            yield ["FAIL", "FAIL", "FAIL", "FAIL", "Criterion 0: FAIL\\n One of the descriptions does not contain a vulnerability."]
            continue

        if not real_contains_vuln and not output_contains_vuln:
            yield ["PASS", "PASS", "PASS", "PASS", "No vulnerabilities to compare."]
            continue


        verdicts, reasoning = run_criteria(query)
        verdicts = ["PASS"] + verdicts

        yield verdicts + [reasoning.replace("\n", "\\n")]

In [None]:
with open(WORK_DIR/f"{model_alias}-outputs.csv", mode='w') as f:
    writer = csv.writer(f)
    writer.writerow(["cr0", "cr1", "cr2", "cr3", "Reasoning"])
    for row in run_query(results, eval_prompt):
        print(row)
        writer.writerow(row)