In [1]:
!pip install requests



In [8]:
import csv
import os
import time
from datetime import datetime, timedelta
import requests

NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

###############################################################################
# 1. Build 120-day windows for a given year
###############################################################################

def build_pubdate_windows_for_year(year: int, window_days: int = 120):
    """
    NVD only allows max 120 days per request for pubStartDate/pubEndDate.
    So we slice the whole year into multiple windows.

    Returns a list of (start_iso, end_iso) strings like:
    '2025-01-01T00:00:00.000' , '2025-04-30T23:59:59.999'
    """
    # start of the year
    start_dt = datetime(year, 1, 1, 0, 0, 0)
    # first day AFTER the year ends
    stop_dt = datetime(year + 1, 1, 1, 0, 0, 0)

    windows = []
    cur_start = start_dt

    while cur_start < stop_dt:
        # cur_end is cur_start + window_days-1 days, but not past Dec 31 23:59:59
        cur_end = cur_start + timedelta(days=window_days-1)
        if cur_end >= stop_dt:
            # cap at end of the year, last moment
            cur_end = stop_dt - timedelta(milliseconds=1)

        # format like NVD examples:
        # 2021-08-04T00:00:00.000
        start_iso = cur_start.strftime("%Y-%m-%dT%H:%M:%S.000")
        end_iso   = cur_end.strftime("%Y-%m-%dT%H:%M:%S.999")

        windows.append((start_iso, end_iso))

        # next window starts the day after current window ends
        cur_start = cur_end + timedelta(milliseconds=1)

    return windows


###############################################################################
# 2. Helper small utilities (same idea as before)
###############################################################################

def pick_en(items, key="value"):
    """Pick English text from NVD's multilingual arrays."""
    if not items:
        return ""
    for it in items:
        if str(it.get("lang", "")).lower() == "en":
            return it.get(key, "") or ""
    return items[0].get(key, "") or ""

def short_title_from_description(desc: str, max_len=120):
    """Use first sentence of description as Title."""
    if not desc:
        return ""
    stop_chars = [".", "。", "؛", "؟", "!", "…"]
    cut_positions = [desc.find(ch) for ch in stop_chars if desc.find(ch) != -1]
    cut = min(cut_positions) if cut_positions else -1
    title = desc if cut == -1 else desc[:cut]
    title = title.strip()
    if len(title) > max_len:
        title = title[:max_len-1].rstrip() + "…"
    return title

def extract_cvss_v31(cve_obj):
    """Return (score, vector) from CVSS v3.1 if present, else v3.0, else ('','')."""
    metrics = cve_obj.get("metrics") or {}
    for key in ("cvssMetricV31", "cvssMetricV30"):
        arr = metrics.get(key)
        if isinstance(arr, list) and arr:
            cvss = arr[0].get("cvssData") or {}
            score = cvss.get("baseScore")
            vector = cvss.get("vectorString") or ""
            return (score if score is not None else "", vector)
    return ("", "")

def extract_affected_os(cve_obj):
    """
    Collect affected OS from CPEs where part == 'o' (operating system).
    Returns string like 'microsoft:windows_10:22h2; apple:macos:13'
    """
    out = set()

    # Look in configurations[].nodes[].cpeMatch[]
    for cfg in cve_obj.get("configurations") or []:
        for node in cfg.get("nodes") or []:
            for match in node.get("cpeMatch") or []:
                if not match.get("vulnerable"):
                    continue
                crit = match.get("criteria") or match.get("cpe23Uri") or ""
                parts = crit.split(":")
                # cpe:2.3:o:vendor:product:version:...
                if len(parts) >= 6 and parts[2] == "o":
                    vendor = parts[3] or "-"
                    product = parts[4] or "-"
                    version = parts[5]
                    if version in ("*", "-"):
                        version = ""
                    label = f"{vendor}:{product}" + (f":{version}" if version else "")
                    out.add(label)

    # Also check cpeNames[] (sometimes present)
    for cpen in cve_obj.get("cpeNames") or []:
        uri = cpen.get("cpeName") or ""
        parts = uri.split(":")
        if len(parts) >= 6 and parts[2] == "o":
            vendor = parts[3] or "-"
            product = parts[4] or "-"
            version = parts[5]
            if version in ("*", "-"):
                version = ""
            label = f"{vendor}:{product}" + (f":{version}" if version else "")
            out.add(label)

    return "; ".join(sorted(out)) if out else ""


###############################################################################
# 3. Fetch CVEs for ONE 120-day window (low-level)
###############################################################################

def fetch_cves_window(start_iso: str, end_iso: str, api_key: str = None, delay_no_key: float = 0.7):
    """
    Generator: returns CVE objects for a single (<=120 day) pubStartDate/pubEndDate window.
    Handles pagination with startIndex.
    """
    headers = {}
    if api_key:
        headers["apiKey"] = api_key

    start_index = 0
    page_size = 2000
    total = None

    while True:
        params = {
            "pubStartDate": start_iso,
            "pubEndDate": end_iso,
            "startIndex": start_index,
            "resultsPerPage": page_size,
        }

        r = requests.get(NVD_API_URL, params=params, headers=headers, timeout=60)
        # Helpful hint: for rate/validation/debug, NVD puts extra info in response headers['message']
        try:
            r.raise_for_status()
        except requests.HTTPError as e:
            # show the NVD diagnostic message if available
            msg = r.headers.get("message", "")
            raise RuntimeError(f"HTTP {r.status_code} for window {start_iso} -> {end_iso}. NVD says: {msg}") from e

        data = r.json()
        if total is None:
            total = data.get("totalResults", 0)

        vulns = data.get("vulnerabilities") or []
        for v in vulns:
            yield v.get("cve") or {}

        start_index += len(vulns)
        if start_index >= total or not vulns:
            break

        if not api_key:
            time.sleep(delay_no_key)


