# Prompt Engineering using CodeLlamma API


In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, CodeLlamaTokenizer, BitsAndBytesConfig
from accelerate import init_empty_weights, load_checkpoint_and_dispatch
import transformers
import torch

# Assuming you're using CUDA, set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = 'cpu'
print(f'Available device: {device}')


# most lightweight model of CodeLlama for instruction prompt
# base_model = "codellama/CodeLlama-7b-Instruct-hf"
# base_model = "codellama/CodeLlama-7b-hf"
# base_model = 'QuantFactory/CodeLlama-7b-hf-GGUF'
# model_id = '/Users/guru/research/LLMs/CodeLlama-70-Instruct-hf'
base_model = 'models/CodeLLama-7b-quantized-4bit'
output_dir = 'models/patch-code-llama/checkpoint-400'

# tokenizer = AutoTokenizer.from_pretrained(base_model, use_fast=True)
tokenizer = CodeLlamaTokenizer.from_pretrained(base_model)

In [None]:

# Choose between 4-bit or 8-bit quantization
use_4bit = True  # Set to False for 8-bit quantization

# Configure quantization
quantization_config = BitsAndBytesConfig(
    load_in_4bit=use_4bit,  
    # load_in_8bit_fp32_cpu_offload=not use_4bit
)

# Initialize the model with empty weights
with init_empty_weights():
    model = AutoModelForCausalLM.from_pretrained(
        base_model, 
        quantization_config=quantization_config,
        device_map='auto',
        )

In [None]:
if use_4bit:
    model.save_pretrained('models/Model-7b-quantized-4bit')
    tokenizer.save_pretrained('models/Model-7b-quantized-4bit')
else:
    model.save_pretrained('models/Model-7b-quantized-8bit')
    tokenizer.save_pretrained('models/Model-7b-quantized-8bit')

In [None]:
# instruct_model= AutoModelForCausalLM.from_pretrained(
#     'models/CodeLLama-Debug', 
#     quantization_config=quantization_config, 
#     device_map='auto'
#     )
# model_quantized

In [None]:
from peft import PeftModel, PeftConfig

instruct_model = 'models/CodeLLama-Debug'



instruct_model = PeftModel.from_pretrained(
    model=model,
    model_id=instruct_model,
    device_map='auto',
    )

# Load the dataset

In [None]:
from datasets import load_dataset

# Load ir1xor1
dataset = load_dataset("ASSERT-KTH/repairllama-datasets", "ir1xor1")
# Load irXxorY
# dataset = load_dataset("ASSERT-KTH/repairllama-dataset", "irXxorY")

def add_question(example):
    """ Add a new feature- question to the dataset """
    if 'question' not in example:
        example['question'] = 'What is the fix version of the code for the following vulnerability?'
    return example

def prepare_examples(dataset):
    """ Similarize the dataset by adding a question to the dataset  and renaming the columns"""
    dataset = dataset.map(add_question)
    # rename the columns
    dataset = dataset.rename_column('input', 'vulnerable')
    dataset = dataset.rename_column('output', 'fix')
    return dataset


dataset = prepare_examples(dataset)
print(dataset)

In [None]:
def check_model_precision(model):
    param_precisions = set()
    buffer_precisions = set()
    
    # Check parameter precisions
    for param in model.parameters():
        param_precisions.add(param.dtype)
    
    # Check buffer precisions
    for buffer in model.buffers():
        buffer_precisions.add(buffer.dtype)
    
    print("Parameter precisions:", param_precisions)
    print("Buffer precisions:", buffer_precisions)
    # return param_precisions, buffer_precisions

def is_model_quantized(model):
    return any(param.dtype == torch.qint8 for param in model.parameters())
    # return any(param.dtype == torch.qint4 for param in model.parameters())


# NOTE:  Since PyTorch does not have a native 4-bit floating-point or integer data type, 
# libraries like bitsandbytes handle 4-bit quantization internally. 
# Check the model's precision levels

check_model_precision(model)
is_model_quantized(model)

# Evaluation code 

In [None]:
from transformers import GenerationConfig
import pandas as pd
import evaluate
from transformers import GenerationConfig
from codebleu import calc_codebleu
from tabulate import tabulate
from logging import getLogger
from configparser import ConfigParser

dash_line = "-" * 100


log = getLogger(__name__)


