In [0]:
# Databricks notebook source
# BRONZE LAYER: TRANSACTION INGESTION
# MAGIC %run ../config/project_config
from pyspark.sql import functions as F
from delta.tables import DeltaTable
from datetime import datetime

In [0]:
%run "/Workspace/Users/sundarasandeepteja@gmail.com/E-Commerce Analytics Medallion Architecture with GenAI/config/project_config"

In [0]:
print("‚ñ† BRONZE LAYER: Transaction Ingestion")
print("=" * 50)
# Read raw data
raw_txn = spark.read.parquet(f"{RAW_DATA_PATH}/transactions")
print(f"Raw records: {raw_txn.count()}")

In [0]:
# Add metadata columns
batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")
bronze_txn = raw_txn.withColumns({
    "_ingested_at": F.current_timestamp(),
    "_source_file": F.lit(f"{RAW_DATA_PATH}/transactions"),
    "_batch_id": F.lit(batch_id),
    "_ingestion_date": F.current_date(),
    "_row_hash": F.md5(F.concat_ws("|",
    F.col("transaction_id"),
    F.col("customer_id"),
    F.col("final_amount").cast("string")
 ))
})
print(f"Added {5} metadata columns")

In [0]:
from delta.tables import DeltaTable

# Write to Delta Lake
table_exists = spark.catalog.tableExists(BRONZE_TRANSACTIONS_TABLE)
bronze_path = "/Volumes/workspace/default/ecommerce_project_volume/bronze/"
if not table_exists:
    bronze_txn.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("_ingestion_date") \
        .option("path", bronze_path) \
        .saveAsTable(BRONZE_TRANSACTIONS_TABLE)
    print(f"‚ñ† Created: {BRONZE_TRANSACTIONS_TABLE} at {bronze_path}")
else:
    # Merge for incremental loads using table name
    delta_table = DeltaTable.forName(spark, BRONZE_TRANSACTIONS_TABLE)
    # List all columns except the key
    update_columns = [col for col in bronze_txn.columns if col != "transaction_id"]
    set_dict = {col: f"source.{col}" for col in update_columns}
    delta_table.alias("target").merge(
        bronze_txn.alias("source"),
        "target.transaction_id = source.transaction_id"
    ).whenMatchedUpdate(
        condition="target._row_hash != source._row_hash",
        set=set_dict
    ).whenNotMatchedInsertAll().execute()
    print(f"‚ñ† Merged into: {BRONZE_TRANSACTIONS_TABLE} at {bronze_path}")

# Optimize
spark.sql(f"OPTIMIZE {BRONZE_TRANSACTIONS_TABLE} ZORDER BY (customer_id, product_id)")
display(spark.table(BRONZE_TRANSACTIONS_TABLE).count())

In [0]:
raw_products = spark.read.parquet(f"{RAW_DATA_PATH}/products")
batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")
bronze_products = raw_products.withColumns({
                        "_ingested_at": F.current_timestamp(),
                        "_batch_id": F.lit(batch_id),
                        "_ingestion_date": F.current_date(),
                    })
bronze_products.write.format("delta").mode("overwrite") \
 .saveAsTable(BRONZE_PRODUCTS_TABLE)
print(f"‚ñ† Created: {BRONZE_PRODUCTS_TABLE}")

In [0]:
# Databricks notebook source
# ======================================
# BRONZE LAYER: CUSTOMER INGESTION
# ======================================

# MAGIC %run ../config/project_config

from pyspark.sql import functions as F
from delta.tables import DeltaTable
from datetime import datetime

print("ü•â BRONZE LAYER: Customer Ingestion")
print("=" * 50)

# ======================================
# STEP 1: READ RAW DATA
# ======================================
print("\nüì• Step 1: Reading raw customer data...")

raw_customers = spark.read.parquet(f"{RAW_DATA_PATH}/customers")
raw_count = raw_customers.count()

print(f"  Records found: {raw_count:,}")
print(f"  Columns: {len(raw_customers.columns)}")

# ======================================
# STEP 2: ADD METADATA COLUMNS
# ======================================
print("\nüè∑Ô∏è Step 2: Adding metadata columns...")

batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")

