<a href="https://colab.research.google.com/github/alenabd24/LLM_Fault_Tolerance/blob/main/BO_Flipped_Bits__Classification_Head_GraphCodeBERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Buffer Overflow CWE Classification with Adversarial Flip-bit Injection**

## Purpose of the script:
1. Train a GraphCodeBERT-based model to classify code snippets into different CWE types (specifically those related to buffer overflows).

2. Introduce bit-flip noise into the tokenized data to simulate data corruption or adversarial attacks.

3. Evaluate how this noise affects the model's accuracy and robustness.

---

Intital Setup: Installing ML and NLP-related libraries, mainly from hugging face

In [None]:
!pip install datasets
!pip install transformers
!pip install accelerate -U
!pip install transformers[torch]
!pip install wandb



**The following libraries are installed:**

1. datasets – A library from Hugging Face for easily accessing and processing large datasets, especially for machine learning and NLP tasks.

2. transformers – The main Hugging Face library for loading and using pre-trained LLMs like GPT, BERT, and T5.

3. accelerate – A library that helps optimize and speed up training large models on multiple GPUs or TPUs.

4. transformers[torch] – Installs the transformers library with PyTorch dependencies (ensuring PyTorch is installed).

5. wandb – Weights & Biases, a popular tool for experiment tracking, hyperparameter tuning, and logging during ML model training.

In [None]:
from tqdm import tqdm, trange
import multiprocessing

from transformers import (WEIGHTS_NAME, AdamW, get_linear_schedule_with_warmup,
                          RobertaConfig, RobertaForSequenceClassification, RobertaTokenizer)
from datasets import Dataset
from transformers import RobertaTokenizer, RobertaForMaskedLM, pipeline
from transformers import DataCollatorWithPadding
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

import torch

!pip install evaluate
import evaluate
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split



# More imported libraries and modules

* tqdm & trange  
Progress bars for tracking training loops.  

* multiprocessing  
Enables parallel processing for efficiency.  

## Transformers & Hugging Face Libraries  
- **RobertaConfig** → Configuration settings for RoBERTa models.  
- **RobertaForSequenceClassification** → RoBERTa model for classification tasks.  
- **RobertaTokenizer** → Tokenizer for RoBERTa (converts text into tokenized inputs).  
- **RobertaForMaskedLM** → RoBERTa for Masked Language Modeling (predicting masked words).  
- **pipeline** → High-level API for using pre-trained models easily.  
- **DataCollatorWithPadding** → Ensures tokenized inputs are correctly padded for training.  
- **AutoModelForSequenceClassification** → Generic method for loading classification models.  
- **TrainingArguments & Trainer** → Utilities for managing model training.  

## Torch & Optimizers  
- **torch** → PyTorch framework for training deep learning models.  
- **AdamW** → Optimizer designed for transformers.  
- **get_linear_schedule_with_warmup** → Learning rate scheduler.  

## Additional Libraries  
- **evaluate** → A package for computing accuracy, F1-score, etc., similar to `datasets.metric`.  
- **numpy & pandas** → For handling datasets and numerical operations.  
- **sklearn.train_test_split** → Splits data into training and test sets.  


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


This command mounts my Google Drive to the Colab environment via the path (/content/drive).
After running this, the colab environment will have access to all files in my G-Drive

Reading the CSV

In [None]:
import pandas as pd
df=pd.read_csv('/content/drive/MyDrive/Colab_Notebooks/MSc_Fault_Tolerance/processed_data_diversevul_2.csv')

Reading the csv inside the G-Drive MSc_Fault_Tolerance project folder

In [None]:
import pandas as pd

# List of buffer overflow related CWEs
buffer_overflow_cwes = [
    'CWE119', 'CWE120', 'CWE121', 'CWE122', 'CWE123', 'CWE124',
    'CWE125', 'CWE787', 'CWE805', 'CWE680', 'CWE131', 'CWE170',
    'CWE369', 'CWE415'
]

# Creating a list of specific CWE identifiers related to buffer overflow vulnerabilities

