In [None]:
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file

from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
from fuzzywuzzy import fuzz
import pandas as pd
import warnings
warnings.filterwarnings('ignore')


In [None]:
llm_model = "gpt-4-0613"
temperature = 0
benchmark_datetime = datetime.now()
llm = ChatOpenAI(model=llm_model, temperature=temperature)

In [None]:
function_descriptions = [
            {
                "name": "find_security_issues_and_generate_fix",
                "description": "Scan the code and find any security vulnerabilities and generate code fix",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "vulnerability found": {
                            "type": "string",
                            "description": " 'Yes' if there is a security vulnerability in code or 'No' if the code doesn't have security vulnerability",
                        },
                        "vulnerability": {
                            "type": "string",
                            "description": "The type of vulnerability found in the code or 'None' "
                        },
                        "vulnerable code": {
                            "type": "string",
                            "description": "The code that is vulnerable to the security issue or 'None' "
                        },
                        "code fix": {
                            "type": "string",
                            "description": "Code fix for the vulnerable code or 'None' "
                        },
                        "comment": {
                            "type": "string",
                            "description": "Comment that describes the issue and fix or 'No issues found' "
                        },
                    },
                    "required": ["vulnerability found", "vulnerability", "vulnerable code", "code fix", "comment"],
                },
            }
        ]

In [None]:
def static_analysis_tool(code):
    first_response = llm.predict_messages([HumanMessage(content=code)],
                                          functions=function_descriptions)
    return first_response.additional_kwargs['function_call']['arguments']

In [None]:
# Function to fetch web content
def fetch_webpage_content(url):
    response = requests.get(url)
    return response.text

In [None]:
# Function to fetch and parse xml
def fetch_and_parse_xml(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'lxml')
    return soup

In [None]:
def compare_results(analysis_result, metadata):
    # Check if a vulnerability was found
    vuln_found = analysis_result['vulnerability found'].lower() == 'yes'
    # Check if the vulnerability matches the one in the metadata using fuzzy matching
    vuln_matches = fuzz.partial_ratio(metadata.category.string.lower(), analysis_result['vulnerability'].lower()) > 80
    # Check if the metadata indicates a vulnerability exists
    metadata_vuln_exists = metadata.vulnerability.string.lower() == 'true'
    # Get the actual vulnerability types
    actual_vuln_type = analysis_result['vulnerability']
    expected_vuln_type = metadata.category.string

    # Combine analysis_result and metadata into one dictionary
    combined_result = {
        'vulnerability_found': vuln_found,
        'vulnerability_type_matches': vuln_matches,
        'metadata_vulnerability_exists': metadata_vuln_exists,
        'expected_vuln_type': expected_vuln_type
    }
    
    # Add all fields from the analysis_result to the combined_result
    combined_result.update(analysis_result)

    # Return the combined result
    return combined_result


In [None]:
def construct_url(base_url, test_case_number, file_extension):
    # Construct the URLs for the Java file and the metadata XML file for this test case
    url = f"{base_url}{test_case_number}.{file_extension}"
    return url

In [None]:
def run_test_case(test_case_number):
    # Set the base URLs for the Java files and the metadata XML files
    base_java_url = "https://raw.githubusercontent.com/OWASP-Benchmark/BenchmarkJava/master/src/main/java/org/owasp/benchmark/testcode/BenchmarkTest"
    base_xml_url = "https://raw.githubusercontent.com/OWASP-Benchmark/BenchmarkJava/master/src/main/java/org/owasp/benchmark/testcode/BenchmarkTest"

    # Construct the URLs for the Java file and the metadata XML file for this test case
    java_url = construct_url(base_java_url, test_case_number, "java")
    xml_url = construct_url(base_xml_url, test_case_number, "xml")

    # Fetch the Java code and the metadata
    code = fetch_webpage_content(java_url)
    metadata = fetch_and_parse_xml(xml_url)
    # print(code)

    # Run the static analysis tool and deserialize the result from JSON to a dictionary
    analysis_result_json = static_analysis_tool(code)
    analysis_result = json.loads(analysis_result_json)
    # print(analysis_result)

    # Run the comparison function
    result = compare_results(analysis_result, metadata)

    return result

In [None]:
def run_all_test_cases(num_test_cases):
    # Create an empty DataFrame to store the results
    df = pd.DataFrame()

    # Loop through all the test case numbers
    for i in range(1, num_test_cases + 1):
        # Format the test case number as a 5-digit string (e.g., '00001', '00002', etc.)
        test_case_number = f"{i:05d}"
        print("Running test: " + str(test_case_number))

        # Run the test case and get the result
        result = run_test_case(test_case_number)

        # Append the result to the DataFrame
        df = df.append(result, ignore_index=True)

    # Save the DataFrame to a CSV file
    file_name = "results-" + llm_model + "-temperature" + str(temperature) + "-benchmark-datetime-" + str(benchmark_datetime) + ".csv"
    df.to_csv(file_name, index=False)

    return df

In [None]:
def analyze_results(csv_path):
    # Load the results from the CSV file
    df = pd.read_csv(csv_path)

    # Calculate the confusion matrix components
    TP = ((df['vulnerability_found'] == True) & (df['metadata_vulnerability_exists'] == True)).sum()
    TN = ((df['vulnerability_found'] == False) & (df['metadata_vulnerability_exists'] == False)).sum()
    FP = ((df['vulnerability_found'] == True) & (df['metadata_vulnerability_exists'] == False)).sum()
    FN = ((df['vulnerability_found'] == False) & (df['metadata_vulnerability_exists'] == True)).sum()

    return TP, TN, FP, FN

In [None]:
num_test_cases = 2740
run_all_test_cases(num_test_cases)

file_name = "results-" + llm_model + "-temperature" + str(temperature) + "-benchmark-datetime-" + str(benchmark_datetime) + ".csv"
TP, TN, FP, FN = analyze_results(file_name)
print(f'True Positives: {TP}')
print(f'True Negatives: {TN}')
print(f'False Positives: {FP}')
print(f'False Negatives: {FN}')