bronze_customers = raw_customers.withColumns({
    # Ingestion timestamp
    "_ingested_at": F.current_timestamp(),
    
    # Source file path
    "_source_file": F.lit(f"{RAW_DATA_PATH}/customers"),
    
    # Batch identifier
    "_batch_id": F.lit(batch_id),
    
    # Ingestion date (for partitioning)
    "_ingestion_date": F.current_date(),
    
    # Row hash for change detection
    "_row_hash": F.md5(F.concat_ws("|",
        F.col("customer_id"),
        F.col("email"),
        F.col("phone"),
        F.col("segment"),
        F.col("state"),
        F.col("is_active").cast("string")
    )),
    
    # PII flag - marks this table contains sensitive data
    "_contains_pii": F.lit(True),
    
    # Processing status
    "_is_processed": F.lit(False)
})

print(f"  ‚úÖ Added 7 metadata columns")
print(f"  ‚úÖ PII flag set to True")

# ======================================
# STEP 3: WRITE TO DELTA LAKE
# ======================================
print("\nüíæ Step 3: Writing to Delta Lake...")

table_exists = spark.catalog.tableExists(BRONZE_CUSTOMERS_TABLE)

if not table_exists:
    print("  Creating new Bronze table...")
    
    bronze_customers.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("_ingestion_date") \
        .option("overwriteSchema", "true") \
        .saveAsTable(BRONZE_CUSTOMERS_TABLE)
    
    print(f"  ‚úÖ Created table: {BRONZE_CUSTOMERS_TABLE}")
    
else:
    print("  Table exists - performing MERGE operation...")
    
    delta_table = DeltaTable.forName(spark, BRONZE_CUSTOMERS_TABLE)
    
    delta_table.alias("target").merge(
        bronze_customers.alias("source"),
        "target.customer_id = source.customer_id"
    ).whenMatchedUpdate(
        condition="target._row_hash != source._row_hash",
        set={
            "first_name": "source.first_name",
            "last_name": "source.last_name",
            "email": "source.email",
            "phone": "source.phone",
            "state": "source.state",
            "segment": "source.segment",
            "registration_date": "source.registration_date",
            "birth_year": "source.birth_year",
            "is_active": "source.is_active",
            "_ingested_at": "source._ingested_at",
            "_source_file": "source._source_file",
            "_batch_id": "source._batch_id",
            "_row_hash": "source._row_hash",
            "_contains_pii": "source._contains_pii",
            "_is_processed": F.lit(False)
        }
    ).whenNotMatchedInsertAll().execute()
    
    print("  ‚úÖ Merge operation complete")

# ======================================
# STEP 4: OPTIMIZE WITH ZORDER
# ======================================
print("\n‚ö° Step 4: Optimizing table...")

spark.sql(f"OPTIMIZE {BRONZE_CUSTOMERS_TABLE} ZORDER BY (segment, state)")

print("  ‚úÖ Optimized with ZORDER on (segment, state)")

# ======================================
# STEP 5: VERIFY AND SUMMARIZE
# ======================================
print("\n‚úÖ Step 5: Verification...")

final_count = spark.table(BRONZE_CUSTOMERS_TABLE).count()
print(f"  Total records in Bronze: {final_count:,}")

# Show segment distribution
print("\nüìä Segment Distribution:")
spark.table(BRONZE_CUSTOMERS_TABLE) \
    .groupBy("segment") \
    .agg(F.count("*").alias("count")) \
    .orderBy(F.desc("count")) \
    .show()

# Show state distribution (top 5)
print("üìä Top 5 States:")
spark.table(BRONZE_CUSTOMERS_TABLE) \
    .groupBy("state") \
    .agg(F.count("*").alias("count")) \
    .orderBy(F.desc("count")) \
    .limit(5) \
    .show()

print("\n" + "=" * 50)
print("ü•â BRONZE CUSTOMER INGESTION COMPLETE!")
print("=" * 50)

In [0]:
# Databricks notebook source
# ======================================
# BRONZE LAYER: RATINGS INGESTION
# ======================================

# MAGIC %run ../config/project_config

from pyspark.sql import functions as F
from delta.tables import DeltaTable
from datetime import datetime

print("ü•â BRONZE LAYER: Ratings Ingestion")
print("=" * 50)

# ======================================
# STEP 1: READ RAW DATA
# ======================================
print("\nüì• Step 1: Reading raw ratings data...")

