In [27]:
#reward based learning on testing data

In [28]:
import requests
import re
import json
import zipfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import csv
import unicodedata
import itertools
import logging
from os import listdir
from os.path import isfile, join
from typing import KeysView
from pandas.core.frame import DataFrame
from sklearn.metrics import confusion_matrix
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import precision_recall_fscore_support as score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer,TfidfVectorizer
from sklearn.metrics import balanced_accuracy_score
import os
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, Trainer, TrainingArguments
import torch

In [None]:
def get_nvd_data():
    r = requests.get('https://nvd.nist.gov/vuln/data-feeds#JSON_FEED')
    for filename in re.findall("nvdcve-1.1-[0-9]*\.json\.zip",r.text):
        print(filename)
        r_file = requests.get("https://nvd.nist.gov/feeds/json/cve/1.1/" + filename, stream=True)
        filePath = "zipFile"
        if not os.path.exists(filePath):
            os.makedirs(filePath)
        with open("zipFile/" + filename, 'wb') as f:
            for chunk in r_file:
                f.write(chunk)
get_nvd_data()

In [None]:
def unzip_data():
    files = [f for f in listdir("zipFile/") if isfile(join("zipFile/", f))]
    files.sort()
    for file in files:
        print("Opening: " + file)
        archive = zipfile.ZipFile(join("zipFile/", file), 'r')
        filePath = "jsonFile"
        if not os.path.exists(filePath):
            os.makedirs(filePath)
        with archive as f:
            f.extractall('jsonFile')
unzip_data()

In [31]:
def create_nvd_dict(year):
    filename = join("jsonFile/nvdcve-1.1-" + str(year) + ".json")
    #print("Opening: " + filename)
    with open(filename, encoding="utf8") as json_file:
        cve_dict = json.load(json_file)
    return(cve_dict)

def generate_CVSSV3csv_for_training():
    list = listdir("jsonFile/")
    number_files = len(list)
    print(number_files)
    for year in range(2020,2025):
        year_in_string = str(year)
        cve_dict = create_nvd_dict(year)
        fileName = 'NVD_'+ year_in_string + '_CVSSV3_train.csv'
        with open('trainCVSSV3/' + fileName, 'w', newline='') as f_output:
            csv_output = csv.writer(f_output)
            csv_output.writerow(['CVE_ID', 'PublishTime','ModifyTime','Report','CVSSV3','AttackVector','AttackComplexity','PrivilegesRequired',
                             'UserInteraction','Scope','ConfidentialityImpact','IntegrityImpact','AvailabilityImpact'])
            for item in cve_dict['CVE_Items']:
                cve_id = item['cve']['CVE_data_meta']['ID']
                report = item['cve']['description']['description_data'][0]['value']
                publish = item['publishedDate']
                modify = item['lastModifiedDate']
                if not report.find("**REJECT**"):
                    continue
                if 'baseMetricV3' not in item['impact']:
                    continue
                elif 'baseMetricV3' in item['impact']:
                    cvssv3_base_score = item['impact']['baseMetricV3']['cvssV3']['baseScore']
                    attackVector = item['impact']['baseMetricV3']['cvssV3']['attackVector']
                    attackComplexity = item['impact']['baseMetricV3']['cvssV3']['attackComplexity']
                    privilegesRequired = item['impact']['baseMetricV3']['cvssV3']['privilegesRequired']
                    userInteraction = item['impact']['baseMetricV3']['cvssV3']['userInteraction']
                    scope = item['impact']['baseMetricV3']['cvssV3']['scope']
                    confidentialityImpact = item['impact']['baseMetricV3']['cvssV3']['confidentialityImpact']
                    integrityImpact = item['impact']['baseMetricV3']['cvssV3']['integrityImpact']
                    availabilityImpact = item['impact']['baseMetricV3']['cvssV3']['availabilityImpact']

                    csv_output.writerow([cve_id, publish, modify,report, cvssv3_base_score,
                                 attackVector, attackComplexity, privilegesRequired, userInteraction,
                                 scope, confidentialityImpact, integrityImpact, availabilityImpact])

