This Python notebook shows the fine-tuning process of a Large Language Model (LLM) using Quantized Low Rank Adaptation (QLoRA) for AI-powered automatic completion of Solidity code. It is also shown how to obtain good hyperparameters with the Ray Tune hyperparameter optimization library and how the fine-tuned model is evaluated against the base model with the evaluation metrics Perplexity, BLEU and METEOR.

In [None]:
%pip install transformers[sentencepiece]

In [None]:
%pip install requests

In [None]:
%pip install tensorrt

In [None]:
!pip install datasets
!pip install transformers[torch]
!pip install peft
!pip install -U bitsandbytes
!pip install flash-attn --no-build-isolation
!pip install ray
!pip install ray[tune]
!pip install optuna
!pip install wandb
!pip install evaluate
!pip install trl==0.14.0
!pip install nltk

In [None]:
!pip install numba

In [None]:
!wandb login

In [None]:
import ray

ray.shutdown()
ray.init(log_to_driver=False, ignore_reinit_error=True, local_mode=True)

In [None]:
import torch
import pandas as pd
import os
import random

#Loading Solidity Dataset

In [None]:
from datasets import load_dataset

train_list = [f"/.../.../Train/solidity_code_{i}.sol" for i in range(1, 18119)]
valid_list = [f"/.../.../Valid/solidity_code_{i}.sol" for i in range(1, 2001)]
test_list = [f"/.../.../Test/solidity_code_{i}.sol" for i in range(1, 2001)]

sol_dataset = load_dataset('text', data_files={'train': train_list, 'validation': valid_list, 'test': test_list}, num_proc=32)   # num_proc allows for multiprocessing, which speeds up processing by parallelizing processes on the CPU. This drastically speeds up the generation of the splits

In [None]:
sol_dataset

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 18118
    })
    validation: Dataset({
        features: ['text'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['text'],
        num_rows: 2000
    })
})

In [None]:
# Some additional changes in the dataset

# def add_newline(example):
#     end_tokens = ['}[END_INT]', '}[END_CON]', '}[END_LIB]', '}[END_VUL_INT]', '}[END_VUL_CON]', '}[END_VUL_LIB]']
#     if example['text'] not in end_tokens:
#         updated_example = example['text'] + '\n'
#         return {"text": updated_example}
#     else:
#         return {"text": example['text']}

# updated_dataset = sol_dataset.map(add_newline)

def replace_tokens(example):
    updated_example = example['text'].replace("\\n", "\n").replace("<|vulnerable_function|>\n", '')
    updated_example = updated_example.replace("\\t", "\t").replace("<|vulnerable_constructor|>\n", '')
    updated_example = updated_example.replace("<|secure_function|>\n\t", "<|secure_function|>\t")
    updated_example = updated_example.replace("<|secure_constructor|>\n\t", "<|secure_constructor|>\t")
    updated_example = updated_example.replace("<|secure_function|>\n", "<|secure_function|>")
    updated_example = updated_example.replace("<|secure_constructor|>\n", "<|secure_constructor|>")
    updated_example = updated_example + '<｜end▁of▁sentence｜>'
    updated_example = updated_example.replace("\n\t\t<｜end▁of▁sentence｜>", "<｜end▁of▁sentence｜>")
    return {"text": updated_example}

updated_dataset = sol_dataset.map(replace_tokens)

In [None]:
print(updated_dataset['train'][343]['text'])

<|fim_begin|>	function removeAllFee() private {
		if (_taxFee == 0 && _teamFee == 0) return;
<|fim_hole|>
		// reentrancy-benign vulnerability
		_teamFee = 0;
	}<|fim_end|>		// reentrancy-benign vulnerability
		_taxFee = 0;<｜end▁of▁sentence｜>


In [None]:
from datasets import ClassLabel
from IPython.display import display, HTML

# Randomly picks num_examples from the dataset and displays them
def show_random_elements(dataset, num_examples=10):
    assert num_examples <= len(dataset), "Can't pick more elements than there are in the dataset!"
    picks = []
    for _ in range(num_examples):
        pick = random.randint(0, len(dataset)-1)
        while pick in picks:
            pick = random.randint(0, len(dataset)-1)
        picks.append(pick)

    df = pd.DataFrame(dataset[picks])
    for column, typ in dataset.features.items():
        if isinstance(typ, ClassLabel):
            df[column] = df[column].transform(lambda i: typ.names[i])
    display(HTML(df.to_html()))

In [None]:
show_random_elements(updated_dataset['test'])

#Dataset Tokenization

In [None]:
from transformers import AutoTokenizer

# model_checkpoint = "distilgpt2"
# model_checkpoint = 'codeparrot/codeparrot-small'
# model_checkpoint = 'Salesforce/codegen-350M-mono'
# model_checkpoint = 'huggingface/CodeBERTa-small-v1'
# model_checkpoint = 'Salesforce/codet5-small'
# model_checkpoint = 'bigcode/starcoder'
# model_checkpoint = 'bigcode/starcoderbase-1b'
# model_checkpoint = 'bigcode/starcoder2-3b'
# model_checkpoint = 'codellama/CodeLlama-7b-hf'
model_checkpoint = 'deepseek-ai/deepseek-coder-1.3b-base'

# !huggingface-cli login

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, is_fast=True)

In [None]:
# if tokenizer.model_max_length > 100000:
tokenizer.model_max_length = 256

special_tokens = {
    "additional_special_tokens": ['<|secure_function|>',
                                  '<|secure_constructor|>',
                                  '<|fim_begin|>',
                                  '<|fim_end|>',
                                  '<|fim_hole|>'
                                  ]
}