raw_ratings = spark.read.parquet(f"{RAW_DATA_PATH}/ratings")
raw_count = raw_ratings.count()

print(f"  Records found: {raw_count:,}")
print(f"  Columns: {len(raw_ratings.columns)}")

# ======================================
# STEP 2: ADD METADATA COLUMNS
# ======================================
print("\nüè∑Ô∏è Step 2: Adding metadata columns...")

batch_id = datetime.now().strftime("%Y%m%d_%H%M%S")

bronze_ratings = raw_ratings.withColumns({
    # Ingestion timestamp
    "_ingested_at": F.current_timestamp(),
    
    # Source file path
    "_source_file": F.lit(f"{RAW_DATA_PATH}/ratings"),
    
    # Batch identifier
    "_batch_id": F.lit(batch_id),
    
    # Ingestion date (for partitioning)
    "_ingestion_date": F.current_date(),
    
    # Row hash for change detection
    "_row_hash": F.md5(F.concat_ws("|",
        F.col("rating_id"),
        F.col("customer_id"),
        F.col("product_id"),
        F.col("rating").cast("string"),
        F.col("helpful_votes").cast("string")
    )),
    
    # Processing status
    "_is_processed": F.lit(False)
})

print(f"  ‚úÖ Added 6 metadata columns")

# ======================================
# STEP 3: WRITE TO DELTA LAKE
# ======================================
print("\nüíæ Step 3: Writing to Delta Lake...")

table_exists = spark.catalog.tableExists(BRONZE_RATINGS_TABLE)

if not table_exists:
    print("  Creating new Bronze table...")
    
    bronze_ratings.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("_ingestion_date") \
        .option("overwriteSchema", "true") \
        .saveAsTable(BRONZE_RATINGS_TABLE)
    
    print(f"  ‚úÖ Created table: {BRONZE_RATINGS_TABLE}")
    
else:
    print("  Table exists - performing MERGE operation...")
    
    delta_table = DeltaTable.forName(spark, BRONZE_RATINGS_TABLE)
    
    delta_table.alias("target").merge(
        bronze_ratings.alias("source"),
        "target.rating_id = source.rating_id"
    ).whenMatchedUpdate(
        condition="target._row_hash != source._row_hash",
        set={
            "customer_id": "source.customer_id",
            "product_id": "source.product_id",
            "rating": "source.rating",
            "has_review": "source.has_review",
            "rating_date": "source.rating_date",
            "helpful_votes": "source.helpful_votes",
            "_ingested_at": "source._ingested_at",
            "_source_file": "source._source_file",
            "_batch_id": "source._batch_id",
            "_row_hash": "source._row_hash",
            "_is_processed": F.lit(False)
        }
    ).whenNotMatchedInsertAll().execute()
    
    print("  ‚úÖ Merge operation complete")

# ======================================
# STEP 4: OPTIMIZE WITH ZORDER
# ======================================
print("\n‚ö° Step 4: Optimizing table...")

spark.sql(f"OPTIMIZE {BRONZE_RATINGS_TABLE} ZORDER BY (product_id, customer_id)")

print("  ‚úÖ Optimized with ZORDER on (product_id, customer_id)")

# ======================================
# STEP 5: VERIFY AND SUMMARIZE
# ======================================
print("\n‚úÖ Step 5: Verification...")

final_count = spark.table(BRONZE_RATINGS_TABLE).count()
print(f"  Total records in Bronze: {final_count:,}")

# Show rating distribution
print("\nüìä Rating Distribution:")
spark.table(BRONZE_RATINGS_TABLE) \
    .groupBy("rating") \
    .agg(F.count("*").alias("count")) \
    .orderBy("rating") \
    .show()
    
# Show review stats
print("üìä Review Statistics:")
spark.table(BRONZE_RATINGS_TABLE) \
    .agg(
        F.count("*").alias("total_ratings"),
        F.sum(
            F.when(F.col("has_review").cast("boolean"), 1).otherwise(0)
        ).alias("with_reviews"),
        F.avg("rating").alias("avg_rating"),
        F.avg("helpful_votes").alias("avg_helpful_votes")
    ) \
    .show()

print("\n" + "=" * 50)
print("ü•â BRONZE RATINGS INGESTION COMPLETE!")
print("=" * 50)