In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp, to_date, when

In [0]:
spark = SparkSession.builder.appName("MedallionPipeline").getOrCreate()

In [0]:
raw_data = [
    (1, "rohan", "100.50", "2023-01-15", None),
    (2, "rahul", "250.75", "2023-01-16", "NY"),
    (3, "axar", "invalid", "2023-01-17", "CA"),
    (4, "Sundar", "300.25", "invalid_date", "TX")
]

In [0]:
bronze_df = spark.createDataFrame(
    raw_data, 
    ["customer_id", "customer_name", "transaction_amount", "transaction_date", "state"]
)

In [0]:
bronze_df_with_timestamp = bronze_df.withColumn("ingestion_timestamp", current_timestamp())
bronze_df_with_timestamp.write.mode("overwrite").format("delta").save("/mnt/bronze/customer_transactions")

In [0]:
silver_df = spark.read.format("delta").load("/mnt/bronze/customer_transactions")

In [0]:
# 2. SILVER LAYER - Data Cleansing and Validation
# Read from Bronze
silver_df = spark.read.format("delta").load("/mnt/bronze/customer_transactions")

# Cleanse and validate data
silver_cleaned_df = (silver_df
    # Handle invalid amounts
    .withColumn("transaction_amount_clean", 
                when(col("transaction_amount").cast("double").isNotNull(), 
                     col("transaction_amount").cast("double"))
                .otherwise(None))
    # Handle invalid dates
    .withColumn("transaction_date_clean", 
                when(col("transaction_date").cast("date").isNotNull(), 
                     to_date(col("transaction_date"), "yyyy-MM-dd"))
                .otherwise(None))
    # Standardize customer name
    .withColumn("customer_name_clean", col("customer_name").cast("string"))
    # Select cleaned columns
    .select(
        "customer_id",
        "customer_name_clean",
        "transaction_amount_clean",
        "transaction_date_clean",
        "state",
        "ingestion_timestamp"
    )
)

In [0]:
silver_cleaned_df.write.mode("overwrite").format("delta").save("/mnt/silver/customer_transactions")

In [0]:
# 3. GOLD LAYER - Curated Business-Ready Data
# Read from Silver
gold_df = spark.read.format("delta").load("/mnt/silver/customer_transactions")

In [0]:
# Create business-ready aggregated view
gold_curated_df = (gold_df
    .filter(col("transaction_amount_clean").isNotNull() & 
            col("transaction_date_clean").isNotNull())
    .groupBy("state")
    .agg({
        "transaction_amount_clean": "sum",
        "customer_id": "count"
    })
    .withColumnRenamed("sum(transaction_amount_clean)", "total_transaction_amount")
    .withColumnRenamed("count(customer_id)", "transaction_count")
)

In [0]:
# Write to Gold table
gold_curated_df.write.mode("overwrite").format("delta").save("/mnt/gold/customer_transaction_summary")

In [0]:
print("Bronze Layer (Raw):")
bronze_df_with_timestamp.show()

Bronze Layer (Raw):
+-----------+-------------+------------------+----------------+-----+--------------------+
|customer_id|customer_name|transaction_amount|transaction_date|state| ingestion_timestamp|
+-----------+-------------+------------------+----------------+-----+--------------------+
|          1|        rohan|            100.50|      2023-01-15| null|2025-03-20 05:04:...|
|          2|        rahul|            250.75|      2023-01-16|   NY|2025-03-20 05:04:...|
|          3|         axar|           invalid|      2023-01-17|   CA|2025-03-20 05:04:...|
|          4|       Sundar|            300.25|    invalid_date|   TX|2025-03-20 05:04:...|
+-----------+-------------+------------------+----------------+-----+--------------------+



In [0]:
print("Silver Layer (Cleansed):")
silver_cleaned_df.show()

Silver Layer (Cleansed):
+-----------+-------------------+------------------------+----------------------+-----+--------------------+
|customer_id|customer_name_clean|transaction_amount_clean|transaction_date_clean|state| ingestion_timestamp|
+-----------+-------------------+------------------------+----------------------+-----+--------------------+
|          4|             Sundar|                  300.25|                  null|   TX|2025-03-20 04:38:...|
|          3|               axar|                    null|            2023-01-17|   CA|2025-03-20 04:38:...|
|          2|              rahul|                  250.75|            2023-01-16|   NY|2025-03-20 04:38:...|
|          1|              rohan|                   100.5|            2023-01-15| null|2025-03-20 04:38:...|
+-----------+-------------------+------------------------+----------------------+-----+--------------------+



In [0]:
print("Gold Layer (Curated):")
gold_curated_df.show()

Gold Layer (Curated):
+-----+-----------------+------------------------+
|state|transaction_count|total_transaction_amount|
+-----+-----------------+------------------------+
|   NY|                1|                  250.75|
| null|                1|                   100.5|
+-----+-----------------+------------------------+

