## SILVER LAYER â€” Cleaned & Standardized Data
Purpose of Silver

Prepare data for analysis by cleaning, standardizing, and structuring it.
Silver is where data quality rules are applied.

##### Silver Transformation Code â€” Explanation

What this code does

Reads data from Bronze tables
Cleans obvious data quality issues (nulls, data types)
Standardizes column names and formats
Converts timestamps into proper date/time fields
Creates analysis-ready tables at correct grain

##### Key transformations explained

Timestamp parsing
Converts raw timestamp strings into proper date/time columns
(e.g., event_time â†’ event_date)

Data type standardization
Converts numeric fields (amounts, quantities) into numeric types

De-duplication (if applied)
Removes duplicate rows based on business keys

Filtering invalid records
Removes rows that cannot be used meaningfully in analysis

##### What is still NOT done

No KPIs

No aggregations

No funnel logic

No business metrics

###### Why this layer exists

Ensures downstream metrics are trustworthy
Keeps business logic separate from raw ingestion
Makes SQL analysis simpler and safer


In [1]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
import re

# ----------------------------
# 0) Create Silver database
# ----------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS silver")

# ----------------------------
# 1) Helpers
# ----------------------------
def to_snake(s: str) -> str:
    s = s.strip()
    s = re.sub(r'[^0-9a-zA-Z]+', '_', s)
    s = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s)
    return s.lower().strip('_')

def snake_case_columns(df):
    new_cols = [to_snake(c) for c in df.columns]
    out = df
    for old, new in zip(df.columns, new_cols):
        if old != new:
            out = out.withColumnRenamed(old, new)
    return out

def pick_col(df, candidates):
    cols = set(df.columns)
    for c in candidates:
        if c in cols:
            return c
    return None

def clean_numeric_str(col):
    # removes commas and currency symbols; keeps digits, dot, minus
    return F.regexp_replace(F.col(col), r"[^0-9\.\-]", "")

# Timestamp parsing:
# If your timestamps are ISO or "yyyy-MM-dd HH:mm:ss", to_timestamp() usually works.
# If you know the exact format, replace with to_timestamp(col, "pattern").
def parse_ts(col):
    return F.to_timestamp(F.col(col))

# ----------------------------
# 2) Read Bronze tables
# ----------------------------
customers_raw   = spark.table("bronze.customers_raw")
sessions_raw    = spark.table("bronze.sessions_raw")
events_raw      = spark.table("bronze.events_raw")
products_raw    = spark.table("bronze.products_raw")
orders_raw      = spark.table("bronze.orders_raw")
order_items_raw = spark.table("bronze.order_items_raw")

# Standardize column naming (snake_case) early
customers_raw   = snake_case_columns(customers_raw)
sessions_raw    = snake_case_columns(sessions_raw)
events_raw      = snake_case_columns(events_raw)
products_raw    = snake_case_columns(products_raw)
orders_raw      = snake_case_columns(orders_raw)
order_items_raw = snake_case_columns(order_items_raw)

# ----------------------------
# 3) CUSTOMERS (Silver)
# ----------------------------
cust_id = pick_col(customers_raw, ["customer_id", "user_id", "userid", "id"])
cust_created = pick_col(customers_raw, ["created_at", "created_on", "signup_time", "registration_time"])
cust_city = pick_col(customers_raw, ["city"])
cust_state = pick_col(customers_raw, ["state", "state_name"])
cust_device = pick_col(customers_raw, ["device_type", "device"])
cust_source = pick_col(customers_raw, ["marketing_source", "source", "acquisition_source"])

customers = customers_raw
if cust_id and cust_id != "user_id":
    customers = customers.withColumnRenamed(cust_id, "user_id")
elif cust_id is None:
    # If no obvious id, keep as-is; Silver still writes but later joins will require adjustment
    pass

# Minimal string standardization
if "user_id" in customers.columns:
    customers = customers.withColumn("user_id", F.trim(F.col("user_id")).cast("string"))

if cust_created and cust_created != "created_at":
    customers = customers.withColumnRenamed(cust_created, "created_at")

if "created_at" in customers.columns:
    customers = customers.withColumn("created_at", parse_ts("created_at"))

# Optional fields: keep if present
for c in ["city", "state", "device_type", "marketing_source"]:
    # try rename if alternate columns exist
    pass

if cust_city and cust_city != "city":
    customers = customers.withColumnRenamed(cust_city, "city")
if cust_state and cust_state != "state":
    customers = customers.withColumnRenamed(cust_state, "state")
if cust_device and cust_device != "device_type":
    customers = customers.withColumnRenamed(cust_device, "device_type")
