### Import Libraries

In [0]:
from pyspark.sql.functions import col, from_json, to_timestamp, year
from pyspark.sql.types import StructType, StringType, DoubleType, StructField, TimestampType, LongType, IntegerType

### Define Function

### Read Bronze Table

In [0]:
# Read the bronze layer tables
event_df_bronze = spark.read.table("workspace.bronze_layer.bronze_event")

item_df_bronze = spark.read.table("workspace.bronze_layer.bronze_item")


### Data pre-processing

In [0]:
# Define unified column names for item_df_bronze
item_rename_map = {
    "adjective": "item_adjective",
    "category": "item_category",
    "created_at": "item_created_at",
    "id": "item_id",
    "modifier": "item_modifier",
    "name": "item_name",
    "price": "item_price",
    "tc_ingestion_timestamp": "tc_ingestion_timestamp",
    "tc_source_file": "tc_source_file",
    "tc_bronze_id": "tc_bronze_id"
}
# Apply column renaming to item_df_bronze 
item_df_silver = item_df_bronze.select([col(c).alias(item_rename_map.get(c, c)) for c in item_df_bronze.columns])

# Define unified column names for event_df_bronze
# Flatten event.payload if it's a JSON string

event_rename_map = {
    "event_id": "event_id",
    "event_time": "event_time",
    "user_id": "event_user_id",
    "event_payload": "event_payload",
    "tc_ingestion_timestamp": "tc_ingestion_timestamp",
    "tc_source_file": "tc_source_file",
    "tc_bronze_id": "tc_bronze_id"
}

# Rename columns with dots to underscores
# event_df_bronze = event_df_bronze.toDF(*[c.replace('.', '_') for c in event_df_bronze.columns])

# Apply column renaming to event_df_bronze
event_df_silver = event_df_bronze.select([col(c).alias(event_rename_map.get(c, c)) for c in event_rename_map.keys()])


In [0]:
# Define schema for event_payload
payload_schema = StructType([
    StructField("event_name", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("parameter_name", StringType(), True),
    StructField("parameter_value", StringType(), True)
])

# Cast columns in event_df_silver
event_df_casted = event_df_silver \
    .withColumn("event_id", col("event_id").cast(StringType())) \
    .withColumn("event_time", to_timestamp(col("event_time"))) \
    .withColumn("event_user_id", col("event_user_id").cast(DoubleType())) \
    .withColumn("event_payload", from_json(col("event_payload"), payload_schema))

In [0]:
# Cast columns in event_df_silver
event_df_casted = event_df_silver \
    .withColumn("event_id", col("event_id").cast(StringType())) \
    .withColumn("event_time", to_timestamp(col("event_time"))) \
    .withColumn("event_user_id", col("event_user_id").cast("double").cast("bigint")) \
    .withColumn("event_payload", from_json(col("event_payload"), payload_schema))

# Cast columns in item_df_silver
item_df_casted = item_df_silver \
    .withColumn("item_id", col("item_id").cast("double").cast("bigint")) \
    .withColumn("item_created_at", to_timestamp(col("item_created_at"))) \
    .withColumn("item_price", col("item_price").cast(DoubleType()))

In [0]:
# Prepare column lists for flattening event_payload and selecting unified columns for the silver layer
technical_cols = ["tc_ingestion_timestamp", "tc_source_file", "tc_bronze_id"]
main_cols = [c for c in event_df_casted.columns if c not in technical_cols + ["event_payload"]]
unpacked_cols = ["event_name", "event_platform", "event_parameter_name", "event_parameter_value"]

# Unpack event_payload struct into separate columns for event_df_casted using payload_schema
# Description: This step flattens the event_payload struct into individual columns and selects unified columns for the silver layer.
event_df_final = event_df_casted \
    .withColumn("event_name", col("event_payload.event_name")) \
    .withColumn("event_platform", col("event_payload.platform")) \
    .withColumn("event_parameter_name", col("event_payload.parameter_name")) \
    .withColumn("event_parameter_value", col("event_payload.parameter_value")) \
    .drop("event_payload") \
    .select(*(main_cols + unpacked_cols + technical_cols))

### Save Tables

In [0]:
# Add year column for partitioning
event_df_final_with_year = event_df_final.withColumn("event_year", year(col("event_time")))

spark.sql("CREATE SCHEMA IF NOT EXISTS silver_layer")

# Save event_df_final partitioned by year
(event_df_final_with_year
    .write
    .mode("overwrite")
    .partitionBy("event_year")
    .format("delta")
    .saveAsTable("workspace.silver_layer.silver_event")
)

# Save item_df_casted without partitioning
(item_df_casted
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("workspace.silver_layer.silver_item")
)

print("Saved event_df_final as workspace.silver_layer.silver_event partitioned by year.")
print("Saved item_df_casted as workspace.silver_layer.silver_item.")