In [None]:
from google.colab import drive
drive.mount('/content/drive/')


Mounted at /content/drive/


# Step 1: Install and Import Libraries


In [None]:
!pip install transformers

import pandas as pd
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
from transformers import AdamW



from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import torch




# **Step2: Load SQL Injection and Other Vulnerabilities Datasets**

In [None]:
import pandas as pd
from sklearn.utils import shuffle

# Load the SQL Injection dataset
sql_data = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Train/updated/sql_injection2.0 (3).csv')
# Load the Other Vulnerabilities dataset (for XSS, CSRF, RCE)
other_vuln_data = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Train/updated/Others_vuln2.0 (1).csv')

# Shuffle the datasets
sql_data_shuffled = shuffle(sql_data)
other_vuln_data_shuffled = shuffle(other_vuln_data)

# Verify the datasets
print("SQL Injection Dataset:")
print(sql_data_shuffled.head())

print("\nOther Vulnerabilities Dataset:")
print(other_vuln_data_shuffled.head())

# Print dataset columns
print("\nSQL Injection Dataset Columns:", sql_data_shuffled.columns)
print("Other Vulnerabilities Dataset Columns:", other_vuln_data_shuffled.columns)


SQL Injection Dataset:
      ID                                          SQL Query Injection Type  \
325  326  DELETE FROM orders WHERE order_id = ' UNION SE...  Boolean-based   
520  521             SELECT * FROM users WHERE id = 'apple'            NaN   
559  560              DELETE FROM orders WHERE order_id = 2            NaN   
528  529     DELETE FROM orders WHERE order_id = 100 OR 1=1    Error-based   
519  520          SELECT * FROM products WHERE name = '100'            NaN   

    Vulnerability Status  
325                  Yes  
520                   No  
559                   No  
528                  Yes  
519                   No  

Other Vulnerabilities Dataset:
      ID Attack Type                        Attack Vector  \
526  711         XSS              '><svg/onload=alert(1)>   
458  615         RCE                              ?cmd=ls   
238  327         RCE                              ?cmd=ls   
535  722         XSS              '><svg/onload=alert(1)>   
501  677 

# Step 3: Tokenize Both Datasets Using CodeBERT

In [None]:
from transformers import RobertaTokenizer
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')

# Function to tokenize data from multiple columns and combine input_ids
def tokenize_data(dataframe, text_columns):
    # Assuming concatenation of texts from different columns for tokenization
    concatenated_texts = dataframe[text_columns].apply(lambda x: ' '.join(x.dropna().values), axis=1)
    return tokenizer(concatenated_texts.tolist(), padding="max_length", truncation=True, max_length=512, return_tensors='pt')

# Columns to tokenize - choose how you concatenate based on your model's capacity to handle input
sql_columns = ['SQL Query', 'Vulnerability Status']
other_vuln_columns = ['Attack Vector', 'Vulnerability Status']

# Tokenize data
sql_encoded = tokenize_data(sql_data, sql_columns)
other_vuln_encoded = tokenize_data(other_vuln_data, other_vuln_columns)

# Printing to verify the outputs
print("SQL Encoded Data:")
print(f"Input IDs: {sql_encoded['input_ids'].shape}")
print(f"Attention Masks: {sql_encoded['attention_mask'].shape}")

print("\nOther Vulnerabilities Encoded Data:")
print(f"Input IDs: {other_vuln_encoded['input_ids'].shape}")
print(f"Attention Masks: {other_vuln_encoded['attention_mask'].shape}")
# Example of preprocessing and checking first few tokenized examples
sample_data = tokenize_data(other_vuln_data, ['Attack Vector', 'Description'])
print(sample_data['input_ids'][:5])
print(sample_data['attention_mask'][:5])


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]



SQL Encoded Data:
Input IDs: torch.Size([1000, 512])
Attention Masks: torch.Size([1000, 512])

Other Vulnerabilities Encoded Data:
Input IDs: torch.Size([738, 512])
Attention Masks: torch.Size([738, 512])
tensor([[    0, 47060,  2069,  ...,     1,     1,     1],
        [    0, 47060,  2069,  ...,     1,     1,     1],
        [    0, 24522, 16204,  ...,     1,     1,     1],
        [    0,  2709,  4462,  ...,     1,     1,     1],
        [    0,   116, 48211,  ...,     1,     1,     1]])
tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])


# Step 5: Create a PyTorch Dataset Class and Split Both Datasets for Training and Validation
This class converts the tokenized data into a format compatible with PyTorch’s Dataset.

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch

class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: val[idx].clone().detach() for key, val in self.encodings.items()}  # Correct way to handle tensor copying
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)
        #return len(self.labels)

# Assuming sql_encoded and other_vuln_encoded are already prepared using your tokenization function
class CodeDataset_other(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = [float(label) for label in labels]

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

    def __len__(self):
        return len(self.labels)

# Assuming sql_encoded and other_vuln_encoded are already prepared using your tokenization function
sql_labels = [1 if x == 'Yes' else 0 for x in sql_data['Vulnerability Status']]
# Assuming other_labels is your label array
other_labels = torch.tensor([1 if x == 'Yes' else 0 for x in other_vuln_data['Vulnerability Status']])


# Create dataset instances
sql_dataset = CustomDataset(sql_encoded, sql_labels)
other_vuln_dataset = CodeDataset_other(other_vuln_encoded, other_labels)

# Example: Using DataLoader to handle batches
train_loader = DataLoader(sql_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(other_vuln_dataset, batch_size=16, shuffle=False)

# Example loop to illustrate how to handle data loader outputs
for batch in train_loader:
    print(batch)  # This should print the batches without warnings
    break


{'input_ids': tensor([[    0, 10089,  3850,  ...,     1,     1,     1],
        [    0, 49179,  1009,  ...,     1,     1,     1],
        [    0, 49179,  1009,  ...,     1,     1,     1],
        ...,
        [    0, 49179,  1047,  ...,     1,     1,     1],
        [    0, 49179,  1047,  ...,     1,     1,     1],
        [    0, 34543,  2349,  ...,     1,     1,     1]]), 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        ...,
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]]), 'labels': tensor([0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0])}


In [None]:
# Assuming sql_dataset and other_vuln_dataset are your datasets prepared from previous steps
train_sql, val_sql = train_test_split(sql_dataset, test_size=0.02, random_state=42)
train_other, val_other = train_test_split(other_vuln_dataset , test_size=0.02, random_state=42)

In [None]:

import pandas as pd
sql_data = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Train/sql_injection2.0.csv')
other_vuln_data = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Train/Others_vuln2.0.csv')
vulnerability_counts = other_vuln_data['Attack Type'].value_counts()
print("Vulnerability Type Counts:\n", vulnerability_counts)

Vulnerability Type Counts:
 Attack Type
CSRF    273
XSS     233
RCE     232
Name: count, dtype: int64


# Step 6: Initialize CodeBERT Models for Both Datasets****

In [None]:
# Define sql_model here
sql_model = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)
other_vuln_model =RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)
optimizer = AdamW(other_vuln_model.parameters(), lr=5e-5)

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Train

In [None]:
!pip install seqeval



Collecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: seqeval
  Building wheel for seqeval (setup.py) ... [?25l[?25hdone
  Created wheel for seqeval: filename=seqeval-1.2.2-py3-none-any.whl size=16161 sha256=f6ace7ad4ce9089f37e338c56f5c01a9ad2d0f8527b89b9760b3a8c5b2fab59d
  Stored in directory: /root/.cache/pip/wheels/1a/67/4a/ad4082dd7dfc30f2abfe4d80a2ed5926a506eb8a972b4767fa
Successfully built seqeval
Installing collected packages: seqeval
Successfully installed seqeval-1.2.2


# Compute_metrics Function

In [None]:
def compute_metrics(p):
    logits, labels = p

    # Ensure logits are a PyTorch tensor
    if not isinstance(logits, Tensor):
        logits = torch.tensor(logits)  # Convert numpy array to tensor if necessary

    # Now safely apply softmax to convert logits to probabilities
    probabilities = torch.softmax(logits, dim=1).detach().cpu().numpy()
    predictions = np.argmax(probabilities, axis=1)  # Convert probabilities to class labels

    # Ensure labels are a numpy array for consistent handling
    if isinstance(labels, Tensor):
        ground_truths = labels.detach().cpu().numpy()
    else:
        ground_truths = labels  # Assuming labels are already numpy arrays

    # Assuming labels and predictions are correctly aligned for metric calculation
    precision, recall, f1, _ = precision_recall_fscore_support(ground_truths, predictions, average='binary')

    # Check for the number of classes in ground truth
    if len(np.unique(ground_truths)) > 1:
        # Check if probabilities has more than one column before accessing the second column
        if probabilities.shape[1] > 1:
            auc_roc = roc_auc_score(ground_truths, probabilities[:, 1])  # Assuming binary classification and probabilities are computed
        else:
            auc_roc = roc_auc_score(ground_truths, probabilities[:, 0])  # Use the first column for binary classification
    else:
        auc_roc = 0 # Set AUC to 0 if only one class is present


    return {
        "accuracy": accuracy_score(ground_truths, predictions),
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "auc_roc": auc_roc
    }

In [None]:
from transformers import Trainer, TrainingArguments
from transformers import RobertaForSequenceClassification

# Define sql_model here
sql_model = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)
other_vuln_model = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)

