In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Use the same catalog/schema you’re using for Bronze
CATALOG = "workspace"
SCHEMA = "default"

BRONZE_TABLE   = f"{CATALOG}.{SCHEMA}.cve_bronze_records"       # from your Bronze layer
CORE_CVE_TABLE = f"{CATALOG}.{SCHEMA}.cve_core"
AFFECTED_TABLE = f"{CATALOG}.{SCHEMA}.cve_affected_products"

spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

spark.conf.set("spark.sql.shuffle.partitions", "8")


In [0]:
# ---------------------------------------------
# 0. Register Bronze table from existing Delta
# ---------------------------------------------
from pyspark.sql import functions as F
from pyspark.sql import types as T

bronze_delta_path = "/Volumes/workspace/default/cve_demo/bronze/2024_Nov13/"
bronze_table_name = "workspace.default.cve_bronze_records"

bronze_df = (
    spark.read
         .format("delta")
         .load(bronze_delta_path)
)

print("✅ Loaded Bronze Delta from:", bronze_delta_path)
print("Row count in Bronze Delta:", bronze_df.count())
bronze_df.printSchema()

(
    bronze_df.write
             .format("delta")
             .mode("overwrite")
             .saveAsTable(bronze_table_name)
)

print("✅ Registered Bronze table as:", bronze_table_name)




✅ Loaded Bronze Delta from: /Volumes/workspace/default/cve_demo/bronze/2024_Nov13/
Row count in Bronze Delta: 38753
root
 |-- dataType: string (nullable = true)
 |-- dataVersion: string (nullable = true)
 |-- cveMetadata: string (nullable = true)
 |-- containers: string (nullable = true)
 |-- _ingestion_timestamp: timestamp (nullable = true)
 |-- _ingestion_date: date (nullable = true)
 |-- _year: integer (nullable = true)
 |-- _record_id: long (nullable = true)

✅ Registered Bronze table as: workspace.default.cve_bronze_records


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

core_table_name = "workspace.default.cve_core"

bronze_tbl = spark.table(bronze_table_name)

# --- 1.1 Define schema for JSON in 'containers' ---
schema_cna = T.StructType(
    [
        T.StructField("title", T.StringType(), True),
        T.StructField(
            "descriptions",
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField("lang", T.StringType(), True),
                        T.StructField("value", T.StringType(), True),
                    ]
                )
            ),
            True,
        ),
        T.StructField(
            "affected",
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField("vendor", T.StringType(), True),
                        T.StructField("product", T.StringType(), True),
                        T.StructField(
                            "versions",
                            T.ArrayType(
                                T.StructType(
                                    [
                                        T.StructField("version", T.StringType(), True),
                                    ]
                                )
                            ),
                            True,
                        ),
                    ]
                )
            ),
            True,
        ),
        T.StructField(
            "metrics",
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField(
                            "cvssV3_1",
                            T.StructType(
                                [
                                    T.StructField("baseScore",   T.DoubleType(), True),
                                    T.StructField("baseSeverity", T.StringType(), True),
                                    T.StructField("vectorString", T.StringType(), True),
                                ]
                            ),
                            True,
                        ),
                        T.StructField(
                            "cvssV3_0",
                            T.StructType(
                                [
                                    T.StructField("baseScore",   T.DoubleType(), True),
                                    T.StructField("baseSeverity", T.StringType(), True),
                                    T.StructField("vectorString", T.StringType(), True),
                                ]
                            ),
                            True,
                        ),
                        T.StructField(
                            "cvssV2",
                            T.StructType(
                                [
                                    T.StructField("baseScore",   T.DoubleType(), True),
                                    T.StructField("baseSeverity", T.StringType(), True),
                                    T.StructField("vectorString", T.StringType(), True),
                                ]
                            ),
                            True,
                        ),
                    ]
                )
            ),
            True,
        ),
    ]
)

schema_containers = T.StructType(
    [
        T.StructField("cna", schema_cna, True),
    ]
)

In [0]:
# --- 1.2 Parse JSON and get base fields per CVE ---
parsed_core = (
    bronze_tbl
    .select(
        F.get_json_object("cveMetadata", "$.cveId").alias("cve_id"),
        F.to_timestamp(
            F.get_json_object("cveMetadata", "$.datePublished")
        ).alias("published_date"),
        F.to_timestamp(
            F.get_json_object("cveMetadata", "$.dateUpdated")
        ).alias("last_modified_date"),
        F.from_json("containers", schema_containers).alias("containers_struct"),
    )
)

