# Bronze Layer Ingestion - Enterprise Data Platform

## Overview
This notebook ingests CSV files from OneLake Files into Delta tables (Bronze layer).

**Prerequisites:**
- CSV files uploaded to Lakehouse Files/bronze/
- Lakehouse attached to this notebook

**Output:**
- Delta tables in Bronze layer
- Data quality report

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
import os
from datetime import datetime

print(f"Bronze Ingestion Started: {datetime.now()}")
print(f"Spark Version: {spark.version}")

## Configuration

In [None]:
# Configuration
BRONZE_PATH = "Files/bronze"
TABLE_PATH = "Tables"

# List of tables to ingest (conformed dimensions)
DIMENSION_TABLES = [
    "DimDate",
    "DimCustomer", 
    "DimProduct",
    "DimEmployee",
    "DimGeography",
    "DimFacility",
    "DimProject",
    "DimAccount"
]

# List of fact tables (by domain)
FACT_TABLES = {
    "Sales": ["FactSales", "FactReturns"],
    "CRM": ["FactOpportunities", "FactActivities"],
    "HR": ["FactAttrition", "FactHiring"],
    "SupplyChain": ["FactInventory", "FactPurchaseOrders"],
    "Manufacturing": ["FactProduction", "FactWorkOrders"],
    "Finance": ["FactGeneralLedger", "FactBudget"],
    "ESG": ["FactEmissions"],
    "CallCenter": ["FactSupport"],
    "ITOps": ["FactIncidents"],
    "FinOps": ["FactCloudCosts"],
    "RD": ["FactExperiments"],
    "Quality": ["FactDefects", "FactSecurityEvents"],
    "RiskCompliance": ["FactRisks", "FactAudits", "FactComplianceChecks"]
}

print(f"Bronze data source: {BRONZE_PATH}")
print(f"Target: Delta tables in {TABLE_PATH}")

## Utility Functions

In [None]:
def ingest_csv_to_delta(table_name: str, csv_path: str, overwrite: bool = True):
    """
    Ingest CSV file to Delta table with schema inference.
    
    Args:
        table_name: Name of the target Delta table
        csv_path: Path to CSV file
        overwrite: Whether to overwrite existing table
    """
    try:
        print(f"\n{'='*80}")
        print(f"Ingesting: {table_name}")
        print(f"Source: {csv_path}")
        
        # Read CSV with schema inference
        df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("dateFormat", "yyyy-MM-dd") \
            .load(csv_path)
        
        row_count = df.count()
        print(f"Rows read: {row_count:,}")
        print(f"Columns: {len(df.columns)}")
        
        # Add metadata columns
        df = df.withColumn("_ingestion_timestamp", current_timestamp()) \
               .withColumn("_source_file", lit(csv_path))
        
        # Write to Delta table
        mode = "overwrite" if overwrite else "append"
        df.write.format("delta") \
            .mode(mode) \
            .option("mergeSchema", "true") \
            .saveAsTable(table_name)
        
        print(f"✅ {table_name} created successfully ({row_count:,} rows)")
        return True
        
    except Exception as e:
        print(f"❌ Error ingesting {table_name}: {str(e)}")
        return False

## Ingest Conformed Dimensions

In [None]:
# Ingest conformed dimensions
print("\n" + "="*80)
print("STEP 1: Ingesting Conformed Dimensions")
print("="*80)

dimension_results = {}

for table in DIMENSION_TABLES:
    csv_path = f"{BRONZE_PATH}/dimensions/{table}.csv"
    success = ingest_csv_to_delta(table, csv_path)
    dimension_results[table] = success

# Summary (use Python's built-in sum, not PySpark's)
success_count = len([v for v in dimension_results.values() if v])
print(f"\n✅ Dimensions ingested: {success_count}/{len(DIMENSION_TABLES)}")

## Ingest Fact Tables

In [None]:
# Ingest fact tables by domain
print("\n" + "="*80)
print("STEP 2: Ingesting Fact Tables")
print("="*80)

fact_results = {}

# Domain to folder mapping
domain_folder_mapping = {
    "Sales": "sales",
    "CRM": "crm",
    "HR": "hr",
    "SupplyChain": "supply_chain",
    "Manufacturing": "manufacturing",
    "Finance": "finance",
    "ESG": "esg",
    "CallCenter": "call_center",
    "ITOps": "itops",
    "FinOps": "finops",
    "RD": "rd",
    "Quality": "quality_security",
    "RiskCompliance": "risk_compliance"
}

