In [0]:
import requests, io, zipfile, json
from pyspark.sql import functions as F

URL = "https://github.com/CVEProject/cvelistV5/archive/refs/heads/main.zip"
resp = requests.get(URL)
z = zipfile.ZipFile(io.BytesIO(resp.content))

import re
json_files = [f for f in z.namelist() if re.search(r"cves/2024/.+\.json$", f)]
records = []
for name in json_files[:5000]:  # smaller subset to stay within CE memory
    try:
        with z.open(name) as f:
            j = json.load(f)
            meta = j.get("cveMetadata", {})
            cna = j.get("containers", {}).get("cna", {}) if isinstance(j.get("containers"), dict) else {}
            records.append({
                "cveId": meta.get("cveId"),
                "state": meta.get("state"),
                "datePublished": meta.get("datePublished"),
                "dateUpdated": meta.get("dateUpdated"),
                "title": cna.get("title"),
                "metrics_json": json.dumps(cna.get("metrics", [])),
                "affected_json": json.dumps(cna.get("affected", [])),
                "descriptions_json": json.dumps(cna.get("descriptions", []))
            })
    except:
        pass

df_raw = spark.createDataFrame(records)
df_2024 = (df_raw
           .withColumn("date_published_ts", F.to_timestamp("datePublished"))
           .filter(F.year("date_published_ts") == 2024))

print("✅ df_2024 rebuilt:", df_2024.count(), "rows")


✅ df_2024 rebuilt: 3083 rows


In [0]:
from pyspark.sql import functions as F
import json

# UDF to extract CVSS v3.x fields safely
def extract_cvss_fields(metrics_json):
    try:
        metrics = json.loads(metrics_json)
        if isinstance(metrics, list) and metrics:
            m = metrics[0]
            v31 = m.get("cvssV3_1", {})
            v30 = m.get("cvssV3", {})
            base_score = v31.get("baseScore") or v30.get("baseScore")
            base_sev   = v31.get("baseSeverity") or v30.get("baseSeverity")
            return (base_score, base_sev)
    except Exception:
        pass
    return (None, None)

# Register UDF
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
spark.udf.register("extract_cvss_fields", extract_cvss_fields)

schema = StructType([
    StructField("score", DoubleType()),
    StructField("severity", StringType())
])

df_core = (
    df_2024
    .withColumn("cvss_struct", F.udf(extract_cvss_fields, schema)(F.col("metrics_json")))
    .withColumn("cvss_score", F.col("cvss_struct.score"))
    .withColumn("cvss_severity", F.col("cvss_struct.severity"))
    .withColumn("published_ts", F.to_timestamp("datePublished"))
    .withColumn("updated_ts",   F.to_timestamp("dateUpdated"))
    .select("cveId","title","state","published_ts","updated_ts","cvss_score","cvss_severity")
)

df_core.createOrReplaceTempView("cve_silver_core")

print("✅ Silver Core table ready:", df_core.count(), "rows")
df_core.show(5, truncate=False)


✅ Silver Core table ready: 3083 rows
+-------------+-----+---------+-----------------------+-----------------------+----------+-------------+
|cveId        |title|state    |published_ts           |updated_ts             |cvss_score|cvss_severity|
+-------------+-----+---------+-----------------------+-----------------------+----------+-------------+
|CVE-2024-0001|NULL |PUBLISHED|2024-09-23 17:25:00.509|2024-09-23 17:57:24.819|NULL      |CRITICAL     |
|CVE-2024-0002|NULL |PUBLISHED|2024-09-23 17:26:08.811|2024-09-23 18:04:46.783|NULL      |CRITICAL     |
|CVE-2024-0003|NULL |PUBLISHED|2024-09-23 17:27:30.114|2024-09-24 13:28:44.669|9.1       |CRITICAL     |
|CVE-2024-0004|NULL |PUBLISHED|2024-09-23 17:28:53.664|2024-09-24 13:37:36.931|9.1       |CRITICAL     |
|CVE-2024-0005|NULL |PUBLISHED|2024-09-23 17:34:11.321|2024-09-24 13:49:20.771|9.1       |CRITICAL     |
+-------------+-----+---------+-----------------------+-----------------------+----------+-------------+
only showing top 5