###############################################################################
# 4. Fetch CVEs for the FULL YEAR by looping windows
###############################################################################

def fetch_cves_year(year: int, api_key: str = None):
    """
    Generator: loops all 120-day windows in the year and yields CVE objects.
    Removes duplicates if any.
    """
    seen = set()
    for (start_iso, end_iso) in build_pubdate_windows_for_year(year):
        # Debug print so you can see progress in Jupyter
        print(f"Fetching window {start_iso} -> {end_iso}")
        for cve in fetch_cves_window(start_iso, end_iso, api_key=api_key):
            cve_id = cve.get("id")
            if cve_id and cve_id not in seen:
                seen.add(cve_id)
                yield cve


###############################################################################
# 5. Save whole year to CSV (same columns you requested)
###############################################################################

def save_year_to_csv(year: int, out_csv: str, api_key: str = None):
    """
    Pull full year of CVEs (in <=120d chunks) and save CSV.
    """
    fields = [
        "CVE ID",
        "Title",
        "Description",
        "CVSS Score (3.1)",
        "Vector String",
        "Affected OS"
    ]

    count = 0
    with open(out_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fields)
        writer.writeheader()

        for cve in fetch_cves_year(year, api_key=api_key):
            cve_id = cve.get("id", "")
            desc_full = pick_en(cve.get("descriptions") or [])
            title = short_title_from_description(desc_full)
            score, vector = extract_cvss_v31(cve)
            affected_os = extract_affected_os(cve)

            writer.writerow({
                "CVE ID": cve_id,
                "Title": title,
                "Description": desc_full,
                "CVSS Score (3.1)": score,
                "Vector String": vector,
                "Affected OS": affected_os
            })
            count += 1

    return count


In [10]:
os.environ["NVD_API_KEY"] = "1af1f1a0-83fb-4619-8ab3-0464a43506af"
print("API key set.")

API key set.


In [12]:
# Can change to 2024, 2023, etc.
year_to_download = 2025 
output_file = f"cve_{year_to_download}.csv"
api_key = os.environ.get("NVD_API_KEY")

row_count = save_year_to_csv(year_to_download, output_file, api_key=api_key)

print(f"Done. Saved {row_count} CVEs from {year_to_download} to {output_file}")

Fetching window 2025-01-01T00:00:00.000 -> 2025-04-30T00:00:00.999
Fetching window 2025-04-30T00:00:00.000 -> 2025-08-27T00:00:00.999
Fetching window 2025-08-27T00:00:00.000 -> 2025-12-24T00:00:00.999
Fetching window 2025-12-24T00:00:00.000 -> 2025-12-31T23:59:59.999
Done. Saved 46805 CVEs from 2025 to cve_2025.csv


In [14]:
import pandas as pd
df = pd.read_csv(output_file)
df.head()

Unnamed: 0,CVE ID,Title,Description,CVSS Score (3.1),Vector String,Affected OS
0,CVE-2024-21675,Rejected reason: To maintain compliance with C...,Rejected reason: To maintain compliance with C...,,,
1,CVE-2024-21679,Rejected reason: To maintain compliance with C...,Rejected reason: To maintain compliance with C...,,,
2,CVE-2024-21688,Rejected reason: To maintain compliance with C...,Rejected reason: To maintain compliance with C...,,,
3,CVE-2024-21691,Rejected reason: To maintain compliance with C...,Rejected reason: To maintain compliance with C...,,,
4,CVE-2024-21692,Rejected reason: To maintain compliance with C...,Rejected reason: To maintain compliance with C...,,,


In [18]:
#Data Cleanup

import pandas as pd

df = pd.read_csv("cve_2025.csv")

# fill NaN (missing) titles with empty strings so .str works
df["Title"] = df["Title"].fillna("")

# build mask for non-rejected rows
mask = ~df["Title"].str.lower().str.startswith("rejected")

# filter out rejected rows
df_clean = df[mask].copy()

print("Before:", len(df), "rows")
print("After :", len(df_clean), "rows")

df_clean.head()

Before: 46805 rows
After : 45117 rows


Unnamed: 0,CVE ID,Title,Description,CVSS Score (3.1),Vector String,Affected OS
77,CVE-2024-56020,Improper Neutralization of Input During Web Pa...,Improper Neutralization of Input During Web Pa...,6.5,CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:C/C:L/I:L/A:L,
78,CVE-2024-56021,Improper Neutralization of Input During Web Pa...,Improper Neutralization of Input During Web Pa...,6.5,CVSS:3.1/AV:N/AC:L/PR:L/UI:R/S:C/C:L/I:L/A:L,
79,CVE-2024-11846,The does not sanitise and escape a parameter ...,The does not sanitise and escape a parameter ...,6.1,CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:L/I:L/A:N,
80,CVE-2025-0168,A vulnerability classified as critical has bee...,A vulnerability classified as critical has bee...,6.3,CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:L/I:L/A:L,
81,CVE-2024-56829,Huang Yaoshi Pharmaceutical Management Softwar...,Huang Yaoshi Pharmaceutical Management Softwar...,10.0,CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H,


In [19]:
df_clean.to_csv("cve_2024_clean.csv", index=False)

In [44]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 41241 entries, 0 to 41240
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   CVE ID            41241 non-null  object 
 1   Title             41241 non-null  object 
 2   Description       41241 non-null  object 
 3   CVSS Score (3.1)  34760 non-null  float64
 4   Vector String     34760 non-null  object 
 5   Affected OS       6139 non-null   object 
dtypes: float64(1), object(5)
memory usage: 1.9+ MB


In [21]:
df.shape

(40704, 6)