In [0]:
# Bronze Layer - Data Ingestion (Azure Storage)

# Define the STORAGE_ACCOUNT variable
STORAGE_ACCOUNT = None

# Get task values from data generation
try:
    expected_customers = dbutils.jobs.taskValues.get(taskKey="data_generation", key="customers_generated", debugValue=500)
    expected_products = dbutils.jobs.taskValues.get(taskKey="data_generation", key="products_generated", debugValue=100)
    expected_orders = dbutils.jobs.taskValues.get(taskKey="data_generation", key="orders_generated", debugValue=1000)
    expected_order_items = dbutils.jobs.taskValues.get(taskKey="data_generation", key="order_items_generated", debugValue=2000)
    source_path_from_generation = dbutils.jobs.taskValues.get(taskKey="data_generation", key="source_path")
    STORAGE_ACCOUNT = dbutils.jobs.taskValues.get(taskKey="data_generation", key="STORAGE_ACCOUNT", debugValue="dataworks")
    
    print(f"📋 Expected data volumes from generation:")
    print(f"   Customers: {expected_customers}")
    print(f"   Products: {expected_products}")
    print(f"   Orders: {expected_orders}")
    print(f"   Order Items: {expected_order_items}")
    
except Exception as e:
    print(f"⚠️ Could not retrieve task values (running standalone): {e}")
    expected_customers = expected_products = expected_orders = expected_order_items = None

In [0]:
# Azure Storage Configuration
# Authentication 
spark.conf.set(f"fs.azure.account.key.dataworks.dfs.core.windows.net", "access-Key")

print("Starting Bronze Layer with Azure Storage...")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime

# Azure Storage Path Configuration
CONFIG = {
    'source_path': f'abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/raw/',
    'bronze_path': f'abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/delta/',
    'checkpoint_path': f'abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/checkpoints/'
}

In [0]:
# Create directories
for path in CONFIG.values():
    try:
        dbutils.fs.mkdirs(path)
        print(f"✅ Created/verified: {path}")
    except Exception as e:
        print(f"⚠️ Directory may already exist: {path}")

# Database setup
spark.sql("CREATE DATABASE IF NOT EXISTS ecommerce_bronze")
spark.sql("USE ecommerce_bronze")

print("✅ Azure paths and database configured")

In [0]:
# Define schemas for data validation
customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("country", StringType(), True),
    StructField("city", StringType(), True),
    StructField("segment", StringType(), True),
    StructField("registration_date", StringType(), True),
    StructField("is_active", StringType(), True)
])

products_schema = StructType([
    StructField("product_id", StringType(), False),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True),
    StructField("price", StringType(), True),
    StructField("cost", StringType(), True),
    StructField("stock_quantity", StringType(), True),
    StructField("is_active", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("order_timestamp", StringType(), True),
    StructField("status", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("subtotal", StringType(), True),
    StructField("shipping_cost", StringType(), True),
    StructField("tax_amount", StringType(), True),
    StructField("total_amount", StringType(), True),
    StructField("currency", StringType(), True)
])

order_items_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("unit_price", StringType(), True),
    StructField("discount_percent", StringType(), True),
    StructField("line_total", StringType(), True)
])

print("✅ Schemas defined for data validation")

In [0]:
# Ingest customers to Bronze container
print("🔄 Processing customers...")

df_customers = (spark.read
                .format("csv")
                .option("header", "true")
                .schema(customers_schema)
                .load(f"{CONFIG['source_path']}customers"))

# Add metadata and type casting
df_customers = (df_customers
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("registration_date", F.to_date(F.col("registration_date")))
    .withColumn("is_active", F.col("is_active").cast("boolean"))
    .withColumn("_source_system", F.lit("ecommerce_csv"))
    .withColumn("_data_quality_flag", F.lit("validated"))
)

# Save to Bronze Delta table in Azure Storage
customers_path = f"{CONFIG['bronze_path']}customers"
df_customers.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(customers_path)

