In [21]:
import json
import pandas as pd
from typing import Any, Dict, List
import datetime
import requests
import gzip


In [23]:
def process_nvd_json(source: str) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load NVD feed (local JSON or remote .json.gz URL) and return:
      - cve_df: main CVE info
      - cpe_df: expanded CPE matches
    """
    # fetch & decompress if URL
    if source.startswith(("http://", "https://")):
        resp = requests.get(source)
        raw = json.loads(gzip.decompress(resp.content))
    else:
        with open(source, "r", encoding="ISO-8859-1") as f:
            raw = json.load(f)

    cve_rows, cpe_rows = [], []
    for item in raw["CVE_Items"]:
        cve_id = item["cve"]["CVE_data_meta"]["ID"]
        # core fields
        cve = {
            "id": cve_id,
            "published_date": item.get("publishedDate"),
            "last_modified_date": item.get("lastModifiedDate"),
            "description": "",
            "cwe": "",
        }
        # English description
        for d in item["cve"]["description"]["description_data"]:
            if d.get("lang") == "en":
                cve["description"] = d.get("value", "")
                break
        # first CWE if present
        try:
            pt = item["cve"]["problemtype"]["problemtype_data"][0]["description"][0]
            if pt.get("value", "").startswith("CWE-"):
                cve["cwe"] = pt["value"]
        except Exception:
            pass
        # CVSS v3
        try:
            cv3 = item["impact"]["baseMetricV3"]["cvssV3"]
            cve["cvss3_base_score"]    = cv3.get("baseScore")
            cve["cvss3_vector"]        = cv3.get("vectorString", "")
            cve["cvss3_base_severity"] = cv3.get("baseSeverity", "")
        except Exception:
            cve.update({
                "cvss3_base_score": None,
                "cvss3_vector": "",
                "cvss3_base_severity": "",
            })

        # gather CPE matches
        for node in item.get("configurations", {}).get("nodes", []):
            for m in node.get("cpe_match", []):
                entry = {
                    "cve_id": cve_id,
                    "cpe23Uri": m.get("cpe23Uri", ""),
                    "vulnerable": m.get("vulnerable", False),
                    "versionStartIncluding": m.get("versionStartIncluding", ""),
                    "versionEndIncluding": m.get("versionEndIncluding", ""),
                }
                parts = entry["cpe23Uri"].split(":")
                if len(parts) > 4:
                    entry["vendor"]  = parts[3]
                    entry["product"] = parts[4]
                    entry["version"] = parts[5] if len(parts) > 5 else ""
                cpe_rows.append(entry)

        cve_rows.append(cve)

    cve_df = pd.DataFrame(cve_rows)
    cpe_df = pd.DataFrame(cpe_rows)

    # clean types
    cve_df["published_date"]       = pd.to_datetime(cve_df["published_date"], errors="coerce")
    cve_df["last_modified_date"]   = pd.to_datetime(cve_df["last_modified_date"], errors="coerce")
    cve_df["cvss3_base_score"]     = pd.to_numeric(cve_df["cvss3_base_score"], errors="coerce")

    # dedupe & sort
    cve_df.drop_duplicates("id", inplace=True)
    cpe_df.drop_duplicates(["cve_id", "cpe23Uri"], inplace=True)
    cve_df.sort_values("id", inplace=True)
    cpe_df.sort_values(["cve_id","cpe23Uri"], inplace=True)

    return cve_df, cpe_df


def process_kev_json(url: str) -> pd.DataFrame:
    """
    Download CISA KEV JSON feed and return DataFrame with cve_id + kev_date.
    """
    data = requests.get(url).json().get("vulnerabilities", [])
    df = pd.DataFrame(data)[["cveID","dateAdded"]].rename(
        columns={"cveID":"cve_id", "dateAdded":"kev_date"}
    )
    df["cve_id"]   = df["cve_id"].str.upper()
    df["kev_date"] = pd.to_datetime(df["kev_date"], errors="coerce")
    df.drop_duplicates("cve_id", inplace=True)
    return df


def process_cwe_xml(url: str) -> pd.DataFrame:
    """
    Download CWE XML zip, parse with ElementTree, return DataFrame of cwe_id, name, description.
    """
    z = requests.get(url).content
    with zipfile.ZipFile(BytesIO(z)) as zp:
        xml_name = next(n for n in zp.namelist() if n.endswith(".xml"))
        with zp.open(xml_name) as f:
            tree = ET.parse(f)
            root = tree.getroot()

    records = []
    # XML uses namespace; find all Weakness elements
    for w in root.findall(".//Weakness"):
        cwe_id = w.get("ID")
        name_el = w.find("Name")
        desc_el = w.find(".//Description_Text")
        records.append({
            "cwe_id": cwe_id or "",
            "name":    name_el.text if name_el is not None else "",
            "description": desc_el.text if desc_el is not None else ""
        })

    df = pd.DataFrame(records).drop_duplicates("cwe_id")
    return df[["cwe_id","name","description"]]


# Example usage:
if __name__ == "__main__":
    nvd_url = "https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-2024.json.gz"
    cve_df, cpe_df = process_nvd_json(nvd_url)

    kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
    kev_df = process_kev_json(kev_url)

    cwe_url = "https://cwe.mitre.org/data/xml/cwec_latest.xml.zip"
    cwe_df = process_cwe_xml(cwe_url)


NameError: name 'zipfile' is not defined