<a href="https://colab.research.google.com/github/Kdavis2025/Projects/blob/main/CVE_GUI_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Business Case**: *Enhancing Vulnerability Intelligence through Automated CVE Ingestion and Analysis*


In today’s evolving cyber threat landscape, proactive vulnerability management is a cornerstone of modern cybersecurity strategy. The National Vulnerability Database (NVD) maintains an authoritative feed of Common Vulnerabilities and Exposures (CVEs), serving as a crucial resource for cybersecurity professionals and organizations aiming to stay ahead of potential exploits.

For a cybersecurity firm, the ability to automate the collection, analysis, and visualization of CVE data directly from the NVD API is a strategic advantage. This allows rapid identification of high-severity vulnerabilities, trend detection across years or industries, and the ability to prioritize remediation efforts effectively. Automating this process also reduces the overhead of manual data entry and ensures real-time relevance.

This project simulates a real-world scenario where our firm builds a vulnerability intelligence platform that consumes CVE data from the NVD, cleans and stores it in a reliable database, exposes searchable APIs for security analysts, and provides a clean UI for business visibility.

## **Data Science & Problem Statement:** *CVE Data Engineering and Risk Analysis Pipeline*
The primary objective of this project is to build a scalable data pipeline and analytics framework for CVE data retrieved from the NVD’s public API. This pipeline will support:

**1. Data Engineering:**

* Ingest CVE data via chunked paginated API calls.
* Implement batch processing for initial full load and incremental syncs using metadata such as lastModified.
* Use ETL best practices to clean, deduplicate, and normalize the dataset (e.g., handling nulls, resolving conflicts in version scores, converting nested JSON fields into relational schema).

**2. Data Storage:**

* Store CVE records in a normalized schema in a relational database (e.g.,
* PostgreSQL) or document-based store (e.g., MongoDB) with appropriate indexing on fields such as CVEID, year, and CVSS score.

**3. Data Analysis:**

* Enable filtering and querying CVE data by risk score (CVSS v2/v3), year, and recency.
* Analyze trends in vulnerability growth, average CVSS scores over time, and high-impact CVEs.
* Generate summary statistics (mean, mode, standard deviation) of CVSS scores to help risk teams prioritize patches.

**4. API Layer:**
Build RESTful APIs to expose CVE records with filters:

* /api/cves?id=CVE-2023-XXXXXw

* /api/cves?year=2022

* /api/cves?score_gte=7.0

* /api/cves?modified_last_n_days=30
* Ensure APIs are secured with input validation, throttling, and JSON schema compliance.

**5. Visualization:**

* Create a dashboard using HTML, CSS, and JavaScript (or optionally a JS framework like Vue/React) to display:

* Top vulnerabilities by score.

* Number of vulnerabilities per year.

* Filtering by date or severity.

**6. Testing & Documentation:**

* Write unit tests for all core functionalities using PyTest or similar.

* Document the API with Swagger/OpenAPI format.

Step 1: Setup the Environment

In [28]:
# Install required pakages

!pip install flask pymongo requests apscheduler



Step 2: Initialize Flask Application

In [29]:
# Import the Flask framework and initialize the app.
# Lay the foundation for both API endpoints and UI pages

from flask import Flask, request, jsonify, render_template_string
app = Flask(__name__)

Step 3: Initialize Your Flask Application

In [30]:
# Set Up the Database with PyMongo: Establish a connection using PyMongo. You can use your local MongoDB server or a cloud-based solution.
# Create a Database and Collection: Define a database (e.g., nvd_cve_db) and a collection (e.g., cves) to store the CVE records.

import pymongo
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["nvd_cve_db"]
cve_collection = db["cves"]

Step 4: Write the Data Fetching Logic

In [31]:
# Define a Function to Fetch CVEs: Use the requests library to call the NVD API. The function should accept parameters like startIndex and resultsPerPage, then return the JSON response.
# Handle Errors: Check for a successful response before proceeding.

import requests

NVD_API_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"