base_core = (
    parsed_core
    .select(
        "cve_id",
        "published_date",
        "last_modified_date",
        F.col("containers_struct.cna.descriptions")[0]["value"].alias("description"),
        F.col("containers_struct.cna.metrics").alias("metrics_array"),
    )
)

# --- 1.3 Flatten metrics and compute a single cvss_score/vector per CVE ---
metrics_exploded = (
    base_core
    .select("cve_id", F.explode_outer("metrics_array").alias("metric_row"))
)

metrics_flat = metrics_exploded.select(
    "cve_id",
    F.coalesce(
        F.col("metric_row.cvssV3_1.baseScore"),
        F.col("metric_row.cvssV3_0.baseScore"),
        F.col("metric_row.cvssV2.baseScore"),
    ).alias("cvss_score"),
    F.coalesce(
        F.col("metric_row.cvssV3_1.vectorString"),
        F.col("metric_row.cvssV3_0.vectorString"),
        F.col("metric_row.cvssV2.vectorString"),
    ).alias("cvss_vector"),
)

metrics_agg = (
    metrics_flat
    .groupBy("cve_id")
    .agg(
        F.max("cvss_score").alias("cvss_score"),
        F.first("cvss_vector", ignorenulls=True).alias("cvss_vector"),
    )
)

core_silver_df = (
    base_core
    .drop("metrics_array")
    .join(metrics_agg, on="cve_id", how="left")
    .dropDuplicates(["cve_id"])
)

core_silver_df.write.format("delta").mode("overwrite").saveAsTable(core_table_name)

print("✅ Silver core table saved as:", core_table_name)
display(spark.table(core_table_name).limit(10))


# ---------------------------------------------
# 2. Silver affected table (cve_affected)
# ---------------------------------------------
affected_table_name = "workspace.default.cve_affected"

bronze_tbl = spark.table(bronze_table_name)

# --- 2.1 Minimal schema for affected parsing ---
schema_cna_affected = T.StructType(
    [
        T.StructField(
            "affected",
            T.ArrayType(
                T.StructType(
                    [
                        T.StructField("vendor", T.StringType(), True),
                        T.StructField("product", T.StringType(), True),
                        T.StructField(
                            "versions",
                            T.ArrayType(
                                T.StructType(
                                    [
                                        T.StructField("version", T.StringType(), True),
                                    ]
                                )
                            ),
                            True,
                        ),
                    ]
                )
            ),
            True,
        ),
    ]
)

schema_containers_affected = T.StructType(
    [
        T.StructField("cna", schema_cna_affected, True),
    ]
)

parsed_affected = (
    bronze_tbl
    .select(
        F.get_json_object("cveMetadata", "$.cveId").alias("cve_id"),
        F.from_json("containers", schema_containers_affected).alias("containers_struct"),
    )
)

# --- 2.2 Explode affected and versions ---
affected_step1 = (
    parsed_affected
    .select(
        "cve_id",
        F.col("containers_struct.cna.affected").alias("affected_list"),
    )
    .withColumn("affected_entry", F.explode_outer("affected_list"))
    .select(
        "cve_id",
        F.col("affected_entry.vendor").alias("vendor"),
        F.col("affected_entry.product").alias("product"),
        F.col("affected_entry.versions").alias("raw_versions"),
    )
)

affected_final = (
    affected_step1
    .withColumn("version_struct", F.explode_outer("raw_versions"))
    .select(
        "cve_id",
        "vendor",
        "product",
        F.col("version_struct.version").alias("version"),
    )
)

affected_final.write.format("delta").mode("overwrite").saveAsTable(affected_table_name)

print("✅ Silver affected table saved as:", affected_table_name)
display(spark.table(affected_table_name).limit(10))

✅ Silver core table saved as: workspace.default.cve_core


