# Imports and getting variables

In [0]:
# Imports
from pyspark.sql.functions import current_timestamp
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number, when, avg, sum
from datetime import date

# widgets with default value
dbutils.widgets.text("date_param_min", date.today().strftime("%Y-%m-%d"), "Minimum date")
dbutils.widgets.text("date_param_max", date.today().strftime("%Y-%m-%d"), "Maximum date")

# read the widgets
min_date = dbutils.widgets.get("date_param_min")
max_date = dbutils.widgets.get("date_param_max")

# Simulating Database to Raw

In [0]:
# get data from production database
df = spark.table("samples.bakehouse.sales_transactions")
# add ingestion_timestamp and partition_date columns for audit and daily executions
df_with_columns = df.withColumn("partition_date", df["dateTime"].cast("date")).withColumn("ingestion_timestamp", current_timestamp())
# filter data based on date parameters
filtered_df = df_with_columns.filter((df_with_columns["partition_date"] >= min_date) & (df_with_columns["partition_date"] <= max_date))
# logging amount of data ingested
print("Amount of data ingested: ", filtered_df.count())
# write data to raw table
filtered_df.write.mode("append").option("mergeSchema", "true").partitionBy("partition_date").saveAsTable("bakehouse.raw.sales_transactions")

# Simulating Raw to Bronze

In [0]:
# get data from raw table
df = spark.table("bakehouse.raw.sales_transactions")
# filter data based on date parameters
filtered_df = df.filter((df["partition_date"] >= min_date) & (df["partition_date"] <= max_date))
# rank each partition_date based on ingestion_timestamp
ranked_df = filtered_df.withColumn("rank", rank().over(Window.partitionBy("partition_date").orderBy(col("ingestion_timestamp").desc())))
# deduplicate partition_date based on ingestion_timestamp
files_deduped_df = ranked_df.filter(ranked_df["rank"] == 1)
# dropping rank column and updating ingestion_timestamp
final_df = files_deduped_df.drop("rank").withColumn("ingestion_timestamp", current_timestamp())
# logging amount of data ingested
print("Amount of data ingested: ", final_df.count())
# write data to bronze table
final_df.write.mode("overwrite").option("mergeSchema", "true").partitionBy("partition_date").option("partitionOverwriteMode", "dynamic").saveAsTable("bakehouse.bronze.sales_transactions")

# Simulating Bronze to Silver

In [0]:
# get data from bronze table
df = spark.table("bakehouse.bronze.sales_transactions")
# filter data based on date parameters
filtered_df = df.filter((df["partition_date"] >= min_date) & (df["partition_date"] <= max_date)).filter(df["paymentMethod"] != "amex")
# rank each transactionID row based on ingestion_timestamp
ranked_df = filtered_df.withColumn("rank", row_number().over(Window.partitionBy("transactionID").orderBy(col("ingestion_timestamp").desc())))
# deduplicate transactionID based on ingestion_timestamp
deduped_df = ranked_df.filter(ranked_df["rank"] == 1).drop("rank")
# check for percentage of negative values in quantity, totalPrice and unitPrice
check_df = deduped_df.withColumn("is_valid",when((deduped_df["quantity"] < 0) | (deduped_df["totalPrice"] < 0) | (deduped_df["unitPrice"] < 0), 0).otherwise(1))
valid_ratio = check_df.agg(avg(col("is_valid")).alias("valid_ratio")).collect()[0]["valid_ratio"]
if valid_ratio < 0.8:
  raise Exception("Percentage of valid records is less than 80%, check source")
else:
  print("Percentage of valid records is greater than 80%, proceeding with the pipeline")
  # dropping is_valid column and updating ingestion_timestamp
  final_df = check_df.drop("is_valid").withColumn("ingestion_timestamp", current_timestamp())
  # logging amount of data ingested
  print("Amount of data ingested: ", final_df.count())
  # write data to silver table
  final_df.write.mode("overwrite").option("mergeSchema", "true").partitionBy("partition_date").option("partitionOverwriteMode", "dynamic").saveAsTable("bakehouse.silver.sales_transactions")

# Simulating Silver to Gold

In [0]:
# get data from silver table
df = spark.table("bakehouse.silver.sales_transactions")
# filter data based on date parameters
filtered_df = df.filter((df["partition_date"] >= min_date) & (df["partition_date"] <= max_date))
# applying discount logic
transformed_df = filtered_df.withColumn("discounted_total_price", when(filtered_df["PaymentMethod"] == "visa", filtered_df["totalPrice"]*0.95).otherwise(filtered_df["totalPrice"]))
# aggregating data
aggregated_df = transformed_df.groupBy("partition_date","product").agg(
    sum("quantity").alias("total_quantity"),
    sum("totalPrice").alias("total_revenue"),
    sum("discounted_total_price").alias("total_discounted_revenue")
)
# updating ingestion_timestamp
final_df = aggregated_df.withColumn("ingestion_timestamp", current_timestamp())
# logging amount of data ingested
print("Amount of data ingested: ", final_df.count())
# write data to gold table
final_df.write.mode("overwrite").option("mergeSchema", "true").partitionBy("partition_date").option("partitionOverwriteMode", "dynamic").saveAsTable("bakehouse.gold.sales_transactions")