# Medallion Pipeline (Databricks)

This pipeline implements a Medallion architecture on Databricks for processing e-commerce data (users, items, and events). The pipeline follows a layered approach:

- Bronze Layer: Ingests raw data into Delta tables without significant transformation, preserving the original structure for auditing and replayability.
- Silver Layer: Cleans, validates, and refines the data (e.g., casting types, handling nulls, adding timestamps), making it queryable and reliable for analysis.
- Gold Layer: Aggregates and models the refined data into a business-oriented datamart (e.g., top items by views), optimized for reporting and decision-making.

The pipeline uses Spark SQL and DataFrames to handle data from JSON sources, applies transformations, and stores results in Delta Lake tables within a Unity Catalog. It culminates in a datamart for ranking items by view counts per year.

## 1. Imports and configuration

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, from_json, get_json_object, 
    to_timestamp, year, coalesce, rank, row_number,
    regexp_replace, trim
)
from pyspark.sql.types import (
    StructType, StructField, StringType, 
    LongType, TimestampType, DoubleType, DecimalType
)
from pyspark.sql.window import Window

In [None]:
CATALOG_NAME = "workspace" 
SCHEMA_NAME = "default" 
VOLUME_NAME = "merkle_landing_zone"

volume_path = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}"
item_file_path = f"{volume_path}/item.csv"
event_file_path = f"{volume_path}/event.csv"

item_source_url = "https://merkle-de-interview-case-study.s3.eu-central-1.amazonaws.com/de/item.csv"
event_source_url = "https://merkle-de-interview-case-study.s3.eu-central-1.amazonaws.com/de/event.csv"

item_target_path = f"{volume_path}/item.csv"
event_target_path = f"{volume_path}/event.csv"

spark.sql(f"USE CATALOG {CATALOG_NAME}")

## 2. Data files copy S3 -> DBFS

In [None]:
dbutils.fs.cp(item_source_url, item_target_path, recurse=True)
dbutils.fs.cp(event_source_url, event_target_path, recurse=True)

display(dbutils.fs.ls(volume_path))

path,name,size,modificationTime
dbfs:/Volumes/workspace/default/merkle_landing_zone/event.csv,event.csv,155970517,1761286720000
dbfs:/Volumes/workspace/default/merkle_landing_zone/item.csv,item.csv,184207,1761286717000


## 3. LAYER 1: BRONZE

 - Ingests raw data into Delta tables without significant transformation

#### 3.A. BRONZE Schema

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS bronze_db")
spark.sql(f"USE SCHEMA bronze_db")

DataFrame[]

#### 3.B. BRONZE (Item Table)

string-only schema for the item CSV (to ingest raw without casting)
- reads the CSV
- writes it as a Delta table

In [None]:
# Define a schema where all fields are strings to ingest raw CSV data without immediate type casting
item_string_schema = StructType([
    StructField("adjective", StringType(), True),
    StructField("category", StringType(), True),
    StructField("created_at", StringType(), True),
    StructField("id", StringType(), True),
    StructField("modifier", StringType(), True),
    StructField("name", StringType(), True),
    StructField("price", StringType(), True)
])

# Read the CSV file using the string schema, with header
bronze_item_df = (spark.read
    .format("csv")
    .option("header", "true")
    .schema(item_string_schema)
    .load(item_file_path)
)

# Write the DataFrame as a Delta table in bronze_db, overwriting if exists
(bronze_item_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"{CATALOG_NAME}.bronze_db.item")
)

display(spark.sql(f"SELECT * FROM {CATALOG_NAME}.bronze_db.item LIMIT 10"))

adjective,category,created_at,id,modifier,name,price
fuzzy,contraption,2014-01-15 21:36:09,2512.0,carrying_case,fuzzy contraption carrying_case,150.0
,instrument,2013-05-14 05:20:50,482.0,refill,instrument refill,35.2
industrial-strength,module,2014-02-04 19:28:32,2446.0,,industrial-strength module,300.0
digital,tool,2013-02-25 12:23:18,1312.0,carrying_case,digital tool carrying_case,16.5
miniature,device,2013-08-05 17:20:45,3556.0,cleaner,miniature device cleaner,16.5
rechargable,contraption,2013-09-12 06:27:01,131.0,cleaner,rechargable contraption cleaner,195.0
,instrument,2013-10-07 09:38:14,1178.0,how-to-manual,instrument how-to-manual,0.0
prize-winning,mechanism,2013-09-09 09:32:18,110.0,storage_unit,prize-winning mechanism storage_unit,41.25
,mechanism,2013-10-25 22:53:25,47.0,,mechanism,15.0
organic,tool,2013-05-10 10:19:33,1696.0,,organic tool,37.5


