In [ ]:
# Import necessary functions
from pyspark.sql.functions import col, to_timestamp, lit
from pyspark.sql.types import TimestampType, IntegerType, LongType

In [ ]:
# 1. Define configuration
bronze_catalog = "main"
bronze_schema = "bronze"
silver_catalog = "main"
silver_schema = "silver"

users_bronze_table = f"{bronze_catalog}.{bronze_schema}.users"
events_bronze_table = f"{bronze_catalog}.{bronze_schema}.events"
silver_table_name = f"{silver_catalog}.{silver_schema}.user_activity"

In [ ]:
# 2. Create the Silver Schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_catalog}.{silver_schema}")

In [ ]:
# 3. Load Bronze tables into DataFrames
# Reading from a Delta table is highly optimized [46, 47]
users_df = spark.read.table(users_bronze_table)
events_df = spark.read.table(events_bronze_table)

In [ ]:
# 4. Clean and Transform 'users' data
users_df_cleaned = users_df \
 .withColumn("registration_date", to_timestamp(col("registration_date")).cast(TimestampType())) \
 .withColumn("id", col("id").cast(LongType())) \
 .drop("_ingestion_timestamp", "_source_file")  # Drop bronze metadata

# Handle NULL values [50, 51]
users_df_cleaned = users_df_cleaned.fillna({"country": "UNKNOWN"})

In [ ]:
# 5. Clean and Transform 'events' data
events_df_cleaned = events_df \
 .withColumn("event_timestamp", to_timestamp(col("event_timestamp")).cast(TimestampType())) \
 .withColumn("session_duration_sec", col("session_duration_sec").cast(IntegerType())) \
 .withColumn("user_id", col("user_id").cast(LongType())) \
 .drop("_ingestion_timestamp", "_source_file")

# Handle NULLs
events_df_cleaned = events_df_cleaned.fillna({"session_duration_sec": 0})

In [ ]:
# 6. Conform Data: Join users and events
silver_activity_df = events_df_cleaned.join(
  users_df_cleaned,
  events_df_cleaned.user_id == users_df_cleaned.id,
  "inner"
).select(
  col("event_id"),
  col("event_timestamp"),
  col("event_type"),
  col("session_duration_sec"),
  col("user_id"),
  col("first_name"),
  col("last_name"),
  col("email"),
  col("country"),
  col("registration_date")
)

In [ ]:
# 7. Write to Silver Delta Table
silver_activity_df.write \
 .format("delta") \
 .mode("overwrite") \
 .saveAsTable(silver_table_name)

print(f"Successfully wrote to {silver_table_name}")