cve_id,published_date,last_modified_date,description,cvss_score,cvss_vector
CVE-2024-43168,2024-08-08T20:25:24.723Z,2025-11-03T22:04:59.198Z,"DISPUTE NOTE: this issue does not pose a security risk as it (according to analysis by the original software developer, NLnet Labs) falls within the expected functionality and security controls of the application. Red Hat has made a claim that there is a security risk within Red Hat products. NLnet Labs has no further information about the claim, and suggests that affected Red Hat customers refer to available Red Hat documentation or support channels. ORIGINAL DESCRIPTION: A heap-buffer-overflow flaw was found in the cfg_mark_ports function within Unbound's config_file.c, which can lead to memory corruption. This issue could allow an attacker with local access to provide specially crafted input, potentially causing the application to crash or allowing arbitrary code execution. This could result in a denial of service or unauthorized actions on the system.",4.8,CVSS:3.1/AV:L/AC:L/PR:L/UI:R/S:U/C:L/I:L/A:L
CVE-2024-43710,2025-01-23T06:06:38.572Z,2025-01-23T14:48:53.139Z,"A server side request forgery vulnerability was identified in Kibana where the /api/fleet/health_check API could be used to send requests to internal endpoints. Due to the nature of the underlying request, only endpoints available over https that return JSON could be accessed. This can be carried out by users with read access to Fleet.",4.3,CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:N/I:L/A:N
CVE-2024-43687,2024-10-04T19:41:15.354Z,2025-05-23T15:13:13.627Z,Improper Neutralization of Input During Web Page Generation (XSS or 'Cross-site Scripting') vulnerability in Microchip TimeProvider 4100 (banner config modules) allows Cross-Site Scripting (XSS).This issue affects TimeProvider 4100: from 1.0 before 2.4.7.,,
CVE-2024-43317,2024-08-19T19:22:52.808Z,2024-08-19T19:47:36.378Z,Improper Neutralization of Input During Web Page Generation (XSS or 'Cross-site Scripting') vulnerability in Metagauss User Registration Team RegistrationMagic allows Cross-Site Scripting (XSS).This issue affects RegistrationMagic: from n/a through 6.0.1.0.,4.3,CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:L/A:N
CVE-2024-43057,2025-03-03T10:07:24.713Z,2025-03-03T13:11:53.199Z,Memory corruption while processing command in Glink linux.,7.8,CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H
CVE-2024-43912,2024-08-26T10:11:16.868Z,2025-11-03T22:07:17.700Z,"In the Linux kernel, the following vulnerability has been resolved: wifi: nl80211: disallow setting special AP channel widths Setting the AP channel width is meant for use with the normal 20/40/... MHz channel width progression, and switching around in S1G or narrow channels isn't supported. Disallow that.",,
CVE-2024-43822,2024-08-17T09:21:42.997Z,2025-05-04T09:27:02.232Z,"In the Linux kernel, the following vulnerability has been resolved: ASoc: PCM6240: Return directly after a failed devm_kzalloc() in pcmdevice_i2c_probe() The value “-ENOMEM” was assigned to the local variable “ret” in one if branch after a devm_kzalloc() call failed at the beginning. This error code will trigger then a pcmdevice_remove() call with a passed null pointer so that an undesirable dereference will be performed. Thus return the appropriate error code directly.",,
CVE-2024-43521,2024-10-08T17:35:52.735Z,2025-07-08T15:39:07.655Z,Windows Hyper-V Denial of Service Vulnerability,7.5,CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H/E:U/RL:O/RC:C
CVE-2024-43391,2024-09-10T08:44:42.576Z,2025-08-22T06:31:58.682Z,"A low privileged remote attacker can perform configuration changes of the firewall services, including packet filter, packet forwarding, network access control or NAT through the FW_PORTFORWARDING.SRC_IP environment variable which can lead to a DoS.",8.1,CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:N/I:H/A:H
CVE-2024-43689,2024-10-21T01:27:15.253Z,2025-09-04T15:04:43.804Z,"Stack-based buffer overflow vulnerability exists in ELECOM wireless access points. By processing a specially crafted HTTP request, arbitrary code may be executed.",8.8,CVSS:3.0/AV:A/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H


✅ Silver affected table saved as: workspace.default.cve_affected


cve_id,vendor,product,version
CVE-2024-43223,EventPrime Events,EventPrime,
CVE-2024-43582,Microsoft,Windows 10 Version 1809,10.0.17763.0
CVE-2024-43582,Microsoft,Windows Server 2019,10.0.17763.0
CVE-2024-43582,Microsoft,Windows Server 2019 (Server Core installation),10.0.17763.0
CVE-2024-43582,Microsoft,Windows Server 2022,10.0.20348.0
CVE-2024-43582,Microsoft,Windows 11 version 21H2,10.0.22000.0
CVE-2024-43582,Microsoft,Windows 10 Version 21H2,10.0.19043.0
CVE-2024-43582,Microsoft,Windows 11 version 22H2,10.0.22621.0
CVE-2024-43582,Microsoft,Windows 10 Version 22H2,10.0.19045.0
CVE-2024-43582,Microsoft,Windows 11 version 22H3,10.0.22631.0
