In [1]:
#import thingsss
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import accuracy_score, recall_score, f1_score

In [2]:
#methods
#tokenize the code and convert to tensors
#tokenizer will return input_id and attention_mask
#input_id is based on the pretrained model that i used
#attention mask is the whether the code will be ignored by the model later on
#whether the code is context or important os fucntion call
def tokenize_data(code, tokenizer):

    encoding = tokenizer.batch_encode_plus(
        code.tolist(),  # Convert the DataFrame/Series to a list
        add_special_tokens=True,  # Adds the special tokens like [CLS] and [SEP]
        padding=True,  # Pad sequences to the same length
        truncation=True,  # Truncate sequences that are too long
        max_length=512,  # Adjust according to your model's max input length
        return_tensors="pt"  # Return PyTorch tensors
    )
    return encoding["input_ids"], encoding["attention_mask"]


In [3]:
#init the things
python_dataset_name = "/content/drive/MyDrive/websec/dataset/python_vuln_CyberNative.csv"
php_dataset_name = "/content/drive/MyDrive/websec/dataset/php_vuln_CyberNative.csv"
#load CodeBERT tokenizer and model
MODEL_NAME = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

#convert csv into df
python_df = pd.read_csv(python_dataset_name)
print(type(python_df))
php_df = pd.read_csv(php_dataset_name)
#remove row with null for chosen and rejected columns from php_df
php_df = php_df.dropna(subset=['chosen', 'rejected'])

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

<class 'pandas.core.frame.DataFrame'>


In [4]:
#see the dataset
#print(python_df.head())

#break the dataset with labels; chosen column = target 0 ;rejected = target 1
# Reshape the dataset
df_chosen = pd.DataFrame({"code": python_df["chosen"], "label": 0})
df_rejected = pd.DataFrame({"code": python_df["rejected"], "label": 1})

# Merge both into a single dataset
df_join = pd.concat([df_chosen, df_rejected], ignore_index=True)
print(type(df_join))
print(df_join.head())
print(df_join.tail())


<class 'pandas.core.frame.DataFrame'>
                                                code  label
