# **Research Internship:** Vulnerability Scoring using AI
###  *by Andres Tito*

### Initializing project

In [2]:
from dotenv import load_dotenv
import datetime
import numpy as np
import os
import pandas as pd
import re

In [3]:
load_dotenv()

tokens = ['GPT_TOKEN', 'FREE_TOKEN', 'GITHUB_TOKEN_LLM']

# Dictionary to store the results
missing_tokens = []

# Iterate through the tokens and check if they exist and are not empty
for token in tokens:
    value = os.getenv(token)
    if not value:
        missing_tokens.append(token)

# Report the results
if missing_tokens:
    raise Exception(f"The following tokens are missing or empty: {', '.join(missing_tokens)}")
else:
    print("All tokens are present and non-empty.")

All tokens are present and non-empty.


### Miscellaneous  Functions

In [4]:
from scripts.get_all_GHSA import main as get_ghsa
from scripts.get_all_NVD import main as get_nvd
from scripts.get_RedHat import main as get_redhat

def check_and_run(json_file, main_function):
    if not os.path.exists(json_file):
        print(f"{json_file} not found. Running {main_function.__name__}...")
        try:
            main_function()
            print(f"Successfully ran {main_function.__name__}")
        except Exception as e:
            print(f"Error running {main_function.__name__}: {e}")
    else:
        print(f"{json_file} exists. Skipping retrieving of data.")

In [5]:
def clean_descriptions(df, column_name):
    # Replace both \n and \r with a space in the specified column
    df[column_name] = df[column_name].str.replace(r'[\n\r]', ' ', regex=True)
    return df

In [6]:
def load_existing_csv(csv_file):
    """Load DataFrame from CSV if it exists and is not empty."""
    if os.path.exists(csv_file) and os.path.getsize(csv_file) > 0:
        try:
            results_df = pd.read_csv(csv_file)
            if not results_df.empty:
                return results_df
            else:
                print("CSV file is empty.")
                return pd.DataFrame()
        except pd.errors.EmptyDataError:
            print("CSV file exists but is empty.")
            return pd.DataFrame()
    else:
        print("CSV file not found or is empty.")
        return pd.DataFrame()

In [7]:

def clean_cvss_vector(df, column_name):
    def clean_and_correct_scope(cvss_vector):
        if not isinstance(cvss_vector, str):
            return cvss_vector  # Return as is if not a string (e.g., NaN or None)

        # Step 1: Replace 'SC:' with 'S:' (Scope: Changed)
        cleaned_vector = re.sub(r'/SC:', '/S:', cvss_vector, flags=re.IGNORECASE)

        # Step 2: Remove multiple occurrences of '/S:' and insert '/S:U' (Scope: Unchanged)
        scope_matches = re.findall(r'/S:[CU]', cleaned_vector)
        
        # If two or more /S: exist, replace them with just /S:U
        if len(scope_matches) > 1:
            cleaned_vector = re.sub(r'/S:[CU]', '', cleaned_vector)  # Remove all existing /S:[C or U]
            cleaned_vector = re.sub(r'(/C:)', '/S:U\\1', cleaned_vector)  # Insert '/S:U' before the first '/C:'

        # Step 3: Remove any extra spaces from the vector
        cleaned_vector = re.sub(r'\s+', '', cleaned_vector)

        return cleaned_vector

    # Apply the cleaning function to the specified column
    df[column_name] = df[column_name].apply(clean_and_correct_scope)
    return df

# **Chapter 1: Data**

## **1.1 Retrieving Data**

In [8]:
# Check for each JSON file and call the corresponding main function if necessary
check_and_run('./cve_data/GHSA_cves.json', get_ghsa)
check_and_run('./cve_data/NVD_cves.json', get_nvd)
check_and_run('./cve_data/RedHat_cves.json', get_redhat)

./cve_data/GHSA_cves.json exists. Skipping retrieving of data.
./cve_data/NVD_cves.json exists. Skipping retrieving of data.
./cve_data/RedHat_cves.json exists. Skipping retrieving of data.


## **1.2 Cleaning Data**

### **1.2.1 Red Hat Cleaning**

