In [1]:
from transformers import RobertaTokenizer, T5ForConditionalGeneration
from transformers import AutoTokenizer

from codebleu import calc_codebleu
import os
import re
from datasets import Dataset
from tokenizers import Tokenizer



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model_name = "Salesforce/codet5p-770m"
tokenizer = RobertaTokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)

In [3]:
def load_data_from_folders(root_dir):
    '''Uploads data from local directory, splits it into whether it is the
    vulnerability (source), code repair (target), or additional vulnerability
    information such as the vulnerability name (context). Takes in the 
    name of the root_dir, the directory where the data is stored.'''
    fine_tuning_data = []
    for repo_folder in os.listdir(root_dir):
        repo_path = os.path.join(root_dir, repo_folder)
        #print(repo_path)
        file_groups = {}
        for filename in os.listdir(repo_path):
            match = re.match(r"(.+)_(\d+)\.txt$", filename)
            #print(match)
            if match:
                prefix, number = match.groups()
                number = int(number)
                #print(number)
                if number not in file_groups:
                    file_groups[number] = {}
                # Identifies the type of file (source, target, context)
                if "source" in prefix:
                    file_groups[number]["source"] = os.path.join(repo_path, filename)
                elif "target" in prefix:
                    file_groups[number]["target"] = os.path.join(repo_path, filename)
                elif "context" in prefix:
                    file_groups[number]["context"] = os.path.join(repo_path, filename)
                #print(file_groups)
        
        # Processes each group of files
        for number, files in sorted(file_groups.items()):
            source_file = files.get("source")
            target_file = files.get("target")
            context_file = files.get("context")

            if source_file and target_file and context_file:
                with open(source_file, "r", encoding="utf-8") as src, \
                    open(target_file, "r", encoding="utf-8") as tgt, \
                    open(context_file, "r", encoding="utf-8") as ctx:

                    sources = src.readlines()
                    targets = tgt.readlines()
                    contexts = ctx.readlines()                

                    for s, t, c in zip(sources, targets, contexts):
                        combined_input = f"{c.strip()} \n Code: {s.strip()}"
                        fine_tuning_data.append({"source": combined_input, "target": t.strip()})
    
    return fine_tuning_data

# Loads dataset and setting up training data
dataset_path = "./dataset"
train_data = load_data_from_folders("/media/lauren/Extreme SSD/Data Collection/Gold_Standard_for_tuning")
print(train_data)