tokenizer.add_special_tokens({'pad_token': '[PAD]'})
tokenizer.add_special_tokens(special_tokens)

tokenizer.padding_side = "right"

def tokenize_function(examples):
    result = tokenizer(examples["text"], truncation=True, padding=True)
    return result

# batched allows for batch processing; standard batch size if not explicitly specified is 1000
sol_dataset_tokenized = updated_dataset.map(tokenize_function, batched=True, num_proc=12, remove_columns=['text'])

In [None]:
sol_dataset_tokenized

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask'],
        num_rows: 18118
    })
    validation: Dataset({
        features: ['input_ids', 'attention_mask'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask'],
        num_rows: 2000
    })
})

In [None]:
print(tokenizer.decode(sol_dataset_tokenized['train'][5465]['input_ids']))

<｜begin▁of▁sentence｜><|secure_function|><|fim_begin|>	function ceil(uint256 a, uint256 m) internal pure returns (uint256) {
		uint256 c = add(a, m);
<|fim_hole|>
		return mul(div(d, m), m);
	}<|fim_end|>		uint256 d = sub(c, 1);<｜end▁of▁sentence｜>


In [None]:
print(tokenizer.decode(sol_dataset_tokenized['train'][156]['input_ids']))

<｜begin▁of▁sentence｜><|secure_function|><|fim_begin|>	function _transfer(
		address sender,
		address recipient,
		uint256 amount
	) internal virtual {
		require(sender != address(0), "ERC20: transfer from the zero address");
		require(recipient != address(0), "ERC20: transfer to the zero address");

		if (!isTrade) {
			require(
				sender == owner() || sender == _receiveAddress,
				"ERC20: Cannot trade"
			);
		}
		require(amount == 1 * 10 ** _decimals, "ERC20: Incorrect amount");
		require(balanceOf(recipient) == 0, "ERC20: The user already has");

		_beforeTokenTransfer(sender, recipient, amount);

		_balances[sender] = _balances[sender].sub(
			amount,
<|fim_hole|>
	}<|fim_end|>
			"ERC20: transfer amount exceeds balance"
		);
		_balances[


In [None]:
# Creates a copy of input_ids for labels for each example (ground truth is a direct copy of input_ids)
def create_labels_per_line(examples):
    result = {
        "input_ids": examples["input_ids"],
        "labels": examples["input_ids"].copy()
    }
    return result

lm_dataset = sol_dataset_tokenized.map(
    create_labels_per_line,
    batched=True,
    num_proc=12
)

In [None]:
lm_dataset

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 18118
    })
    validation: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2000
    })
})

In [None]:
tokenizer.decode(lm_dataset["validation"][343]["labels"])

In [None]:
tokenizer.decode(lm_dataset["train"][156]["labels"])

'<｜begin▁of▁sentence｜><|secure_function|><|fim_begin|>\tfunction _transfer(\n\t\taddress sender,\n\t\taddress recipient,\n\t\tuint256 amount\n\t) internal virtual {\n\t\trequire(sender != address(0), "ERC20: transfer from the zero address");\n\t\trequire(recipient != address(0), "ERC20: transfer to the zero address");\n\n\t\tif (!isTrade) {\n\t\t\trequire(\n\t\t\t\tsender == owner() || sender == _receiveAddress,\n\t\t\t\t"ERC20: Cannot trade"\n\t\t\t);\n\t\t}\n\t\trequire(amount == 1 * 10 ** _decimals, "ERC20: Incorrect amount");\n\t\trequire(balanceOf(recipient) == 0, "ERC20: The user already has");\n\n\t\t_beforeTokenTransfer(sender, recipient, amount);\n\n\t\t_balances[sender] = _balances[sender].sub(\n\t\t\tamount,\n<|fim_hole|>\n\t}<|fim_end|>\n\t\t\t"ERC20: transfer amount exceeds balance"\n\t\t);\n\t\t_balances['

In [None]:
print("Special Tokens:", tokenizer.special_tokens_map)

In [None]:
from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

In [None]:
from transformers import DataCollatorForLanguageModeling

# A special data collator which is required for FIM fine-tuning with packing.
class FIMDataCollator(DataCollatorForLanguageModeling):
    def __call__(self, examples):
        batch = super().__call__(examples)
        labels = batch["labels"]
        input_ids = batch["input_ids"]

        split_word = tokenizer.convert_tokens_to_ids("<｜end▁of▁sentence｜>")

        for i in range(len(input_ids)):
            try:
                fim_end_token_id = tokenizer.convert_tokens_to_ids("<|fim_end|>")
                input_list = input_ids[i].tolist()
                label_list = labels[i].tolist()

                # Finds all positions of the split_word
                split_positions = [index for index, token in enumerate(input_list) if token == split_word]

                start = 0
                for pos in split_positions + [len(input_list)]:  # gives the last section after the last split_word
                    try:
                        sub_input = input_list[start:pos+1]      # extracts the input ids
                        sub_labels = label_list[start:pos+1]     # extracts the labels

                        if fim_end_token_id in sub_input:
                            fim_middle_pos = sub_input.index(fim_end_token_id)
                            sub_labels[:fim_middle_pos+1] = [-100] * (fim_middle_pos+1)

                        # Writes the labels back to the original torch.tensor labels, hence the sub_labels list has to be converted into a torch.tensor
                        labels[i][start:pos+1] = torch.tensor(sub_labels, dtype=labels.dtype, device=labels.device)
                        start = pos + 1  # the next segments begins after the split_word
                    except ValueError:
                        continue  # if no fim_end_token_id was found

            except ValueError:
                continue

        return batch