In [9]:
RedHat_cve = pd.read_json('./cve_data/RedHat_cves.json')
RedHat_cve = RedHat_cve.rename(columns={ 'bugzilla_description' : 'Description',
                                        'cvss3_score': 'Score',
                                        'cvss3_scoring_vector': 'Vector',
                                        'resource_url': 'url'
                                          })
RedHat_cve = clean_descriptions(RedHat_cve, 'Description')

In [10]:
RedHat_cve = RedHat_cve[['CVE', 'CWE', 'severity', 'Description', 'Score', 'Vector','url']]
#Rows without CWE, CVE or Description are not useful
print(f"Number of rows before dropna: {RedHat_cve.shape[0]}")
cleanData_RedHat = RedHat_cve.dropna(subset=['CWE', 'CVE', 'Description','Score'])
print(f"Number of rows after dropna: {cleanData_RedHat.shape[0]}")

Number of rows before dropna: 34440
Number of rows after dropna: 16262


In [11]:
cleanData_RedHat.to_csv("./datasets/RedHat_df.csv", index=False)

### **1.2.2 NVD CVES Cleaning**

In [12]:
NVD_cves = pd.read_json('./cve_data/NVD_cves.json')
NVD_cves = NVD_cves.rename(columns={  'id' : 'CVE',
                                          'description': 'Description', 
                                          'weakness_description':'CWE',
                                          'primary_baseScore':'Score',
                                          'primary_vectorString':'Vector',
                                          'primary_source':'source',
                                          'secondary_baseScore':'Score2',
                                          'secondary_vectorString':'Vector2',
                                          'secondary_source':'source2',
                                          'patch_url' : 'url'
                                          })
NVD_cves = NVD_cves.drop(['sourceIdentifier'],axis=1)
NVD_cves = clean_descriptions(NVD_cves, 'Description')

In [13]:
#scoreIsNull_NVD = NVD_cves[NVD_cves['Score'].isnull()]
cleanData_NVD = NVD_cves.dropna(subset=['CWE', 'CVE', 'Description'])
cleanData_NVD = cleanData_NVD.query('CWE != "NVD-CWE-Other" and CWE != "NVD-CWE-noinfo"')

#Get Dataframe of CVEs with either primary score or secondary score, if both are NaN drop.
cleanData_NVD_full = cleanData_NVD.dropna(subset=['Score', 'Score2'], how='all')

#Separate Primary and Secondary Score in different tables
cleanData_NVD1 = cleanData_NVD.dropna(subset=['Score'])
cleanData_NVD1 = cleanData_NVD1.drop(['Score2', 'Vector2','source2'],axis=1)


cleanData_NVD2 = cleanData_NVD.dropna(subset=['Score2'])
cleanData_NVD2 = cleanData_NVD2.drop(['Score', 'Vector','source'],axis=1)
cleanData_NVD2 = cleanData_NVD2.rename(columns={  'Score2' : 'Score',
                                          'Vector2': 'Vector', 
                                          'source2':'source',
                                          })

In [14]:
cleanData_NVD1.to_csv("./datasets/NVD1_df.csv", index=False)
cleanData_NVD2.to_csv("./datasets/NVD2_df.csv", index=False)

In [15]:
cleanData_NVD_full_new= cleanData_NVD_full
cleanData_NVD_full_new['CVE_Year'] = cleanData_NVD_full_new['CVE'].str.extract(r'CVE-(\d{4})')
cleanData_NVD_new = cleanData_NVD_full_new[cleanData_NVD_full_new['CVE_Year'].astype(int) >= 2019]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  cleanData_NVD_full_new['CVE_Year'] = cleanData_NVD_full_new['CVE'].str.extract(r'CVE-(\d{4})')


### **1.2.3 GHSA CVES Cleaning**

In [16]:
GHSA_cves = pd.read_json('./cve_data/GHSA_cves.json')

GHSA_cves = GHSA_cves.rename(columns={  'cve_id' : 'CVE',
                                          'description': 'Description', 
                                          'cvss_score': 'Score', 
                                          'cvss_vector': 'Vector',
                                          'cwes':'CWE',
                                         'package_name':'package'
                                          
                                          })
GHSA_cves = GHSA_cves.drop(['severity'],axis=1)
GHSA_cves = clean_descriptions(GHSA_cves, 'Description')