def fetch_cves(start_index=0, results_per_page=100):
    params = {"startIndex": start_index, "resultsPerPage": results_per_page}
    response = requests.get(NVD_API_BASE, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error fetching data: {response.status_code}")
        return None

Step 5: Process and Synchronize the Data

In [32]:
# Create a Synchronization Function (sync_cves): Loop through the API responses using the pagination parameters until all pages have been retrieved.
# Data Cleansing & Deduplication- Extract necessary fields such as CVE ID, description, publication dates, and CVSS scores.
# Convert date strings into Python datetime objects.
# Prepare each record for an upsert operation to avoid duplicate entries.
# Batch Upsert Operations: Use PyMongo’s bulk_write for efficient database operations.

from pymongo import UpdateOne
import datetime
import time

#Iniitalizes CVEs
  # start_index: This variable is initialized to 0 and is used to keep track of the starting position for fetching data from the NVD API (pagination).
  # results_per_page: Set to 100, this variable determines how many CVE records to request per API call.
  # total_results: Initially set to infinity (float('inf')), this variable will store the total number of CVE records available from the API.
  # bulk_operations: This empty list will store database operations to be executed in batches for efficiency.

def sync_cves():
    start_index = 0
    results_per_page = 100
    total_results = float('inf')
    bulk_operations = []

#Fetch and Process Data
 # The while loop iterates until all CVE records have been fetched.
 # fetch_cves: This function (defined elsewhere in the code) makes API calls to the NVD to retrieve CVE data.
 # The loop processes the data, extracts relevant information like CVE ID, description, dates, and scores, and stores it in the doc dictionary.
 # UpdateOne: Operations are prepared using this class to either update existing documents or insert new ones based on the CVE ID.
 # Note: These operations are added to the bulk_operations list.

    while start_index < total_results:
        data = fetch_cves(start_index, results_per_page)
        if not data:
            break

        total_results = data.get("totalResults", 0)
        vulnerabilities = data.get("vulnerabilities", [])
        # ... (Data extraction and processing within the loop)

        for item in vulnerabilities:
            cve_item = item.get("cve", {})
            cve_id = cve_item.get("id")
            if not cve_id:
                continue

            description_list = cve_item.get("descriptions", [])
            description = description_list[0].get("value") if description_list else ""

            publishedDateStr = cve_item.get("published")
            lastModifiedStr = cve_item.get("lastModified")
            try:
                publishedDate = (datetime.datetime.fromisoformat(publishedDateStr.replace('Z', '+00:00'))
                                 if publishedDateStr else None)
                lastModifiedDate = (datetime.datetime.fromisoformat(lastModifiedStr.replace('Z', '+00:00'))
                                    if lastModifiedStr else None)
            except Exception:
                publishedDate = lastModifiedDate = None

            cvssV2Score, cvssV3Score = None, None
            metrics = item.get("metrics", {})
            if "cvssMetricV2" in metrics and metrics["cvssMetricV2"]:
                cvssV2Score = metrics["cvssMetricV2"][0]["cvssData"].get("baseScore")
            if "cvssMetricV3" in metrics and metrics["cvssMetricV3"]:
                cvssV3Score = metrics["cvssMetricV3"][0]["cvssData"].get("baseScore")

            doc = {
                "cveId": cve_id,
                "description": description,
                "publishedDate": publishedDate,
                "lastModifiedDate": lastModifiedDate,
                "cvssV2Score": cvssV2Score,
                "cvssV3Score": cvssV3Score,
                "rawData": item
            }

            bulk_operations.append(
                UpdateOne({"cveId": cve_id}, {"$set": doc}, upsert=True)
            )
        # Database Operations
           # If the bulk_operations list contains operations, they are executed in a batch using cve_collection.bulk_write.
           # This is more efficient than individual database operations.
           # The bulk_operations list is then cleared to prepare for the next batch.
        if bulk_operations:
            cve_collection.bulk_write(bulk_operations)
            bulk_operations = []

        #Pagenation and Delay
          # start_index: This is updated to move to the next page of results from the API.
          # time.sleep(1): The code pauses for 1 second to avoid overwhelming the NVD API with requests.
        start_index += results_per_page
        time.sleep(1)

Step 6: Schedule Periodic Synchronization

In [33]:
# Integrate APScheduler: Schedule the sync_cves function to run periodically (for example, every hour) so that your database remains up to date.
# Start the Scheduler

from apscheduler.schedulers.background import BackgroundScheduler
import time
import signal
import sys

# Define or import your sync_cves function
def sync_cves():
    print("Syncing CVEs...")  # Replace with actual logic

# Create the scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(func=sync_cves, trigger="interval", hours=1)
scheduler.start()

# Optional: Run sync immediately at startup
sync_cves()



Syncing CVEs...


Step 7: Build the API Endpoints with Flask

In [34]:
#Get_CVES Function
@app.route('/api/cves', methods=['GET'])
def get_cves():
    #Build the Query
    query = {}
    cveId = request.args.get("cveId")
    year = request.args.get("year")
    score = request.args.get("score", type=float)
    lastModifiedDays = request.args.get("lastModifiedDays", type=int)
    limit = request.args.get("limit", 10, type=int)
    offset = request.args.get("offset", 0, type=int)

    if cveId:
        query["cveId"] = cveId
    if year:
        query["cveId"] = {"$regex": f"cve-{year}-", "$options": "i"}
    if score is not None:
        query["$or"] = [
            {"cvssV2Score": {"$gte": score}},
            {"cvssV3Score": {"$gte": score}}
        ]
    if lastModifiedDays is not None:
        date_threshold = datetime.datetime.utcnow() - datetime.timedelta(days=lastModifiedDays)
        query["lastModifiedDate"] = {"$gte": date_threshold}

    #Retrieve data from Database
    total_records = cve_collection.count_documents(query)
    cursor = cve_collection.find(query).sort("publishedDate", -1).skip(offset).limit(limit)
    cve_list = list(cursor)

    #Format the Response
    for doc in cve_list:
        doc["_id"] = str(doc["_id"])
        if doc.get("publishedDate"):
            doc["publishedDate"] = doc["publishedDate"].isoformat()
        if doc.get("lastModifiedDate"):
            doc["lastModifiedDate"] = doc["lastModifiedDate"].isoformat()
    return jsonify({"totalRecords": total_records, "data": cve_list})

Step 8: Develop the Frontend UI in Javascript

In [35]:
list_template = """
<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <title>CVE List</title>
  <style>
    table { width: 100%; border-collapse: collapse; }
    th, td { border: 1px solid #ccc; padding: 8px; text-align: left; }
    tr:hover { background-color: #f2f2f2; cursor: pointer; }
  </style>
</head>
<body>
  <h1>CVE List</h1>
  <div>Total Records: <span id="total-records">0</span></div>
  <table id="cveTable">
    <thead>
      <tr>
        <th>CVE ID</th>
        <th>Description</th>
        <th>Published Date</th>
        <th>CVSS Score</th>
      </tr>
    </thead>
    <tbody></tbody>
  </table>
  <div>
    Results Per Page:
    <select id="resultsPerPage">
      <option value="10" selected>10</option>
      <option value="50">50</option>
      <option value="100">100</option>
    </select>
  </div>
  <script>
    const resultsPerPageSelect = document.getElementById('resultsPerPage');
    const totalRecordsSpan = document.getElementById('total-records');
    const tableBody = document.querySelector('#cveTable tbody');
    let currentPage = 0;

    async function fetchCVEs(limit=10, offset=0) {
      const res = await fetch(`/api/cves?limit=${limit}&offset=${offset}`);
      const result = await res.json();
      totalRecordsSpan.textContent = result.totalRecords;
      return result.data;
    }

    async function renderCVETable() {
      const limit = Number(resultsPerPageSelect.value);
      const data = await fetchCVEs(limit, currentPage * limit);
      tableBody.innerHTML = '';
      data.forEach(cve => {
        const row = document.createElement('tr');
        row.innerHTML = `
          <td>${cve.cveId}</td>
          <td>${cve.description.substring(0, 100)}...</td>
          <td>${new Date(cve.publishedDate).toLocaleDateString()}</td>
          <td>${cve.cvssV2Score || cve.cvssV3Score || 'N/A'}</td>
        `;
        row.addEventListener('click', () => {
          window.location.href = `/cves/${cve.cveId}`;
        });
        tableBody.appendChild(row);
      });
    }

    resultsPerPageSelect.addEventListener('change', () => {
      currentPage = 0;
      renderCVETable();
    });
    renderCVETable();
  </script>
</body>
</html>
"""
#Flask Router
@app.route('/cves/list')
def cve_list_view():
    return render_template_string(list_template)

Step 9: Write Unit Tests to validate CVE UI and Functionality

In [36]:
# Create a Test File (e.g., test_app.py): Use Python’s built-in unittest framework and GUI to interface
# Test Your Endpoints: Utilize Flask’s test client to call your endpoints (for both the list and detail views) and verify that the responses contain the expected data.

!pip install flask_testing
import unittest
from flask_testing import TestCase

#Sets up the test Class
class TestApp(TestCase):
    def create_app(self):
        app.config['TESTING'] = True
        return app

#Writes Test Methods with simulated GET Request
    def test_cve_list_view(self):
        response = self.client.get('/cves/list')
        self.assertEqual(response.status_code, 200) # Checks for 200 which represents sucessful request
        self.assertIn(b'CVE List', response.data)
        self.assertIn(b'Total Records:', response.data)
        self.assertIn(b'Results Per Page:', response.data)
        self.assertIn(b'table', response.data)
        self.assertIn(b'thead', response.data)
        self.assertIn(b'tbody', response.data)
        self.assertIn(b'tr', response.data)
        self.assertIn(b'th', response.data)
        self.assertIn(b'td', response.data)

    def test_cve_detail_view(self):
        response = self.client.get('/cves/CVE-2023-12345')
        self.assertEqual(response.status_code, 200) # Checks for 200 which represents sucessful request
        self.assertIn(b'CVE Detail', response.data)
        self.assertIn(b'CVE ID:', response.data)
        self.assertIn(b'Description:', response.data)
        self.assertIn(b'Published Date:', response.data)
        self.assertIn(b'CVSS Score:', response.data)
        self.assertIn(b'Raw Data:', response.data)



    def test_api_cves(self):
        response = self.client.get('/api/cves')
        self.assertEqual(response.status_code, 200) # Checks for 200 which represents sucessful request
        data = response.get_json()
        self.assertIn('totalRecords', data)
        self.assertIn('data', data)


    def test_api_cves_with_params(self):
        response = self.client.get('/api/cves?year=2023')
        self.assertEqual(response.status_code, 200) # Checks for 200 which represents sucessful request
        data = response.get_json()
        self.assertIn('totalRecords', data)
        self.assertIn('data', data)
        for item in data['data']:
            self.assertIn('cveId', item)
            self.assertIn('publishedDate', item)
            self.assertIn('lastModifiedDate', item)
            self.assertIn('cvssV2Score', item)
            self.assertIn('cvssV3Score', item)
            self.assertIn('rawData', item)
            self.assertEqual(item['cveId'][:4], 'CVE-2023')


    def test_api_cves_with_invalid_params(self):
        response = self.client.get('/api/cves?year=invalid')
        self.assertEqual(response.status_code, 200) # Checks for 200 which represents sucessful request
        data = response.get_json()
        self.assertIn('totalRecords', data)
        self.assertIn('data', data)
        self.assertEqual(data['totalRecords'], 0)
        self.assertEqual(len(data['data']), 0)

# Initialize the main GUI to incorprate front end/back end application
if __name__ == '__main__':
    unittest.main()






E
ERROR: /root/ (unittest.loader._FailedTest./root/)
----------------------------------------------------------------------
AttributeError: module '__main__' has no attribute '/root/'

----------------------------------------------------------------------
Ran 1 test in 0.003s

FAILED (errors=1)


SystemExit: True

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


I encountered a problem while trying to instantiate my CVE GUI. I believe the issue lies in the 'app' library not being located within the designated folder. After spending a few days troubleshooting, I decided it was time to ask for help.