Step 2: Silver Layer: To create useful relational tables from the table of arrays which is the output from the broze layer.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *

print("üìñ Starting Silver layer...")

bronze_table = "bronze_db.cve_bronze_2024"
bronze_df = spark.table(bronze_table)

print("Bronze rows:", bronze_df.count())
bronze_df.printSchema()


üìñ Starting Silver layer...
Bronze rows: 38753
root
 |-- containers: string (nullable = true)
 |-- cveMetadata: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- dataType: string (nullable = true)
 |-- dataVersion: string (nullable = true)
 |-- _ingestion_timestamp: timestamp (nullable = true)
 |-- _ingestion_date: date (nullable = true)
 |-- _year: integer (nullable = true)
 |-- _record_id: long (nullable = true)



In [0]:
# descriptions
description_schema = StructType([
    StructField("lang", StringType(), True),
    StructField("value", StringType(), True)
])

# versions
version_schema = StructType([
    StructField("version", StringType(), True),
    StructField("status", StringType(), True)
])

# affected products
affected_schema = StructType([
    StructField("vendor",   StringType(), True),
    StructField("product",  StringType(), True),
    StructField("versions", ArrayType(version_schema), True)
])

# CVSS 3.x
cvss_v3_schema = StructType([
    StructField("baseScore",    StringType(), True),
    StructField("baseSeverity", StringType(), True),
    StructField("vectorString", StringType(), True)
])

metrics_entry_schema = StructType([
    StructField("cvssV3_1", cvss_v3_schema, True),
    StructField("cvssV3_0", cvss_v3_schema, True)
])

# Full containers
containers_schema = StructType([
    StructField(
        "cna",
        StructType([
            StructField("descriptions", ArrayType(description_schema), True),
            StructField("affected", ArrayType(affected_schema), True),
            StructField("metrics", ArrayType(metrics_entry_schema), True)
        ]),
        True
    )
])

print("Schema for parsing ready.")


Schema for parsing ready.


In [0]:
bronze_parsed = bronze_df.withColumn(
    "containers_parsed",
    from_json(col("containers"), containers_schema)
)

print("Parsed 'containers' successfully.")
bronze_parsed.select("cveMetadata", "containers_parsed").show(3, truncate=False)


Parsed 'containers' successfully.
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cveMetadata                                                                                                                                                                                                                              

In [0]:
from pyspark.sql.functions import col, to_timestamp, coalesce, element_at

core_df = (
    bronze_parsed
    .withColumn(
        "description_primary",
        element_at(col("containers_parsed.cna.descriptions"), 1).getField("value")
    )
    .withColumn(
        "metric_primary",
        element_at(col("containers_parsed.cna.metrics"), 1)
    )
    .withColumn("cvss_v31", col("metric_primary.cvssV3_1"))
    .withColumn("cvss_v30", col("metric_primary.cvssV3_0"))
    .select(
        col("cveMetadata")["cveId"].alias("cve_id"),

        # üîπ NEW COLUMN: reserved timestamp
        to_timestamp(col("cveMetadata")["dateReserved"]).alias("date_reserved_ts"),

        # existing timestamps
        to_timestamp(col("cveMetadata")["datePublished"]).alias("date_published_ts"),
        to_timestamp(col("cveMetadata")["dateUpdated"]).alias("date_updated_ts"),
        col("cveMetadata")["state"].alias("state"),

        col("description_primary").alias("description_en"),

        coalesce(col("cvss_v31.baseScore"), col("cvss_v30.baseScore")).alias("cvss_base_score"),
        coalesce(col("cvss_v31.baseSeverity"), col("cvss_v30.baseSeverity")).alias("cvss_base_severity"),
        coalesce(col("cvss_v31.vectorString"), col("cvss_v30.vectorString")).alias("cvss_vector"),

        col("_ingestion_timestamp"),
        col("_ingestion_date"),
        col("_year"),
        col("_record_id"),
    )
)


print("Core Silver rows:", core_df.count())
core_df.show(5, truncate=False)