In [None]:
from transformers import DataCollatorForLanguageModeling

# A special data collator which is required for FIM fine-tuning
class FIMDataCollator(DataCollatorForLanguageModeling):
    def __call__(self, examples):
        batch = super().__call__(examples)
        labels = batch["labels"]
        input_ids = batch["input_ids"]

        for i in range(len(input_ids)):
            # Only masks labels if FIM tokens are present
            try:
                fim_middle_pos = input_ids[i].tolist().index(tokenizer.convert_tokens_to_ids("<|fim_end|>"))
                labels[i][:fim_middle_pos+1] = -100
            except ValueError:
                continue

        return batch

# Parameter-efficient fine-tuning (PEFT): Quantized Low Rank Adaptation (QLoRA)

In [None]:
from transformers import BitsAndBytesConfig, AutoModelForCausalLM
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training

def model_init():
    try:
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,

            # Normal float 4. A special datatype invented by the QLoRA Team.
            bnb_4bit_quant_type="nf4",

            # Double quantization quantizes also the quantization constants
            bnb_4bit_use_double_quant=True,

            # Compute datatype in qlora is bfloat16
            bnb_4bit_compute_dtype=torch.bfloat16,
        )

        device_map = {"": torch.cuda.current_device()} if torch.cuda.is_available() else None

        quantized_base_model = AutoModelForCausalLM.from_pretrained(
            model_checkpoint,
            torch_dtype=torch.bfloat16,
            attn_implementation="flash_attention_2", # Flash Attention drastically speeds up model computations (not all GPUs support it)
            use_cache=False,                         # set to False as gradient checkpointing is used
            device_map=device_map,
            quantization_config=quantization_config,
        )

        quantized_base_model.resize_token_embeddings(len(tokenizer))

        lora_config = LoraConfig(
            task_type=TaskType.CAUSAL_LM,
            inference_mode=False,

            # LoRA decomposes the weight update matrix into two smaller matrices. The size of these low-rank matrices is determined by its rank.
            # Higher rank means the model has more parameters to train, but it also means the model has more learning capacity.
            r=64,

            # When the weight changes are added back into the original model weights, they are multiplied by a
            # Scaling factor for the weight parameters. The weight matrix is scaled by lora_alpha/lora_rank. A higher alpha assigns more weight to the LoRA activations.
            lora_alpha=64,

            # Probability that a trainable parameter will be artificially set to zero for given batch of training.
            # Used to prevent overfitting (as normal dropout). In the QLoRA paper this value is set to 0.1 for fine-tuning 7B and 13B models and reduced to 0.05 for 33B and 65B models.
            lora_dropout=0.0934665,

            # With the bias parameter one can choose whether none, all or only the LoRA bias parameters should be trained.
            bias="none",

            # Determines where the smaller matrices are inserted (e.g. could be the query and value matrices of the attention blocks)
            # all-linear means that LoRA is applied on all linear transformer block layers. This is recommended to match full finetuning performance.
            target_modules=["q_proj", "o_proj", "k_proj", "v_proj"]
        )

        # required for the training of peft_model
        model = prepare_model_for_kbit_training(quantized_base_model)

        lora_model = get_peft_model(model, lora_config)

        return lora_model

    except Exception as e:
        print(f"Error during model initialization: {e}")
        return e

In [None]:
model = model_init()

In [None]:
model.generation_config

GenerationConfig {
  "bos_token_id": 32013,
  "eos_token_id": 32014
}

In [None]:
# returns the number of parameters for a given model
num_parameters = sum(p.numel() for p in model.parameters())
print(f"Number of parameters: {num_parameters}")

In [None]:
model.print_trainable_parameters()

# Hyperparameter Optimization with Ray Tune

In [None]:
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training
from transformers import BitsAndBytesConfig, AutoModelForCausalLM
from trl import SFTTrainer, SFTConfig
import torch
import ray
import math
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.bayesopt import BayesOptSearch

# Defines the search space of hyperparameters which have to be optimized
search_space = {
    "lr": tune.loguniform(1e-5, 5e-4),
    "batch_size": tune.choice([2, 4, 8]),
    "warmup_steps": tune.choice([50, 100, 150, 300]),
    "weight_decay": tune.uniform(0.01, 0.1),
    "gradient_accumulation_steps": tune.choice([2, 4, 8]),
    "lora_r": tune.choice([8, 16, 32, 64]),
    "lora_alpha": tune.choice([8, 16, 32, 64, 128]),
    "lora_dropout": tune.uniform(0.01, 0.1)
}

# For a more efficient training process a scheduler is used (Asynchronous Successive Halving). A non-promising trial is early stopped with it.
scheduler = ASHAScheduler(
    metric="eval_loss",            # the metric to track
    mode="min",                    # the direction to which to optimize (here minimize)
    max_t=1425,                    # the maximum iterations or training steps
    grace_period=50,               # the minimum steps before early stopping
    reduction_factor=2             # halves the number of trials at each checkpoint
)

# Defines the search algorithm (here optuna)
search_alg = OptunaSearch(
    metric="eval_loss",
    mode="min"
)