0  import ast\n\nclass RestrictedPython(ast.NodeT...      0
1  from flask import Flask, request, jsonify\nimp...      0
2  import subprocess\n\ndef run_command(user_inpu...      0
3  def safe_function():\n    user_input = input("...      0
4  import subprocess\n\ndef run_command(user_inpu...      0
                                                  code  label
843  import sqlite3\n\ndef login(username, password...      1
844  def get_user_input():\n    user_input = input(...      1
845  import subprocess\n\ndef execute_command(user_...      1
846  import os\ndef run_command(user_input):\n    c...      1
847  import os\n\ndef vulnerable_function(user_inpu...      1


In [5]:
#spliiting of the dataset
#split into train (70%), val (20%), test (10%)
train_code, temp_code, train_labels, temp_labels = train_test_split(
    df_join["code"], df_join["label"], test_size=0.3, random_state=42
)
print(type(train_code))
print(type(train_labels))

# 33% of temp(30%) is 10 % validation
val_code, test_code, val_labels, test_labels = train_test_split(
    temp_code, temp_labels, test_size=0.33, random_state=42
)

<class 'pandas.core.series.Series'>
<class 'pandas.core.series.Series'>


In [6]:
#tokenize the code to get the input_id and attention_mask
#these are all tensor type so later i can put into tensor dataset then subse
#dataloader for training
print(train_code.head())
print(type(train_code))
train_code_ids, train_attention_mask = tokenize_data(train_code, tokenizer)
val_code_ids, val_attention_mask = tokenize_data(val_code, tokenizer)
test_code_ids, test_attention_mask = tokenize_data(test_code, tokenizer)
#label also need convert to tensor
train_labels_tensor = torch.tensor(train_labels.values)
val_labels_tensor = torch.tensor(val_labels.values)
test_labels_tensor = torch.tensor(test_labels.values)


482    import os\nfrom flask import Flask, request\n\...
344    class BankAccount:\n    def __init__(self, bal...
140    import pickle\nimport os\nimport subprocess\n\...
173    import ast\n\nclass RestrictedPython(ast.NodeV...
6      import os\nimport subprocess\n\ndef execute_co...
Name: code, dtype: object
<class 'pandas.core.series.Series'>


In [7]:
#merge train_code_ids and train_attention_mask into one tensor as the input tensor
train_input_tensor = torch.cat((train_code_ids, train_attention_mask), dim=1)
val_input_tensor = torch.cat((val_code_ids, val_attention_mask), dim=1)
test_input_tensor = torch.cat((test_code_ids, test_attention_mask), dim=1)

#create tensor dataset
#split according to X tensor and y tensor , X tensor is input with input id and attention mask
#they are the same size size of inputid == size of attention mask
#lastly is label tensor
train_dataset = TensorDataset(train_input_tensor ,train_labels_tensor)
val_dataset = TensorDataset(val_input_tensor,val_labels_tensor)
test_dataset = TensorDataset(test_input_tensor, test_labels_tensor)

#create train val and test dataloader with batch size 8
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8)
test_dataloader = DataLoader(test_dataset, batch_size=8)


In [8]:
#just some pretty viewing of the dataloader before model training
for batch in train_dataloader:
    # Assuming the batch is a tuple: (input_tensor, labels)
    input_tensor, labels = batch

    # Split the input tensor back into input_ids and attention_mask
    input_ids = input_tensor[:, :input_tensor.shape[1] // 2]  # First half: input_ids
    attention_mask = input_tensor[:, input_tensor.shape[1] // 2:]  # Second half: attention_mask

    # Convert the first sequence in the batch back to code (text)
    decoded_code = tokenizer.decode(input_ids[0], skip_special_tokens=True)

    print(f"Decoded Code: \n{decoded_code}\n")
    print(f"Attention Mask: {attention_mask[0]}")  # Print attention mask for the first example
    print(f"Label: {labels[0]}")  # Print the label for the first example

    break  # Just print one batch

Decoded Code: 
import pickle
import os

class RCE:
    def __reduce__(self):
        return (os.system, ('echo "Remote Code Execution"',))

pickled = pickle.dumps(RCE())
print(pickled)

deserialized = pickle.loads(pickled)

Attention Mask: tensor([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [9]:

from transformers import RobertaForSequenceClassification, AdamW
from tqdm import tqdm

# Load the RobertaForSequenceClassification model
model_name = "microsoft/codebert-base"
model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=2)  # Binary classification


# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Optimizer
optimizer = AdamW(model.parameters(), lr=2e-5)

# Function for training the model
def train(model, train_dataloader, val_dataloader, optimizer, epochs=3):
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        all_labels = []
        all_predictions = []

        # Training loop
        for batch in tqdm(train_dataloader):
            input_tensor, labels = batch
            input_ids = input_tensor[:, :input_tensor.shape[1] // 2]
            attention_mask = input_tensor[:, input_tensor.shape[1] // 2:]

            # Move data to GPU if available
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits

            # Backpropagation
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Collect predictions and labels for metrics
            predictions = torch.argmax(logits, dim=-1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

        # Calculate training metrics
        accuracy = accuracy_score(all_labels, all_predictions)
        recall = recall_score(all_labels, all_predictions, average='weighted')
        f1 = f1_score(all_labels, all_predictions, average='weighted')

        print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss / len(train_dataloader):.4f} - "
              f"Accuracy: {accuracy:.4f} - Recall: {recall:.4f} - F1 Score: {f1:.4f}")

        # Validation after each epoch
        validate(model, val_dataloader)

# Function to evaluate on the validation set
def validate(model, val_dataloader):
    model.eval()
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for batch in val_dataloader:
            input_tensor, labels = batch
            input_ids = input_tensor[:, :input_tensor.shape[1] // 2]
            attention_mask = input_tensor[:, input_tensor.shape[1] // 2:]

            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            predictions = torch.argmax(logits, dim=-1)
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

    # Calculate validation metrics
    accuracy = accuracy_score(all_labels, all_predictions)
    recall = recall_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    print(f"Validation Accuracy: {accuracy:.4f} - Validation Recall: {recall:.4f} - Validation F1 Score: {f1:.4f}")
    model.train()

# Training the model
train(model, train_dataloader, val_dataloader, optimizer)



pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]


  0%|          | 0/75 [00:00<?, ?it/s][A
  1%|▏         | 1/75 [00:02<03:12,  2.60s/it][A
  3%|▎         | 2/75 [00:03<01:45,  1.45s/it][A
  4%|▍         | 3/75 [00:03<01:17,  1.08s/it][A
  5%|▌         | 4/75 [00:04<01:04,  1.10it/s][A
  7%|▋         | 5/75 [00:05<00:57,  1.23it/s][A
  8%|▊         | 6/75 [00:05<00:52,  1.32it/s][A
  9%|▉         | 7/75 [00:06<00:48,  1.39it/s][A
 11%|█         | 8/75 [00:07<00:46,  1.44it/s][A
 12%|█▏        | 9/75 [00:07<00:45,  1.47it/s][A
 13%|█▎        | 10/75 [00:08<00:43,  1.49it/s][A
 15%|█▍        | 11/75 [00:09<00:42,  1.51it/s][A
 16%|█▌        | 12/75 [00:09<00:41,  1.52it/s][A
 17%|█▋        | 13/75 [00:10<00:40,  1.53it/s][A
 19%|█▊        | 14/75 [00:10<00:39,  1.53it/s][A
 20%|██        | 15/75 [00:11<00:39,  1.53it/s][A
 21%|██▏       | 16/75 [00:12<00:38,  1.53it/s][A
 23%|██▎       | 17/75 [00:12<00:37,  1.54it/s][A
 24%|██▍       | 18/75 [00:13<00:37,  1.54it/s][A
 25%|██▌       | 19/75 [00:14<00:36,  1.54it/s]

Epoch 1/3 - Loss: 0.5654 - Accuracy: 0.6762 - Recall: 0.6762 - F1 Score: 0.6754
Validation Accuracy: 0.9000 - Validation Recall: 0.9000 - Validation F1 Score: 0.8993


100%|██████████| 75/75 [00:51<00:00,  1.45it/s]


Epoch 2/3 - Loss: 0.2789 - Accuracy: 0.8853 - Recall: 0.8853 - F1 Score: 0.8853
Validation Accuracy: 0.9059 - Validation Recall: 0.9059 - Validation F1 Score: 0.9058


100%|██████████| 75/75 [00:54<00:00,  1.39it/s]


Epoch 3/3 - Loss: 0.1918 - Accuracy: 0.9325 - Recall: 0.9325 - F1 Score: 0.9325
Validation Accuracy: 0.9000 - Validation Recall: 0.9000 - Validation F1 Score: 0.8993


In [10]:
for batch in val_dataloader:
            input_tensor, labels = batch
            input_ids = input_tensor[:, :input_tensor.shape[1] // 2]
            attention_mask = input_tensor[:, input_tensor.shape[1] // 2:]

            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            print("X train " , input_ids.shape)
            print(" Y train " , labels.shape)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            predictions = torch.argmax(logits, dim=-1)
            print(predictions.shape)

            break

X train  torch.Size([8, 493])
 Y train  torch.Size([8])
torch.Size([8])


In [12]:

# Function to evaluate the model with unseen test set
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def test(model, test_dataloader,device):
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_predictions = []

    with torch.no_grad():  # Disable gradient calculation
        for batch in test_dataloader:
            input_tensor, labels = batch
            input_ids = input_tensor[:, :input_tensor.shape[1] // 2]
            attention_mask = input_tensor[:, input_tensor.shape[1] // 2:]

            # Move data to the appropriate device (GPU/CPU)
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            # Model inference (forward pass)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs.logits

            # Get predictions by taking the class with the highest logit value
            predictions = torch.argmax(logits, dim=-1)

            # Store predictions and actual labels
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predictions.cpu().numpy())

    # Calculate test metrics
    print("all_labels", len(all_labels))
    print("all_predictions", len(all_predictions))
    print("all labels,", all_labels)
    print("all predictions,", all_predictions)

    # Overall accuracy
    accuracy = accuracy_score(all_labels, all_predictions)

    # Precision, Recall, and F1-score for each class
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_predictions, average=None)

    # Print the test results in a structured format
    print(f"{'':<10}{'Precision':<10}{'Recall':<10}{'F1-Score':<10}")
    print(f"{'Not Vuln':<10}{precision[0]:<10.2f}{recall[0]:<10.2f}{f1[0]:<10.2f}")
    print(f"{'Vuln':<10}{precision[1]:<10.2f}{recall[1]:<10.2f}{f1[1]:<10.2f}")
    print(f"\n{'Accuracy':<10}{accuracy:.4f}")


# Run the model on test dataloader
test(model, test_dataloader, device)

all_labels 85
all_predictions 85
all labels, [1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1]
all predictions, [1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1]
          Precision Recall    F1-Score  
Not Vuln  0.97      0.77      0.86      
Vuln      0.80      0.98      0.88      

Accuracy  0.8706


#save on google drive model and tokenizer
model.save_pretrained("/content/drive/MyDrive/websec/model/php_xformer/model")
tokenizer.save_pretrained("/content/drive/MyDrive/websec/model/php_xformer/token")
#zip
import shutil
# Zip the directory into a file
shutil.make_archive("/content/drive/MyDrive/websec/model/php_xformer", 'zip', '/content/drive/MyDrive/websec/model', 'php_xformer')


In [None]:

# save model as pth state file
#torch.save(model.state_dict(), '/content/drive/MyDrive/websec/model/xformer_php_model.pth')

#to use the model
#from transformers import RobertaForSequenceClassification
# Initialize the model
#model = RobertaForSequenceClassification.from_pretrained('roberta-base')
# Load the saved model state_dict
#model.load_state_dict(torch.load('/content/drive/MyDrive/websec/model/xformer_python_model.pth'))
# Set the model to evaluation mode if you're not training
#model.eval()
# Verify the model is loaded properly
#print(model)