def generate_text(model, tokenizer, prompt):
    # Tokenize and move to device
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids
    input_ids = input_ids.to(device)

    # attention_mask = input_ids.ne(tokenizer.pad_token_id).long().to(device)

    # print('generating..')

    # Generate text
    model_output = model.generate(
        input_ids=input_ids,
        generation_config=GenerationConfig(
            max_new_tokens=512,
            # Optional: tweak other parameters for speed
            do_sample=True,  # sampling instead of greedy decoding
            temperature=0.7,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id,
        ),
    )
    # print('decoding...')
    # Decode the generated text
    text_output = tokenizer.decode(
        model_output[0], skip_special_tokens=True
    )
    
    return text_output

def generate_fixes(
    original_model,
    instruct_model,
    tokenizer,
    dataset,
    result_csv,
):
    """" Generate fixes for a list of vulnerables using a model """
    human_baseline_fixes = dataset['test']['fix'][0:4]
    programming_languages = len(human_baseline_fixes) * ['Java']
    original_model_fixes = []
    instruct_model_fixes = []

    for _, vulnerable in enumerate(dataset['test']['vulnerable'][0:4]):
        prompt = f"""
                    Generation the fix for the following vulnerable code:
                    Vulnerable:

                    {vulnerable}

                    fix: \n"""
        # print(prompt)
    
        original_model_text_output = generate_text(original_model, tokenizer, prompt)
        # print(original_model_text_output)
        # print(dash_line)   

        original_model_fixes.append(original_model_text_output)


        instruct_model_text_output = generate_text(original_model, tokenizer, prompt)
        # print(instruct_model_text_output)
        # print(dash_line)

        instruct_model_fixes.append(instruct_model_text_output)

    df = pd.DataFrame(
        zip(
            human_baseline_fixes,
            original_model_fixes,
            instruct_model_fixes,
            programming_languages,
        ),
        columns=[
            "human_baseline_fixes",
            "original_model_fixes",
            "instruct_model_fixes",
            "programming_language",
        ],
    )
    df.to_csv(result_csv, index=False)
    log.info(dash_line)
    log.info(f"Results of vul-fix-training saved to {result_csv}")
    log.info(dash_line)
    log.info("Sample of the results:")
    log.info(df.head())
    log.info(dash_line)
    return df

# generate_fixes(
#     model,
#     model,
#     tokenizer,
#     dataset,
#     'results.csv',
# )

In [None]:
prompt = """
            Generation the fix for the following vulnerable C code, the vulnerable code is a division by zero error:
            Vulnerable:

            public class Test {
                public static void main(String[] args) {
                    int a = 10;
                    int b = 0;
                    int c = a / b;
                    System.out.println(c);
                }
            }

            fix: \n
        """

print(generate_text(model, tokenizer, prompt))

In [None]:
from peft import PeftModel
from transformers import AutoModelForCausalLM, BitsAndBytesConfig, AutoTokenizer
import gc
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    DataCollatorForSeq2Seq,
)
from peft import (
    LoraConfig,
    get_peft_model,
    get_peft_model_state_dict,
    # prepare_model_for_int8_training,
    prepare_model_for_kbit_training,
    set_peft_model_state_dict,
)
import sys
import os
from datetime import datetime
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
import transformers
import torch

output_dir = "models/Model-7b-quantized-4bit"
# base_model = "codellama/CodeLlama-7b-Instruct-hf"
# model = AutoModelForCausalLM.from_pretrained(
#     base_model,
#     # load_in_8bit=True,
#     torch_dtype=torch.float32,
#     device_map="auto",
# )
# tokenizer = AutoTokenizer.from_pretrained(base_model)

# To load a fine-tuned Lora/Qlora adapter use PeftModel.from_pretrained.
# output_dir should be something containing an adapter_config.json and adapter_model.bin:
model = PeftModel.from_pretrained(model, output_dir)


# # 8. Evaluate the model
# eval_prompt = generate_eval_prompt(dataset["test"][0])
# model_input = tokenizer(eval_prompt, return_tensors="pt").to("cuda")

# model.eval()
# with torch.no_grad():
#     print(tokenizer.decode(model.generate(**model_input,
#           max_new_tokens=100)[0], skip_special_tokens=True))

# Tokenization

In [None]:
tokenizer.add_eos_token = True
tokenizer.pad_token_id = 0
tokenizer.padding_side = "left"

def tokenize(prompt):
    result = tokenizer(
        prompt,
        truncation=True,
        max_length=512,
        padding=False,
        return_tensors=None,
    )

    # "self-supervised learning" means the labels are also the inputs:
    result["labels"] = result["input_ids"].copy()

    return result


