In [None]:
!pip install -U -q datasets transformers torch peft wandb bitsandbytes accelerate

In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer
from peft import LoraConfig, get_peft_model
import torch
import os

In [None]:
dataset = load_dataset("lemon42-ai/Code_Vulnerability_Labeled_Dataset", split="train")

def filter_cpp(sample):
    return sample["code"].startswith("```c++")

dataset = dataset.filter(filter_cpp)
dataset = dataset.remove_columns(["Unnamed: 0"])

In [None]:
def format_instruction(sample):
    return {
        "instruction": "Analyze the following C++ code and classify its vulnerability.",
        "output": sample["label"],
        "code": sample["code"]
    }

dataset = dataset.map(format_instruction)
dataset = dataset.remove_columns(["label"])

In [None]:
model_name = "Qwen/Qwen2.5-Coder-1.5B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

def tokenize(sample):
    inputs = "Instruction: " + sample["instruction"] + "\n\nCode:\n" + sample["code"] + "\n\nResponse: "
    targets = sample["output"]
    tokenized = tokenizer(inputs + targets, truncation=True, max_length=1024, padding="max_length")
    tokenized["labels"] = tokenized["input_ids"].copy()
    return tokenized

tokenized_dataset = dataset.map(tokenize, batched=False)

train_validation_test_split = tokenized_dataset.train_test_split(test_size=0.2, shuffle=True, seed=42)
train_dataset = train_validation_test_split["train"]
validation_test_dataset = train_validation_test_split["test"]
validation_test_split = validation_test_dataset.train_test_split(test_size=0.5, shuffle=True, seed=42)
validation_dataset = validation_test_split["train"]
test_dataset = validation_test_split["test"]

lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.1,
    bias="none",
    task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

In [None]:
args = TrainingArguments(
    output_dir="qwen25_coder_1_5b_instruct",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    learning_rate=2e-4,
    num_train_epochs=1,
    save_strategy="steps",
    save_steps=100,
    eval_strategy="steps",
    eval_steps=100,
    load_best_model_at_end=True,
    greater_is_better=True,
    report_to="wandb",
    fp16=torch.cuda.is_available()
)

os.environ["WANDB_ENTITY"]="VulnRL"

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=validation_dataset,
    tokenizer=tokenizer
)

In [None]:
trainer.train()
model.save_pretrained("qwen25_coder_1_5b_instruct")
tokenizer.save_pretrained("qwen25_coder_1_5b_instruct")

In [None]:
def get_prediction(prompt):
    messages = [
        {"role": "system", "content": "Analyze the following C++ code and classify its vulnerability."},
        {"role": "user", "content": prompt}
    ]
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=1024
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    return tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]