In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

print(os.environ['SPARK_VERSION'])

# Get job parameters from Databricks
date_str = dbutils.widgets.get("current_date")
# date_str = "2024-07-26"

# Define file paths based on date parameter
booking_data = f"dbfs:/DataEngineering/bookings_daily_data/bookings_{date_str}.csv"
customer_data = f"dbfs:/DataEngineering/customers_daily_data/customers_{date_str}.csv"
print(booking_data)
print(customer_data)

# Read booking data
booking_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiLine", "true") \
    .load(booking_data)

booking_df.printSchema()
display(booking_data)

# Read customer data for scd2 merge
customer_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiLine", "true") \
    .load(customer_data)

customer_df.printSchema()
display(customer_df)

# Data Quality Checks on booking data
check_incremental = Check(spark, CheckLevel.Error, "Booking Data Check") \
    .hasSize(lambda x: x > 0) \
    .isUnique("booking_id", hint="Booking ID is not unique throught") \
    .isComplete("customer_id") \
    .isComplete("amount") \
    .isNonNegative("amount") \
    .isNonNegative("quantity") \
    .isNonNegative("discount")

# Data Quality Checks on customer data
check_scd = Check(spark, CheckLevel.Error, "Customer Data Check") \
    .hasSize(lambda x: x > 0) \
    .isUnique("customer_id") \
    .isComplete("customer_name") \
    .isComplete("customer_address") \
    .isComplete("phone_number") \
    .isComplete("email")

# Run the verification suite
booking_dq_check = VerificationSuite(spark) \
    .onData(booking_df) \
    .addCheck(check_incremental) \
    .run()

customer_dq_check = VerificationSuite(spark) \
    .onData(customer_df) \
    .addCheck(check_scd) \
    .run()

booking_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)
display(booking_dq_check_df)

customer_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, customer_dq_check)
display(customer_dq_check_df)

# Check if verification passed
if booking_dq_check.status != "Success":
    raise ValueError("Data Quality Checks Failed for Booking Data")

if customer_dq_check.status != "Success":
    raise ValueError("Data Quality Checks Failed for Customer Data")

# Add ingestion timestamp to booking data
booking_df_incremental = booking_df.withColumn("ingestion_time", current_timestamp())

# Join booking data with customer data
df_joined = booking_df_incremental.join(customer_df, "customer_id")

# Business transformation: calculate total cost after discount and filter
df_transformed = df_joined \
    .withColumn("total_cost", col("amount") - col("discount")) \
    .filter(col("quantity") > 0)

# Group by and aggregate df_transformed
df_transformed_agg = df_transformed \
    .groupBy("booking_type", "customer_id") \
    .agg(
        _sum("total_cost").alias("total_amount_sum"),
        _sum("quantity").alias("total_quantity_sum")
    )

# Check if the Delta table exists
fact_table_path = "gds_de_bootcamp.default.booking_fact"
fact_table_exists = spark._jsparkSession.catalog().tableExists(fact_table_path)

if fact_table_exists:
    # Read the existing fact table
    df_existing_fact = spark.read.format("delta").table(fact_table_path)
    
    # Combine the aggregated data
    df_combined = df_existing_fact.unionByName(df_transformed_agg, allowMissingColumns=True)
    
    # Perform another group by and aggregation on the combined data
    df_final_agg = df_combined \
        .groupBy("booking_type", "customer_id") \
        .agg(
            _sum("total_amount_sum").alias("total_amount_sum"),
            _sum("total_quantity_sum").alias("total_quantity_sum")
        )
else:
    # If the fact table doesn't exist, use the aggregated transformed data directly
    df_final_agg = df_transformed_agg

display(df_final_agg)

# Write the final aggregated data back to the Delta table
df_final_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(fact_table_path)

scd_table_path = "gds_de_bootcamp.default.customer_scd"
scd_table_exists = spark._jsparkSession.catalog().tableExists(scd_table_path)

# Check if the customers table exists
if scd_table_exists:
    # Load the existing SCD table
    scd_table = DeltaTable.forName(spark, scd_table_path)
    display(scd_table.toDF())
    
    # Perform SCD2 merge logic
    scd_table.alias("scd") \
        .merge(
            source=customer_df.alias("updates"),
            condition="scd.customer_id = updates.customer_id and scd.valid_to = '9999-12-31'"
        ) \
        .whenMatchedUpdate(set={
            "valid_to": "updates.valid_from",
        }) \
        .execute()

    customer_df.write.format("delta").mode("append").saveAsTable(scd_table_path)
else:
    # If the SCD table doesn't exist, write the customer data as a new Delta table
    customer_df.write.format("delta").mode("overwrite").saveAsTable(scd_table_path)


3.5.0
root
 |-- booking_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- booking_date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- booking_type: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: integer (nullable = true)
 |-- booking_status: string (nullable = true)
 |-- hotel_name: string (nullable = true)
 |-- flight_number: string (nullable = true)



'dbfs:/DataEngineering/bookings_daily_data/bookings_2024-07-26.csv'

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- valid_from: date (nullable = true)
 |-- valid_to: date (nullable = true)



