In [None]:
"""
This script fetches all CVE records from the NVD API using pagination and stores them in a local `cve_data.json` file.  
It handles large data retrieval efficiently by batching requests with a maximum of 2000 records per API call.

"""

import requests
import time
import json
from tqdm import tqdm

BASE_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
HEADERS = {
    "Content-Type": "application/json",
    "User-Agent": "Mozilla/5.0"
}

def fetch_cve_batch(start_index=0, results_per_page=2000):
    params = {
        "startIndex": start_index,
        "resultsPerPage": results_per_page
    }
    response = requests.get(BASE_URL, headers=HEADERS, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code}")
        return None

def fetch_all_cves():
    all_cves = []
    results_per_page = 2000
    start_index = 0

    # totalResults
    print("Fetching initial page to get total number of results...")
    initial = fetch_cve_batch(start_index, results_per_page)
    if not initial:
        return []

    total = initial.get("totalResults", 0)
    print(f"Total CVEs: {total}")

    all_cves.extend(initial.get("vulnerabilities", []))
    start_index += results_per_page

    for i in tqdm(range(start_index, total, results_per_page), desc="Fetching pages"):
        # Sleep to avoid rate limiting
        time.sleep(1)  
        batch = fetch_cve_batch(i, results_per_page)
        if batch and "vulnerabilities" in batch:
            all_cves.extend(batch["vulnerabilities"])
        else:
            print(f"Failed to fetch batch at index {i}")
            break

    return all_cves

if __name__ == "__main__":
    cve_data = fetch_all_cves()
    print(f"Fetched {len(cve_data)} CVEs.")

    # Save to JSON
    with open("cve_data.json", "w", encoding="utf-8") as f:
        json.dump(cve_data, f, indent=2, ensure_ascii=False)

    print("Saved all CVEs to cve_data.json")


In [None]:
"""
This script preprocesses raw CVE data from `cve_data.json`, extracting key fields like ID, scores, and dates while handling missing values.  
It also removes duplicate entries based on CVE ID and saves the cleaned result to `nvd_cleaned.json`.

"""

import json
from datetime import datetime

def extract_description(descriptions):
    for desc in descriptions:
        if desc.get("lang") == "en":
            return desc.get("value")
    return "No description available"

def extract_score(metrics, version):
    try:
        key = f"cvssMetric{version}"
        return metrics[key][0]["cvssData"]["baseScore"]
    except (KeyError, IndexError, TypeError):
        return None

def preprocess_cve_data(input_path="cve_data.json", output_path="nvd_cleaned.json"):
    with open(input_path, "r", encoding="utf-8") as f:
        raw_data = json.load(f)

    cleaned_data = []

    for item in raw_data:
        cve = item.get("cve", {})
        cve_id = cve.get("id")
        if not cve_id:
            continue  # Skip if ID is missing

        published = cve.get("published", "")
        year = None
        try:
            year = datetime.fromisoformat(published.replace("Z", "")).year
        except:
            pass

        cleaned_entry = {
            "id": cve_id,
            "published": published,
            "lastModified": cve.get("lastModified"),
            "description": extract_description(cve.get("descriptions", [])),
            "baseScoreV2": extract_score(cve.get("metrics", {}), "V2"),
            "baseScoreV3": extract_score(cve.get("metrics", {}), "V3"),
            "year": year,
            "sourceIdentifier": cve.get("sourceIdentifier", ""),
            "vulnStatus": cve.get("vulnStatus", "")
        }

        cleaned_data.append(cleaned_entry)

   # De-duplication based on CVE ID
    deduplicated_data = list({entry["id"]: entry for entry in cleaned_data}.values())

    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(deduplicated_data, f, indent=2, ensure_ascii=False)

    print(f"Cleaned {len(deduplicated_data)} records and saved to {output_path}")

if __name__ == "__main__":
    preprocess_cve_data()


In [2]:
"""
This modification ensures that only CVEs with IDs greater than or equal to CVE-2006-5072 are uploaded to Firestore, 
specifically to handle the HTTP 429 "Quota Exceeded" error. When the script attempts to fetch and upload a large number of CVEs, 
it may hit API rate limits, causing a 429 error

"""

from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
from firebase_config import init_firestore
from fetch_utils import fetch_all_cves
from preprocess_utils import clean_and_deduplicate
import time

db = init_firestore()

START_CVE_ID = "CVE-2006-5072"

def filter_after_id(cve_list, start_id=START_CVE_ID):
    return [cve for cve in cve_list if cve["id"] >= start_id]

def upload_to_firestore(cve_list):
    for entry in cve_list:
        doc_id = entry["id"]
        db.collection("cves").document(doc_id).set(entry)
        print(f"Uploaded: {doc_id}")

def full_sync():
    print("[FULL SYNC] Starting full data fetch...")
    raw_data = fetch_all_cves()
    cleaned = clean_and_deduplicate(raw_data)
    filtered = filter_after_id(cleaned)
    upload_to_firestore(filtered)
    print(f"[FULL SYNC] {len(filtered)} records uploaded.\n")

def incremental_sync():
    print("[INCREMENTAL SYNC] Fetching updates from last 24 hours...")
    now = datetime.utcnow()
    yesterday = now - timedelta(days=1)

    url_params = {
        "lastModStartDate": yesterday.isoformat() + "Z",
        "lastModEndDate": now.isoformat() + "Z"
    }

    raw_data = fetch_all_cves(url_params=url_params)
    cleaned = clean_and_deduplicate(raw_data)
    filtered = filter_after_id(cleaned)
    upload_to_firestore(filtered)
    print(f"[INCREMENTAL SYNC] {len(filtered)} updated records uploaded.\n")

if __name__ == "__main__":
    full_sync() 
    # scheduler = BackgroundScheduler()
    # scheduler.add_job(full_sync, 'cron', hour=1)               
    # scheduler.add_job(incremental_sync, 'interval', hours=2) #Syncs every 2 hours
    # scheduler.start()

    print("CVE Scheduler started. Press Ctrl+C to stop.")
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        # scheduler.shutdown()
        print("Scheduler stopped.")


[FULL SYNC] Starting full data fetch...


Fetching CVEs:   7%|▋         | 10/152 [00:34<08:03,  3.40s/it]

Error: 429





KeyboardInterrupt: 