# Filter the dataset to include only rows where the CWE-Type column matches one of the buffer overflow CWEs
# This line filters the DataFrame (df) to only include rows where the value in the CWE-Type column matches
# any of the CWEs in the buffer_overflow_cwes list.
filtered_df = df[df['CWE-Type'].isin(buffer_overflow_cwes)]

# Show unique values in the CWE-Type column after filtering
# extracts the unique values in the CWE-Type column of the filtered DataFrame (filtered_df), i.e.,
# the distinct CWEs that match buffer overflow vulnerabilities. It prints those unique CWEs so you can see which vulnerabilities are present
unique_cwes = filtered_df['CWE-Type'].unique()
print("Unique CWEs in the filtered dataset:", unique_cwes)

# Save the filtered dataset to a new CSV file
filtered_df.to_csv('filtered_dataset.csv', index=False)

print("Dataset has been filtered and saved as 'filtered_dataset.csv'")


Unique CWEs in the filtered dataset: ['CWE787' 'CWE119' 'CWE120' 'CWE415' 'CWE125' 'CWE369' 'CWE131' 'CWE121'
 'CWE122' 'CWE680' 'CWE805']
Dataset has been filtered and saved as 'filtered_dataset.csv'


CWEs are used to classify and identify different types of security vulnerabilities, in this case related to buffer overflow.

filtered_df = df[df['CWE-Type'].isin(buffer_overflow_cwes)]:
* Only rows where condition of (CWE-type present in buffer_overflow_cwes = true) are kept.
* Returns a new DataFrame with only the rows where the CWE-type column matches one of the buffer overflow CWEs

In [None]:
filtered_df.to_csv('/content/drive/MyDrive/Colab_Notebooks/MSc_Fault_Tolerance/bo_filtered_dataset.csv', index=False)

In [None]:
len(filtered_df)

107874

In [None]:

# Define a list of CWEs to filter
target_cwes = ['CWE0', 'CWE787', 'CWE119', 'CWE120', 'CWE415' ,'CWE125', 'CWE369', 'CWE131',
 'CWE121', 'CWE122', 'CWE680', 'CWE805' ]

# Include CWEs in the range 780-790
#for i in range(780, 791):
#    target_cwes.append('CWE' + str(i))

# Filter entries containing CWEs in the specified range or in the target list
filtered_df = df[df['CWE-Type'].str.contains('|'.join(target_cwes))]

# Output the filtered dataframe
print(filtered_df)

                                                     code CWE-Type
