In [1]:
import pandas as pd
import json
import requests
import time
from tqdm import tqdm
from IPython.display import clear_output
import re
import os
import numpy as np

### Generating dates

In [39]:
import datetime


def generate_date_lists(start_year=2017, end_year=2024):
    """Generates lists of starting and ending dates for vulnerability scraping.




    Args:



        start_year: The starting year for the date range.




    Returns:



        A tuple containing two lists: starting_dates and ending_dates.



    """

    starting_dates = []

    ending_dates = []

    # Set the initial start date

    start_date = datetime.datetime(start_year, 1, 1)

    end_date_year = datetime.datetime(end_year, 1, 1)

    while start_date <= end_date_year:

        # Calculate the end date, 15 days after the start date

        end_date = start_date + datetime.timedelta(days=7)

        # Format the dates in the desired format

        starting_dates.append(start_date.strftime('%Y-%m-%dT%H:%M:%S.%f'))

        ending_dates.append(end_date.strftime('%Y-%m-%dT%H:%M:%S.%f'))

        # Increment the start date by 15 days

        start_date = end_date

    return starting_dates, ending_dates


# Example usage:
starting_dates, ending_dates = generate_date_lists(2010, 2024)


print(starting_dates)


print(ending_dates)

['2010-01-01T00:00:00.000000', '2010-01-08T00:00:00.000000', '2010-01-15T00:00:00.000000', '2010-01-22T00:00:00.000000', '2010-01-29T00:00:00.000000', '2010-02-05T00:00:00.000000', '2010-02-12T00:00:00.000000', '2010-02-19T00:00:00.000000', '2010-02-26T00:00:00.000000', '2010-03-05T00:00:00.000000', '2010-03-12T00:00:00.000000', '2010-03-19T00:00:00.000000', '2010-03-26T00:00:00.000000', '2010-04-02T00:00:00.000000', '2010-04-09T00:00:00.000000', '2010-04-16T00:00:00.000000', '2010-04-23T00:00:00.000000', '2010-04-30T00:00:00.000000', '2010-05-07T00:00:00.000000', '2010-05-14T00:00:00.000000', '2010-05-21T00:00:00.000000', '2010-05-28T00:00:00.000000', '2010-06-04T00:00:00.000000', '2010-06-11T00:00:00.000000', '2010-06-18T00:00:00.000000', '2010-06-25T00:00:00.000000', '2010-07-02T00:00:00.000000', '2010-07-09T00:00:00.000000', '2010-07-16T00:00:00.000000', '2010-07-23T00:00:00.000000', '2010-07-30T00:00:00.000000', '2010-08-06T00:00:00.000000', '2010-08-13T00:00:00.000000', '2010-08-

### Creating dates directories

In [41]:
import os

for year in range(2010, 2025):
    directory_name = str(year)
    os.mkdir(f"API responses/{directory_name}")
    print(f"Directory {directory_name} created.")

Directory 2010 created.
Directory 2011 created.
Directory 2012 created.
Directory 2013 created.
Directory 2014 created.
Directory 2015 created.
Directory 2016 created.
Directory 2017 created.
Directory 2018 created.
Directory 2019 created.
Directory 2020 created.
Directory 2021 created.
Directory 2022 created.
Directory 2023 created.
Directory 2024 created.


### Fetching API responses and saving to respective years folder

In [42]:
no_vulns_found = list()
excepted_list = list()
for i, (start, end) in enumerate(zip(starting_dates, ending_dates)):
    print(
        f"getting {start.split('-')[1]}-{start.split('-')[2].split('T')[0]} to {end.split('-')[1]}-{end.split('-')[2].split('T')[0]} CWEs {i+1}/{len(starting_dates)}")
    try:

        response = requests.get(
            f"https://services.nvd.nist.gov/rest/json/cves/2.0/?pubStartDate={start}&pubEndDate={end}")
        response.raise_for_status()  # Raise an exception for bad responses (4xx, 5xx)
        cwe_data = response.json()

        year = start.split('-')[0]
        print(f"length of vulns: {cwe_data['totalResults']}")
        if cwe_data['totalResults']:
            with open(f"API responses/{year}/{start.split('-')[1]}-{start.split('-')[2].split('T')[0]} to {end.split('-')[1]}-{end.split('-')[2].split('T')[0]}.json", "w") as f:
                json.dump(cwe_data, f, indent=4)
        else:
            no_vulns_found.append(start)
    except requests.RequestException as e:
        print(
            f"Error fetching description for {start.split('-')[1]}-{start.split('-')[2].split('T')[0]} to {end.split('-')[1]}-{end.split('-')[2].split('T')[0]}: {e}")
        # Add CWE ID to the exceptions list for furthe tries
        excepted_list.append(start)
    finally:  # Always sleep, even if there's an error
        time.sleep(5)

    if i % 10 == 0 and i != 0:
        clear_output(wait=True)

getting 10-27 to 11-03 CWEs 722/731
length of vulns: 686
getting 11-03 to 11-10 CWEs 723/731
length of vulns: 632
getting 11-10 to 11-17 CWEs 724/731
length of vulns: 699
getting 11-17 to 11-24 CWEs 725/731
length of vulns: 464
getting 11-24 to 12-01 CWEs 726/731
length of vulns: 456
getting 12-01 to 12-08 CWEs 727/731
length of vulns: 642
getting 12-08 to 12-15 CWEs 728/731
length of vulns: 671
getting 12-15 to 12-22 CWEs 729/731
length of vulns: 813
getting 12-22 to 12-29 CWEs 730/731
length of vulns: 327
getting 12-29 to 01-05 CWEs 731/731
length of vulns: 513


### Extracting knowledge to dataframe

In [None]:
def extract_vulns_from_json_files(directory_path):
    """
    Iterates through JSON files in a directory, extracts those with CWE IDs,
    and returns a dictionary mapping file names to CWE data.

    Args:
        directory_path (str): The path to the directory containing JSON files.

    Returns:
        dict: A dictionary where keys are file names and values are lists of CWE data.
    """

    descriptions_df = {
        "pubYear": list(),
        "Description": list(),
        "CWE-ID": list(),
        "vectorString": list(),
        "attackVector": list(),
        "attackComplexity": list(),
        "privilegesRequired": list(),
        "userInteraction": list(),
        "scope": list(),
        "confidentialityImpact": list(),
        "integrityImpact": list(),
        "availabilityImpact": list(),
    }
    years_list = list(range(2010, 2025))
    for year in years_list:
        for filename in os.listdir(os.path.join(directory_path, str(year))):
            if filename.endswith(".json"):
                filepath = os.path.join(directory_path, str(year), filename)
                print(filepath)
                with open(filepath, "r") as file:
                    try:
                        data = json.load(file)
                    except json.JSONDecodeError:
                        print(f"Skipping invalid JSON file: {filename}")
                        continue  # Skip to the next file if JSON is invalid
                    for entry in data["vulnerabilities"]:
                        # try:
                        descriptions_df["pubYear"].append(
                            entry["cve"]["published"].split("-")[0])  # Extract publication year
                        descriptions_df["Description"].append(
                            entry["cve"]["descriptions"][0]["value"])
                        if "weaknesses" in entry["cve"].keys():
                            descriptions_df["CWE-ID"].append(
                                entry["cve"]["weaknesses"][0]["description"][0]["value"])
                        else:
                            descriptions_df["CWE-ID"].append("None")

                        if "metrics" in entry["cve"].keys() and "cvssMetricV30" in entry["cve"]["metrics"].keys():
                            nist_metric = entry["cve"]["metrics"]["cvssMetricV30"]
                            print(nist_metric)
                            descriptions_df["vectorString"].append(
                                nist_metric[0]["cvssData"]["vectorString"] if nist_metric != [] else "None")
                            descriptions_df["attackVector"].append(
                                nist_metric[0]["cvssData"]["attackVector"] if nist_metric != [] else "None")
                            descriptions_df["attackComplexity"].append(
                                nist_metric[0]["cvssData"]["attackComplexity"] if nist_metric != [] else "None")
                            descriptions_df["privilegesRequired"].append(
                                nist_metric[0]["cvssData"]["privilegesRequired"] if nist_metric != [] else "None")
                            descriptions_df["userInteraction"].append(
                                nist_metric[0]["cvssData"]["userInteraction"] if nist_metric != [] else "None")
                            descriptions_df["scope"].append(
                                nist_metric[0]["cvssData"]["scope"] if nist_metric != [] else "None")
                            descriptions_df["confidentialityImpact"].append(
                                nist_metric[0]["cvssData"]["confidentialityImpact"] if nist_metric != [] else "None")
                            descriptions_df["integrityImpact"].append(
                                nist_metric[0]["cvssData"]["integrityImpact"] if nist_metric != [] else "None")
                            descriptions_df["availabilityImpact"].append(
                                nist_metric[0]["cvssData"]["availabilityImpact"] if nist_metric != [] else "None")
                        else:
                            descriptions_df["vectorString"].append("None")
                            descriptions_df["attackVector"].append("None")
                            descriptions_df["attackComplexity"].append("None")
                            descriptions_df["privilegesRequired"].append(
                                "None")
                            descriptions_df["userInteraction"].append("None")
                            descriptions_df["scope"].append("None")
                            descriptions_df["confidentialityImpact"].append(
                                "None")
                            descriptions_df["integrityImpact"].append("None")
                            descriptions_df["availabilityImpact"].append(
                                "None")
                        # except Exception as e:
                        #    print(f"exception:{e}")
                        #    continue

    return descriptions_df


directory = "API responses"  # Update with your actual directory path
cwe_data = extract_vulns_from_json_files(directory)

In [None]:
data_df = pd.DataFrame(cwe_data)
data_df

In [None]:
data_df["CWE-ID"].value_counts()

In [51]:
df1 = data_df[~(data_df["vectorString"] == "None")].copy()

In [52]:
df1["CWE-ID"].value_counts()

CWE-ID
CWE-79            6243
NVD-CWE-noinfo    6005
CWE-119           4375
CWE-200           3289
CWE-20            3272
                  ... 
CWE-620              1
CWE-130              1
CWE-115              1
CWE-524              1
CWE-1336             1
Name: count, Length: 252, dtype: int64

In [53]:
df1.drop_duplicates(inplace=True)
df1 = df1[df1["CWE-ID"] != "NVD-CWE-noinfo"]
df1 = df1[df1["CWE-ID"] != "NVD-CWE-Other"]
df1

Unnamed: 0,pubYear,Description,CWE-ID,vectorString,attackVector,attackComplexity,privilegesRequired,userInteraction,scope,confidentialityImpact,integrityImpact,availabilityImpact
4323,2010,"MIT Kerberos 5 (aka krb5) 1.3.x, 1.4.x, 1.5.x,...",CWE-310,CVSS:3.0/AV:N/AC:H/PR:N/UI:N/S:U/C:N/I:L/A:N,NETWORK,HIGH,NONE,NONE,UNCHANGED,NONE,LOW,NONE
4324,2010,MIT Kerberos 5 (aka krb5) 1.7.x and 1.8.x thro...,CWE-310,CVSS:3.0/AV:N/AC:H/PR:N/UI:N/S:U/C:N/I:L/A:N,NETWORK,HIGH,NONE,NONE,UNCHANGED,NONE,LOW,NONE
4328,2010,MIT Kerberos 5 (aka krb5) 1.8.x through 1.8.3 ...,CWE-310,CVSS:3.0/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L,NETWORK,LOW,LOW,NONE,UNCHANGED,LOW,LOW,LOW
6671,2011,contrib/pdfmark/pdfroff.sh in GNU troff (aka g...,CWE-254,CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:L/A:L,NETWORK,LOW,NONE,NONE,UNCHANGED,NONE,LOW,LOW
7722,2011,Memory leak in the NAT implementation in Cisco...,CWE-399,CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H,NETWORK,LOW,NONE,NONE,UNCHANGED,NONE,NONE,HIGH
...,...,...,...,...,...,...,...,...,...,...,...,...
192284,2023,A malicious user could use this issue to acces...,CWE-918,CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:N/A:N,NETWORK,LOW,NONE,NONE,CHANGED,HIGH,NONE,NONE
192285,2023,A malicious user could use this issue to get c...,CWE-29,CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H,NETWORK,LOW,NONE,NONE,UNCHANGED,HIGH,HIGH,HIGH
192286,2023,This vulnerability is capable of writing arbit...,CWE-434,CVSS:3.0/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H,NETWORK,LOW,LOW,NONE,UNCHANGED,HIGH,HIGH,HIGH
192287,2023,This vulnerability enables malicious users to ...,CWE-29,CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:L,NETWORK,LOW,NONE,NONE,CHANGED,HIGH,HIGH,LOW


In [55]:
df1["pubYear"].value_counts()

pubYear
2018    13027
2017    11317
2019     7951
2016     4700
2020     1797
2022     1740
2021     1371
2023     1052
2015      109
2014        7
2010        3
2011        3
2013        3
Name: count, dtype: int64

In [56]:
df1.to_csv("../43k_data_to_synthetic.csv", sep="\t", index=False)

In [3]:
def has_valid_cwe(weak_list):
    """
    Checks if the given data contains a valid CWE ID within the "weaknesses" list.

    Args:
        data (list): List of weaknesses.

    Returns:
        bool: True if a valid CWE ID is found, False otherwise.
    """

    cwe_pattern = re.compile(r"CWE-\d+")  # Regex to match CWE IDs

    if len(weak_list) != 0:
        for item in weak_list:
            for desc in item.get("description", []):
                if cwe_pattern.match(desc.get("value", "")):
                    return True

    return False

In [4]:
def extract_cwes_from_json_files(directory_path):
    """
    Iterates through JSON files in a directory, extracts those with CWE IDs,
    and returns a dictionary mapping file names to CWE data.

    Args:
        directory_path (str): The path to the directory containing JSON files.

    Returns:
        dict: A dictionary where keys are file names and values are lists of CWE data.
    """

    valid_cwe_descriptions = {
        "CWE-ID": list(),
        "Description": list()
    }
    years_list = list(range(2010, 2025))
    for year in years_list:
        for filename in os.listdir(os.path.join(directory_path, str(year))):
            if filename.endswith(".json"):
                filepath = os.path.join(directory_path, str(year), filename)
                print(filepath)
                with open(filepath, "r") as file:
                    try:
                        data = json.load(file)
                    except json.JSONDecodeError:
                        print(f"Skipping invalid JSON file: {filename}")
                        continue  # Skip to the next file if JSON is invalid
                    for entry in data["vulnerabilities"]:
                        try:
                            if len(entry["cve"]["weaknesses"]) != 0 and has_valid_cwe(entry["cve"]["weaknesses"]):
                                valid_cwe_descriptions["CWE-ID"].append(
                                    entry["cve"]["weaknesses"][0]["description"][0]["value"])
                                valid_cwe_descriptions["Description"].append(
                                    get_english_description(entry["cve"]["descriptions"]))
                        except Exception as e:
                            print(f"exception:{e}")
                            continue

    return valid_cwe_descriptions

In [5]:
directory = "API responses"  # Update with your actual directory path
cwe_data = extract_cwes_from_json_files(directory)

API responses\2010\01-01 to 01-08.json
API responses\2010\01-08 to 01-15.json
API responses\2010\01-15 to 01-22.json
API responses\2010\01-22 to 01-29.json
API responses\2010\01-29 to 02-05.json
API responses\2010\02-05 to 02-12.json
API responses\2010\02-12 to 02-19.json
API responses\2010\02-19 to 02-26.json
API responses\2010\02-26 to 03-05.json
exception:'weaknesses'
API responses\2010\03-05 to 03-12.json
API responses\2010\03-12 to 03-19.json
exception:'weaknesses'
API responses\2010\03-19 to 03-26.json
API responses\2010\03-26 to 04-02.json
API responses\2010\04-02 to 04-09.json
exception:'weaknesses'
API responses\2010\04-09 to 04-16.json
exception:'weaknesses'
API responses\2010\04-16 to 04-23.json
API responses\2010\04-23 to 04-30.json
exception:'weaknesses'
API responses\2010\04-30 to 05-07.json
exception:'weaknesses'
exception:'weaknesses'
API responses\2010\05-07 to 05-14.json
API responses\2010\05-14 to 05-21.json
exception:'weaknesses'
exception:'weaknesses'
exception:'we