customer_id,customer_name,customer_address,phone_number,email,valid_from,valid_to
1026,Lori Odom,"66894 Pamela Ridge Apt. 701 Wilsonport, NV 55859",,santanachristopher@wilson-bailey.com,2024-01-30,9999-12-31
1030,David Odonnell,USNV Simmons FPO AE 08244,6864684148,kathleen41@hotmail.com,2024-08-30,9999-12-31
1035,Cheryl Weaver,"3716 Cunningham Station Apt. 567 Davidborough, TX 41021",893.223.0773x3326,smithcatherine@yahoo.com,2024-03-07,9999-12-31
1036,Rebecca Johnson,"674 Bishop Mission Suzannebury, NY 90306",018.713.0054x360,carol33@holt-higgins.info,2024-05-25,9999-12-31
1037,Lisa Hill,"6720 Brittany Streets Lake Sabrinaview, IN 22990",(646)830-3919x64651,cory15@hotmail.com,2024-11-01,9999-12-31
1038,Aaron Cooper,"37842 Haynes Isle Suite 421 South Marisa, PA 75690",249-334-3781x7626,crystal91@henderson-lane.net,2024-11-26,9999-12-31
1039,Betty Andrews,Unit 9441 Box 7301 DPO AA 92892,(055)647-0735,avilacody@yahoo.com,2024-04-10,9999-12-31
1047,Edward Stone,"31740 Martinez Trace Jonesview, NC 49949",884.266.5166x7808,zwhite@hotmail.com,2024-03-31,9999-12-31
1048,James Myers,"78527 Kelly Corner Powellbury, FL 03544",001-403-398-8094,vyoder@wiley-jones.com,2024-08-06,9999-12-31
1050,Scott Freeman,"528 John Hollow Theresabury, SC 37328",0970621868,emily78@gmail.com,2024-01-24,9999-12-31




check,check_level,check_status,constraint,constraint_status,constraint_message
Booking Data Check,Error,Success,SizeConstraint(Size(None)),Success,
Booking Data Check,Error,Success,"UniquenessConstraint(Uniqueness(List(booking_id),None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(customer_id,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(amount,None,None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(amount is non-negative,COALESCE(CAST(amount AS DECIMAL(20,10)), 0.0) >= 0,None,List(amount),None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(quantity is non-negative,COALESCE(CAST(quantity AS DECIMAL(20,10)), 0.0) >= 0,None,List(quantity),None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(discount is non-negative,COALESCE(CAST(discount AS DECIMAL(20,10)), 0.0) >= 0,None,List(discount),None))",Success,


check,check_level,check_status,constraint,constraint_status,constraint_message
Customer Data Check,Error,Error,SizeConstraint(Size(None)),Success,
Customer Data Check,Error,Error,"UniquenessConstraint(Uniqueness(List(customer_id),None,None))",Success,
Customer Data Check,Error,Error,"CompletenessConstraint(Completeness(customer_name,None,None))",Success,
Customer Data Check,Error,Error,"CompletenessConstraint(Completeness(customer_address,None,None))",Success,
Customer Data Check,Error,Error,"CompletenessConstraint(Completeness(phone_number,None,None))",Failure,Value: 0.9885057471264368 does not meet the constraint requirement!
Customer Data Check,Error,Error,"CompletenessConstraint(Completeness(email,None,None))",Success,


[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-3884623181828154>, line 82[0m
[1;32m     79[0m     [38;5;28;01mraise[39;00m [38;5;167;01mValueError[39;00m([38;5;124m"[39m[38;5;124mData Quality Checks Failed for Booking Data[39m[38;5;124m"[39m)
[1;32m     81[0m [38;5;28;01mif[39;00m customer_dq_check[38;5;241m.[39mstatus [38;5;241m!=[39m [38;5;124m"[39m[38;5;124mSuccess[39m[38;5;124m"[39m:
[0;32m---> 82[0m     [38;5;28;01mraise[39;00m [38;5;167;01mValueError[39;00m([38;5;124m"[39m[38;5;124mData Quality Checks Failed for Customer Data[39m[38;5;124m"[39m)
[1;32m     84[0m [38;5;66;03m# Add ingestion timestamp to booking data[39;00m
[1;32m     85[0m booking_df_incremental [38;5;241m=[39m booking_df[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mingestion_time[39m[38;5;124m"[39m, current_t

In [0]:
# %sql
# DROP TABLE gds_de_bootcamp.default.booking_fact

In [0]:
# Read the customer SCD table and display all data
# customer_scd_df = spark.read.format("delta").table("gds_de_bootcamp.default.customer_scd")
# display(customer_scd_df)

customer_id,customer_name,customer_address,phone_number,email,valid_from,valid_to
1026,Lori Odom,"66894 Pamela Ridge Apt. 701 Wilsonport, NV 55859",,santanachristopher@wilson-bailey.com,2024-01-30,9999-12-31
1030,David Odonnell,USNV Simmons FPO AE 08244,6864684148,kathleen41@hotmail.com,2024-08-30,9999-12-31
1035,Cheryl Weaver,"3716 Cunningham Station Apt. 567 Davidborough, TX 41021",893.223.0773x3326,smithcatherine@yahoo.com,2024-03-07,9999-12-31
1036,Rebecca Johnson,"674 Bishop Mission Suzannebury, NY 90306",018.713.0054x360,carol33@holt-higgins.info,2024-05-25,9999-12-31
1037,Lisa Hill,"6720 Brittany Streets Lake Sabrinaview, IN 22990",(646)830-3919x64651,cory15@hotmail.com,2024-11-01,9999-12-31
1038,Aaron Cooper,"37842 Haynes Isle Suite 421 South Marisa, PA 75690",249-334-3781x7626,crystal91@henderson-lane.net,2024-11-26,9999-12-31
1039,Betty Andrews,Unit 9441 Box 7301 DPO AA 92892,(055)647-0735,avilacody@yahoo.com,2024-04-10,9999-12-31
1047,Edward Stone,"31740 Martinez Trace Jonesview, NC 49949",884.266.5166x7808,zwhite@hotmail.com,2024-03-31,9999-12-31
1048,James Myers,"78527 Kelly Corner Powellbury, FL 03544",001-403-398-8094,vyoder@wiley-jones.com,2024-08-06,9999-12-31
1050,Scott Freeman,"528 John Hollow Theresabury, SC 37328",0970621868,emily78@gmail.com,2024-01-24,9999-12-31