In [17]:
cleanData_GHSA = GHSA_cves.dropna(subset=['CVE', 'CWE','Description'])
cleanData_GHSA = cleanData_GHSA.dropna(subset=['Score'])


In [18]:
# GHSA returns CWE as a list instead of a string, causing problems in our code.
# List to string separated by a ','
cleanData_GHSA['CWE'] = cleanData_GHSA['CWE'].apply(lambda x: ', '.join(x) if isinstance(x, list) else x)

In [19]:
print(f"Number of rows before dropna: {cleanData_GHSA.shape[0]}")
clean_GHSA = cleanData_GHSA.drop_duplicates()
# Step 2: Remove rows where 'score' is equal to 0.0
clean_GHSA = clean_GHSA[clean_GHSA['Score'] != 0.0]
# Step 3: Remove rows where 'cwe' is ' '
clean_GHSA = clean_GHSA[clean_GHSA['CWE'] != '']
clean_GHSA = clean_GHSA.dropna(subset=['Vector','CWE'])
print(f"Number of rows after dropna: {clean_GHSA.shape[0]}")

Number of rows before dropna: 30870
Number of rows after dropna: 18048


In [20]:
clean_GHSA.to_csv("./datasets/GitHub_df.csv", index=False)

# **Chapter 2: Handling LLM scores**

## **2.1 Budget Approximation**

In [21]:
import json
from scripts.ai_bugdet_calculator import create_payload_and_calculate_tokens, \
    calculate_total_cost_per_model

total_input = 0.0 
total_output = 0.0 
database= cleanData_NVD_full

model_info = {
    'GPT-4o-mini': {'input_price': 0.00014, 'output_price': 0.0006},
    'GPT-4o Global Deployment': {'input_price': 0.0045, 'output_price': 0.0135},
    'gpt-4o-2024-08-06 Global Deployment': {'input_price': 0.0023, 'output_price': 0.0090}
}

# Function to save results to JSON
def save_results_to_json(cve_results, overall_total_cost, filename='payload_tokens.json'):
    result = {
        "cve_results": cve_results,
        "overall_total_cost": overall_total_cost
    }
    with open(filename, 'w') as json_file:
        json.dump(result, json_file, indent=4)

# Function to save results to CSV
def save_results_to_csv(cve_results, filename='payload_tokens.csv'):
    # Convert the dictionary to a DataFrame
    df = pd.DataFrame.from_dict(cve_results, orient='index')
    df.index.name = 'CVE ID'
    df.reset_index(inplace=True)
    df.to_csv(filename, index=False)

print(f'Amount of CVES {database.shape}')

for index,row in database.iterrows():
    cve = row['CVE']
    description = row['Description']
    cwe = row['CWE']

    cve_results, input_tokens, output_tokens = create_payload_and_calculate_tokens(cve, description, cwe)

    # Update overall total cost
    total_input += input_tokens
    total_output += output_tokens


calculate_total_cost_per_model(total_input,total_output,model_info)

Amount of CVES (103715, 11)
Total cost for GPT-4o-mini: €4.283091
Total cost for GPT-4o Global Deployment: €116.068432
Total cost for gpt-4o-2024-08-06 Global Deployment: €67.164719


## **2.2 Resume from where it left off**
*Continue from last CVE saved in CSV File*


In [22]:
def get_remaining_data(csv_file, df_to_evaluate):
    """Return the remaining unprocessed data from the DataFrame."""
    results_df = load_existing_csv(csv_file)

    if not results_df.empty:
        # Get the last processed row index
        last_index = results_df.index[-1]
        print(f"Resuming from index: {last_index + 1}")
        df_remaining = df_to_evaluate.iloc[last_index + 1:]
    else:
        print("Starting from the first row.")
        df_remaining = df_to_evaluate

    return df_remaining, results_df

## **2.3 CVSS3.1 CALCULATOR**

In [23]:
from cvss import CVSS3

def cvss_calculator(vector):
    try:
        cvss = CVSS3(vector)
        return cvss.scores()[0]
    except:
        return None


## **2.4 Prompt Request**

In [24]:
from scripts.chat import ask_bot
from tqdm import tqdm