#### 3.C. BRONZE (Event Table)

In [None]:
event_string_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_time", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_payload", StringType(), True)
])

# multiLine: "true" enables reading fields that span multiple lines, crucial for JSON payloads that might contain newlines
# escape: '"' specifies the escape character for handling double quotes inside quoted fields, preventing parsing errors in JSON strings
bronze_event_df = (spark.read
    .format("csv")
    .option("header", "true")
    .schema(event_string_schema)
    .option("multiLine", "true")
    .option("escape", '"')
    .load(event_file_path)
)

(bronze_event_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(f"{CATALOG_NAME}.bronze_db.event")
)

display(spark.sql(f"SELECT * FROM {CATALOG_NAME}.bronze_db.event LIMIT 10"))

event_id,event_time,user_id,event_payload
b9de71c5c3cc4cd7a97e50b832106e5a,2017-06-26 11:23:39,178481.0,"{""event_name"":""view_item"",""platform"":""android"",""parameter_name"":""item_id"",""parameter_value"":""3526""}"
23267713c9ea44419331731f50b6a8db,2017-06-27 10:46:39,178481.0,"{""event_name"":""view_item"",""platform"":""android"",""parameter_name"":""item_id"",""parameter_value"":""1514""}"
1b7822fa7b854e01970218ae8f721fe0,2017-06-27 11:15:39,178481.0,"{""event_name"":""view_item"",""platform"":""android"",""parameter_name"":""item_id"",""parameter_value"":""3712""}"
2a7a188a626841ac94befcc419f06af4,2016-10-05 20:43:10,154133.0,"{""event_name"":""view_item"",""platform"":""android"",""parameter_name"":""item_id"",""parameter_value"":""3586""}"
631d657264cc4616a4528f759509b25d,2016-10-04 03:29:10,154133.0,"{""event_name"":""view_item"",""platform"":""android"",""parameter_name"":""item_id"",""parameter_value"":""1061""}"
05e4df2fa9044bf9a49a7351fdd4a6cd,2016-07-21 01:17:43,119514.0,"{""event_name"":""view_item"",""platform"":""web"",""parameter_name"":""item_id"",""parameter_value"":""550""}"
22d3cdd566534fbdaa5e2982c1e64a75,2016-11-24 12:10:06,164581.0,"{""event_name"":""view_item"",""platform"":""web"",""parameter_name"":""item_id"",""parameter_value"":""982""}"
cb0c825fe3804f42b602106580845b88,2016-11-23 21:13:06,164581.0,"{""event_name"":""view_item"",""platform"":""web"",""parameter_name"":""item_id"",""parameter_value"":""858""}"
181f7f97ab10440fb444810e085ded46,2016-11-25 20:06:06,164581.0,"{""event_name"":""view_item"",""platform"":""web"",""parameter_name"":""item_id"",""parameter_value"":""3363""}"
bbbd68b2dbd04ecabacaa00e130a52f9,2018-02-06 06:19:55,232806.0,"{""event_name"":""view_item"",""platform"":""web"",""parameter_name"":""item_id"",""parameter_value"":""2344""}"


## 4. LAYER 2: SILVER

- refines the Bronze event data by parsing the JSON payload
- casting types
- adding year
- filtering valid timestamps
- writing a partitioned Delta table

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS silver_db")
spark.sql(f"USE SCHEMA silver_db")

DataFrame[]

#### 4.A. SILVER (Item Table)

In [None]:
# Load Bronze event data
bronze_item = spark.table(f"{CATALOG_NAME}.bronze_db.item")

# Transform the Bronze item data into Silver by casting types, handling nulls, and selecting/aliasing columns for clarity and consistency
silver_item_df = (bronze_item
  .withColumn("item_id", col("id").cast(DoubleType()).cast(LongType())) # Cast 'id' to Double then Long to handle potential decimal IDs as integers, renaming to 'item_id'
  .withColumn("created_at_ts", to_timestamp(col("created_at")))
  .withColumn("price_decimal", col("price").cast(DecimalType(10, 2))) # Cast 'price' to Decimal with precision 10 and scale 2 for monetary values
  .withColumn("adjective_clean", coalesce(col("adjective"), lit("unknown"))) # Handle null 'adjective' by replacing with 'unknown' using coalesce
  .withColumn("modifier_clean", coalesce(col("modifier"), lit("unknown")))
  .select(
    col("item_id"),
    col("name").alias("item_name"),
    col("category"),
    col("adjective_clean").alias("adjective"),
    col("modifier_clean").alias("modifier"),
    col("price_decimal").alias("price"),
    col("created_at_ts")
  )
)

