In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

In [0]:
display(os.environ.get('SPARK_VERSION'))
date_str = dbutils.widgets.get("arrival_date")

booking_data = f"/Volumes/incremental_load/default/orders_data/booking_data/bookings_{date_str}.csv"
customer_data = f"/Volumes/incremental_load/default/orders_data/customer_data/customers_{date_str}.csv"

print(booking_data)
print(customer_data)

In [0]:
customer_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .load(customer_data)
customer_df.printSchema()
display(customer_df)

booking_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiline", "true") \
    .load(booking_data)

booking_df.printSchema()
display(booking_df)

In [0]:
# Data quality checks for customer_df
customer_check = Check(spark, CheckLevel.Error, "Customer Data Quality Check") \
    .isComplete("customer_id") \
    .isUnique("customer_id") \
    .isComplete("customer_name") \
    .isComplete("customer_address") \
    .isComplete("email") \
    .hasSize(lambda x: x > 0)

customer_verification = VerificationSuite(spark) \
    .onData(customer_df) \
    .addCheck(customer_check) \
    .run()

customer_df_check = VerificationResult.checkResultsAsDataFrame(spark, customer_verification)
print(customer_df_check)

# Data quality checks for booking_df
booking_check = Check(spark, CheckLevel.Error, "Booking Data Quality Check") \
    .isComplete("booking_id") \
    .isUnique("booking_id", hint="Booking ID is not unique throught") \
    .isComplete("customer_id") \
    .isNonNegative("amount") \
    .isNonNegative("quantity") \
    .isNonNegative("discount") \
    .hasSize(lambda x: x > 0)

booking_verification = VerificationSuite(spark) \
    .onData(booking_df) \
    .addCheck(booking_check) \
    .run()

booking_df_check = VerificationResult.checkResultsAsDataFrame(spark, booking_verification)
print(booking_df_check)

if booking_verification.status != "Success":
    raise ValueError("Data Quality Checks Failed for Booking Data")

if customer_verification.status != "Success":
    raise ValueError("Data Quality Checks Failed for Customer Data")

In [0]:
booking_df_incremental = booking_df.withColumn("ingestion_time", current_timestamp())
# Join booking data with customer data on customer_id
df_joined = booking_df_incremental.join(customer_df, "customer_id")
# Transform and get Net amount after discount
df_transformed = df_joined.withColumn(
    "net_amount", col("amount") - col("discount")) \
    .filter(col("quantity") > 0)

# Group by aggregation by customer id and booking type
df_transformed_agg = df_transformed \
    .groupBy("customer_id", "booking_type") \
    .agg(
        _sum("net_amount").alias("total_amount"),
        _sum("quantity").alias("total_quantity")
    )

In [0]:
fact_table_path = "incremental_load.default.booking_fact"
fact_table_exists = spark._jsparkSession.catalog().tableExists(fact_table_path)

if fact_table_exists:
    existing_fact_df = spark.read.format("delta").table(fact_table_path)
    df_combined = existing_fact_df.unionByName(df_transformed_agg, allowMissingColumns = True)

    df_final_agg = df_combined.groupBy("customer_id", "booking_type") \
        .agg(
            _sum("total_amount").alias("total_amount"),
            _sum("total_quantity").alias("total_quantity")
        )
else:
    df_final_agg = df_transformed_agg

print(df_final_agg)

In [0]:
df_final_agg.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(fact_table_path)

In [0]:
scd2_table_path = "incremental_load.default.customer_dim"
scd2_table_exists = spark._jsparkSession.catalog().tableExists(scd2_table_path)

if not scd2_table_exists:
    customer_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(scd2_table_path)
else:
    deltaTable = DeltaTable.forName(spark, scd2_table_path)
    print(deltaTable.toDF())
   
    # SCD2 Merge
    deltaTable.alias("scd").merge(
        customer_df.alias("updates"),
        "scd.customer_id = updates.customer_id and scd.valid_to = '9999-12-31'" 
    ).whenMatchedUpdate(
        set={
            "valid_to": "updates.valid_from"
        }
    ).execute()

    customer_df.write.format("delta").mode("append").saveAsTable(scd2_table_path)
