<a href="https://colab.research.google.com/github/Tilak202400/CVs/blob/main/CVs_scan1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import sqlite3
import datetime

# Define database connection
conn = sqlite3.connect("vulnerabilities.db")
cursor = conn.cursor()

# Drop the existing table if it exists to ensure the correct schema
cursor.execute("DROP TABLE IF EXISTS vulnerabilities")

# Create table with the correct columns
cursor.execute("""
    CREATE TABLE vulnerabilities (
        id TEXT PRIMARY KEY,
        description TEXT,
        cvss_score REAL,
        severity TEXT,
        published_date TEXT  -- This column was missing
    )
""")
conn.commit()

# Function to classify vulnerability severity
def classify_severity(cvss_score):
    if cvss_score >= 9.0:
        return "Critical"
    elif cvss_score >= 7.0:
        return "High"
    elif cvss_score >= 4.0:
        return "Medium"
    else:
        return "Low"

# Get date range for the last month
end_date = datetime.date.today()
start_date = end_date - datetime.timedelta(days=30)  # Last 30 days

# Convert to API date format
start_date_str = start_date.strftime("%Y-%m-%dT00:00:00.000")
end_date_str = end_date.strftime("%Y-%m-%dT23:59:59.999")

# Function to fetch CVEs from NVD API
def fetch_cve_data(start_index=0):
    url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?pubStartDate={start_date_str}&pubEndDate={end_date_str}&startIndex={start_index}"
    print(f"Fetching CVEs from: {url}")

    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return data.get("vulnerabilities", []), data.get("totalResults", 0)
    else:
        print(f"Error fetching data: {response.status_code}")
        return [], 0

# Function to process and store vulnerabilities
def process_and_store_vulnerabilities():
    start_index = 0
    while True:
        vulnerabilities, total_results = fetch_cve_data(start_index)

        if not vulnerabilities:
            break  # Stop if no more data

        for vuln in vulnerabilities:
            cve = vuln.get("cve", {})
            cve_id = cve.get("id", "Unknown ID")

            # Extract description (English preferred)
            descriptions = cve.get("descriptions", [{}])
            description = "No description available"
            for desc in descriptions:
                if desc.get("lang") == "en":
                    description = desc.get("value", description)

            # Extract CVSS score
            cvss_score = None
            metrics = cve.get("metrics", {})
            if "cvssMetricV31" in metrics:
                cvss_score = metrics["cvssMetricV31"][0]["cvssData"]["baseScore"]
            elif "cvssMetricV30" in metrics:
                cvss_score = metrics["cvssMetricV30"][0]["cvssData"]["baseScore"]
            elif "cvssMetricV2" in metrics:
                cvss_score = metrics["cvssMetricV2"][0]["cvssData"]["baseScore"]

            severity = classify_severity(cvss_score or 0.0)

            # Extract published date
            published_date = cve.get("published", "Unknown Date")

            # Store in database
            cursor.execute("INSERT OR IGNORE INTO vulnerabilities VALUES (?, ?, ?, ?, ?)",
                           (cve_id, description, cvss_score or 0.0, severity, published_date))
            conn.commit()

        start_index += len(vulnerabilities)  # Move to next batch
        if start_index >= total_results:
            break  # Stop when all CVEs are fetched

# Run the process
process_and_store_vulnerabilities()

# Retrieve all stored vulnerabilities from last month
cursor.execute("SELECT * FROM vulnerabilities WHERE published_date BETWEEN ? AND ?", (start_date_str, end_date_str))
rows = cursor.fetchall()

# Display results
if rows:
    print("\nStored Vulnerabilities (Last Month):")
    for row in rows:
        print(f"ID: {row[0]}, Description: {row[1]}, CVSS Score: {row[2]}, Severity: {row[3]}, Published Date: {row[4]}")
else:
    print("No CVEs found in the database for the last month.")

# Close connection
conn.close()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[  503.782940] Workqueue: kfd_process_wq kfd_process_wq_release [amdgpu]
[  503.789640] RIP: 0010:dma_fence_signal+0x74/0xa0
[  503.877620] Call Trace:
[  503.880066]  <TASK>
[  503.882168]  ? __warn+0xcd/0x260
[  503.885407]  ? dma_fence_signal+0x74/0xa0
[  503.889416]  ? report_bug+0x288/0x2d0
[  503.893089]  ? handle_bug+0x53/0xa0
[  503.896587]  ? exc_invalid_op+0x14/0x50
[  503.900424]  ? asm_exc_invalid_op+0x16/0x20
[  503.904616]  ? dma_fence_signal+0x74/0xa0
[  503.908626]  kfd_process_wq_release+0x6b/0x370 [amdgpu]
[  503.914081]  process_one_work+0x654/0x10a0
[  503.918186]  worker_thread+0x6c3/0xe70
[  503.921943]  ? srso_alias_return_thunk+0x5/0xfbef5
[  503.926735]  ? srso_alias_return_thunk+0x5/0xfbef5
[  503.931527]  ? __kthread_parkme+0x82/0x140
[  503.935631]  ? __pfx_worker_thread+0x10/0x10
[  503.939904]  kthread+0x2a8/0x380
[  503.943132]  ? __pfx_kthread+0x10/0x10
[  503.946882]  ret_from_fork+0x2d/0x

In [1]:
import requests
import sqlite3
import datetime

# --------------------------
# Set up the SQLite database
# --------------------------
conn = sqlite3.connect("vulnerabilities.db")
cursor = conn.cursor()

# Create the table with the appropriate columns (if it doesn't exist)
cursor.execute("""
    CREATE TABLE IF NOT EXISTS vulnerabilities (
        id TEXT PRIMARY KEY,
        description TEXT,
        cvss_score REAL,
        severity TEXT,
        published_date TEXT
    )
""")
conn.commit()