# Setup training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
)

# Initialize the Trainer for SQL model
trainer_sql = Trainer(
    model=sql_model,
    args=training_args,
    train_dataset=train_sql,
    eval_dataset=val_sql,
    compute_metrics=compute_metrics  # Using the revised compute_metrics function
)
trainer_other = Trainer(
    model=other_vuln_model,
    args=training_args,
    train_dataset=train_other,
    eval_dataset=val_other,
    compute_metrics=compute_metrics  # Using the revised compute_metrics function
)

# Assuming 'labels' is the column name for your target variable
def check_labels(dataset):
    """Checks the shape and type of labels in a dataset."""
    for item in dataset:
        print(item['labels'], item['labels'].shape, type(item['labels']))
        break  # Print only the first item for demonstration

# Check the labels in your datasets
check_labels(train_other)
check_labels(val_other)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


tensor(1.) torch.Size([]) <class 'torch.Tensor'>
tensor(1.) torch.Size([]) <class 'torch.Tensor'>


In [None]:
from transformers import Trainer, TrainingArguments
from transformers import RobertaForSequenceClassification

# Define sql_model here
sql_model = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)
other_vuln_model = RobertaForSequenceClassification.from_pretrained('microsoft/codebert-base', num_labels=2)

# Setup training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
)

# Initialize the Trainer for SQL model
trainer_sql = Trainer(
    model=sql_model,
    args=training_args,
    train_dataset=train_sql,
    eval_dataset=val_sql,
    compute_metrics=compute_metrics  # Using the revised compute_metrics function
)
trainer_other = Trainer(
    model=other_vuln_model,
    args=training_args,
    train_dataset=train_other,
    eval_dataset=val_other,
    compute_metrics=compute_metrics  # Using the revised compute_metrics function
)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Train both models
trainer_sql.train()


Epoch,Training Loss,Validation Loss


NameError: name 'Tensor' is not defined

In [None]:
trainer_other.train()

In [None]:
print(other_vuln_data['Vulnerability Status'].value_counts())


RESULT of Train

RESULT of Train

In [None]:
model_path_sql = './model_sql'
model_path_other = './model_other'
trainer_sql.model.save_pretrained(model_path_sql)
trainer_other.model.save_pretrained(model_path_other)

# Build a Scraping

Predicting the the vulnerbility

In [None]:
!pip install transformers # Ensure transformers is installed.
from transformers import RobertaTokenizer, RobertaForSequenceClassification # Import the RobertaTokenizer and RobertaForSequenceClassification classes
import requests
from bs4 import BeautifulSoup
import re
import torch # Import torch

# Load tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
sql_model = RobertaForSequenceClassification.from_pretrained('/content/model_sql')
other_vuln_model = RobertaForSequenceClassification.from_pretrained('/content/model_sql')