In [None]:
generate_CVSSV3csv_for_training()

In [33]:
#Generate training dataset using NVD reports from 2020 to 2025.
def generate_CombinedFile():
    list = listdir("trainCVSSV3/")
    number_files = len(list)-1
    dict = []
    dict_of_reports = {}
    for year in range(2020,2025):
        year_in_string = str(year)
        file_name = 'NVD_'+ year_in_string + '_CVSSV3_train.csv'
        dict_of_reports[year_in_string] = []
        dict_of_reports[year_in_string] = pd.read_csv("trainCVSSV3/" + file_name)
        dict.append(dict_of_reports[year_in_string])
    df = pd.concat(dict, ignore_index=True)
    return df

In [34]:
#data frame generation for items in 'AttackVector'
df = generate_CombinedFile()
df.dropna(inplace=True)
train_df, val_test_df = train_test_split(df, test_size=0.3, random_state=42)
val_df, test_df = train_test_split(val_test_df, test_size=0.5, random_state=42)
# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Tokenize a list of descriptions
#def tokenize_descriptions(text_list, tokenizer, max_length=512, padding=True):
#    encodings = []
#    for text in text_list:
#        # Encode text to token IDs with padding and truncation
#        encoded = tokenizer.encode_plus(text,max_length=max_length,padding='max_length' if padding else None,truncation=True,return_tensors='pt') # Change to your needed format (e.g., 'pt' for PyTorch tensors, 'tf' for TensorFlow, etc.)
#    encodings.append(encoded['input_ids'].tolist()) # Extract input IDs
#   return encodings

def tokenize_descriptions(text_list, tokenizer, max_length=512, padding=True):
# Encode the list of texts, processing them as a batch
    return tokenizer(text_list,max_length=max_length,padding='max_length' if padding else False,truncation=True,return_tensors='pt')

train_encodings = tokenize_descriptions(train_df['Report'].tolist(), tokenizer)
val_encodings = tokenize_descriptions(val_test_df['Report'].tolist(), tokenizer)
test_encodings = tokenize_descriptions(val_test_df['Report'].tolist(), tokenizer)

In [35]:
#this section is for Attack Vector item
label_map = {'NETWORK': 0, 'ADJACENT_NETWORK': 1, 'LOCAL': 2, 'PHYSICAL': 3} 
train_labels = [label_map[label] for label in train_df['AttackVector']]
val_labels = [label_map[label] for label in val_df['AttackVector']]
test_labels = [label_map[label] for label in test_df['AttackVector']]


class VulnerabilityDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = VulnerabilityDataset(train_encodings, train_labels)
val_dataset = VulnerabilityDataset(val_encodings, val_labels)
test_dataset = VulnerabilityDataset(test_encodings, test_labels)



In [None]:
#Fine Tune model using Hugging Face
#loading pretrained model 
model_name = "bert-base-uncased"
# This is for the 4 labels in AttackVector
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=4)
# Prepare training arguments
training_args = TrainingArguments(
output_dir='./results',
# Choose the number of epochs, less epochs: Underfitting, too many: overfitting. 
# Check Evaluation metrics and adjust this. Computationally Intensive
num_train_epochs=3, #set to 10 when given more computing power 
# Adjust batch size based on GPU memory. No Gpu now so keep this unaltered, change during deplayment phase
per_device_train_batch_size=1, #set to 8 when given more computing power
per_device_eval_batch_size=2, #set to 16 when given more computing power
warmup_steps=500,
weight_decay=0.01,
logging_dir='./logs',
logging_steps=10,
evaluation_strategy="epoch",
save_strategy="epoch",
)

In [None]:
# Define Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
)

# Fine-tune the model
trainer.train()

# Evaluate the model
trainer.evaluate(eval_dataset=test_dataset)