def ai_scoring(model, df, csv_file):
    # Get the remaining data to process based on the CSV file's progress
    df_to_evaluate, results_df = get_remaining_data(csv_file, df)

    # Set up the progress bar with tqdm
    with tqdm(total=len(df_to_evaluate), desc="Processing CVEs", unit="CVE") as pbar:
        for index, row in df_to_evaluate.iterrows():
            cve = row['CVE']
            description = row['Description']
            cwe = row['CWE']

            # Call the function from chat.py
            score, vector, error = ask_bot(model, cve, description, cwe)
            
            # Handle critical errors (like 404 or connection issues) and save progress
            if error == 404 or error == "CONNECTION_ERROR" or error == 401:
                tqdm.write(f"Critical error {error} for CVE: {cve}. Saving progress and exiting.")
                results_df.to_csv(csv_file, index=False)
                return results_df # Stop processing further CVEs

            # If no errors, append results
            if score is not None and vector is not None:
                
                #Score based on given vector
                calculated_score = cvss_calculator(vector)
                
                results_df = pd.concat([results_df, pd.DataFrame([{
                    'CVE': cve,
                    'Description': description,
                    'CWE': cwe,
                    'Score_LLM': score,
                    'Calculated_Score_LLM' : calculated_score,
                    'Vector_LLM': vector,

                }])], ignore_index=True)

                # Save progress after each CVE
                results_df.to_csv(csv_file, index=False)
            
            # Print error message below the progress bar
            if error:
                tqdm.write(f"Failed to retrieve CVSS score and vector for CVE: {cve} (Error: {error})")
            
            # Update the progress bar
            pbar.update(1)
    return results_df

## **2.5 Merge Score**

In [25]:
def merge_and_score(model,dataframe,suffix, csv_file, score=True):

    # csv_file: model score file name. Format displays 'Llama_GHSA_merged_NVD' or 'Llama_GHSA_merged_GHSA' depending on comparison.
    file_return = os.path.join('integrated_scores', f'{csv_file}_merged_{suffix}.csv')
    model_file = os.path.join('model_scores', f'{model}_{suffix}.csv')

    merged_df = load_existing_csv(file_return)

    if score:
        results_df = ai_scoring(model,dataframe, model_file)
    else:
        if not merged_df.empty:
            return merged_df
        results_df = load_existing_csv(model_file) #Load model score   
    
    results_df = results_df.merge(dataframe[['CVE', 'Score', 'Vector']], on='CVE', how='inner')
    results_df = results_df.drop_duplicates(subset=['CVE'], keep='first')
    #Save merge df in a new csv file to not alter saved LLM response
    results_df.to_csv(file_return, index=False)
    return results_df


# **Chapter 3: Evaluation**

## **3.1 Define CVSS Category**

In [26]:
# Define function to categorize CVSS scores based on ranges
def get_cvss_category(score):
    if score == 0.0:
        return 'None'
    elif 0.1 <= score <= 3.9:
        return 'Low'
    elif 4.0 <= score <= 6.9:
        return 'Medium'
    elif 7.0 <= score <= 8.9:
        return 'High'
    elif 9.0 <= score <= 10.0:
        return 'Critical'
    else:
        return 'Invalid'

## **3.2 Compare vector strings**

In [27]:
# Function to compare two columns with CVSS strings in a DataFrame
def compare_vector_columns(df, col1, col2):
    # Function to compare two CVSS strings
    def compare_vector_strings(vector1, vector2):
        # Ensure that both vectors are strings (in case they are floats or NaN)
        vector1 = str(vector1) if pd.notnull(vector1) else ''
        vector2 = str(vector2) if pd.notnull(vector2) else ''
        
        # Split the CVSS strings into components by '/'
        components1 = vector1.split('/')
        components2 = vector2.split('/')

        # Initialize an empty list to store the differences
        differences = []

        # Compare corresponding components from both CVSS strings
        for comp1, comp2 in zip(components1, components2):
            if comp1 != comp2:
                differences.append(f"{comp1} != {comp2}")

        # Join the differences into a single string and return it
        return ','.join(differences) if differences else '0 diff'

    # Apply the compare function row-wise and create a new column 'differences'
    df['vector_diff'] = df.apply(lambda row: compare_vector_strings(row[col1], row[col2]), axis=1)

    return df


## **3.3 Log Results**