if cust_source and cust_source != "marketing_source":
    customers = customers.withColumnRenamed(cust_source, "marketing_source")

# Drop rows missing critical key
if "user_id" in customers.columns:
    customers = customers.dropna(subset=["user_id"]).dropDuplicates(["user_id"])

# Write
(customers.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.customers")
)

# ----------------------------
# 4) SESSIONS (Silver)
# ----------------------------
sess_id = pick_col(sessions_raw, ["session_id", "sessionid", "sid", "id"])
sess_user = pick_col(sessions_raw, ["customer_id", "user_id", "userid"])
sess_start = pick_col(sessions_raw, ["session_start", "start_time", "session_start_time", "start"])
sess_end = pick_col(sessions_raw, ["session_end", "end_time", "session_end_time", "end"])
sess_device = pick_col(sessions_raw, ["device_type", "device"])

sessions = sessions_raw
if sess_id and sess_id != "session_id":
    sessions = sessions.withColumnRenamed(sess_id, "session_id")
if sess_user and sess_user != "user_id":
    sessions = sessions.withColumnRenamed(sess_user, "user_id")
if sess_start and sess_start != "session_start":
    sessions = sessions.withColumnRenamed(sess_start, "session_start")
if sess_end and sess_end != "session_end":
    sessions = sessions.withColumnRenamed(sess_end, "session_end")
if sess_device and sess_device != "device_type":
    sessions = sessions.withColumnRenamed(sess_device, "device_type")

# Type casts (Silver responsibility)
if "session_start" in sessions.columns:
    sessions = sessions.withColumn("session_start", parse_ts("session_start"))
if "session_end" in sessions.columns:
    sessions = sessions.withColumn("session_end", parse_ts("session_end"))

# Clean IDs
for c in ["session_id", "user_id"]:
    if c in sessions.columns:
        sessions = sessions.withColumn(c, F.trim(F.col(c)).cast("string"))

# Add session_date for downstream
if "session_start" in sessions.columns:
    sessions = sessions.withColumn("session_date", F.to_date(F.col("session_start")))

# Drop critical nulls + dedupe by session_id
required = [c for c in ["session_id", "user_id", "session_start"] if c in sessions.columns]
if required:
    sessions = sessions.dropna(subset=required)
if "session_id" in sessions.columns:
    sessions = sessions.dropDuplicates(["session_id"])

(sessions.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.sessions")
)

# ----------------------------
# 5) EVENTS / CLICKSTREAM (Silver)
# ----------------------------
evt_sess = pick_col(events_raw, ["session_id", "sessionid", "sid"])
evt_user = pick_col(events_raw, ["customer_id", "user_id", "userid"])
evt_type = pick_col(events_raw, ["event_type", "event_name", "event", "action"])
evt_time = pick_col(events_raw, ["event_time", "timestamp", "event_timestamp", "time"])
evt_prod = pick_col(events_raw, ["product_id", "item_id", "sku", "productid"])

events = events_raw
if evt_sess and evt_sess != "session_id":
    events = events.withColumnRenamed(evt_sess, "session_id")
if evt_user and evt_user != "user_id":
    events = events.withColumnRenamed(evt_user, "user_id")
if evt_type and evt_type != "event_type":
    events = events.withColumnRenamed(evt_type, "event_type")
if evt_time and evt_time != "event_time":
    events = events.withColumnRenamed(evt_time, "event_time")
if evt_prod and evt_prod != "product_id":
    events = events.withColumnRenamed(evt_prod, "product_id")

# Minimal standardization (still not KPI logic)
if "event_type" in events.columns:
    events = events.withColumn(
        "event_type",
        F.lower(F.regexp_replace(F.trim(F.col("event_type")), r"\s+", "_"))
    )

# Timestamp cast (Silver responsibility)
if "event_time" in events.columns:
    events = events.withColumn("event_time", parse_ts("event_time"))

# ID cleanup
for c in ["session_id", "user_id", "product_id"]:
    if c in events.columns:
        events = events.withColumn(c, F.trim(F.col(c)).cast("string"))

# Add event_date for downstream
if "event_time" in events.columns:
    events = events.withColumn("event_date", F.to_date(F.col("event_time")))

# Drop critical nulls
required = [c for c in ["session_id", "user_id", "event_type", "event_time"] if c in events.columns]
if required:
    events = events.dropna(subset=required)

# Deduplicate using a practical natural key
dedupe_key = [c for c in ["session_id", "user_id", "event_type", "event_time", "product_id"] if c in events.columns]
if dedupe_key:
    events = events.dropDuplicates(dedupe_key)

(events.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.events")
)

