In [67]:
import pandas as pd
import numpy as np
import os, sys
import json
import logging
from localutils.errorhandler import error_handler

In [4]:
logger = logging.getLogger(__name__)

In [None]:
@error_handler(logger)
def load_nvd_cve_data(start_year: int, end_year: int, directory: str) -> pd.DataFrame:
    data = []
    cves_list = []
    for file in os.listdir(directory):
        # check if file is .json and is within the range of years
        if file.endswith(".json") and int(file.split("-")[2].split(".")[0]) in range(start_year, end_year + 1):
            file_path = os.path.join(directory, file)
            with open(file_path, 'r') as file:
                data = json.load(file)
                logger.info(f"Loaded {file_path}")
                for _, cve_data in enumerate(data.get("CVE_Items", [])):
                    cve_id = "not_found"
                    description = "not_found"
                    published_date = "not_found"
                    last_modified_date = "not_found"
                    cwe_id = "not_found"
                    cvss_version = "not_found"
                    severity = "not_found"
                    base_score = "not_found"
                    exploitability_score = "not_found"
                    vector_string = "not_found"
                    attack_vector = "not_found"
                    attack_complexity = "not_found"
                    authentication = "not_found"
                    user_interaction = "not_found"
                    confidentiality_impact = "not_found"
                    integrity_impact = "not_found"
                    availability_impact = "not_found"

                    if cve_data.get("impact"):
                        for key in cve_data.get("impact"):
                            cve_id = cve_data.get("cve").get("CVE_data_meta").get("ID", "") if cve_data.get("cve") else ""
                            description = cve_data.get("cve").get("description").get("description_data")[0].get("value", "")
                            published_date = cve_data.get("publishedDate", "")
                            last_modified_date = cve_data.get("lastModifiedDate", "")
                            problemtype_data = cve_data.get("cve", {}).get("problemtype", {}).get("problemtype_data", [{}])[0].get("description", [])
                            cwe_id = problemtype_data[0].get("value", "") if problemtype_data else ""
                            cvss_version = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("version") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("version")
                            severity = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("baseSeverity") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("severity")
                            base_score = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("baseScore") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("baseScore")
                            exploitability_score = cve_data.get("impact").get("baseMetricV3").get("exploitabilityScore") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("exploitabilityScore")
                            vector_string = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("vectorString") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("vectorString")
                            attack_vector = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("attackVector") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("accessVector")
                            attack_complexity = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("attackComplexity") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("accessComplexity")
                            authentication = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("privilegesRequired") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("authentication")
                            user_interaction = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("userInteraction") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("userInteractionRequired")
                            confidentiality_impact = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("confidentialityImpact") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("confidentialityImpact")
                            integrity_impact = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("integrityImpact") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("integrityImpact")
                            availability_impact = cve_data.get("impact").get("baseMetricV3").get("cvssV3").get("availabilityImpact") if key == "baseMetricV3" else cve_data.get("impact").get("baseMetricV2").get("cvssV2").get("availabilityImpact")

                    cves_list.append((cve_id, description, published_date, last_modified_date, cvss_version, cwe_id, vector_string, attack_vector, attack_complexity, authentication, user_interaction, base_score, severity, exploitability_score, confidentiality_impact, integrity_impact, availability_impact))

    cves_df = pd.DataFrame(cves_list, columns=["cve_id", "description", "published_date", "last_modified_date", "cvss_version", "cwe_id", "cvss_vector", "attack_vector", "attack_complexity", "privileges_required", "user_interaction", "base_score", "base_severity", "exploitability_score", "confidentiality_impact", "integrity_impact", "availability_impact"])
    return cves_df

In [74]:
df = load_nvd_cve_data(2023, 2025, "data/download/NVDCVE")
df.drop(df[df['cve_id'] == 'not_found'].index, inplace=True)

In [None]:
df