In [28]:
def log_results(df, log, mean_deviation, mean_percentage_deviation, accuracy, log_file='log.txt'):
    
    # Format the log entry
    log_entry = (
        f"\nInnerJoin Rows: {df.shape[0]}\n"
        f"Mean Deviation: {mean_deviation}\n"
        f"Mean Percentage Deviation: {mean_percentage_deviation:.2f}%\n"
        f"Accuracy within same Severity Level: {accuracy:.2f}%\n\n"
    )

    # Print the log entry to the console
    print(log_entry)
    log += log_entry
    # Append the log entry to the log file
    with open(log_file, 'a') as file:
        file.write(log)

## **3.5 Evaluate Score accuracy**

In [29]:
def evaluate_score_accuracy(log, df, original_score_column, llm_score_column, file_name):
    # Convert the original and LLM score columns to float
    df[original_score_column] = pd.to_numeric(df[original_score_column], errors='coerce')
    df[llm_score_column] = pd.to_numeric(df[llm_score_column], errors='coerce')

    # Calculate absolute deviation
    df['Deviation'] = abs(df[original_score_column] - df[llm_score_column])

    # Calculate percentage deviation
    df['Percentage Deviation'] = (df['Deviation'] / df[original_score_column]) * 100
    df.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Calculate mean deviation and mean percentage deviation
    mean_deviation = df['Deviation'].mean()
    mean_percentage_deviation = df['Percentage Deviation'].mean()

    # Apply the get_cvss_category function to both original and LLM scores
    df['Original_Category'] = df[original_score_column].apply(get_cvss_category)
    df['LLM_Category'] = df[llm_score_column].apply(get_cvss_category)

    # Check if both original and LLM scores fall into the same category
    df['Accurate'] = df['Original_Category'] == df['LLM_Category']

    # Calculate accuracy as the percentage of scores that match within the same category
    accuracy = (df['Accurate'].sum() / len(df)) * 100

    # Log and Print results

    log_results(df, log, mean_deviation, mean_percentage_deviation,accuracy)

    # Save the results to a CSV file
    df.to_csv(file_name, index=False)

    return df

## **3.6 Call Evaluation Functions**

In [30]:
def get_evaluation(model, dataframe, suffix, csv_file, notes, score_and_evaluate=True):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log = f"Log Time: {current_time}\nModel: {model}\nDataframe: {suffix}\nDataFrame Rows: {dataframe.shape[0]}\nNotes: {notes}"

    results = merge_and_score(model, dataframe, suffix, csv_file,score_and_evaluate)
    
    results = compare_vector_columns(results,'Vector_LLM','Vector')
    evaluation = evaluate_score_accuracy(log, results, f'Score', 'Calculated_Score_LLM', f'./evaluation/Eval_{suffix}_{model}.csv')
    return evaluation

# **Chapter 4: Experimentation**

In [None]:
#In the case you want to perform evaluation here in jupyternotebook
#nvd_against_meta = get_evaluation("meta-llama-3.1-8b-instruct",cleanData_NVD1, "NVD_llama", "results", "NVD against llama",False)

In [None]:
# model = "llama-3.1-8b-instruct"
# dataframe= clean_GHSA
# suffix= "GHSA"
# model_file = os.path.join('model_scores', f'{model}_{suffix}.csv')

# ghsa_evaluation = ai_scoring(model,dataframe, model_file)

In [45]:
model = "llama-3.1-8b-instruct"
dataframe= cleanData_NVD1
suffix= "NVD"
model_file = os.path.join('model_scores', f'{model}_{suffix}.csv')

nvd_against_meta = ai_scoring(model,dataframe, model_file)

Resuming from index: 41881


Processing CVEs:   0%|          | 0/51911 [00:01<?, ?CVE/s]


OSError: [Errno 28] No space left on device

In [46]:
model = "gpt-4o-mini"
dataframe= cleanData_NVD_full
suffix= "NVD"
model_file = os.path.join('model_scores', f'{model}_{suffix}.csv')

nvd_against_meta = ai_scoring(model,dataframe, model_file)

Resuming from index: 20349


Processing CVEs:   0%|          | 4/83366 [00:15<89:23:42,  3.86s/CVE]


KeyboardInterrupt: 

# **Chapter 5: DISPLAY RESULTS**

## RUN Streamlit

In [96]:
!streamlit run script/fronted.py