In [0]:
from pyspark.sql import types as T

# Schema for the affected array
affected_schema = T.ArrayType(
    T.StructType([
        T.StructField("vendor", T.StringType()),
        T.StructField("product", T.StringType()),
        T.StructField("versions", T.ArrayType(T.MapType(T.StringType(), T.StringType())))
    ])
)

df_aff = (
    df_2024
    .withColumn("affected_arr", F.from_json("affected_json", affected_schema))
    .withColumn("aff_exploded", F.explode_outer("affected_arr"))
    .select(
        F.col("cveId"),
        F.col("aff_exploded.vendor").alias("vendor"),
        F.col("aff_exploded.product").alias("product")
    )
)

df_aff.createOrReplaceTempView("cve_silver_affected")

print("✅ Silver Affected table ready:", df_aff.count(), "rows")
df_aff.show(10, truncate=False)


✅ Silver Affected table ready: 4989 rows
+-------------+------------------+-------------------+
|cveId        |vendor            |product            |
+-------------+------------------+-------------------+
|CVE-2024-0001|Pure Storage      |FlashArray         |
|CVE-2024-0002|PureStorage       |FlashArray         |
|CVE-2024-0003|PureStorage       |FlashArray         |
|CVE-2024-0004|PureStorage       |FlashArray         |
|CVE-2024-0005|PureStorage       |FlashArray         |
|CVE-2024-0005|PureStorage       |FlashBlade         |
|CVE-2024-0006|YugabyteDB        |YugabyteDB Anywhere|
|CVE-2024-0007|Palo Alto Networks|PAN-OS             |
|CVE-2024-0007|Palo Alto Networks|Prisma Access      |
|CVE-2024-0007|Palo Alto Networks|Cloud NGFW         |
+-------------+------------------+-------------------+
only showing top 10 rows


In [0]:
print("Unique CVEs in core:", df_core.select("cveId").distinct().count())
print("Unique CVEs in affected:", df_aff.select("cveId").distinct().count())

print("Top vendors by number of CVEs:")
df_aff.groupBy("vendor").count().orderBy(F.desc("count")).show(10, False)


Unique CVEs in core: 3083
Unique CVEs in affected: 3083
Top vendors by number of CVEs:
+-------------+-----+
|vendor       |count|
+-------------+-----+
|Red Hat      |1260 |
|code-projects|159  |
|Mozilla      |144  |
|n/a          |114  |
|Unknown      |97   |
|Google       |87   |
|PHPGurukul   |78   |
|D-Link       |73   |
|NVIDIA       |72   |
|NULL         |70   |
+-------------+-----+
only showing top 10 rows


In [0]:
df_core.createOrReplaceTempView("cve_core")
df_aff.createOrReplaceTempView("cve_affected")

# Example join query
spark.sql("""
SELECT a.vendor, COUNT(DISTINCT a.cveId) AS cve_count,
       ROUND(AVG(c.cvss_score),2) AS avg_score
FROM cve_affected a
JOIN cve_core c ON a.cveId = c.cveId
WHERE a.vendor IS NOT NULL
GROUP BY a.vendor
ORDER BY cve_count DESC
LIMIT 10
""").show(truncate=False)


+--------------+---------+---------+
|vendor        |cve_count|avg_score|
+--------------+---------+---------+
|code-projects |158      |5.74     |
|n/a           |114      |5.44     |
|Unknown       |92       |9.8      |
|Google        |87       |NULL     |
|PHPGurukul    |78       |5.23     |
|SourceCodester|70       |4.66     |
|IrfanView     |70       |NULL     |
|Red Hat       |58       |NULL     |
|1000 Projects |50       |NULL     |
|Mozilla       |50       |NULL     |
+--------------+---------+---------+