def generate_and_tokenize_prompt(data_point):
    full_prompt =f"""You are a powerful code-fixing model. Your job is to analyze and fix vulnerabilities in code. You are given a snippet of vulnerable code and its context.

You must output the fixed version of the code snippet.

### Input:
{data_point["question"]}

### Context:
{data_point["context"]}

### Response:
{data_point["answer"]}
"""
    return tokenize(full_prompt)

Reformat to prompt and tokenize each sample:

In [None]:
# tokenized_train_dataset = dataset.map(generate_and_tokenize_prompt)
dataset['test'].map(generate_and_tokenize_prompt)

# Setup Lora

In [None]:
from datetime import datetime
import os
import sys

import torch
from peft import (
    LoraConfig,
    get_peft_model,
    get_peft_model_state_dict,
    # prepare_model_for_int8_training,
    prepare_model_for_kbit_training,
    set_peft_model_state_dict,
)
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq


In [None]:
model.train() # put model back into training mode
# model = prepare_model_for_int8_training(model)
model = prepare_model_for_kbit_training(model)

config = LoraConfig(
    r=16,
    lora_alpha=16,
    target_modules=[
    "q_proj",
    "k_proj",
    "v_proj",
    "o_proj",
],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)
model = get_peft_model(model, config)


In [None]:
def generate_chat_response(model, tokenizer, device, chat, max_new_tokens=200):

   inputs = tokenizer.apply_chat_template(chat, return_tensors="pt").to(device)
   output = model.generate(input_ids=inputs, max_new_tokens=max_new_tokens)
   output = output[0].to(device)
   return tokenizer.decode(output)


chat = [
   {"role": "system", "content": "You are a helpful and honest code assistant expert in JavaScript. Please, provide all answers to programming questions in JavaScript"},
   {"role": "user", "content": "Write a function that computes the set of sums of all contiguous sublists of a given list."},
]
response = generate_chat_response(model, tokenizer, device, chat)
print(response)


# RepairLLama

In [None]:
vul = dataset['test'][0]['vulnerable']
patch = dataset['test'][0]['fix']

prompt = f"Fix the vulnerability in the following code:\n{vul}\n\nPatch:\n{patch}"


chat = [
   {"role": "system", "content": "You are a helpful and honest code assistant expert in Python. Please, provide all answers to programming questions in C"},
   {"role": "user", "content": prompt}]

response = generate_chat_response(model, tokenizer, device, chat, 200)
# print(response)

# Fine-tuning CodeLLama model

In [None]:
tokenizer.add_eos_token = True
tokenizer.pad_token_id = 0
tokenizer.padding_side = "left"

In [None]:
def tokenize(prompt):
    result = tokenizer(
        prompt,
        truncation=True,
        max_length=512,
        padding=False,
        return_tensors=None,
    )

    # "self-supervised learning" means the labels are also the inputs:
    result["labels"] = result["input_ids"].copy()

    return result


In [None]:
dataset['train'][0]['vulnerable']

In [None]:
print(torch.cuda.memory_summary(device=None, abbreviated=False))

In [None]:
import torch
import gc


model.train() # put model back into training mode
model = prepare_model_for_kbit_training(model)

config = LoraConfig(
    r=16,
    lora_alpha=16,
    target_modules=[
    "q_proj",
    "k_proj",
    "v_proj",
    "o_proj",
],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)
model = get_peft_model(model, config)

# Preparing dataset for CodeLLama fine-tuning

In [None]:
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=num_train_epochs,
        per_device_train_batch_size=per_device_train_batch_size,
        gradient_accumulation_steps=gradient_accumulation_steps,
        optim="paged_adamw_32bit",
        save_steps=100,
        logging_steps=100,
        learning_rate=float(config['fine_tuning']['learning_rate']),
        evaluation_strategy="steps",
        eval_steps=100,
        fp16=True,
        bf16=False,
        group_by_length=True,
        logging_strategy="steps",
        save_strategy="no",
        gradient_checkpointing=False,
    )

    trainer = Trainer(
        model=model,
        train_dataset=tokenized_train_dataset,
        eval_dataset=tokenized_val_dataset,
        args=training_args,
        data_collator=default_data_collator,
    )

    old_state_dict = model.state_dict
    model.state_dict = (lambda self, *_, **__: get_peft_model_state_dict(self, old_state_dict())).__get__(
        model, type(model)
    )

    # Train and save the model
    trainer.train()

    trainer.model.save_pretrained(output_dir)
    trainer.save_model(output_dir)
    log.info("Fine-Tuning Completed!")
    log.info("Model saved to:", output_dir)
    log.info("=" * 50)
    return trainer, model, tokenizer