In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, lit, explode, expr, array_contains
from pyspark.sql.utils import AnalysisException
import time
from pyspark.sql.functions import current_date,countDistinct

# Initialize Spark Session
#spark = SparkSession.builder.appName("OSV Data Processing").getOrCreate()

# Define Delta Table Path
delta_path = "#path to delta table"

try:
    start_time = time.time()

    # Load Delta Table
    df = spark.read.format("delta").load(delta_path)

    # Step 1: Data Quality Checks (Check for duplicates,future dates,cardinality check)
    df_duplicates = df.groupBy("id", "summary", "modified").count().filter(col("count") > 1)
    print(f"Number of duplicate records: {df_duplicates.count()}")
    df_duplicates.show(truncate=False)

    df_future_dates = df.filter(col("published") > current_date())
    print(f"Number of records with future dates: {df_future_dates.count()}")
    df_future_dates.show(truncate=False)

    df.select([
    count("id").alias("Total Records"),
    countDistinct("id").alias("Unique IDs"),
    countDistinct("ecosystem").alias("Unique Ecosystems"),
    countDistinct("aliases").alias("Unique Aliases")
]).show()

    # Step 2: Extract Required Fields for Derived Tables
    df_exploded = (
        df.withColumn("affected", explode(col("affected")))  # Explode affected array
          .withColumn("ranges", explode(col("affected.ranges")))  # Explode ranges
          .withColumn("events", explode(col("ranges.events")))  # Explode events
          .withColumn("introduced", col("events.introduced"))  # Extract introduced version
          .withColumn("fixed_version", col("events.fixed"))  # Extract fixed version
          .withColumn("ecosystem", col("affected.package.ecosystem"))  # Extract ecosystem directly
          .withColumn("package_name", col("affected.package.name"))  # Extract package name
          .select("id", "ecosystem", "package_name", "introduced", "fixed_version", "published", "severity")
    )

    # Step 3: Create Derived Tables for Common Query Patterns

    # Derived Table 1: Get vulnerabilities by package
    vulnerabilities_by_package = df_exploded.select("id", "ecosystem", "package_name", "severity", "introduced", "fixed_version")
    vulnerabilities_by_package.write.format("delta").partitionBy("ecosystem").mode("overwrite").save(delta_path + "derived/vulnerabilities_by_package")

    # Derived Table 2: Get vulnerabilities by ecosystem
    vulnerabilities_by_ecosystem = df_exploded.groupBy("ecosystem").count()
    vulnerabilities_by_ecosystem.write.format("delta").mode("overwrite").save(delta_path + "derived/vulnerabilities_by_ecosystem")

    # Derived Table 3: Get fixed versions of vulnerabilities
    fixed_versions = df_exploded.filter(col("fixed_version").isNotNull())
    fixed_versions.write.format("delta").partitionBy("ecosystem").mode("overwrite").save(delta_path + "derived/fixed_versions")

    end_time = time.time()
    
    print(f"Data processing completed in {round(end_time - start_time, 2)} seconds.")

except AnalysisException as e:
    print(f"Spark AnalysisException: {e}")
except Exception as e:
    print(f"Unexpected Error: {e}")

finally:
    # Stop Spark Session
    spark.stop()


StatementMeta(osv, 7, 2, Finished, Available, Finished)

Number of duplicate records: 0
+---+-------+--------+-----+
|id |summary|modified|count|
+---+-------+--------+-----+
+---+-------+--------+-----+

Number of records with future dates: 0
+---+-------+-------+-------+--------+---------+-----------------+--------+--------------+--------+---------+----+
|id |summary|details|aliases|modified|published|database_specific|affected|schema_version|severity|ecosystem|year|
+---+-------+-------+-------+--------+---------+-----------------+--------+--------------+--------+---------+----+
+---+-------+-------+-------+--------+---------+-----------------+--------+--------------+--------+---------+----+

+-------------+----------+-----------------+--------------+
|Total Records|Unique IDs|Unique Ecosystems|Unique Aliases|
+-------------+----------+-----------------+--------------+
|        42802|     42802|                9|         28015|
+-------------+----------+-----------------+--------------+

Data processing completed in 88.55 seconds.