# Create table in metastore
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ecommerce_bronze.customers
USING DELTA
LOCATION '{customers_path}'
""")

print(f"✅ Customers: {df_customers.count()} records ingested to {customers_path}")


In [0]:
# Ingest products to Bronze container
print("🔄 Processing products...")

df_products = (spark.read
               .format("csv")
               .option("header", "true")
               .schema(products_schema)
               .load(f"{CONFIG['source_path']}products"))

# Add metadata and type casting
df_products = (df_products
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("cost", F.col("cost").cast("double"))
    .withColumn("stock_quantity", F.col("stock_quantity").cast("int"))
    .withColumn("is_active", F.col("is_active").cast("boolean"))
    .withColumn("_source_system", F.lit("ecommerce_csv"))
    .withColumn("_data_quality_flag", F.lit("validated"))
)

# Save to Bronze Delta table
products_path = f"{CONFIG['bronze_path']}products"
df_products.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(products_path)

# Create table in metastore
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ecommerce_bronze.products
USING DELTA
LOCATION '{products_path}'
""")

print(f"✅ Products: {df_products.count()} records ingested to {products_path}")

In [0]:
# Ingest orders to Bronze container
print("🔄 Processing orders...")

df_orders = (spark.read
             .format("csv")
             .option("header", "true")
             .schema(orders_schema)
             .load(f"{CONFIG['source_path']}orders"))

# Add metadata and type casting
df_orders = (df_orders
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("order_date", F.to_date(F.col("order_date")))
    .withColumn("order_timestamp", F.to_timestamp(F.col("order_timestamp")))
    .withColumn("subtotal", F.col("subtotal").cast("double"))
    .withColumn("shipping_cost", F.col("shipping_cost").cast("double"))
    .withColumn("tax_amount", F.col("tax_amount").cast("double"))
    .withColumn("total_amount", F.col("total_amount").cast("double"))
    .withColumn("_source_system", F.lit("ecommerce_csv"))
    .withColumn("_data_quality_flag", F.lit("validated"))
)

# Save to Bronze Delta table
orders_path = f"{CONFIG['bronze_path']}orders"
df_orders.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(orders_path)