# ------------------------------------
# Helper function: Classify severity
# ------------------------------------
def classify_severity(cvss_score):
    if cvss_score is None:
        return "Unknown"
    elif cvss_score >= 9.0:
        return "Critical"
    elif cvss_score >= 7.0:
        return "High"
    elif cvss_score >= 4.0:
        return "Medium"
    else:
        return "Low"

# ------------------------------------------------
# Calculate the date range for the last 365 days
# ------------------------------------------------
today = datetime.date.today()
one_year_ago = today - datetime.timedelta(days=365)

# We'll split the 365 days into 4 equal periods (approximately 91 days each)
period_duration = 365 // 4  # integer division (roughly 91 days)
periods = []
start = one_year_ago
for i in range(4):
    # For each period, end_date = start_date + period_duration - 1 day
    end = start + datetime.timedelta(days=period_duration - 1)
    # Make sure the last period ends at today's date
    if i == 3 or end > today:
        end = today
    periods.append((start, end))
    start = end + datetime.timedelta(days=1)

# --------------------------------------------------------
# Function to fetch CVEs from the NVD API for a given period
# --------------------------------------------------------
def fetch_cve_data(pub_start_str, pub_end_str, start_index=0):
    url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?pubStartDate={pub_start_str}&pubEndDate={pub_end_str}&startIndex={start_index}"
    print(f"Fetching CVEs from: {url}")
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        vulnerabilities = data.get("vulnerabilities", [])
        total_results = data.get("totalResults", 0)
        return vulnerabilities, total_results
    else:
        print(f"Error fetching data: {response.status_code}, {response.text}")
        return [], 0

# -----------------------------------------------------------
# Process and store vulnerabilities for a given time period
# -----------------------------------------------------------
def process_and_store_period(period_start, period_end):
    # Convert period dates to API's expected format:
    # Format: YYYY-MM-DDTHH:MM:SS.mmm (we use 00:00:00.000 and 23:59:59.999)
    pub_start_str = period_start.strftime("%Y-%m-%dT00:00:00.000")
    pub_end_str = period_end.strftime("%Y-%m-%dT23:59:59.999")

    start_index = 0
    while True:
        vulnerabilities, total_results = fetch_cve_data(pub_start_str, pub_end_str, start_index)
        if not vulnerabilities:
            break

        for vuln in vulnerabilities:
            try:
                cve = vuln.get("cve", {})
                cve_id = cve.get("id", "Unknown ID")

                # Extract English description if available
                descriptions = cve.get("descriptions", [{}])
                description = "No description available"
                for desc in descriptions:
                    if desc.get("lang") == "en":
                        description = desc.get("value", description)

                # Extract CVSS score from v3.1, v3.0 or v2
                cvss_score = None
                metrics = cve.get("metrics", {})
                if "cvssMetricV31" in metrics:
                    cvss_score = metrics["cvssMetricV31"][0]["cvssData"]["baseScore"]
                elif "cvssMetricV30" in metrics:
                    cvss_score = metrics["cvssMetricV30"][0]["cvssData"]["baseScore"]
                elif "cvssMetricV2" in metrics:
                    cvss_score = metrics["cvssMetricV2"][0]["cvssData"]["baseScore"]

                severity = classify_severity(cvss_score)

                # Extract published date; if not available, use a default value.
                published_date = cve.get("published", "Unknown Date")

                # Insert into database (using INSERT OR IGNORE to avoid duplicates)
                cursor.execute(
                    "INSERT OR IGNORE INTO vulnerabilities VALUES (?, ?, ?, ?, ?)",
                    (cve_id, description, cvss_score or 0.0, severity, published_date)
                )
                conn.commit()
            except Exception as e:
                print(f"Error processing CVE {cve_id}: {e}")

        start_index += len(vulnerabilities)
        if start_index >= total_results:
            break

# ----------------------------------------
# Process all periods and store the CVEs
# ----------------------------------------
for period in periods:
    print(f"\nProcessing period from {period[0]} to {period[1]}:")
    process_and_store_period(period[0], period[1])

# ----------------------------------------
# Retrieve and print all stored vulnerabilities
# ----------------------------------------
cursor.execute("SELECT * FROM vulnerabilities")
rows = cursor.fetchall()

if rows:
    print("\nStored Vulnerabilities (Last Year):")
    for row in rows:
        print(f"ID: {row[0]}, Description: {row[1]}, CVSS Score: {row[2]}, Severity: {row[3]}, Published Date: {row[4]}")
else:
    print("No CVEs found in the database.")

conn.close()


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
ID: CVE-2023-37017, Description: Open5GS MME versions <= 2.6.4 contain an assertion that can be remotely triggered via a malformed ASN.1 packet over the S1AP interface. An attacker may send an `S1Setup Request` message missing a required `Global eNB ID` field to repeatedly crash the MME, resulting in denial of service., CVSS Score: 8.6, Severity: High, Published Date: 2025-01-22T15:15:11.310
ID: CVE-2023-37018, Description: Open5GS MME versions <= 2.6.4 contains an assertion that can be remotely triggered via a malformed ASN.1 packet over the S1AP interface. An attacker may send a `UE Capability Info Indication` message missing a required `MME_UE_S1AP_ID` field to repeatedly crash the MME, resulting in denial of service., CVSS Score: 8.6, Severity: High, Published Date: 2025-01-22T15:15:11.410
ID: CVE-2023-37019, Description: Open5GS MME versions <= 2.6.4 contains an assertion that can be remotely triggered via a malforme