In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import my_tokenizer
from transformers import AutoModelForCausalLM, AutoTokenizer

In [2]:
sample = pd.read_csv("Sample.csv")
sample = sample[["code", "language", "safety"]]

sample.head()

Unnamed: 0,code,language,safety
0,int _gnutls_ciphertext2compressed(gnutls_sessi...,C/C++,vulnerable
1,static char *make_filename_safe(const char *fi...,C/C++,vulnerable
2,"unpack_Z_stream(int fd_in, int fd_out)\n{\n\tI...",C/C++,vulnerable
3,"static void cirrus_do_copy(CirrusVGAState *s, ...",C/C++,vulnerable
4,"glue(cirrus_bitblt_rop_fwd_, ROP_NAME)(CirrusV...",C/C++,vulnerable


In [3]:
dataset = sample[:1001]
dataset.head()

Unnamed: 0,code,language,safety
0,int _gnutls_ciphertext2compressed(gnutls_sessi...,C/C++,vulnerable
1,static char *make_filename_safe(const char *fi...,C/C++,vulnerable
2,"unpack_Z_stream(int fd_in, int fd_out)\n{\n\tI...",C/C++,vulnerable
3,"static void cirrus_do_copy(CirrusVGAState *s, ...",C/C++,vulnerable
4,"glue(cirrus_bitblt_rop_fwd_, ROP_NAME)(CirrusV...",C/C++,vulnerable


In [4]:
train_data, temp_data = train_test_split(dataset, test_size=0.2, stratify=dataset['language'], random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, stratify=temp_data['language'], random_state=42)

In [5]:
import torch
def create_prompt(code, language):
      prompt = f'''
Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
Is this {language} code safe or vulnerable to software vulnerabilities:
{code}

### Response:
'''
      return prompt
max_input_length = 1500
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, data, tokenizer, max_length):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        code = self.data['code'].iloc[idx]  # Assuming text is in the first column
        language = self.data['language'].iloc[idx]
        label = self.data['safety'].replace({'safe': 0, 'vulnerable': 1}).iloc[idx]
        prompts = create_prompt(code, language)
        return {'prompt': prompts, 'label': label, 'index': idx}

In [6]:
device = 'cuda'
model_name = "tiiuae/falcon-rw-1b"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token # setting tokenizer pad token
tokenizer.padding_side = 'left' # setting padding to left as decoder models run from left to right

In [7]:
def collate_batch(batch):
    indices = [item['index'] for item in batch]
    prompts = [item['prompt'] for item in batch]
    labels = [item['label'] for item in batch]
    input_ids = tokenizer(prompts, padding=True, truncation=True, return_tensors="pt", max_length=max_input_length).to(device)
    labels = torch.tensor(labels).to(device)
    return {"indices": indices, "input_ids": input_ids['input_ids'], 'attention_mask': input_ids['attention_mask'], "labels": labels}



In [8]:
sample = pd.read_csv("Sample.csv")
sample = sample[:1000]
sample = sample[['code', 'language', 'safety', 'dataset']]
dataset = TextDataset(sample, None, None)
data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_batch)

In [9]:
from tqdm.auto import tqdm
model.eval()
with torch.no_grad():
  counter = 0
  for batch in tqdm(data_loader):
    indices = batch['indices']
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    labels = batch['labels']
    outputs = model.generate( # function to generate output completion
        input_ids, # passing input prompt
        attention_mask=attention_mask, # including attention mask
        max_length=3000,  # specifying maximum length of model generation
        pad_token_id=tokenizer.pad_token_id,
        temperature=0.8, # hyper-parameter
        top_p=0.92, # hyper-parameter
        top_k=50, # hyper-parameter
        no_repeat_ngram_size=2,  # Prevent repeating the same n-grams so will prevent the same token to be repeated n times
        eos_token_id=tokenizer.eos_token_id
    ) # generating outputs from inputs
    generated_texts = [tokenizer.decode(output_id, skip_special_tokens=True) for output_id in outputs]
    responses = [text.split("### Response:\n")[1].strip() if "### Response:\n" in text else text for text in generated_texts]
    for idx, response in zip(indices, responses):
      sample.at[idx, 'completion'] = response
    

  0%|          | 0/250 [00:00<?, ?it/s]

  label = self.data['safety'].replace({'safe': 0, 'vulnerable': 1}).iloc[idx]
  label = self.data['safety'].replace({'safe': 0, 'vulnerable': 1}).iloc[idx]