0       int _gnutls_ciphertext2compressed(gnutls_sessi...     CWE0
2       unpack_Z_stream(int fd_in, int fd_out)\n{\n\tI...     CWE0
3       static void cirrus_do_copy(CirrusVGAState *s, ...   CWE787
4       glue(cirrus_bitblt_rop_fwd_, ROP_NAME)(CirrusV...   CWE787
5       static int cirrus_bitblt_videotovideo_copy(Cir...   CWE787
...                                                   ...      ...
409114  CpuDefinitionInfoList *qmp_query_cpu_definitio...     CWE0
409115  static bool loongarch_cpu_exec_interrupt(CPUSt...     CWE0
409116  static bool loongarch_cpu_has_work(CPUState *c...     CWE0
409117  static void loongarch_cpu_add_definition(gpoin...     CWE0
409118  static void loongarch_cpu_synchronize_from_tb(...     CWE0

[156645 rows x 2 columns]


In [None]:
# Extract unique CWE types
unique_cwes = filtered_df['CWE-Type'].nunique()

# Output unique CWE types
print("Unique CWE types:", unique_cwes)

Unique CWE types: 12


This shows that there are 12 unique CWE types present in the filtered dataframe, filtered_df

In [None]:
df=filtered_df

In [None]:
df = df.astype(str)

astype(str) converts all the values in the DataFrame(df) to string data type.

In [None]:
# Creating 2 dictionaries that convert between unique CWE types and numerical labels
id2label = dict() # Maps integer index to a CWE-type (0 : 'CWE119)
label2id = dict() # Maps CWE-type to an integer index ('CWE119' : 0)
ind = 0
for i in df['CWE-Type'].unique():
    id2label[ind] = i
    label2id[i] = ind
    ind+=1

Printing the two created dictionaries:

In [None]:
print('id2label dictionary: ')
print(id2label)
print('label2id dictionary: ')
print(label2id)

id2label dictionary: 
{0: 'CWE0', 1: 'CWE787', 2: 'CWE119', 3: 'CWE120', 4: 'CWE415', 5: 'CWE125', 6: 'CWE369', 7: 'CWE131', 8: 'CWE121', 9: 'CWE122', 10: 'CWE680', 11: 'CWE805'}
label2id dictionary: 
{'CWE0': 0, 'CWE787': 1, 'CWE119': 2, 'CWE120': 3, 'CWE415': 4, 'CWE125': 5, 'CWE369': 6, 'CWE131': 7, 'CWE121': 8, 'CWE122': 9, 'CWE680': 10, 'CWE805': 11}


In [None]:
df['label']=df['CWE-Type'].map(label2id)
df.head()

Unnamed: 0,code,CWE-Type,label
0,int _gnutls_ciphertext2compressed(gnutls_sessi...,CWE0,0
2,"unpack_Z_stream(int fd_in, int fd_out)\n{\n\tI...",CWE0,0
3,"static void cirrus_do_copy(CirrusVGAState *s, ...",CWE787,1
4,"glue(cirrus_bitblt_rop_fwd_, ROP_NAME)(CirrusV...",CWE787,1
5,static int cirrus_bitblt_videotovideo_copy(Cir...,CWE787,1


In [None]:
# Splitting the dataset into training(80%) and test (20%) sets
df_train, df_test = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
dataset = {} # Creating an empty dictionary
dataset['text'] = list(df_train['code']) # adding key-value pair to dataset dictionary, 'text' = key and 'code' = value (in the form of a list). Serves as the feature
dataset['label'] = list(df_train['label']) # same, but adding the key-value pair to act as the label (prediction) for the model
# The code below converts dictionary we just created into a Hugging Face dataset object. It provides many convenient NLP features, such as tokenization.
ds = Dataset.from_dict(dataset) # Creation of hugging face dataset object
ds = ds.train_test_split(test_size=0.1) # train/validation split (10% validation)

The code cell above performs the **second (2ND)** data split.

### 1st Split:
* Creating the initial training and test datasets.
* test dataset is entirely separated from the training process
### 2nd Split:
* Splits the training data set into training and validation
* The validation set is used for hyperparameter tuning and intermediate evaluations during the training phase. Happens before testing

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base") # Loading the tokenizer
model = AutoModelForSequenceClassification.from_pretrained("microsoft/graphcodebert-base", num_labels=12,id2label=id2label, label2id=label2id)  # Adjust num_labels according to your classification needs
# Above, a model is loaded for sequence classification, with 12 possible output labels, defined by num_labels=12. Corresponding to 12 types of CWEs

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Sequence classification is a popular task in NLP, where the transformer model (GraphCodeBERT in our case) uses layers of self-attention mechanisms to understand underlying context of tokens in the sequence, and learn how different tokens relate to each other. It generates output representations for each token in the sequence.

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix # model performance evaluation metrics
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)

# A function that calculates accuracy during model evaluation by comparing the predicted labels (after applying argmax) to the true labels.

---

#Tokenization and random bit-flip injection

In [None]:
import datasets  # Import datasets
import numpy as np
from transformers import AutoTokenizer
import torch

# Load the tokenizer - ensure this matches the model you're using
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")  # Replace with your tokenizer if needed

def random_bit_flip(value, n_bits=1):
    """Randomly flips bits, clamping the result to the valid token ID range."""
    token_max_value = tokenizer.vocab_size
    for _ in range(n_bits):
        bit_pos = np.random.randint(0, 8) # Limited to 8 bits
        value ^= (1 << bit_pos)

    # Clamping the value to be within the valid range [0, token_max_value)
    value = min(value, token_max_value - 1)  # Ensure it's less than token_max_value
    value = max(value, 0)  # Ensure it's not negative
    return value

def preprocess_function(examples):
    tokenized = tokenizer(examples["text"], truncation=True)

    for i in range(len(tokenized['input_ids'])):
        sequence = np.array(tokenized['input_ids'][i])
        flipped_sequence = np.array([random_bit_flip(id) for id in sequence])
        tokenized['input_ids'][i] = flipped_sequence.tolist()

    return tokenized

# Determine the device
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using device: {device}")

tokenized_dataset = ds.map(preprocess_function, batched=True)

print("Tokenized Dataset Structure:")
print(f"  Type of tokenized_dataset: {type(tokenized_dataset)}")

try:
    train_dataset = tokenized_dataset["train"]
    print(f"  Features of the 'train' split: {train_dataset.features}")

    sample = train_dataset[0]  # Access the first element of the 'train' split
    print("\nSample from the tokenized dataset (first example of train):")
    print(f"  Type of sample: {type(sample)}")
    print(f"  Keys in sample: {sample.keys()}")
    print(f"  Input IDs: {sample['input_ids'][:20]}...")  # Print the first 20 token IDs
    print(f"  Attention Mask: {sample['attention_mask'][:20]}...")
    print(f"  Length of Input IDs: {len(sample['input_ids'])}")

    input_ids = np.array(sample['input_ids'])
    print(f"Max value in sample's input_ids: {np.max(input_ids)}")
    print(f"Min value in sample's input_ids: {np.min(input_ids)}")

except KeyError:
    print("  Error: No 'train' split found. Check your dataset.")
except Exception as e:
    print(f"  An error occurred: {e}")


Using device: cuda


Map:   0%|          | 0/112784 [00:00<?, ? examples/s]

Map:   0%|          | 0/12532 [00:00<?, ? examples/s]

Tokenized Dataset Structure:
  Type of tokenized_dataset: <class 'datasets.dataset_dict.DatasetDict'>
  Features of the 'train' split: {'text': Value(dtype='string', id=None), 'label': Value(dtype='int64', id=None), 'input_ids': Sequence(feature=Value(dtype='int32', id=None), length=-1, id=None), 'token_type_ids': Sequence(feature=Value(dtype='int8', id=None), length=-1, id=None), 'attention_mask': Sequence(feature=Value(dtype='int8', id=None), length=-1, id=None)}

Sample from the tokenized dataset (first example of train):
  Type of sample: <class 'dict'>
  Keys in sample: dict_keys(['text', 'label', 'input_ids', 'token_type_ids', 'attention_mask'])
  Input IDs: [229, 19982, 10043, 9267, 1027, 5797, 2288, 1033, 2453, 1002, 2354, 6821, 6465, 6139, 976, 14935, 2594, 978, 2359, 6788]...
  Attention Mask: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]...
  Length of Input IDs: 48
Max value in sample's input_ids: 19982
Min value in sample's input_ids: 100


1. **random_bit_flip function**:
* Takes in 2 arguments (Token ID & number of bits to flip)
* In each loop iteration a random bit position is selected and the bit at that position is flipped using XOR operations
* After flipping, the value is clamped to ensure it doesn't exceed the maximum token ID range (0 to tokenizer.vocab_size - 1 = 30,522)
* Function returns modified token ID
* Randomly flips bits in a token ID representation (up to 8 bits) and ensures the result stays within the valid token ID range (0 to tokenizer.vocab_size - 1 = 30,522

2. **preprocess_function**:
* Tokenizes the input text and applies random_bit_flip function to each token ID in the sequences
* Modified tokenized data is returned

Tokenization is a critical preprocessing step for any transformer model. The text needs to be converted into tokens, which are numerical representations that the model can understand.
After tokenization, the dataset is ready for model training or evaluation.

---

Now I'm checking the first entry in the training set dictionary:

In [None]:
print(tokenized_dataset["train"][0])  # Prints the first entry in the training set


{'text': 'int imap_msg_close(struct Context *ctx, struct Message *msg)\n{\n  return mutt_file_fclose(&msg->fp);\n}', 'label': 2, 'input_ids': [229, 19982, 10043, 9267, 1027, 5797, 2288, 1033, 2453, 1002, 2354, 6821, 6465, 6139, 976, 14935, 2594, 978, 2359, 6788, 6592, 4407, 1016, 5764, 2288, 1023, 1061, 2717, 14147, 4783, 1051, 5363, 1027, 4445, 10491, 2062, 878, 996, 5812, 2226, 947, 1030, 1074, 2489, 1023, 1029, 1081, 100], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}


In [None]:
from transformers import Trainer, TrainingArguments

'''
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["test"],
)

'''
training_args = TrainingArguments(
    output_dir="/content/drive/MyDrive/Colab Notebooks/THESIS_PROJECT/MODEL_WEIGHTS/NEW_MODEL_WEIGHTS/graphcodebert_bo",
    learning_rate=2e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    report_to="wandb",
    fp16 = True,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()


  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Accuracy
1,1.3777,1.349408,0.49226
2,1.2243,1.248309,0.528966


Epoch,Training Loss,Validation Loss,Accuracy
1,1.3777,1.349408,0.49226
2,1.2243,1.248309,0.528966
3,1.1039,1.218684,0.543249


TrainOutput(global_step=21147, training_loss=1.2797158549241987, metrics={'train_runtime': 5563.8609, 'train_samples_per_second': 60.812, 'train_steps_per_second': 3.801, 'total_flos': 8.886561356254464e+16, 'train_loss': 1.2797158549241987, 'epoch': 3.0})

* Defines Training Arguments: Specifies how the model should be trained (learning rate, batch size, evaluation strategy, etc.).
* Sets Up the Trainer: Passes the model, datasets, tokenizer, and evaluation metrics to the Trainer for training and evaluation.
* Starts Training: Runs the trainer.train() method, which will train the model on the training dataset and evaluate it periodically on the validation set.

In [None]:
trainer.evaluate()

{'eval_loss': 1.2186839580535889,
 'eval_accuracy': 0.5432492818384934,
 'eval_runtime': 49.1049,
 'eval_samples_per_second': 255.209,
 'eval_steps_per_second': 15.966,
 'epoch': 3.0}

Cell above evaluates performance of the training process

## Now the inference stage begins, below predictions are generated on unseen data from the df_test dataset.

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' # checking gpu availability

preds = [] # empty predictions list
for i in df_test['code'].values: # iterate over test dataset 'code' column values
    with torch.no_grad():        # disabling gradient calculation (faster inference, saves memory)
        inputs = tokenizer(i, return_tensors="pt",  truncation=True).to(device) # tokenize and move input to the device
        logits = model(**inputs).logits
        predicted_class_id = logits.argmax().item()
        preds.append(predicted_class_id)

The code above takes the test set (df_test), tokenizes each input, feeds it through the model, and gets the predicted class label for each code snippet.
It stores these predictions in preds, which can later be compared with the true labels to compute performance metrics like accuracy.

In [None]:
y_true = [id2label[i] for i in df_test['label'].values]
y_pred = [id2label[i] for i in preds]


This code above is used to prepare the true labels (y_true) and the predicted labels (y_pred) for evaluation, by converting numeric labels back to their corresponding CWE-type labels

In [None]:
from sklearn.metrics import classification_report
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

        CWE0       0.28      0.24      0.26      9765
      CWE119       0.15      0.20      0.17      4984
      CWE120       0.00      0.00      0.00      1127
      CWE121       0.00      0.00      0.00        23
      CWE122       0.00      0.00      0.00       327
      CWE125       0.19      0.33      0.24      5657
      CWE131       0.00      0.00      0.00        27
      CWE369       0.04      0.02      0.03       651
      CWE415       0.07      0.00      0.00       895
      CWE680       0.00      0.00      0.00        18
      CWE787       0.24      0.18      0.20      7855

    accuracy                           0.21     31329
   macro avg       0.09      0.09      0.08     31329
weighted avg       0.21      0.21      0.20     31329



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Cell above prints the model performance report with relevant metrics

In [None]:
%%capture output_log
!nvidia-smi  # Example command (Replace with your training script)
print("Training started...")


In [None]:
with open("run_logs.txt", "w") as f:
    f.write(output_log.stdout)


In [None]:
from google.colab import files
files.download("run_logs.txt")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Setting up connection with git for version control

In [None]:
!git config --global user.name "alenabd24"
!git config --global user.email "alenabd24@outlook.com"



In [None]:
!git add .


fatal: not a git repository (or any of the parent directories): .git
