# Bronze to Silver Layer Transformation

**Purpose:** Clean and filter raw transaction data from Bronze layer  
**Input:** Bronze parquet files (all transaction types)  
**Output:** Silver parquet files (purchase transactions only, cleaned)

## Step 1: Import Libraries

In [None]:
from pyspark.sql.functions import col, to_date, lower

## Step 2: Read Bronze Layer Data

Load raw data from Bronze container. Update the storage account name.

In [None]:
# Configuration - UPDATE THIS
STORAGE_ACCOUNT = "stgretail1763452716"  # Your storage account
BRONZE_PATH = f"abfss://retail@{STORAGE_ACCOUNT}.dfs.core.windows.net/bronze/Daniel-jcVv/azure-retail-transactions-api/refs/heads/main/data-source/retail_transactions_bronze.parquet"

# Read Bronze data
df_bronze = spark.read.parquet(BRONZE_PATH)

print(f"Bronze records loaded: {df_bronze.count()}")
df_bronze.show(5)

## Step 3: Data Transformation

Apply business rules:
- Filter only 'purchase' transactions
- Remove null values in critical columns
- Convert timestamp to date
- Standardize payment method (lowercase)
- Cast amount to proper type

In [None]:
df_silver = (
    df_bronze
    .filter(col("event_type") == "purchase")  # Only purchases
    .dropna(subset=["customer_id", "amount"])  # Remove nulls
    .withColumn("event_date", to_date(col("event_timestamp")))  # Date conversion
    .withColumn("payment_method", lower(col("payment_method")))  # Standardize
    .withColumn("amount", col("amount").cast("float"))  # Type casting
    .select(
        "event_id", "customer_id", "event_date", "product_id",
        "product_category", "payment_method", "amount", "location"
    )
)

print(f"Silver records after transformation: {df_silver.count()}")
df_silver.show(5)

## Step 4: Write to Silver Layer

In [None]:
SILVER_PATH = f"abfss://retail@{STORAGE_ACCOUNT}.dfs.core.windows.net/silver/cleaned_transactions"

df_silver.write.mode("overwrite").parquet(SILVER_PATH)

print(f"Silver layer written to: {SILVER_PATH}")