In [None]:
!pip install -q datasets transformers pytorch-lightning wandb

In [None]:
!wget https://samate.nist.gov/SARD/downloads/test-suites/2022-05-12-php-test-suite-sqli-v1-0-0.zip
!wget https://samate.nist.gov/SARD/downloads/test-suites/2015-10-27-php-vulnerability-test-suite.zip

In [None]:
!mkdir dataset
!unzip -o -q 2022-05-12-php-test-suite-sqli-v1-0-0.zip -d dataset
!unzip -o -q 2015-10-27-php-vulnerability-test-suite.zip -d dataset

In [None]:
import os
import json
import re
import random

# Define the path to the dataset directory
dataset_dir = "dataset"

# Initialize a list to store the extracted information
results = {
    "directory_name": [],
    "code_snippet": [],
    "cwe_id": []
}

# Iterate through all directories in the dataset
l = os.listdir(dataset_dir)
random.shuffle(l)
l = l[:25000]
for directory in l:
    directory_path = os.path.join(dataset_dir, directory)
    if os.path.isdir(directory_path):
        # Check if there is a manifest.sariff file
        manifest_path = os.path.join(directory_path, "manifest.sarif")
        if os.path.exists(manifest_path):
            # Read the contents of the manifest file
            with open(manifest_path, "r") as f:
                manifest_data = json.load(f)

            # Extract relevant information from the manifest data
            code_snippet = None
            cwe_id = "None"

            for result in manifest_data["runs"][0]["results"]:
                code_location = result["locations"][0]["physicalLocation"]["artifactLocation"]["uri"]
                code_snippet = open(os.path.join(directory_path,code_location), "r").read()
                cwe_id = result["ruleId"]

                code_snippet = re.sub(r'<!--(.*?)-->', '', code_snippet, flags=re.DOTALL).strip('\n')

                results["directory_name"].append(directory)
                results["code_snippet"].append(code_snippet)
                results["cwe_id"].append(cwe_id)

In [None]:
labels = list(set(results["cwe_id"]))

In [None]:
from datasets import Dataset,DatasetDict,load_from_disk
dataset = Dataset.from_dict(results)

In [None]:
train_testvalid = dataset.train_test_split(test_size=0.2)
# Split the 10% test + valid in half test, half valid
test_valid = train_testvalid['test'].train_test_split(test_size=0.5)
# gather everyone if you want to have a single DatasetDict
dataset = DatasetDict({
    'train': train_testvalid['train'],
    'test': test_valid['test'],
    'validation': test_valid['train']})

In [None]:
dataset

In [None]:
dataset.save_to_disk('datasets')

In [None]:
!zip -r datasets.zip datasets

In [None]:
from transformers import RobertaTokenizer

tokenizer = RobertaTokenizer.from_pretrained("Salesforce/codet5-small")

prefix = "Find the CWE ID: "
max_input_length = 512
num_labels = len(labels)  # Number of classes

def preprocess_examples(examples):
    codes = examples['code_snippet']
    cwe_ids = examples['cwe_id']

    inputs = [prefix + code for code in codes]
    model_inputs = tokenizer(inputs, max_length=max_input_length, padding="max_length", truncation=True)

    # Convert CWE IDs to indices
    label = [labels.index(cwe_id) for cwe_id in cwe_ids]

    # Set up labels for classification
    model_inputs["labels"] = label

    return model_inputs

In [None]:
dataset = dataset.map(preprocess_examples, batched=True)

In [None]:
from torch.utils.data import DataLoader

dataset.set_format(type="torch", columns=['input_ids', 'attention_mask', 'labels'])
train_dataloader = DataLoader(dataset['train'], shuffle=True, batch_size=8)
valid_dataloader = DataLoader(dataset['validation'], batch_size=4)
test_dataloader = DataLoader(dataset['test'], batch_size=4)

In [None]:
batch = next(iter(train_dataloader))
print(batch.keys())