(silver_item_df.write
  .format("delta")
  .mode("overwrite")
  .saveAsTable(f"{CATALOG_NAME}.silver_db.item")
)
display(spark.sql(f"SELECT * FROM {CATALOG_NAME}.silver_db.item LIMIT 10"))

item_id,item_name,category,adjective,modifier,price,created_at_ts
2512,fuzzy contraption carrying_case,contraption,fuzzy,carrying_case,150.0,2014-01-15T21:36:09.000Z
482,instrument refill,instrument,unknown,refill,35.2,2013-05-14T05:20:50.000Z
2446,industrial-strength module,module,industrial-strength,unknown,300.0,2014-02-04T19:28:32.000Z
1312,digital tool carrying_case,tool,digital,carrying_case,16.5,2013-02-25T12:23:18.000Z
3556,miniature device cleaner,device,miniature,cleaner,16.5,2013-08-05T17:20:45.000Z
131,rechargable contraption cleaner,contraption,rechargable,cleaner,195.0,2013-09-12T06:27:01.000Z
1178,instrument how-to-manual,instrument,unknown,how-to-manual,0.0,2013-10-07T09:38:14.000Z
110,prize-winning mechanism storage_unit,mechanism,prize-winning,storage_unit,41.25,2013-09-09T09:32:18.000Z
47,mechanism,mechanism,unknown,unknown,15.0,2013-10-25T22:53:25.000Z
1696,organic tool,tool,organic,unknown,37.5,2013-05-10T10:19:33.000Z


#### 4.B. SILVER (Event Table)