[{'source': 'CWE-502: Deserialization of Untrusted Data \n Code: model_checkpoint = torch.load(args.modelpath,', 'target': 'print("=> loading model params \'{}\'".format(args.modelpath))'}, {'source': 'CWE-939: Improper Authorization in Handler for Custom URL Scheme \n Code: if fmt == "url":', 'target': 'import urllib.parse'}, {'source': "CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection') \n Code: output0 = subprocess.run(cmd0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,", 'target': 'output0 = subprocess.run(cmd0, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,'}, {'source': 'CWE-78: Improper Neutralization of Special Elements used in an OS Command \n Code: output1 = subprocess.run(cmd1, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,', 'target': 'output1 = subprocess.run(cmd1, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,'}, {'source': "CWE-78: Improper Neutralization of Spe

In [4]:
# Converts data list to a Hugging Face Dataset
dataset = Dataset.from_list(train_data)

# Performs an 80-20 train-test split on sata
split_dataset = dataset.train_test_split(test_size=0.2)

# Sets train and test datasets as variables
train_dataset = split_dataset["train"]
test_dataset = split_dataset["test"]

In [5]:

def preprocess_function(examples):
    '''Tokenizes the model inputs as the source (the vulnerable code) 
    and the labels as the target (the repaired code)'''
    model_inputs = tokenizer(
        examples["source"], padding="max_length", truncation=True, max_length=256
    )    
    labels = tokenizer(
        examples["target"], padding="max_length", truncation=True, max_length=256
    )

   
    # Flattens label tokens
    model_inputs["labels"] = labels["input_ids"]
    
    #print("Fixed Labels: ", model_inputs["labels"][:2])  #
    return model_inputs

In [6]:
# Converts lists to Hugging Face Datasets
train_dataset = Dataset.from_list(train_dataset)
test_dataset = Dataset.from_list(test_dataset)


# Tokenizes testing and training datasets
tokenized_train = train_dataset.map(preprocess_function, batched=False)
tokenized_test = test_dataset.map(preprocess_function, batched=False)


Map: 100%|██████████| 72/72 [00:00<00:00, 919.40 examples/s]
Map: 100%|██████████| 18/18 [00:00<00:00, 746.48 examples/s]


In [7]:
# Views and counts tokenized data 
print(tokenized_test[0])
print(len(tokenized_test[0]['input_ids']), len(tokenized_test[0]['labels']))
tokenized_test = test_dataset.map(preprocess_function, batched=False)


{'source': "CWE-95: Improper Neutralization of Directives in Dynamically Evaluated Code ('Eval Injection') \n Code: def load_function(path, name=None):", 'target': 'import ast', 'input_ids': [1, 39, 6950, 17, 8778, 30, 2221, 22754, 7455, 14566, 287, 1588, 434, 9908, 3606, 316, 12208, 1230, 10271, 690, 3356, 7707, 13904, 15492, 6134, 7010, 3356, 30, 1652, 1262, 67, 915, 12, 803, 16, 508, 33, 7036, 4672, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

Map: 100%|██████████| 18/18 [00:00<00:00, 739.77 examples/s]


In [8]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir=".Salesforce/codet5-base-finetuned",
    eval_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=10,
    save_total_limit=2,
    logging_dir="./logs",
    report_to="none",  
)


In [11]:
import evaluate 
from sacrebleu import corpus_bleu
import numpy as np
from pygments.lexers import guess_lexer
from pygments.util import ClassNotFound


def detect_language(code_snippet):
    """Detects the programming language and maps it to a valid CodeBLEU language."""
    from pygments.lexers import guess_lexer
    from pygments.util import ClassNotFound

    AVAILABLE_LANGS = {'java', 'javascript', 'c_sharp', 'php', 'c', 'cpp', 'python', 'go', 'ruby', 'rust'}
    
    try:
        lexer = guess_lexer(code_snippet)
        lang = lexer.name.lower()
    except ClassNotFound:
        lang = "unknown"

    # Maps detected language to CodeBLEU supported languages
    lang_map = {
        "c++": "cpp",
        "c#": "c_sharp",
        "javascript": "javascript",
        "java": "java",
        "python": "python",
        "php": "php",
        "go": "go",
        "ruby": "ruby",
        "rust": "rust",
        "c": "c"
    }
    
    mapped_lang = lang_map.get(lang, "python")  
    return mapped_lang if mapped_lang in AVAILABLE_LANGS else "python"

# Loads metrics
bleu_metric = evaluate.load("sacrebleu")
rouge_metric = evaluate.load("rouge")
accuracy_metric = evaluate.load("accuracy")

def compute_metrics(eval_preds):
    predictions, labels = eval_preds

    # Converts predictions from logits -> token IDs
    if isinstance(predictions, tuple):
        predictions = predictions[0]  
    
    predictions = np.argmax(predictions, axis=-1)  

    # Decodes outputs & labels
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Detects programming language
    lang = detect_language(decoded_preds[0])  
    print(f"Detected Language: {lang}")  
    if lang == "unknown":
        lang = "python" 

    # Computes BLEU score
    bleu_score = corpus_bleu(decoded_preds, [decoded_labels]).score

    # Computes Accuracy
    exact_matches = sum([pred == label for pred, label in zip(decoded_preds, decoded_labels)])
    accuracy = exact_matches / len(decoded_preds)
    #print("Exact Match Accuracy:", accuracy * 100)
   
    # Computes ROUGE-L scores
    rouge_scores = rouge_metric.compute(predictions=decoded_preds, references=decoded_labels)
    rouge_l = rouge_scores["rougeL"]  



    # Computes CodeBLEU
    codebleu_result = calc_codebleu(decoded_labels, decoded_preds, lang)

    # Ensures `codebleu_result` is a dictionary and extracts the final BLEU score
    if isinstance(codebleu_result, dict):
        codebleu_score = float(codebleu_result.get("codebleu", 0.0))  
    else:
        codebleu_score = float(codebleu_result) 


    return {
        "bleu": float(bleu_score),
        "rougeL": float(rouge_l) * 100,  
        "accuracy": float(accuracy) * 100,  
        "codebleu": float(codebleu_score) * 100  
    }

In [None]:
from transformers import DataCollatorForSeq2Seq
from transformers import Trainer, TrainingArguments
from transformers import Seq2SeqTrainer
from sacrebleu import corpus_bleu
# Ensures ignored tokens are properly handled
data_collator = DataCollatorForSeq2Seq(
    tokenizer=tokenizer,
    label_pad_token_id=-100  
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_test,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)
trainer.train()



  trainer = Trainer(


Epoch,Training Loss,Validation Loss


In [11]:
results = trainer.evaluate()
print(results)

Detected Language: python
{'eval_loss': 3.8785650730133057, 'eval_bleu': 72.75341962523076, 'eval_rougeL': 72.9918719624602, 'eval_accuracy': 38.88888888888889, 'eval_codebleu': 51.07597774866396, 'eval_runtime': 1.0257, 'eval_samples_per_second': 17.55, 'eval_steps_per_second': 4.875, 'epoch': 10.0}


In [None]:
import torch

def generate_translation(text):
    '''Translates the vulnerable code to repaired code. Takes in a snippet of vulnerable
    code text as an input, and outputs the repaired code.'''
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(model.device)
    with torch.no_grad():
        output_tokens = model.generate(**inputs, max_length=60)
    generated_text = tokenizer.decode(output_tokens[0], skip_special_tokens=True)
    return generated_text

# Example input
test_sentence = ""
generated_output = generate_translation("CWE-78: Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection') \nCode:     output0 = subprocess.run(cmd0, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)")
print("Generated Translation:", generated_output)


Generated Translation:      output0 = subprocess.run(cmd0, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