def scrape_webpage(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')

    html_content = str(soup)
    script_content = [script.text for script in soup.find_all('script') if script.text]
    hidden_inputs = soup.find_all('input', {'type': 'hidden'})
    data_attributes = [tag for tag in soup.find_all(attrs=re.compile(r'^data-'))]
    sql_candidates = []

    for input_tag in hidden_inputs:
        if 'sql' in input_tag.get('value', '').lower():
            sql_candidates.append(input_tag['value'])

    for data_tag in data_attributes:
        for attribute, value in data_tag.attrs.items():
            if 'sql' in attribute.lower() and isinstance(value, str):
                sql_candidates.append(value)

    return html_content, script_content, sql_candidates


def suggest_fixes(sql_prediction, other_vuln_prediction, script_content):
  """Provides code suggestions based on detected vulnerabilities."""
  suggestions = []
  if sql_prediction == 1:
    suggestions.append(
        "**SQL Injection Detected:**\n"
        "**Suggestion:** Sanitize user inputs before using them in SQL queries.\n"
        "**Example:** Use parameterized queries or escape special characters."
    )

  if other_vuln_prediction == 1:
    suggestions.append(
        "**Other Vulnerability (XSS, CSRF, RCE) Detected:**\n"
        "**Suggestion:** Investigate the script_content for potential vulnerable code.\n"
        "**Possible fixes:**\n"
        " - **XSS:** Encode user input before displaying it.\n"
        " - **CSRF:** Use anti-CSRF tokens.\n"
        " - **RCE:** Validate user input to prevent code execution."
    )


  if not suggestions:
    suggestions.append("No vulnerabilities detected.")

  return suggestions


def predict_and_suggest(url):
    """Predicts vulnerabilities and suggests fixes for a given URL."""
    html_content, script_content, sql_candidates = scrape_webpage(url) # scrape_webpage is now defined
    encoded_input = tokenizer(script_content, return_tensors="pt", truncation=True, padding=True)
    sql_outputs = sql_model(**encoded_input)
    other_vuln_outputs = other_vuln_model(**encoded_input)
    sql_predictions = torch.argmax(sql_outputs.logits, dim=-1)
    other_vuln_predictions = torch.argmax(other_vuln_outputs.logits, dim=-1)

    suggestions = suggest_fixes(sql_predictions.item(), other_vuln_predictions.item(), script_content)

    return suggestions


# Example Usage
url = "https://www.facebook.com/"  # Replace with the target URL
suggestions = predict_and_suggest(url)

for suggestion in suggestions:
  print(suggestion)

In [None]:
!pip install transformers # Ensure transformers is installed.

In [None]:
!pip install transformers # Ensure transformers is installed.
from transformers import RobertaTokenizer, RobertaForSequenceClassification # Import the RobertaTokenizer and RobertaForSequenceClassification classes

# Load tokenizer and model
tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
sql_model = RobertaForSequenceClassification.from_pretrained('/content/model_sql')
other_vuln_model = RobertaForSequenceClassification.from_pretrained('/content/model_sql')


def suggest_fixes(sql_prediction, other_vuln_prediction, script_content):
  """Provides code suggestions based on detected vulnerabilities."""
  suggestions = []
  if sql_prediction == 1:
    suggestions.append(
        "**SQL Injection Detected:**\n"
        "**Suggestion:** Sanitize user inputs before using them in SQL queries.\n"
        "**Example:** Use parameterized queries or escape special characters."
    )

  if other_vuln_prediction == 1:
    suggestions.append(
        "**Other Vulnerability (XSS, CSRF, RCE) Detected:**\n"
        "**Suggestion:** Investigate the script_content for potential vulnerable code.\n"
        "**Possible fixes:**\n"
        " - **XSS:** Encode user input before displaying it.\n"
        " - **CSRF:** Use anti-CSRF tokens.\n"
        " - **RCE:** Validate user input to prevent code execution."
    )


  if not suggestions:
    suggestions.append("No vulnerabilities detected.")

  return suggestions


def predict_and_suggest(url):
    """Predicts vulnerabilities and suggests fixes for a given URL."""
    html_content, script_content, sql_candidates = scrape_webpage(url)
    encoded_input = tokenizer(script_content, return_tensors="pt", truncation=True, padding=True)
    sql_outputs = sql_model(**encoded_input)
    other_vuln_outputs = other_vuln_model(**encoded_input)
    sql_predictions = torch.argmax(sql_outputs.logits, dim=-1)
    other_vuln_predictions = torch.argmax(other_vuln_outputs.logits, dim=-1)

    suggestions = suggest_fixes(sql_predictions.item(), other_vuln_predictions.item(), script_content)

    return suggestions


# Example Usage
url = "https://www.facebook.com/"  # Replace with the target URL
suggestions = predict_and_suggest(url)

for suggestion in suggestions:
  print(suggestion)