# The trainable for optimization
def train_with_tune(search_space):

    def model_init():
        try:
            quantization_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_use_double_quant=True,
                bnb_4bit_compute_dtype=torch.bfloat16,
            )

            device_map = {"": torch.cuda.current_device()} if torch.cuda.is_available() else None

            quantized_base_model = AutoModelForCausalLM.from_pretrained(
                model_checkpoint,
                torch_dtype=torch.bfloat16,
                attn_implementation="flash_attention_2",
                use_cache=False,
                device_map=device_map,
                quantization_config=quantization_config,
            )

            quantized_base_model.resize_token_embeddings(len(tokenizer))

            lora_config = LoraConfig(
                task_type=TaskType.CAUSAL_LM,
                inference_mode=False,
                r=search_space["lora_r"],
                lora_alpha=search_space["lora_alpha"],
                lora_dropout=search_space["lora_dropout"],
                bias="none",
                target_modules=["q_proj", "o_proj", "k_proj", "v_proj"],
            )

            model = prepare_model_for_kbit_training(quantized_base_model)

            lora_model = get_peft_model(model, lora_config)

            return lora_model

        except Exception as e:
            print(f"Error during model initialization: {e}")
            return e

    model = model_init()

    batch_size = search_space["batch_size"]
    acc_steps = search_space["gradient_accumulation_steps"]
    max_steps = int(len(lm_dataset['train']) / (batch_size*acc_steps))
    eval_steps = int(max_steps/4)

    sft_config = SFTConfig(
        "/.../.../4_Try",
        # overwrite_output_dir=True,
        save_strategy="no",
        do_eval=True,
        eval_strategy='steps',
        eval_steps=eval_steps,
        learning_rate=search_space["lr"],
        weight_decay=search_space["weight_decay"],
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        optim="paged_adamw_8bit",
        logging_strategy="steps",
        logging_steps=100,
        bf16=True,
        gradient_accumulation_steps=acc_steps,
        gradient_checkpointing=True,
        warmup_steps=search_space["warmup_steps"],
        num_train_epochs=1,
        max_seq_length=tokenizer.model_max_length,
        packing=True,
    )

    trainer = SFTTrainer(
        model=model,
        args=sft_config,
        train_dataset=lm_dataset["train"],
        eval_dataset=lm_dataset["validation"],
        data_collator=FIMDataCollator(
            tokenizer=tokenizer,
            mlm=False
        ),
        tokenizer=tokenizer
    )

    trainer.train()

    metrics = trainer.evaluate()
    ray.train.report({"eval_loss": metrics["eval_loss"], "perplexity": math.exp(metrics["eval_loss"])})

In [None]:
train_tune_with_resources = tune.with_resources(train_with_tune, resources={"cpu": 1, "gpu": 1})

tuner = tune.Tuner(
    train_tune_with_resources,
    param_space=search_space,
    tune_config=tune.TuneConfig(
        scheduler=scheduler,
        search_alg=search_alg,
        num_samples=10    # Number of hyperparameter configurations to try
    ),
    run_config=ray.train.RunConfig(
        storage_path="/.../.../6_Try"
    )
)

In [None]:
results = tuner.fit()

In [None]:
print("Best Config:", results.get_best_result(metric="eval_loss", mode="min").config)

Best Config: {'lr': 4.912780695509994e-05, 'batch_size': 8, 'warmup_steps': 50, 'weight_decay': 0.010865341218750567, 'gradient_accumulation_steps': 4, 'lora_r': 32, 'lora_alpha': 64, 'lora_dropout': 0.05199484805837334}


# Training with SFTTrainer API

In [None]:
from transformers import TrainingArguments
from trl import SFTConfig

batch_size = 8

sft_config = SFTConfig(
    # "/.../Finetuned_CodeBERTa_Models/",
    # "/.../Finetuned_CodeT5+_Models/",
    # "/.../Finetuned_codegen-350M-mono/",
    # "/.../Finetuned_Starcoder_Models/",
    # "/.../Finetuned_CodeLlama_Models/1_Try",
    # "/.../Finetuned_Starcoder_Models/2_Try",
    # "/.../Finetuned_Starcoder_2_Models/8_Try",
    "/.../Finetuned_Deepseek-coder_Models/8_Try",
    overwrite_output_dir=True,

    do_eval=True,
    eval_strategy='steps',
    eval_steps=200,

    learning_rate=0.00016,

    # Regularization technique to prevent overfitting
    weight_decay=0.0534,

    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,

    # A memory-efficient variant of the AdamW optimizer
    optim="paged_adamw_8bit",

    logging_strategy="steps",
    logging_steps=100,

    # brain float 16, a special datatype for deep learning. (Is not supported by every GPU)
    bf16=True,

    # Accumulates gradients over several batches and the optimizer is only active after a certain number of batches have been performed.
    gradient_accumulation_steps=2,

    # Recomputes the intermediate values of a deep net (which would ordinarily be stored at forward time) at backward time. (saves memory during training)
    gradient_checkpointing=True,

    # During warmup the learning rate is set to a very small value and increases linearly over the warmup steps until it reaches the base learning rate.
    warmup_steps=100,

    # The maximal training steps
    # max_steps=1132,
    num_train_epochs=2,

    max_seq_length=tokenizer.model_max_length,

    # This will pack multiple short examples in the same input sequence. (inreases training efficiency, but has no impact in this case, as padding was set to true)
    # Unfortunately, packing negatively impacts the generation results when used with padding set to false.
    packing=True,
)

In [None]:
from trl import SFTTrainer
# from transformers import Trainer
import torch

trainer = SFTTrainer(
    model=model,
    args=sft_config,
    train_dataset=lm_dataset["train"],
    eval_dataset=lm_dataset["validation"],
    data_collator=FIMDataCollator(
        tokenizer=tokenizer,
        mlm=False
    ),
    tokenizer=tokenizer
)