In [None]:
payload_schema = StructType([
    StructField("event_name", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("parameter_name", StringType(), True),
    StructField("parameter_value", StringType(), True)
])

# Transform the Bronze event data: convert 'event_time' to timestamp, cast 'user_id' to long via double (to handle potential non-integer strings)
# parse 'event_payload' into a struct using the schema, extract year from the timestamp, and select/alias the relevant columns for the Silver layer
silver_event_df = (bronze_event
  .withColumn("event_at_ts", to_timestamp(col("event_time")))
  .withColumn("user_id", col("user_id").cast(DoubleType()).cast(LongType()))
  .withColumn("payload", from_json(col("event_payload"), payload_schema)) # Parse the JSON string in event_payload using the defined schema
  .withColumn("event_year", year(col("event_at_ts")))
  .select(
    col("event_id"),
    col("event_at_ts"),
    col("user_id"),
    col("payload.event_name").alias("event_name"),
    col("payload.platform").alias("platform"),
    col("payload.parameter_name").alias("parameter_name"),
    col("payload.parameter_value").alias("parameter_value"),
    col("event_year")
  )
)

# Filter the DataFrame to remove rows where the timestamp conversion failed (null timestamps), ensuring data quality
silver_event_df_filtered = silver_event_df.filter(col("event_at_ts").isNotNull()) 

(silver_event_df_filtered.write
  .format("delta")
  .mode("overwrite")
  .partitionBy("event_year")
  .saveAsTable(f"{CATALOG_NAME}.silver_db.event")
)

display(spark.sql(f"SELECT * FROM {CATALOG_NAME}.silver_db.event LIMIT 10"))

event_id,event_at_ts,user_id,event_name,platform,parameter_name,parameter_value,event_year
2a7a188a626841ac94befcc419f06af4,2016-10-05T20:43:10.000Z,154133,view_item,android,item_id,3586,2016
631d657264cc4616a4528f759509b25d,2016-10-04T03:29:10.000Z,154133,view_item,android,item_id,1061,2016
05e4df2fa9044bf9a49a7351fdd4a6cd,2016-07-21T01:17:43.000Z,119514,view_item,web,item_id,550,2016
22d3cdd566534fbdaa5e2982c1e64a75,2016-11-24T12:10:06.000Z,164581,view_item,web,item_id,982,2016
cb0c825fe3804f42b602106580845b88,2016-11-23T21:13:06.000Z,164581,view_item,web,item_id,858,2016
181f7f97ab10440fb444810e085ded46,2016-11-25T20:06:06.000Z,164581,view_item,web,item_id,3363,2016
0fb7065155394dc3ba90c6c554dbe8d5,2016-03-03T07:13:36.000Z,108121,view_item,web,item_id,332,2016
fdbc95f4e3644a7caf0a3e143b42bfb3,2016-03-03T18:31:36.000Z,108121,view_item,web,item_id,1132,2016
e8b3c6358f924cf3912f9a0b88758d5f,2016-04-12T10:49:42.000Z,96931,view_item,web,item_id,1533,2016
3018677b46514e018a40bc2aeac6390d,2016-04-11T00:34:42.000Z,96931,view_item,web,item_id,2362,2016


# LAYER 3: GOLD

- loads Silver event
- filters view_item
- aggregates views, platform counts
- finds top platform, ranks
- joins Silver item for names/category
- writes Delta table

DDL: 
- denormalized fact table with item_id/event_year as composite key
- total_views/rank/platform as metrics
- item_name/category for dimension
- optimized for year-based queries via partitioning if added

In [None]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS gold_db")
spark.sql(f"USE SCHEMA gold_db")

DataFrame[]

In [None]:
# Load the Silver event table from the specified catalog and schema
silver_events = spark.table(f"{CATALOG_NAME}.silver_db.event")

# Create a base DataFrame by filtering for 'view_item' events where the parameter is 'item_id' and the value is not null
# select the year, cast parameter_value to long as 'item_id', and platform
views_base_df = (silver_events
  .filter(
    (col("event_name") == "view_item") & 
    (col("parameter_name") == "item_id") &
    (col("parameter_value").isNotNull())
  )
  .select(
    col("event_year"),
    col("parameter_value").cast(DoubleType()).cast(LongType()).alias("item_id"),
    col("platform")
  )
)

# Aggregate total views by grouping on item_id and event_year, counting all rows, and renaming the count column to 'total_views'.
total_views_df = (views_base_df
  .groupBy("item_id", "event_year")
  .agg({"*": "count"})
  .withColumnRenamed("count(1)", "total_views")
)

# Aggregate platform-specific views by grouping on item_id, event_year, and platform, counting rows, and renaming to 'platform_views'
platform_counts_df = (views_base_df
  .groupBy("item_id", "event_year", "platform")
  .agg({"*": "count"})
  .withColumnRenamed("count(1)", "platform_views")
)

# Define a window partitioned by item_id and event_year, ordered descending by platform_views to rank platforms
platform_window = (Window
  .partitionBy("item_id", "event_year")
  .orderBy(col("platform_views").desc())
)

# Add a row number over the window to identify the top platform
# filter for rank 1, and select item_id, year, and alias platform as 'most_used_platform'
top_platform_df = (platform_counts_df
  .withColumn("rn", row_number().over(platform_window))
  .filter(col("rn") == 1)
  .select(
    col("item_id"),
    col("event_year"),
    col("platform").alias("most_used_platform")
  )
)

# Join the total views DataFrame with the top platform DataFrame on item_id and event_year using a left join
datamart_df = (total_views_df
  .join(top_platform_df, ["item_id", "event_year"], "left")
)

# Define a window partitioned by event_year, ordered descending by total_views to rank items within each year
item_rank_window = (Window
  .partitionBy("event_year")
  .orderBy(col("total_views").desc())
)

# Add a rank column over the window to assign ranks based on total_views within each year
final_datamart_df = (datamart_df
  .withColumn("item_rank_in_year", rank().over(item_rank_window))
)

# Load selected columns (item_id, item_name, category) from the Silver item table
silver_item = spark.table(f"{CATALOG_NAME}.silver_db.item").select("item_id", "item_name", "category")

# Join the final datamart with item details on item_id using a left join
# select the desired columns including names and category for denormalization
final_datamart_with_names_df = (final_datamart_df
  .join(silver_item, "item_id", "left")
  .select(
      "item_id",
      "item_name",
      "category",
      "event_year",
      "total_views",
      "item_rank_in_year",
      "most_used_platform"
  )
)

# Write the final datamart DataFrame as a Delta table in overwrite mode to the gold_db.top_item table
# This creates or updates the Gold layer datamart with DDL 
# including item_id (long), item_name (string), category (string), event_year (int), total_views (long), item_rank_in_year (int), most_used_platform (string)
# designed for business queries on top items by views, ranked per year
(final_datamart_with_names_df.write
  .format("delta")
  .mode("overwrite")
  .saveAsTable(f"{CATALOG_NAME}.gold_db.top_item")
)

In [None]:
# Query Datamart

top_items_df = spark.sql(f"""
  SELECT * FROM {CATALOG_NAME}.gold_db.top_item
  WHERE event_year IS NOT NULL
  ORDER BY event_year DESC, item_rank_in_year ASC
  LIMIT 20
""")
display(top_items_df)

item_id,item_name,category,event_year,total_views,item_rank_in_year,most_used_platform
597,widget opener,widget,2018,33,1,web
2422,glossy contraption,contraption,2018,29,2,web
3882,module,module,2018,29,2,web
2337,reflective contraption,contraption,2018,28,4,web
159,miniature module,module,2018,28,4,web
1144,rechargable contraption,contraption,2018,28,4,web
3763,digital widget charger,widget,2018,27,7,android
3884,organic apparatus,apparatus,2018,27,7,web
2954,analog contraption warmer,contraption,2018,27,7,web
639,reflective mechanism,mechanism,2018,27,7,web