# Create table in metastore
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ecommerce_bronze.orders
USING DELTA
LOCATION '{orders_path}'
""")

print(f"✅ Orders: {df_orders.count()} records ingested to {orders_path}")

In [0]:
# Ingest order items to Bronze container
print("🔄 Processing order items...")

df_order_items = (spark.read
                  .format("csv")
                  .option("header", "true")
                  .schema(order_items_schema)
                  .load(f"{CONFIG['source_path']}order_items"))

# Add metadata and type casting
df_order_items = (df_order_items
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .withColumn("discount_percent", F.col("discount_percent").cast("double"))
    .withColumn("line_total", F.col("line_total").cast("double"))
    .withColumn("_source_system", F.lit("ecommerce_csv"))
    .withColumn("_data_quality_flag", F.lit("validated"))
)

# Save to Bronze Delta table
order_items_path = f"{CONFIG['bronze_path']}order_items"
df_order_items.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save(order_items_path)

# Create table in metastore
spark.sql(f"""
CREATE TABLE IF NOT EXISTS ecommerce_bronze.order_items
USING DELTA
LOCATION '{order_items_path}'
""")

print(f"✅ Order Items: {df_order_items.count()} records ingested to {order_items_path}")

In [0]:
# Data Quality Validation
print("🔍 BRONZE LAYER DATA QUALITY CHECKS")
print("=" * 50)

# Check record counts
customers_count = spark.table('ecommerce_bronze.customers').count()
products_count = spark.table('ecommerce_bronze.products').count()
orders_count = spark.table('ecommerce_bronze.orders').count()
order_items_count = spark.table('ecommerce_bronze.order_items').count()

print(f"📊 Record Counts:")
print(f"   Customers: {customers_count:,}")
print(f"   Products: {products_count:,}")
print(f"   Orders: {orders_count:,}")
print(f"   Order Items: {order_items_count:,}")

# Data quality checks
null_customer_ids = spark.table('ecommerce_bronze.customers').filter(F.col('customer_id').isNull()).count()
negative_prices = spark.table('ecommerce_bronze.products').filter(F.col('price') < 0).count()
null_order_totals = spark.table('ecommerce_bronze.orders').filter(F.col('total_amount').isNull()).count()

print(f"\n🔍 Data Quality Metrics:")
print(f"   Null customer IDs: {null_customer_ids}")
print(f"   Negative prices: {negative_prices}")
print(f"   Null order totals: {null_order_totals}")

# Calculate data quality score
total_issues = null_customer_ids + negative_prices + null_order_totals
total_records = customers_count + products_count + orders_count + order_items_count

if total_issues == 0:
    quality_score = 100
    print(f"\n✅ DATA QUALITY SCORE: {quality_score}% (Perfect)")
else:
    quality_score = max(0, 100 - (total_issues / total_records * 100))
    print(f"\n⚠️ DATA QUALITY SCORE: {quality_score:.1f}%")

# COMMAND ----------

# Performance Optimization
print("⚡ OPTIMIZING BRONZE TABLES")
print("=" * 30)

# Optimize all tables for better query performance
tables = ['customers', 'products', 'orders', 'order_items']

for table in tables:
    print(f"🔧 Optimizing ecommerce_bronze.{table}...")
    spark.sql(f"OPTIMIZE ecommerce_bronze.{table}")
    
# Z-order optimization for frequently queried columns
spark.sql("OPTIMIZE ecommerce_bronze.orders ZORDER BY (order_date, customer_id)")
spark.sql("OPTIMIZE ecommerce_bronze.customers ZORDER BY (country, segment)")
spark.sql("OPTIMIZE ecommerce_bronze.products ZORDER BY (category)")

print("✅ All Bronze tables optimized")

# COMMAND ----------

# Bronze Layer Summary and Task Values
bronze_ingestion_timestamp = str(datetime.now())

# Set task values for Silver layer
dbutils.jobs.taskValues.set(key="bronze_customers_count", value=customers_count)
dbutils.jobs.taskValues.set(key="bronze_products_count", value=products_count)
dbutils.jobs.taskValues.set(key="bronze_orders_count", value=orders_count)
dbutils.jobs.taskValues.set(key="bronze_order_items_count", value=order_items_count)
dbutils.jobs.taskValues.set(key="bronze_data_quality_score", value=float(quality_score))
dbutils.jobs.taskValues.set(key="bronze_total_issues", value=total_issues)
dbutils.jobs.taskValues.set(key="bronze_ingestion_timestamp", value=bronze_ingestion_timestamp)
dbutils.jobs.taskValues.set(key="bronze_container_path", value=f"abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/")
dbutils.jobs.taskValues.set(key="STORAGE_ACCOUNT", value=STORAGE_ACCOUNT)

# Validation against expected volumes
if expected_customers:
    validation_results = {
        "customers_match": customers_count == expected_customers,
        "products_match": products_count == expected_products,
        "orders_match": orders_count == expected_orders,
        "order_items_match": order_items_count == expected_order_items
    }
    
    all_validations_passed = all(validation_results.values())
    dbutils.jobs.taskValues.set(key="bronze_validation_passed", value=all_validations_passed)
    
    print(f"\n🔍 Data Volume Validation:")
    for check, result in validation_results.items():
        status = "✅" if result else "❌"
        print(f"   {status} {check}: {result}")

print("📋 BRONZE LAYER SUMMARY")
print("=" * 40)
print(f"📁 Storage Location: abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/")
print(f"📊 Total Records Processed: {total_records:,}")
print(f"🎯 Data Quality Score: {quality_score:.1f}%")
print(f"💾 Delta Tables Created: {len(tables)}")
print("⚡ Performance Optimizations: Applied")

print("\n✅ Bronze layer ingestion complete!")
print("📁 Data stored in Azure Bronze container")
print("📋 Task values set for Silver layer processing")
print("🔜 Next: Run 03_silver_processing.py")