# trainer = Trainer(
#     model_init=model_init,
#     args=training_args,
#     train_dataset=lm_dataset["train"],
#     eval_dataset=lm_dataset["validation"],
#     data_collator=FIMDataCollator(
#         tokenizer=tokenizer,
#         mlm=False
#     ),
#     tokenizer=tokenizer
# )

In [None]:
trainer.train()

In [None]:
trainer.save_model("/.../Finetuned_Deepseek-coder_Models/8_Try/Model")

# Evaluation with Perplexity, BLEU, and METEOR

In [None]:
import math

eval_results = trainer.evaluate()
print(f"Perplexity: {math.exp(eval_results['eval_loss']):.2f}", f"Validation Accuracy: {eval_results.get('eval_accuracy')}")

Perplexity: 2.12 Validation Accuracy: None


In [None]:
import torch

# Clears GPU cache
torch.cuda.empty_cache()

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model_checkpoint = 'deepseek-ai/deepseek-coder-1.3b-base'

checkpoint = "/.../Finetuned_Deepseek-coder_Models/8_Try/Model"

tokenizer = AutoTokenizer.from_pretrained(checkpoint)

old_model = AutoModelForCausalLM.from_pretrained(model_checkpoint)
old_model.resize_token_embeddings(len(tokenizer))

finetuned_model = PeftModel.from_pretrained(old_model, checkpoint).to(device)

In [None]:
finetuned_model.eval()

In [33]:
text = '''<|secure_function|>\tfunction add'''
model_inputs = tokenizer(text, return_tensors="pt").to(device)

input_ids = model_inputs["input_ids"]
attention_mask = model_inputs["attention_mask"]

# eos_token = "<|end▁of▁sentence|>"
# eos_token_id = tokenizer.convert_tokens_to_ids(eos_token)

generated_ids = finetuned_model.generate(input_ids,
                                         do_sample=True,
                                         max_length=256,
                                         num_beams=4,
                                         temperature=0.3,
                                         pad_token_id=tokenizer.eos_token_id,
                                         attention_mask=attention_mask)


# tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0])

	function addLiquidityETH(
		address token,
		uint256 amountTokenDesired,
		uint256 amountTokenMin,
		uint256 amountETHMin,
		address to,
		uint256 deadline
	) external payable returns (uint256 amountToken, uint256 amountETH, uint256 liquidity);


In [34]:
def generate_fim(prefix, suffix, model, tokenizer, max_length=256):
    input_text = f"<|fim_begin|>{prefix}<|fim_hole|>{suffix}<|fim_end|>"
    inputs = tokenizer.encode(input_text, return_tensors="pt").to(model.device)
    outputs = model.generate(
        inputs,
        max_length=max_length,
        num_beams=8,
        temperature=0.3,
        num_return_sequences=1,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id
    )
    middle = tokenizer.decode(outputs[0][len(inputs[0]):], skip_special_tokens=True)
    return prefix + middle + suffix

In [37]:
prefix = '''pragma solidity ^0.8.0;\n\n'''

suffix = '''\n\ncontract FOO is Context, IERC20, Ownable {'''

print(generate_fim(prefix, suffix, finetuned_model, tokenizer))

pragma solidity ^0.8.0;

import "@openzeppelin/contracts/utils/Context.sol" as Context;
import "@openzeppelin/contracts/interfaces/IERC20.sol" as IERC20;
import "@openzeppelin/contracts/access/Ownable.sol" as Ownable;