# Updated FACT_TABLES with all domains
FACT_TABLES_UPDATED = {
    "Sales": ["FactSales", "FactReturns"],
    "CRM": ["FactOpportunities", "FactActivities"],
    "HR": ["FactAttrition", "FactHiring"],
    "SupplyChain": ["FactInventory", "FactPurchaseOrders"],
    "Manufacturing": ["FactProduction", "FactWorkOrders"],
    "Finance": ["FactGeneralLedger", "FactBudget"],
    "ESG": ["FactEmissions"],
    "CallCenter": ["FactSupport"],
    "ITOps": ["FactIncidents"],
    "FinOps": ["FactCloudCosts"],
    "RD": ["FactExperiments"],
    "Quality": ["FactDefects", "FactSecurityEvents"],
    "RiskCompliance": ["FactRisks", "FactAudits", "FactComplianceChecks"]
}

for domain, tables in FACT_TABLES_UPDATED.items():
    print(f"\n--- {domain} Domain ---")
    folder = domain_folder_mapping.get(domain, domain.lower())
    
    for table in tables:
        csv_path = f"{BRONZE_PATH}/{folder}/{table}.csv"
        
        # Check if file exists (some domains may not be generated yet)
        try:
            spark.read.format("csv").option("header", "true").load(csv_path).limit(1).count()
            success = ingest_csv_to_delta(table, csv_path)
            fact_results[table] = success
        except Exception as e:
            print(f"⏭️  Skipping {table} (file not found)")
            fact_results[table] = None

# Summary - use len() to avoid conflict with PySpark's sum()
success_list = [v for v in fact_results.values() if v == True]
skipped_list = [v for v in fact_results.values() if v is None]
failed_list = [v for v in fact_results.values() if v == False]

success_count = len(success_list)
skipped_count = len(skipped_list)
failed_count = len(failed_list)

print(f"\n✅ Fact tables ingested: {success_count}")
print(f"⏭️  Skipped (not generated): {skipped_count}")
print(f"❌ Failed: {failed_count}")

## Verify Delta Tables

In [None]:
# List all Delta tables created
print("\n" + "="*80)
print("STEP 3: Verification - Delta Tables Created")
print("="*80)

tables = spark.catalog.listTables()
delta_tables = [t for t in tables if t.tableType == "MANAGED"]

print(f"\nTotal Delta tables: {len(delta_tables)}\n")

# Display table statistics
for table in sorted(delta_tables, key=lambda x: x.name):
    df = spark.table(table.name)
    row_count = df.count()
    col_count = len(df.columns)
    print(f"  {table.name:30s} | {row_count:>10,} rows | {col_count:>3} columns")

## Data Quality Checks

In [None]:
# Basic data quality checks
print("\n" + "="*80)
print("STEP 4: Data Quality Checks")
print("="*80)

# Check for nulls in primary keys
quality_issues = []

# Check DimCustomer
try:
    dim_customer = spark.table("DimCustomer")
    null_count = dim_customer.filter(col("customer_id").isNull()).count()
    if null_count > 0:
        quality_issues.append(f"DimCustomer: {null_count} null customer_id values")
    else:
        print("✅ DimCustomer: No null primary keys")
except:
    print("⏭️  DimCustomer not available")

# Check DimProduct
try:
    dim_product = spark.table("DimProduct")
    null_count = dim_product.filter(col("product_id").isNull()).count()
    if null_count > 0:
        quality_issues.append(f"DimProduct: {null_count} null product_id values")
    else:
        print("✅ DimProduct: No null primary keys")
except:
    print("⏭️  DimProduct not available")

# Check FactSales
try:
    fact_sales = spark.table("FactSales")
    null_count = fact_sales.filter(
        col("order_id").isNull() | col("customer_id").isNull() | col("product_id").isNull()
    ).count()
    if null_count > 0:
        quality_issues.append(f"FactSales: {null_count} null key values")
    else:
        print("✅ FactSales: No null key columns")
except:
    print("⏭️  FactSales not available")

# Summary
if quality_issues:
    print("\n⚠️  Data Quality Issues Found:")
    for issue in quality_issues:
        print(f"  - {issue}")
else:
    print("\n✅ All data quality checks passed!")

## Completion Summary

In [None]:
# Final summary
print("\n" + "="*80)
print("BRONZE INGESTION COMPLETE")
print("="*80)
print(f"Completion Time: {datetime.now()}")

# Calculate counts using len() to avoid PySpark sum() conflict
dim_count = len([v for v in dimension_results.values() if v])
fact_count = len([v for v in fact_results.values() if v == True])

print(f"\nDimensions: {dim_count} tables")
print(f"Facts: {fact_count} tables")
print(f"\nNext Step: Run notebook 02_transform_to_silver.ipynb")
print("="*80)