Core Silver rows: 38753
+-------------+-----------------------+-----------------------+-----------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+--------------------------------------------+--------------------------+---------------+-----+----------+
|cve_id       |date_reserved_ts       |date_published_ts      |date_updated_ts        |state    |description_en                                                                                                                         

In [0]:
affected_df = (
    bronze_parsed
    .select(
        col("cveMetadata")["cveId"].alias("cve_id"),
        explode_outer(col("containers_parsed.cna.affected")).alias("aff")
    )
    .select(
        "cve_id",
        col("aff.vendor").alias("vendor"),
        col("aff.product").alias("product"),
        explode_outer(col("aff.versions")).alias("ver")
    )
    .select(
        "cve_id",
        "vendor",
        "product",
        col("ver.version").alias("version"),
        col("ver.status").alias("version_status")
    )
)

print("Affected Silver rows:", affected_df.count())
affected_df.show(5, truncate=False)


Affected Silver rows: 179498
+-------------+---------+---------------------------------------------------------------------------------------------------------+-------+--------------+
|cve_id       |vendor   |product                                                                                                  |version|version_status|
+-------------+---------+---------------------------------------------------------------------------------------------------------+-------+--------------+
|CVE-2024-4856|Unknown  |FS Product Inquiry                                                                                       |0      |affected      |
|CVE-2024-4716|Campcodes|Complete Web-Based School Management System                                                              |1.0    |affected      |
|CVE-2024-4446|pt-guy   |Content Views ‚Äì Post Grid & Filter, Recent Posts, Category Posts, & More (Gutenberg Blocks and Shortcode)|*      |affected      |
|CVE-2024-4758|Unknown  |Muslim Prayer 

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS silver_db")

core_table_name = "silver_db.cve_core_2024"
affected_table_name = "silver_db.cve_affected_2024"

(core_df.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .saveAsTable(core_table_name))

(affected_df.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .saveAsTable(affected_table_name))

print("Silver tables written successfully.")


Silver tables written successfully.


In [0]:
#sanity check by quireying the silver table.
spark.sql(f"SELECT COUNT(*) FROM silver_db.cve_core_2024").show()



+--------+
|COUNT(*)|
+--------+
|   38753|
+--------+



In [0]:
spark.sql(f"SELECT COUNT(*) FROM silver_db.cve_affected_2024").show()



+--------+
|COUNT(*)|
+--------+
|  179498|
+--------+



In [0]:
spark.sql("SELECT * FROM silver_db.cve_core_2024 LIMIT 5").show(truncate=False)


+--------------+-----------------------+-----------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+--------------------------------------------+--------------------------+---------------+-----+----------+-----------------------+
|cve_id        |date_published_ts      |date_updated_ts        |state    |description_en                                                                                                                                                                                                                                                                                            |cvss_base_score|cvss_base_severity|cvss_vector                                 |_ingestio

In [0]:
spark.sql("SELECT * FROM silver_db.cve_affected_2024 LIMIT 5").show(truncate=False)

+--------------+--------------+-----------------------------+--------+--------------+
|cve_id        |vendor        |product                      |version |version_status|
+--------------+--------------+-----------------------------+--------+--------------+
|CVE-2024-33637|Solid Plugins |Solid Affiliate              |n/a     |affected      |
|CVE-2024-33695|ThemeNcode    |Fan Page Widget by ThemeNcode|n/a     |affected      |
|CVE-2024-33546|AA-Team       |WZone                        |n/a     |affected      |
|CVE-2024-33901|n/a           |n/a                          |n/a     |affected      |
|CVE-2024-33039|Qualcomm, Inc.|Snapdragon                   |QAM8255P|affected      |
+--------------+--------------+-----------------------------+--------+--------------+



In [0]:
%sql
-- Sanity check by querying the silver affected table

SELECT vendor,
       COUNT(version_status) AS count
FROM silver_db.cve_affected_2024
GROUP BY vendor
ORDER BY count DESC;


vendor,count
Linux,36048
Cisco,32445
"Qualcomm, Inc.",25316
Microsoft,13161
,7339
"Brother Industries, Ltd",4427
Autodesk,2966
Siemens,2545
Red Hat,2291
Apple,1692
