# Setup our vulnerability data science lab environment

First we'll import all the libraries we need. A couple of them need installed first. JQ is a pythonic implementation of jq; a tool for querying json really fast. When looking at 25 years of vulnerabilities it is enormously useful.

In [1]:
#!pip install requests
#!pip install hurst
#!pip install jq

In [2]:
# Install the missing jq module
!pip install jq

# Install the missing sklearn module
!pip install scikit-learn

# Re-run the imports
import requests
import gzip as gz
import shutil
import pandas as pd
import json
import datetime
import tqdm
import os
import jq
import json
import itertools
import numpy as np
import datetime
from sklearn.metrics import mean_absolute_error as mae
from sklearn.metrics import mean_absolute_percentage_error as mape
import re
from pandas.plotting import autocorrelation_plot

Collecting scikit-learn
  Downloading scikit_learn-1.7.0-cp312-cp312-macosx_12_0_arm64.whl.metadata (31 kB)
Collecting scipy>=1.8.0 (from scikit-learn)
  Downloading scipy-1.15.3-cp312-cp312-macosx_14_0_arm64.whl.metadata (61 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloading joblib-1.5.1-py3-none-any.whl.metadata (5.6 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading scikit_learn-1.7.0-cp312-cp312-macosx_12_0_arm64.whl (10.7 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.7/10.7 MB[0m [31m33.4 MB/s[0m eta [36m0:00:00[0m1m29.9 MB/s[0m eta [36m0:00:01[0m
[?25hDownloading joblib-1.5.1-py3-none-any.whl (307 kB)
Downloading scipy-1.15.3-cp312-cp312-macosx_14_0_arm64.whl (22.4 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.4/22.4 MB[0m [31m34.6 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25

The folders where we will store the data as a gzip, and as a json need to be created.

In [3]:
file_exists = os.path.exists('CVE-NVD')
if not file_exists:
  os.mkdir('CVE-NVD')
  os.mkdir('CVE-NVD/GZIP')
  os.mkdir('CVE-NVD/JSON')

Let's also setup some other folders for MITRE's advance views of CVE data.

In [4]:
file_exists = os.path.exists('CVE-MITRE')
if not file_exists:
  os.mkdir('CVE-MITRE')
  os.mkdir('CVE-MITRE/CSV')

# convert datetime obj to string
str_current_datetime = str(current_datetime)
  
# create a file object along with extension
file_name = str_current_datetime+".txt"# Download the CVE data from NVD and MITRE

Now we'll download the NVD data for every year since 1999. Don't worry it's faster than you think.  
PROTIP: The progress bar comes for free from the tqdm package. Just wrap a for loop in tqdm.tqdm() it and you get a progress bar for free.
Now after this tutorial if you keep this notebook, you'll always be able to fetch all this CVE data easily. Handy for many more things than just forecasting.

In [5]:
import time
import requests
import os
import tqdm

# Placeholder for your API key
API_KEY = ""

# Base URL for the NVD API
BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

# Create directories if they don't exist
file_exists = os.path.exists('CVE-NVD')
if not file_exists:
    os.mkdir('CVE-NVD')
    os.mkdir('CVE-NVD/JSON')

# Rate limit: 50 requests per 30 seconds
RATE_LIMIT = 50
RATE_LIMIT_WINDOW = 30  # seconds

# Counter for requests
request_count = 0
start_time = time.time()

# Pagination parameters
start_index = 0
results_per_page = 2000  # Maximum allowed by the API

while True:
    params = {
        "startIndex": start_index,
        "resultsPerPage": results_per_page,
    }
    headers = {'apiKey': API_KEY}

    response = requests.get(BASE_URL, params=params, headers=headers)

    # Rate limiting logic
    request_count += 1
    if request_count >= RATE_LIMIT:
        elapsed_time = time.time() - start_time
        if elapsed_time < RATE_LIMIT_WINDOW:
            time.sleep(RATE_LIMIT_WINDOW - elapsed_time)
        request_count = 0
        start_time = time.time()

    if response.status_code == 200:
        data = response.json()
        total_results = data.get("totalResults", 0)

        # Save the current page of results
        with open(f'CVE-NVD/JSON/cve_data_{start_index}.json', 'w') as f:
            f.write(response.text)

        # Check if we have fetched all results
        if start_index + results_per_page >= total_results:
            break

        # Update the start index for the next page
        start_index += results_per_page
    else:
        print(f"Failed to fetch data: {response.status_code}")
        break

# Convert the data to panda dataframes and csv files

Here we start to use JQ to make queiries specific to CVE json structure. We pull out the CVE-ID, the published date, the assigner, and the CVSSv2 base score.

In [39]:
# Combined jq query to extract all relevant vulnerability data
vuln_query = jq.compile("""
  .vulnerabilities[] | {
    cve_id: .cve.id,
    published: .cve.published,
    sourceIdentifier: .cve.sourceIdentifier,
    description: [.cve.descriptions[].value],
    cvss_v2_score: (if .cve.metrics.cvssMetricV2 and (.cve.metrics.cvssMetricV2 | length > 0) 
                     then .cve.metrics.cvssMetricV2[0].cvssData.baseScore 
                     else null end),
    cvss_v2_exploitability_score: (if .cve.metrics.cvssMetricV2 and (.cve.metrics.cvssMetricV2 | length > 0) 
                                    then .cve.metrics.cvssMetricV2[0].exploitabilityScore 
                                    else null end),
    cvss_v2_vector: (if .cve.metrics.cvssMetricV2 and (.cve.metrics.cvssMetricV2 | length > 0) 
                     then .cve.metrics.cvssMetricV2[0].cvssData.vectorString 
                     else null end),
    cvss_v3_score: (if .cve.metrics.cvssMetricV31 and (.cve.metrics.cvssMetricV31 | length > 0) 
                     then .cve.metrics.cvssMetricV31[0].cvssData.baseScore
                     elif .cve.metrics.cvssMetricV30 and (.cve.metrics.cvssMetricV30 | length > 0) 
                     then .cve.metrics.cvssMetricV30[0].cvssData.baseScore 
                     else null end),
    cvss_v3_exploitability_score: (if .cve.metrics.cvssMetricV31 and (.cve.metrics.cvssMetricV31 | length > 0) 
                                    then .cve.metrics.cvssMetricV31[0].exploitabilityScore 
                                    elif .cve.metrics.cvssMetricV30 and (.cve.metrics.cvssMetricV30 | length > 0) 
                                    then .cve.metrics.cvssMetricV30[0].exploitabilityScore 
                                    else null end),
    cvss_v3_vector: (if .cve.metrics.cvssMetricV31 and (.cve.metrics.cvssMetricV31 | length > 0) 
                     then .cve.metrics.cvssMetricV31[0].cvssData.vectorString 
                     elif .cve.metrics.cvssMetricV30 and (.cve.metrics.cvssMetricV30 | length > 0) 
                     then .cve.metrics.cvssMetricV30[0].cvssData.vectorString 
                     else null end),
    cpe_criteria: [.cve.configurations[]?.nodes[].cpeMatch[]? | select(.vulnerable == true) | .criteria],
    cwe: [.cve.weaknesses[]?.description[].value]
  }
""")

# Function to process a single file and extract vulnerabilities
def process_file(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)  # Load the JSON data from the file
    
    # Apply the jq query to extract vulnerabilities
    vuln_data = vuln_query.input(data).all()  # List of dictionaries for each vulnerability
    
    return vuln_data

# Function to process multiple files in a directory with progress bar
def process_directory(directory_path):
    all_vulns = []  # List to hold vulnerabilities from all files
    json_files = [f for f in os.listdir(directory_path) if f.endswith('.json')]  # Filter JSON files
    
    # Use tqdm to create a progress bar for file processing
    for filename in tqdm.tqdm(json_files, desc="Processing Files", unit="file"):
        file_path = os.path.join(directory_path, filename)
        
        # Process each file
        vuln_data = process_file(file_path)
        all_vulns.extend(vuln_data)  # Append the extracted data from this file
    
    # Return a list of all vulnerabilities found
    return all_vulns

# Define the directory where your JSON files are stored
json_dir = 'CVE-NVD/JSON/'

# Process all JSON files in the directory
vulnerabilities = process_directory(json_dir)

# Convert the list of dictionaries to a pandas DataFrame
df = pd.DataFrame(vulnerabilities)

# Optional: Clean up list-based fields (like 'description', 'cpe_criteria', 'cwe')
df['description'] = df['description'].apply(lambda x: ', '.join(x) if isinstance(x, list) else '')
df['cpe_criteria'] = df['cpe_criteria'].apply(lambda x: ', '.join(x) if isinstance(x, list) else '')
df['cwe'] = df['cwe'].apply(lambda x: ', '.join(x) if isinstance(x, list) else '')

# Show the first few rows of the DataFrame
print(df.head())


Processing Files: 100%|███████████████████████████████████████████████████████████████████████████| 150/150 [01:00<00:00,  2.48file/s]


           cve_id                published          sourceIdentifier  \
0  CVE-2024-35296  2024-07-26T10:15:02.713       security@apache.org   
1  CVE-2024-41684  2024-07-26T12:15:02.763  vdisclose@cert-in.org.in   
2  CVE-2024-41685  2024-07-26T12:15:02.977  vdisclose@cert-in.org.in   
3  CVE-2024-41686  2024-07-26T12:15:03.113  vdisclose@cert-in.org.in   
4  CVE-2024-41687  2024-07-26T12:15:03.250  vdisclose@cert-in.org.in   

                                         description  cvss_v2_score  \
0  Invalid Accept-Encoding header can cause Apach...            NaN   
1  This vulnerability exists in SyroTech SY-GPON-...            NaN   
2  This vulnerability exists in SyroTech SY-GPON-...            NaN   
3  This vulnerability exists in SyroTech SY-GPON-...            NaN   
4  This vulnerability exists in SyroTech SY-GPON-...            NaN   

   cvss_v2_exploitability_score cvss_v2_vector  cvss_v3_score  \
0                           NaN           None            8.2   
1         

Save all the data we just filtered to a CSV file, for future use.

In [40]:
all_items.sort_index()
all_items.to_csv('NVD-Vulnerability-Volumes.csv')

# Now we want to clone this data frame and explode the cpe column so we can do vendor and product forecast

In [41]:
def process_cpe_dataframe(df):
    # Remove CVE-ID from lists and explode
    df['v2.3 CPE'] = df['v2.3 CPE'].str[1:]
    df = df.explode('v2.3 CPE')
    
    def extract_cpe_parts(cpe_str):
        if pd.isna(cpe_str):
            return pd.Series({
                'Part': None, 'Vendor': None, 'Product': None,
                'Version': None, 'Update': None, 'Edition': None,
                'Language': None, 'SW_Edition': None, 'Target_SW': None,
                'Target_HW': None, 'Other': None
            })
        
        cpe_str = cpe_str.strip('"')
        parts = cpe_str.split(':')
        
        # Ensure we have enough parts
        if len(parts) >= 13:
            return pd.Series({
                'Part': parts[2],
                'Vendor': parts[4],
                'Product': parts[5],
                'Version': parts[6],
                'Update': parts[7],
                'Edition': parts[8],
                'Language': parts[9],
                'SW_Edition': parts[10],
                'Target_SW': parts[11],
                'Target_HW': parts[12],
                'Other': parts[13] if len(parts) > 13 else None
            })
        return pd.Series({
            'Part': None, 'Vendor': None, 'Product': None,
            'Version': None, 'Update': None, 'Edition': None,
            'Language': None, 'SW_Edition': None, 'Target_SW': None,
            'Target_HW': None, 'Other': None
        })
    
    # Apply the extraction and create new columns
    df[['Part', 'Vendor', 'Product', 'Version', 'Update', 
        'Edition', 'Language', 'SW_Edition', 'Target_SW',
        'Target_HW', 'Other']] = df['v2.3 CPE'].apply(extract_cpe_parts)
    
    return df

In [42]:
cpe_df = process_cpe_dataframe(all_items)

In [43]:
cpe_df.to_csv('Vendor-Product-Vulnerability-Volumes.csv')

If you want to read that file in the future, without fetching all the data again, just uncoment the cell below.

In [44]:
#all_items = pd.read_csv('NVD-Vulnerability-Volumes.csv',index_col=['Publication'],parse_dates=['Publication'], low_memory=False)
#all_items = all_items.sort_index()

In [45]:
all_items.head()

Unnamed: 0_level_0,ID,ASSIGNER,DESCRIPTION,Count,v2 CVSS,v2 Vector,v2 Exploitability Score,v3 CVSS,v3 Vector,v3 Exploitability Score,CWE,v2.3 CPE
Publication,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2017-12-06 14:29:00.957,"""CVE-2017-13154""","""security@android.com""","""An elevation of privilege vulnerability in th...",1.0,"[7.2, 7.2, 7.8, 7.8, 7.8, 10, 4.6, 9.3, 4.6, 5...","[""AV:L/AC:L/Au:N/C:C/I:C/A:C"", ""AV:L/AC:L/Au:N...","[3.9, 3.9, 10, 10, 10, 10, 3.9, 8.6, 3.9, 10, ...","[7.8, 7.8, 7.5, 7.5, 7.5, 9.8, 7.8, 7.8, 7.8, ...","[""CVSS:3.0/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H...","[1.8, 1.8, 3.9, 3.9, 3.9, 3.9, 1.8, 1.8, 1.8, ...","[""CWE-416"", ""CWE-434"", ""CWE-200"", ""CWE-200"", ""...",[]
2017-12-06 14:29:01.003,"""CVE-2017-13156""","""security@android.com""","""Existe una vulnerabilidad de elevaci\u00f3n d...",1.0,,,,,,,,[]
2017-12-06 14:29:01.037,"""CVE-2017-13157""","""security@android.com""","""An elevation of privilege vulnerability in th...",1.0,,,,,,,,[]
2017-12-06 14:29:01.067,"""CVE-2017-13158""","""security@android.com""","""Existe una vulnerabilidad de elevaci\u00f3n d...",1.0,,,,,,,,[]
2017-12-06 14:29:01.113,"""CVE-2017-13159""","""security@android.com""","""An information disclosure vulnerability in th...",1.0,,,,,,,,[]


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=7acd54e3-f1e9-4bb5-a625-0a781a5b944c' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>