In [None]:
tokenizer.decode(batch['input_ids'][0])

In [None]:
label = batch['labels'][0]
labels[label]

In [None]:
from transformers import T5ForConditionalGeneration, AdamW, get_linear_schedule_with_warmup
import torch.nn as nn
import pytorch_lightning as pl

class CodeT5(pl.LightningModule):
    def __init__(self, num_labels, lr=5e-5, num_train_epochs=15, warmup_steps=1000):
        super().__init__()
        self.model = T5ForConditionalGeneration.from_pretrained("Salesforce/codet5-small")
        self.classification_head = nn.Linear(self.model.config.hidden_size, num_labels)
        self.save_hyperparameters()

    def forward(self, input_ids, attention_mask):
        outputs = self.model(input_ids=input_ids, attention_mask=attention_mask,labels=None)
        logits = self.classification_head(outputs.last_hidden_state[:, 0, :])

        return logits

    def common_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        labels = batch["labels"]
        logits = self(input_ids, attention_mask)

        return logits, labels

    def training_step(self, batch, batch_idx):
        logits, labels = self.common_step(batch, batch_idx)

        # Define your loss function
        criterion = nn.CrossEntropyLoss()

        # Calculate the loss
        loss = criterion(logits, labels)

        # Log the training loss
        self.log("training_loss", loss)

        return loss

    def validation_step(self, batch, batch_idx):
        logits, labels = self.common_step(batch, batch_idx)
        loss = nn.CrossEntropyLoss()(logits, labels)
        self.log("val_loss", loss, prog_bar=True)

    def test_step(self, batch, batch_idx):
        loss = self.common_step(batch, batch_idx)

        return loss

    def configure_optimizers(self):
        # create optimizer
        optimizer = AdamW(self.parameters(), lr=self.hparams.lr)
        # create learning rate scheduler
        num_train_optimization_steps = self.hparams.num_train_epochs * len(train_dataloader)
        lr_scheduler = {'scheduler': get_linear_schedule_with_warmup(optimizer,
                                                    num_warmup_steps=self.hparams.warmup_steps,
                                                    num_training_steps=num_train_optimization_steps),
                        'name': 'learning_rate',
                        'interval':'step',
                        'frequency': 1}

        return {"optimizer": optimizer, "lr_scheduler": lr_scheduler}

    def train_dataloader(self):
        return train_dataloader

    def val_dataloader(self):
        return valid_dataloader

    def test_dataloader(self):
        return test_dataloader

In [None]:
import wandb

wandb.login()

In [None]:
model = CodeT5(len(labels))

In [None]:
from pytorch_lightning import Trainer
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor

wandb_logger = WandbLogger(name='codet5-finetune-code-vulnerabilty-25k', project='FYP')
# for early stopping, see https://pytorch-lightning.readthedocs.io/en/1.0.0/early_stopping.html?highlight=early%20stopping
early_stop_callback = EarlyStopping(
    monitor='validation_loss',
    patience=3,
    strict=False,
    verbose=False,
    mode='min'
)
lr_monitor = LearningRateMonitor(logging_interval='step')

trainer = Trainer(default_root_dir="/content/drive/MyDrive/CodeT5/Notebooks/Checkpoints",
                  logger=wandb_logger,
                  callbacks=[early_stop_callback, lr_monitor])
trainer.fit(model)

In [None]:
save_directory = "model" # save in the current working directory, you can change this of course
model.model.save_pretrained(save_directory)

In [None]:
dataset = load_from_disk('datasets')

In [None]:
test_example = dataset['test'][2]
print(test_example)
print("Code:", test_example['code_snippet'])

In [None]:
# prepare for the model
input_ids = tokenizer(test_example['code_snippet'], return_tensors='pt').input_ids
# generate
outputs = model.model.generate(input_ids)
print("Generated docstring:", tokenizer.decode(outputs[0], skip_special_tokens=True))

In [None]:
print("Ground truth:", test_example['cwe_id'])