# ----------------------------
# 6) PRODUCTS (Silver)
# ----------------------------
prod_id = pick_col(products_raw, ["product_id", "item_id", "sku", "id"])
prod_title = pick_col(products_raw, ["title", "product_name", "name"])
prod_cat = pick_col(products_raw, ["category", "l0", "product_category"])
prod_brand = pick_col(products_raw, ["brand_name", "brand"])
prod_mrp = pick_col(products_raw, ["mrp", "list_price", "price"])

products = products_raw
if prod_id and prod_id != "product_id":
    products = products.withColumnRenamed(prod_id, "product_id")
if prod_title and prod_title != "title":
    products = products.withColumnRenamed(prod_title, "title")
if prod_cat and prod_cat != "category":
    products = products.withColumnRenamed(prod_cat, "category")
if prod_brand and prod_brand != "brand_name":
    products = products.withColumnRenamed(prod_brand, "brand_name")
if prod_mrp and prod_mrp != "mrp":
    products = products.withColumnRenamed(prod_mrp, "mrp")

if "product_id" in products.columns:
    products = products.withColumn("product_id", F.trim(F.col("product_id")).cast("string"))

# MRP typing (optional; safe numeric conversion)
if "mrp" in products.columns:
    products = products.withColumn("mrp", clean_numeric_str("mrp").cast(T.DecimalType(18,2)))

# Deduplicate
if "product_id" in products.columns:
    products = products.dropna(subset=["product_id"]).dropDuplicates(["product_id"])

(products.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.products")
)

# ----------------------------
# 7) ORDERS (Silver)
# ----------------------------
ord_id = pick_col(orders_raw, ["order_id", "orderid", "transaction_id", "purchase_id", "id"])
ord_user = pick_col(orders_raw, ["customer_id", "user_id", "userid"])
ord_sess = pick_col(orders_raw, ["session_id", "sessionid", "sid"])
ord_time = pick_col(orders_raw, ["order_time", "order_date", "transaction_time", "purchase_time", "timestamp"])
ord_status = pick_col(orders_raw, ["order_status", "status"])
ord_pay = pick_col(orders_raw, ["payment_method", "payment_type", "payment"])
ord_total = pick_col(orders_raw, ["net_gmv", "net_amount", "total_amount", "amount", "total"])

orders = orders_raw
if ord_id and ord_id != "order_id":
    orders = orders.withColumnRenamed(ord_id, "order_id")
if ord_user and ord_user != "user_id":
    orders = orders.withColumnRenamed(ord_user, "user_id")
if ord_sess and ord_sess != "session_id":
    orders = orders.withColumnRenamed(ord_sess, "session_id")
if ord_time and ord_time != "order_time":
    orders = orders.withColumnRenamed(ord_time, "order_time")
if ord_status and ord_status != "order_status":
    orders = orders.withColumnRenamed(ord_status, "order_status")
if ord_pay and ord_pay != "payment_method":
    orders = orders.withColumnRenamed(ord_pay, "payment_method")
if ord_total and ord_total != "net_gmv":
    orders = orders.withColumnRenamed(ord_total, "net_gmv")

# Cast types
if "order_time" in orders.columns:
    orders = orders.withColumn("order_time", parse_ts("order_time"))
if "net_gmv" in orders.columns:
    orders = orders.withColumn("net_gmv", clean_numeric_str("net_gmv").cast(T.DecimalType(18,2)))

# Normalize strings (not KPI logic)
for c in ["order_id", "user_id", "session_id"]:
    if c in orders.columns:
        orders = orders.withColumn(c, F.trim(F.col(c)).cast("string"))

if "order_status" in orders.columns:
    orders = orders.withColumn("order_status", F.lower(F.trim(F.col("order_status"))))
if "payment_method" in orders.columns:
    orders = orders.withColumn("payment_method", F.lower(F.trim(F.col("payment_method"))))

# Add order_date
if "order_time" in orders.columns:
    orders = orders.withColumn("order_date", F.to_date(F.col("order_time")))

# Drop critical nulls + dedupe by order_id
required = [c for c in ["order_id", "user_id", "order_time"] if c in orders.columns]
if required:
    orders = orders.dropna(subset=required)
if "order_id" in orders.columns:
    orders = orders.dropDuplicates(["order_id"])

(orders.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.orders")
)

# ----------------------------
# 8) ORDER ITEMS (Silver)
# ----------------------------
oi_order = pick_col(order_items_raw, ["order_id", "orderid", "transaction_id"])
oi_prod = pick_col(order_items_raw, ["product_id", "item_id", "sku"])
oi_qty = pick_col(order_items_raw, ["quantity", "qty"])
oi_price = pick_col(order_items_raw, ["selling_price", "unit_price", "price"])