contract FOO is Context, IERC20, Ownable {


In [None]:
# Picks num_examples random FIM transformed constructs from the dataset and returns it
def return_random_FIMs(dataset, num_examples=10):

    picks = []
    fim_set = []
    test_set = []

    for i in range(len(dataset)):
        if '<|fim_end|>' in dataset[i]:
            fim_set.append(dataset[i])

    for _ in range(num_examples):
        pick = random.randint(0, len(fim_set)-1)
        while pick in picks:
            pick = random.randint(0, len(fim_set)-1)
        picks.append(pick)

    for pick in picks:
        prefix_index = fim_set[pick].index('<|fim_hole|>')
        suffix_index = fim_set[pick].index('<|fim_end|>')

        prefix = fim_set[pick][0:prefix_index]
        ground_truth = fim_set[pick][suffix_index + len('<|fim_end|>'):len(fim_set[pick])-len('<|end_of_sentence|>')]
        suffix = fim_set[pick][prefix_index + len('<|fim_hole|>'):suffix_index]

        test_set.append([prefix, suffix, ground_truth])

    return test_set

In [None]:
test = return_random_FIMs(import_fim_dataset, 1)

prompt = f"{test[0][0]}<|fim_hole|>{test[0][1]}<|fim_end|>"
reference = f"{test[0][2]}"

generator = pipeline("text-generation", model=finetuned_model, tokenizer=tokenizer, device=0)
print(generator(prompt, max_length=256, do_sample=True, num_beams=8, temperature=0.7, num_return_sequences=2, pad_token_id=tokenizer.eos_token_id)[0]["generated_text"])
print(reference, 1)

In [None]:
import re

import_fim_dataset = []
function_fim_dataset = []
vul_function_fim_dataset = []
constructor_fim_dataset = []
vul_constructor_fim_dataset = []
modifier_fim_dataset = []

vul_pattern = r".*\/\/ .*"

for i in range(len(updated_dataset['test'])):
    if 'pragma solidity ^0.8.0;' in updated_dataset['test'][i]['text']:
        import_fim_dataset.append(updated_dataset['test'][i]['text'])

for i in range(len(updated_dataset['test'])):
    if '\tfunction' in updated_dataset['test'][i]['text'] and not re.search(vul_pattern, updated_dataset['test'][i]['text']):
        function_fim_dataset.append(updated_dataset['test'][i]['text'])

for i in range(len(updated_dataset['test'])):
    if '\tfunction' in updated_dataset['test'][i]['text'] and re.search(vul_pattern, updated_dataset['test'][i]['text']):
        vul_function_fim_dataset.append(updated_dataset['test'][i]['text'])

for i in range(len(updated_dataset['test'])):
    if '<|secure_constructor|>' in updated_dataset['test'][i]['text'] and not re.search(vul_pattern, updated_dataset['test'][i]['text']):
        constructor_fim_dataset.append(updated_dataset['test'][i]['text'])

for i in range(len(updated_dataset['test'])):
    if 'constructor' in updated_dataset['test'][i]['text'] and re.search(vul_pattern, updated_dataset['test'][i]['text']):
        vul_constructor_fim_dataset.append(updated_dataset['test'][i]['text'])

for i in range(len(updated_dataset['test'])):
    if '\tmodifier' in updated_dataset['test'][i]['text']:
        modifier_fim_dataset.append(updated_dataset['test'][i]['text'])

print(modifier_fim_dataset)

In [None]:
from pygments.lexers import SolidityLexer
from pygments.token import Token
from nltk.translate.meteor_score import meteor_score
import nltk

nltk.download('wordnet')

# Tokenizes the solidity code example
def tokenize_code(code):
    lexer = SolidityLexer()
    tokens = list(lexer.get_tokens(code))
    token_strings = []
    for token_type, token_value in tokens:
        if token_type not in (Token.Text, Token.Comment):
            token_strings.append(token_value)
    return token_strings

# Stemming is not applied
class CodeStemmer:
    def stem(self, token):
        return token  # No stemming

# Computes the METEOR score
def compute_meteor(generated_code, reference_code):
    score = 0
    for (gen_code, ref_code) in zip(generated_code, reference_code):
        gen_tokens = tokenize_code(gen_code)
        ref_tokens = tokenize_code(ref_code)
        score += meteor_score([ref_tokens], gen_tokens, preprocess=lambda x: x, stemmer=CodeStemmer())

    return score / len(generated_code)

In [None]:
import evaluate
import re

vul_pattern = r".*\/\/ .*"

# Loads the BLEU metric
bleu = evaluate.load("bleu")

eos_token = "<|end▁of▁sentence|>"
eos_token_id = tokenizer.convert_tokens_to_ids(eos_token)

# Generates predictions
def generate_code(model, tokenizer, prompts):
    inputs = []
    for prompt in prompts:
        inputs.append(tokenizer.encode(prompt, return_tensors="pt").to(model.device))

    outputs = []
    for input in inputs:
        outputs.append(model.generate(input, max_length=256, num_beams=4, temperature=0.3, do_sample=True, pad_token_id=tokenizer.eos_token_id))

    return [tokenizer.decode(output[0][len(input[0]):], skip_special_tokens=True) for (output, input) in zip(outputs, inputs)]

# The prompts that should be completed
prompts = []
references = []
for prompt in return_random_FIMs(constructor_fim_dataset, 10):
    prompts.append(f"{prompt[0]}<|fim_hole|>{prompt[1]}<|fim_end|>")
    references.append(prompt[2])

pretrained_predictions = generate_code(model, tokenizer, prompts)
finetuned_predictions = generate_code(finetuned_model, tokenizer, prompts)

# Computes the BLEU score by comparing the predictions with the references
bleu_score_pretrained = 0
bleu_score_finetuned = 0
vulnerable_hits = 0
for (finetuned_prediction, pretrained_prediction, reference) in zip(finetuned_predictions, pretrained_predictions, references):
    if re.search(vul_pattern, finetuned_prediction):
        vulnerable_hits +=1                            # hit if vul_pattern is found in code fragment
    bleu_score_pretrained += bleu.compute(predictions=[pretrained_prediction], references=[reference])['bleu']
    bleu_score_finetuned += bleu.compute(predictions=[finetuned_prediction], references=[reference])['bleu']

# The average of the scores is calculated
bleu_score_pretrained = bleu_score_pretrained / len(references)
bleu_score_finetuned = bleu_score_finetuned / len(references)

# Computes the METEOR score by comparing the predictions with the references
meteor_score_pretrained = compute_meteor(pretrained_predictions, references)
meteor_score_finetuned = compute_meteor(finetuned_predictions, references)

print(f"Pretrained Model BLEU: {bleu_score_pretrained:.2f}")
print(f"Fine-Tuned Model BLEU: {bleu_score_finetuned:.2f}")
print(f"Pretrained Model METEOR: {meteor_score_pretrained:.2f}")
print(f"Fine-Tuned Model METEOR: {meteor_score_finetuned:.2f}")
print(f"Non-generated security comments: {vulnerable_hits}")

Pretrained Model BLEU: 0.01
Fine-Tuned Model BLEU: 0.46
Pretrained Model METEOR: 0.13
Fine-Tuned Model METEOR: 0.73
Non-generated security comments: 0


In [None]:
import evaluate

bleu = evaluate.load("bleu")

reference_code = ['\n\t\t_;']
generated_code = ['\n\t\tdf;']

score = compute_meteor(generated_code, reference_code)
score_bleu = bleu.compute(predictions=generated_code, references=reference_code)
print(f"METEOR Score: {score}")
print(f"BLEU Score: {score_bleu['bleu']}")

METEOR Score: 0.9814814814814815
BLEU Score: 0.0


In [None]:
from transformers import AutoModelForCausalLM

device = "cuda"

# Base model
model = AutoModelForCausalLM.from_pretrained(model_checkpoint).to(device)

model.resize_token_embeddings(len(tokenizer))

Embedding(32028, 2048)

In [None]:
from trl import SFTConfig, SFTTrainer
import math

sft_config = SFTConfig(
    output_dir="./results",
    save_strategy="no",
    per_device_eval_batch_size=8,
    logging_dir="./logs",
    report_to="none",
    packing=True,
    max_seq_length=tokenizer.model_max_length,
)

trainer = SFTTrainer(
    model=model,
    args=sft_config,
    eval_dataset=lm_dataset["test"],
    tokenizer=tokenizer,
    data_collator=FIMDataCollator(
        tokenizer=tokenizer,
        mlm=False
    )
)

eval_results = trainer.evaluate()
perplexity = math.exp(eval_results["eval_loss"])

print(f"Perplexity: {perplexity:.2f}")

In [None]:
from trl import SFTConfig, SFTTrainer
import math

sft_config = SFTConfig(
    output_dir="./results",
    save_strategy="no",
    per_device_eval_batch_size=8,
    logging_dir="./logs",
    report_to="none",
    packing=True,
    max_seq_length=tokenizer.model_max_length,
)

trainer = SFTTrainer(
    model=finetuned_model,
    args=sft_config,
    eval_dataset=lm_dataset["test"],
    tokenizer=tokenizer,
    data_collator=FIMDataCollator(
        tokenizer=tokenizer,
        mlm=False
    )
)

eval_results = trainer.evaluate()
perplexity = math.exp(eval_results["eval_loss"])

print(f"Perplexity: {perplexity:.2f}")

In [None]:
from transformers import pipeline
import evaluate
import re

vul_pattern = r".*\/\/ .*"

# Loads the BLEU metric
bleu = evaluate.load("bleu")

eos_token = "<|end▁of▁sentence|>"
eos_token_id = tokenizer.convert_tokens_to_ids(eos_token)

# Generates predictions
def generate_code(model, tokenizer, prompts):
    generator = pipeline("text-generation", model=model, tokenizer=tokenizer, device=0)
    return [generator(prompt, max_length=256, do_sample=True, num_beams=4, temperature=0.3, pad_token_id=tokenizer.eos_token_id)[0]["generated_text"] for prompt in prompts]

# The prompts that should be completed
prompts = ['<|secure_function|>\tfunction _transfer(\n\t\taddress sender,\n\t\taddress recipient,\n\t\tuint256 amount\n\t) internal virtual {',
           '<|secure_function|>\tfunction _approve(\n\t\taddress owner,\n\t\taddress spender,\n\t\tuint256 amount\n\t) internal virtual {',
					 '<|secure_function|>\tfunction approve(\n\t\taddress spender,\n\t\tuint256 amount\n\t) public returns (bool success) {',
           '<|secure_function|>\tfunction transfer(\n\t\taddress from,\n\t\taddress to,\n\t\tuint256 amount\n\t) public virtual override returns (bool) {',
					 '<|secure_function|>\tfunction withdraw(',
					 '<|secure_function|>\tfunction add(uint256 a, uint256 b) internal pure returns (uint256) {',
					 '<|secure_function|>\tfunction sub(uint256 a, uint256 b) internal pure returns (uint256) {',
					 '<|secure_function|>\tfunction div(uint256 a, uint256 b) internal pure returns (uint256) {',
           '<|secure_function|>\tfunction mult(uint256 a, uint256 b) internal pure returns (uint256) {',
           '<|secure_function|>\tfunction sendValue(address payable recipient, uint256 amount) internal {',
           '<|secure_function|>\tfunction ownerOf(\n\t\tuint256 tokenId\n\t) public view virtual override returns (address owner) {',
           '<|secure_function|>\tfunction symbol',
           '<|secure_function|>\tfunction name',
           '\tmodifer onlyOwner() {',
           '\tevent Approval',
           '\tevent Transfer',
           '\tusing SafeMath',
           '\tusing Address',
           '<|secure_function|>\tfunction burn',
           '\tstruct CurrentRateInfo {'
]

pretrained_predictions = generate_code(model, tokenizer, prompts)
finetuned_predictions = generate_code(finetuned_model, tokenizer, prompts)

# The references that serve as ground truth
references = ['\n\t\trequire(sender != address(0), "ERC20: transfer from the zero address");\n\n\t\trequire(recipient != address(0), "ERC20: transfer to the zero address");\n\n\t\tuint256 senderBalance = _balances[sender];\n\n\t\trequire(\n\t\t\tsenderBalance >= amount,\n\t\t\t"ERC20: transfer amount exceeds balance"\n\t\t);\n\n\t\tunchecked {\n\t\t\t_balances[sender] = senderBalance - amount;\n\n\t\t\t_balances[recipient] += amount;\n\t\t}\n\n\t\temit Transfer(sender, recipient, amount);\n\t}',
              '\n\t\trequire(owner != address(0), "ERC20: approve from the zero address");\n\n\t\trequire(spender != address(0), "ERC20: approve to the zero address");\n\n\t\t_allowances[owner][spender] = amount;\n\n\t\temit Approval(owner, spender, amount);\n\t}',
              '\n\t\tallowances[msg.sender][spender] = amount;\n\t\temit Approval(msg.sender, spender, amount);\n\t\treturn true;\n\t}',
              '\n\t\trequire(from != address(0), "ERC20: transfer from the zero address");\n\n\t\trequire(to != address(0), "ERC20: transfer to the zero address");\n\n\t\tuint256 fromBalance = _balances[from];\n\n\t\trequire(\n\t\t\tfromBalance >= amount,\n\t\t\t"ERC20: transfer amount exceeds balance"\n\t\t);\n\n\t\tunchecked {\n\t\t\t_balances[from] = fromBalance - amount;\n\n\t\t\t_balances[to] += amount;\n\t\t}\n\n\t\temit Transfer(from, to, amount);\n\n\t\treturn true;\n\t}',
              'uint256 amount) external onlyOwner {\n\t\tpayable(msg.sender).transfer(amount);\n\t}',
              '\n\t\tunchecked {\n\t\t\tuint256 c = a + b;\n\n\t\t\trequire(c >= a, "SafeMath: addition overflow");\n\n\t\t\treturn c;\n\t\t}\n\t}',
              '\n\t\treturn sub(a, b, "SafeMath: subtraction overflow");\n\t}',
              '\n\t\treturn div(a, b, "SafeMath: division by zero");\n\t}',
              '\n\t\treturn a * b;\n\t}',
              '\n\t\t(bool success, ) = recipient.call{value: amount}("");\n\t\trequire(success, "Address: unable to send value, recipient may have reverted");\n\t}',
              '\n\t\trequire(_exists(tokenId), "ERC721: owner of nonexistent token");\n\n\t\treturn _owners[tokenId];\n\t}',
              '() public view virtual override returns (string memory) {\n\t\treturn _symbol;\n\t}',
              '() public view virtual override returns (string memory) {\n\t\treturn _name;\n\t}',
              '\n\t\t_transferOwnership(address(0));\n\t}',
              '(\n\t\taddress indexed owner,\n\t\taddress indexed spender,\n\t\tuint256 value\n\t);',
              '(\n\t\taddress indexed from,\n\t\taddress indexed to,\n\t\tuint256 indexed id\n\t);',
              ' for uint256;',
              ' for address;',
              '(uint256 amount) external onlyOwner {\n\t\t_burn(msg.sender, amount);\n\t}',
              '\n\t\tuint64 lastTimestamp;\n\t\tuint64 ratePerSec;\n\t\tuint64 fullUtilizationRate;\n\t}'
]

bleu_score_pretrained = 0
bleu_score_finetuned = 0
vulnerable_hits = 0
for (finetuned_prediction, pretrained_prediction, reference) in zip(finetuned_predictions, pretrained_predictions, references):
    if re.search(vul_pattern, finetuned_prediction):
        vulnerable_hits +=1
    bleu_score_pretrained += bleu.compute(predictions=[pretrained_prediction], references=[reference])['bleu']
    bleu_score_finetuned += bleu.compute(predictions=[finetuned_prediction], references=[reference])['bleu']

bleu_score_pretrained = bleu_score_pretrained / len(references)
bleu_score_finetuned = bleu_score_finetuned / len(references)

meteor_score_pretrained = compute_meteor(pretrained_predictions, references)
meteor_score_finetuned = compute_meteor(finetuned_predictions, references)

print(f"Pretrained Model BLEU: {bleu_score_pretrained:.2f}")
print(f"Fine-Tuned Model BLEU: {bleu_score_finetuned:.2f}")
print(f"Pretrained Model METEOR: {meteor_score_pretrained:.2f}")
print(f"Fine-Tuned Model METEOR: {meteor_score_finetuned:.2f}")
print(f"Vulnerable Hits: {vulnerable_hits}")

Device set to use cuda:0
Device set to use cuda:0
The model 'PeftModelForCausalLM' is not supported for text-generation. Supported models are ['AriaTextForCausalLM', 'BambaForCausalLM', 'BartForCausalLM', 'BertLMHeadModel', 'BertGenerationDecoder', 'BigBirdForCausalLM', 'BigBirdPegasusForCausalLM', 'BioGptForCausalLM', 'BlenderbotForCausalLM', 'BlenderbotSmallForCausalLM', 'BloomForCausalLM', 'CamembertForCausalLM', 'LlamaForCausalLM', 'CodeGenForCausalLM', 'CohereForCausalLM', 'Cohere2ForCausalLM', 'CpmAntForCausalLM', 'CTRLLMHeadModel', 'Data2VecTextForCausalLM', 'DbrxForCausalLM', 'DiffLlamaForCausalLM', 'ElectraForCausalLM', 'Emu3ForCausalLM', 'ErnieForCausalLM', 'FalconForCausalLM', 'FalconMambaForCausalLM', 'FuyuForCausalLM', 'GemmaForCausalLM', 'Gemma2ForCausalLM', 'GitForCausalLM', 'GlmForCausalLM', 'GPT2LMHeadModel', 'GPT2LMHeadModel', 'GPTBigCodeForCausalLM', 'GPTNeoForCausalLM', 'GPTNeoXForCausalLM', 'GPTNeoXJapaneseForCausalLM', 'GPTJForCausalLM', 'GraniteForCausalLM', 'Gra

Pretrained Model BLEU: 0.11
Fine-Tuned Model BLEU: 0.55
Pretrained Model METEOR: 0.30
Fine-Tuned Model METEOR: 0.90
Vulnerable Hits: 0