order_items = order_items_raw
if oi_order and oi_order != "order_id":
    order_items = order_items.withColumnRenamed(oi_order, "order_id")
if oi_prod and oi_prod != "product_id":
    order_items = order_items.withColumnRenamed(oi_prod, "product_id")
if oi_qty and oi_qty != "quantity":
    order_items = order_items.withColumnRenamed(oi_qty, "quantity")
if oi_price and oi_price != "selling_price":
    order_items = order_items.withColumnRenamed(oi_price, "selling_price")

# Cast types
if "quantity" in order_items.columns:
    order_items = order_items.withColumn("quantity", clean_numeric_str("quantity").cast("int"))
if "selling_price" in order_items.columns:
    order_items = order_items.withColumn("selling_price", clean_numeric_str("selling_price").cast(T.DecimalType(18,2)))

# ID cleanup
for c in ["order_id", "product_id"]:
    if c in order_items.columns:
        order_items = order_items.withColumn(c, F.trim(F.col(c)).cast("string"))

# Drop critical nulls
required = [c for c in ["order_id", "product_id"] if c in order_items.columns]
if required:
    order_items = order_items.dropna(subset=required)

# Deduplicate (order_id + product_id is typically a good key; keep qty/price latest not handled at Silver)
dedupe_key = [c for c in ["order_id", "product_id"] if c in order_items.columns]
if dedupe_key:
    order_items = order_items.dropDuplicates(dedupe_key)

(order_items.write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("silver.order_items")
)

# ----------------------------
# 9) Silver verification (schemas + counts)
# ----------------------------
silver_tables = [
    "silver.customers",
    "silver.sessions",
    "silver.events",
    "silver.products",
    "silver.orders",
    "silver.order_items"
]

for t in silver_tables:
    print("\n---", t, "---")
    spark.table(t).printSchema()
    print("rows =", spark.table(t).count())


StatementMeta(, 506231e0-2c0c-4281-a1fc-be6c54df9823, 3, Finished, Available, Finished)


--- silver.customers ---
root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: string (nullable = true)
 |-- signup_date: string (nullable = true)
 |-- marketing_opt_in: string (nullable = true)

rows = 20000

--- silver.sessions ---
root
 |-- session_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- session_start: timestamp (nullable = true)
 |-- device_type: string (nullable = true)
 |-- source: string (nullable = true)
 |-- country: string (nullable = true)
 |-- session_date: date (nullable = true)

rows = 120000

--- silver.events ---
root
 |-- event_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- qty: string (nullable = true)
 |-- cart_size: string (nullable = true)
 |-- payment: string (nulla

In [5]:
from pyspark.sql import functions as F, types as T

orders = spark.table("silver.orders")

orders_silver = (
    orders
    .withColumn(
        "net_gmv",
        F.regexp_replace(F.col("total_usd"), r"[^0-9\.\-]", "")
        .cast(T.DecimalType(18, 2))
    )
)

(
    orders_silver.write
    .mode("overwrite")
    .option("mergeSchema", "true")   # ðŸ”‘ THIS FIXES THE ERROR
    .format("delta")
    .saveAsTable("silver.orders")
)


StatementMeta(, b6a041ba-2994-4a69-8e39-78f986eb4012, 7, Finished, Available, Finished)

In [6]:
spark.table("silver.orders").printSchema()


StatementMeta(, b6a041ba-2994-4a69-8e39-78f986eb4012, 8, Finished, Available, Finished)

root
 |-- order_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- order_time: timestamp (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- discount_pct: string (nullable = true)
 |-- subtotal_usd: string (nullable = true)
 |-- total_usd: string (nullable = true)
 |-- country: string (nullable = true)
 |-- device: string (nullable = true)
 |-- source: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- net_gmv: decimal(18,2) (nullable = true)



In [7]:
spark.table("silver.orders") \
     .select("order_id", "total_usd", "net_gmv") \
     .show(10, truncate=False)


StatementMeta(, b6a041ba-2994-4a69-8e39-78f986eb4012, 9, Finished, Available, Finished)

+--------+---------+-------+
|order_id|total_usd|net_gmv|
+--------+---------+-------+
|1       |85.72    |85.72  |
|10      |7.75     |7.75   |
|10002   |80.94    |80.94  |
|10004   |19.7     |19.70  |
|10005   |26.71    |26.71  |
|10006   |101.89   |101.89 |
|10007   |7.16     |7.16   |
|10008   |12.95    |12.95  |
|1001    |10.99    |10.99  |
|10013   |41.4     |41.40  |
+--------+---------+-------+
only showing top 10 rows

