# SalesLT to Retail Data Bronze Layer

**Objective**: Copy all SalesLT tables to bronze layer in Retail Data lakehouse using simplified Fabric PySpark approach

**Prerequisites**:
- Microsoft Fabric environment with PySpark runtime
- Access to SalesLT tables (via shortcuts or direct tables)
- Write permissions to target bronze lakehouse
- Source lakehouse shortcut configured in bronze lakehouse

**Setup Strategy**:
1. **Current Context**: Running in bronze lakehouse (`RDS_Fabric_Foundry_workspace_Gaiye_Retail_Solution_Test_IDM_LH_bronze`)
2. **Source Access**: Access SalesLT tables from `Gaiye_Test_Lakehouse` via shortcuts
3. **Data Flow**: Shortcut tables → Bronze Files + Tables with metadata enrichment

**Expected Tables**: address, customer, customeraddress, product, productcategory, productdescription, productmodel, productmodelproductdescription, salesorderdetail, salesorderheader

**Workflow Options**:
- **First Run**: Execute all steps (1-5) for initial setup and validation
- **Subsequent Runs**: Execute steps 1, 4, 5 only (Setup → Process → Validate) for regular data refreshes
- **Quick Refresh**: Steps 2-3 can be skipped once environment is validated and working
- **Optimized Refresh**: Execute steps 1, 4 only (Setup → Process) for fastest data updates once pipeline is proven reliable

## Step 1: Environment Setup

In [None]:
# Import required libraries
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import lit, current_timestamp
from pyspark.sql.types import StringType

# Configuration
BRONZE_TARGET_PATH = "Files/SalesLT/"
SOURCE_SYSTEM = "SalesLT"
SOURCE_DATABASE = "Gaiye_Test_Lakehouse"
LOAD_TIMESTAMP = datetime.now().isoformat()
LOAD_DATE = datetime.now().strftime("%Y-%m-%d")

# Expected SalesLT tables
EXPECTED_TABLES = [
    'address', 'customer', 'customeraddress', 'product', 
    'productcategory', 'productdescription', 'productmodel',
    'productmodelproductdescription', 'salesorderdetail', 'salesorderheader'
]

print("🚀 SalesLT to Retail Data Bronze Pipeline")
print("=" * 50)
print(f"✅ Libraries imported")
print(f"📅 Load timestamp: {LOAD_TIMESTAMP}")
print(f"🎯 Target path: {BRONZE_TARGET_PATH}")
print(f"📥 Source database: {SOURCE_DATABASE}")
print(f"📊 Expected tables: {len(EXPECTED_TABLES)}")
print(f"✅ Microsoft Fabric PySpark environment ready")

## Step 2: Discover Available Tables

In [None]:
# Discover source tables from Gaiye_Test_Lakehouse
print("🔍 DISCOVERING SOURCE TABLES")
print("=" * 50)

# Check source lakehouse context
print(f"🏠 Current context: Bronze lakehouse (target)")
print(f"📥 Source database: {SOURCE_DATABASE}")
print(f"📤 Target path: {BRONZE_TARGET_PATH}")
print(f"📋 Expected table names: {', '.join(EXPECTED_TABLES)}")
print()

try:
    # Get tables from source database specifically
    print(f"🔍 Querying tables from {SOURCE_DATABASE}...")
    source_tables_df = spark.sql(f"SHOW TABLES IN {SOURCE_DATABASE}").toPandas()
    
    print(f"✅ Total tables found in source: {len(source_tables_df)}")
    print(f"✅ Spark SQL connection confirmed")
    
    if len(source_tables_df) > 0:
        # Handle flexible column naming
        table_column = None
        for possible_col in ['tableName', 'table_name', 'name']:
            if possible_col in source_tables_df.columns:
                table_column = possible_col
                break
        
        if table_column is None:
            table_column = source_tables_df.columns[0]
            print(f"🔍 Using column '{table_column}' as table name")
        
        print(f"\n📋 All available tables in {SOURCE_DATABASE}:")
        for _, row in source_tables_df.iterrows():
            table_name = row[table_column]
            # Check if this matches our expected tables
            is_expected = any(expected.lower() == table_name.lower() for expected in EXPECTED_TABLES)
            marker = "🎯" if is_expected else "📋"
            print(f"   {marker} {table_name}")
    else:
        print(f"\n⚠️ No tables found in {SOURCE_DATABASE}!")
        print("💡 Verify the source lakehouse contains data")
    
    # Find matching tables from our expected list
    available_tables = []
    missing_tables = []
    table_mapping = {}  # Map expected names to actual names
    
    if len(source_tables_df) > 0:
        # Get actual table names (case-sensitive)
        actual_table_names = source_tables_df[table_column].tolist()
        actual_table_names_lower = [name.lower() for name in actual_table_names]
        
        print(f"\n🔍 Matching expected tables with available tables:")
        for expected_table in EXPECTED_TABLES:
            # Find case-insensitive match
            matching_indices = [i for i, name in enumerate(actual_table_names_lower) 
                              if name == expected_table.lower()]
            
            if matching_indices:
                # Use the actual table name (with correct case)
                actual_table_name = actual_table_names[matching_indices[0]]
                available_tables.append(actual_table_name)
                table_mapping[expected_table] = actual_table_name
                print(f"   ✅ Found: {expected_table} → {actual_table_name}")
            else:
                missing_tables.append(expected_table)
                print(f"   ❌ Missing: {expected_table}")
    
    print(f"\n📊 DISCOVERY SUMMARY")
    print(f"✅ Available tables found: {len(available_tables)}")
    print(f"❌ Missing tables: {len(missing_tables)}")
    
    if len(available_tables) == 0:
        print(f"\n⚠️ No expected tables found in {SOURCE_DATABASE}!")
        print("💡 Check that the source lakehouse contains the SalesLT tables")
        print("💡 Verify table names match expected format")
        print()
        print("🔧 TROUBLESHOOTING:")
        print("1. Ensure you have a shortcut to the source lakehouse")
        print("2. Refresh the lakehouse view in Fabric")
        print("3. Check the source lakehouse contains data")
        
        # Show what was actually found for debugging
        if len(source_tables_df) > 0:
            print(f"\n🔍 Debug - Available table names in source:")
            for _, row in source_tables_df.iterrows():
                print(f"   📋 '{row[table_column]}'")
    else:
        print(f"\n🎉 Ready to process {len(available_tables)} tables!")
        
        # Store for next steps (use actual table names with correct casing)
        TABLES_TO_PROCESS = available_tables
        TABLE_MAPPING = table_mapping
        print(f"📝 Tables to process: {', '.join(TABLES_TO_PROCESS)}")
        print(f"🚀 Source: {SOURCE_DATABASE} → Target: {BRONZE_TARGET_PATH}")
        
        if missing_tables:
            print(f"\n⚠️ Missing tables (will be skipped): {', '.join(missing_tables)}")
        
except Exception as e:
    print(f"❌ Failed to discover tables: {str(e)}")
    print()
    print("🔧 TROUBLESHOOTING:")
    print(f"1. Ensure {SOURCE_DATABASE} is accessible from this lakehouse")
    print("2. Check lakehouse attachment/shortcut configuration")
    print("3. Refresh the lakehouse view in Fabric")
    print("4. Verify source lakehouse permissions")
    TABLES_TO_PROCESS = []
    TABLE_MAPPING = {}

## Step 3: Test Bronze Layer Access

In [None]:
# Test write access to bronze layer
print("🧪 TESTING BRONZE LAYER ACCESS")
print("=" * 50)

test_path = f"{BRONZE_TARGET_PATH}_test_access"

try:
    # Create test data
    test_data = [("access_test", LOAD_TIMESTAMP, "success")]
    test_df = spark.createDataFrame(test_data, ["test_type", "timestamp", "status"])
    
    # Test write to bronze location
    print(f"📝 Testing write to: {test_path}")
    test_df.write.mode("overwrite").parquet(test_path)
    
    # Verify read access
    verify_df = spark.read.parquet(test_path)
    test_count = verify_df.count()
    
    print(f"✅ Write access confirmed")
    print(f"✅ Read access confirmed ({test_count} test records)")
    print(f"🎯 Target path ready: {BRONZE_TARGET_PATH}")
    
    # Display test data to confirm
    print("\n📋 Test data sample:")
    verify_df.show()
    
except Exception as e:
    print(f"❌ Bronze layer access test failed: {str(e)}")
    print("💡 Ensure you have write permissions to the current lakehouse")
    print("💡 Check Files directory structure and permissions")

## Step 4: Process SalesLT Tables to Bronze

In [None]:
# Copy SalesLT tables to bronze layer with metadata enrichment
print("🚀 PROCESSING SALESLT TABLES TO BRONZE")
print("=" * 60)

if 'TABLES_TO_PROCESS' not in locals() or len(TABLES_TO_PROCESS) == 0:
    print("❌ No tables to process. Run previous steps first.")
else:
    print(f"📋 Processing {len(TABLES_TO_PROCESS)} tables")
    print(f"📥 Source: {SOURCE_DATABASE}")
    print(f"📤 Target: {BRONZE_TARGET_PATH}")
    print(f"📅 Load date: {LOAD_DATE}")
    print()
    
    # Processing results tracking
    results = []
    total_rows_processed = 0
    
    for i, table_name in enumerate(TABLES_TO_PROCESS, 1):
        print(f"[{i}/{len(TABLES_TO_PROCESS)}] Processing {table_name}...")
        
        try:
            # Read source table using qualified name
            print(f"   📖 Reading from {SOURCE_DATABASE}.{table_name}...")
            source_df = spark.sql(f"SELECT * FROM {SOURCE_DATABASE}.{table_name}")
            row_count = source_df.count()
            
            print(f"   ✅ Source data loaded: {row_count:,} rows")
            
            # Add bronze layer metadata columns
            print(f"   🏷️ Adding metadata columns...")
            bronze_df = source_df \
                .withColumn("_load_date", lit(LOAD_DATE)) \
                .withColumn("_load_timestamp", lit(LOAD_TIMESTAMP)) \
                .withColumn("_source_system", lit(SOURCE_SYSTEM)) \
                .withColumn("_source_table", lit(table_name)) \
                .withColumn("_processing_timestamp", current_timestamp()) \
                .withColumn("_record_source", lit("cross_lakehouse_copy")) \
                .withColumn("_load_method", lit("spark_sql_full_extract")) \
                .withColumn("_source_database", lit(SOURCE_DATABASE)) \
                .withColumn("_target_path", lit(f"{BRONZE_TARGET_PATH}{table_name}"))
            
            # Write to bronze layer as files
            table_target_path = f"{BRONZE_TARGET_PATH}{table_name}"
            print(f"   💾 Writing to Files: {table_target_path}")
            
            bronze_df.write \
                .mode("overwrite") \
                .option("overwriteSchema", "true") \
                .parquet(table_target_path)
            
            print(f"   ✅ Files saved: {row_count:,} rows")
            
            # Also create lakehouse table (data in memory, efficient to do both)
            lakehouse_table_name = f"bronze_{table_name}"
            print(f"   🏢 Creating lakehouse table: {lakehouse_table_name}")
            
            try:
                # Create/replace lakehouse table
                bronze_df.write \
                    .mode("overwrite") \
                    .option("overwriteSchema", "true") \
                    .saveAsTable(lakehouse_table_name)
                
                print(f"   ✅ Table created: {lakehouse_table_name}")
                table_creation_status = "success"
                
            except Exception as table_error:
                table_error_msg = str(table_error)[:60]
                print(f"   ⚠️ Table creation failed: {table_error_msg}...")
                table_creation_status = "file_only"
            
            # Success tracking
            total_rows_processed += row_count
            results.append({
                "table": table_name,
                "rows": row_count,
                "status": "success",
                "target_path": table_target_path,
                "lakehouse_table": lakehouse_table_name,
                "table_status": table_creation_status
            })
            
            print(f"   🎉 Successfully processed {row_count:,} rows (Files + Table)")
            
        except Exception as e:
            error_msg = str(e)[:100]
            results.append({
                "table": table_name,
                "rows": 0,
                "status": "failed",
                "error": error_msg
            })
            print(f"   ❌ Failed: {error_msg}...")
        
        print()
    
    # Processing summary
    successful = [r for r in results if r["status"] == "success"]
    failed = [r for r in results if r["status"] == "failed"]
    tables_created = [r for r in successful if r.get("table_status") == "success"]
    files_only = [r for r in successful if r.get("table_status") == "file_only"]
    
    print("🎉 PROCESSING SUMMARY")
    print("=" * 60)
    print(f"✅ Successfully processed: {len(successful)} tables")
    print(f"❌ Failed processing: {len(failed)} tables")
    print(f"📊 Total rows processed: {total_rows_processed:,}")
    print(f"📁 Files created: {len(successful)} (all)")
    print(f"🏢 Lakehouse tables created: {len(tables_created)}")
    print(f"⚠️ Files only (table creation failed): {len(files_only)}")
    print(f"📅 Processing completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    if successful:
        print(f"\n📁 Bronze layer file structure:")
        print(f"{BRONZE_TARGET_PATH}")
        for result in successful:
            table_marker = "🏢" if result.get("table_status") == "success" else "📁"
            print(f"├── {result['table']}/ ({result['rows']:,} rows) {table_marker}")
    
    if tables_created:
        print(f"\n🏢 Lakehouse tables created:")
        for result in tables_created:
            print(f"✅ {result['lakehouse_table']} ({result['rows']:,} rows)")
    
    if files_only:
        print(f"\n📁 Files only (table creation issues):")
        for result in files_only:
            print(f"⚠️ {result['table']} → Files saved, table creation failed")
    
    if failed:
        print(f"\n⚠️ Processing failures:")
        for result in failed:
            print(f"❌ {result['table']}: {result.get('error', 'Unknown error')}")
    
    print(f"\n🎯 Bronze data ready for downstream processing!")
    print(f"📁 Files stored in: {BRONZE_TARGET_PATH}")
    if tables_created:
        print(f"🏢 Tables available in: {len(tables_created)} lakehouse tables")

## Step 5: Validate Bronze Layer Data

In [None]:
# Validate bronze layer data quality and completeness
print("🔍 VALIDATING BRONZE LAYER DATA")
print("=" * 60)

if 'TABLES_TO_PROCESS' not in locals() or len(TABLES_TO_PROCESS) == 0:
    print("❌ No tables to validate. Run previous steps first.")
else:
    validation_results = []
    total_bronze_rows = 0
    
    print(f"📋 Validating {len(TABLES_TO_PROCESS)} bronze files")
    print(f"📍 Target location: {BRONZE_TARGET_PATH}")
    print()
    
    for i, table_name in enumerate(TABLES_TO_PROCESS, 1):
        print(f"[{i}/{len(TABLES_TO_PROCESS)}] Validating {table_name}...")
        
        try:
            file_path = f"{BRONZE_TARGET_PATH}{table_name}"
            lakehouse_table_name = f"bronze_{table_name}"
            
            # Read bronze data files
            bronze_df = spark.read.parquet(file_path)
            bronze_count = bronze_df.count()
            
            # Check if lakehouse table exists and validate
            table_count = 0
            table_exists = False
            try:
                table_df = spark.table(lakehouse_table_name)
                table_count = table_df.count()
                table_exists = True
            except Exception:
                table_exists = False
            
            # Check metadata columns
            sample_row = bronze_df.select(
                "_load_date", 
                "_source_system", 
                "_source_table",
                "_load_method"
            ).first()
            
            # Get column count for schema validation
            column_count = len(bronze_df.columns)
            metadata_columns = [col for col in bronze_df.columns if col.startswith('_')]
            business_columns = [col for col in bronze_df.columns if not col.startswith('_')]
            
            total_bronze_rows += bronze_count
            validation_results.append({
                "table": table_name,
                "bronze_rows": bronze_count,
                "table_rows": table_count,
                "table_exists": table_exists,
                "total_columns": column_count,
                "business_columns": len(business_columns),
                "metadata_columns": len(metadata_columns),
                "load_date": sample_row._load_date if sample_row else "Unknown",
                "source_system": sample_row._source_system if sample_row else "Unknown",
                "load_method": sample_row._load_method if sample_row else "Unknown",
                "lakehouse_table": lakehouse_table_name,
                "status": "success"
            })
            
            print(f"   ✅ Files: {bronze_count:,} rows validated")
            if table_exists:
                print(f"   🏢 Table: {table_count:,} rows validated")
                row_match = "✅" if bronze_count == table_count else "⚠️"
                print(f"   {row_match} Row count match: Files={bronze_count:,}, Table={table_count:,}")
            else:
                print(f"   ⚠️ Lakehouse table not found: {lakehouse_table_name}")
            print(f"   📊 Columns: {len(business_columns)} business + {len(metadata_columns)} metadata")
            print(f"   📅 Load date: {sample_row._load_date if sample_row else 'Unknown'}")
            print(f"   🏷️ Source: {sample_row._source_system if sample_row else 'Unknown'}")
            
        except Exception as e:
            error_msg = str(e)[:80]
            validation_results.append({
                "table": table_name,
                "bronze_rows": 0,
                "status": "failed",
                "error": error_msg
            })
            print(f"   ❌ Validation failed: {error_msg}...")
        
        print()
    
    # Validation summary
    successful_validations = [r for r in validation_results if r["status"] == "success"]
    failed_validations = [r for r in validation_results if r["status"] == "failed"]
    tables_available = [r for r in successful_validations if r.get("table_exists", False)]
    files_only = [r for r in successful_validations if not r.get("table_exists", False)]
    
    print("🎯 VALIDATION SUMMARY")
    print("=" * 60)
    print(f"✅ Successfully validated: {len(successful_validations)} files")
    print(f"❌ Failed validations: {len(failed_validations)} files")
    print(f"📊 Total bronze rows: {total_bronze_rows:,}")
    print(f"📁 Files available: {len(successful_validations)}")
    print(f"🏢 Lakehouse tables available: {len(tables_available)}")
    print(f"⚠️ Files only (no table): {len(files_only)}")
    print(f"🏷️ Metadata enrichment: Load tracking added")
    
    if successful_validations:
        print(f"\n📋 Bronze layer inventory:")
        for result in successful_validations:
            table_marker = "🏢+📁" if result.get("table_exists") else "📁"
            table_info = f" | Table: {result.get('table_rows', 0):,}" if result.get("table_exists") else ""
            print(f"  • {result['table']}: {result['bronze_rows']:,} rows | {result['total_columns']} columns | {result['load_date']} {table_marker}{table_info}")
    
    if tables_available:
        print(f"\n🏢 Available lakehouse tables:")
        for result in tables_available:
            print(f"  ✅ {result['lakehouse_table']}: {result.get('table_rows', 0):,} rows")
    
    if files_only:
        print(f"\n📁 Files only (tables not created):")
        for result in files_only:
            print(f"  ⚠️ {result['table']}: File available, no lakehouse table")
    
    if failed_validations:
        print(f"\n⚠️ Validation failures:")
        for result in failed_validations:
            print(f"  ❌ {result['table']}: {result.get('error', 'Unknown error')}")
    
    print(f"\n🎉 Bronze layer validation complete!")
    print(f"📁 Files location: {BRONZE_TARGET_PATH}")
    if tables_available:
        print(f"🏢 Tables accessible via: SELECT * FROM bronze_[tablename]")
    print(f"🚀 Ready for silver layer processing")

---

## Summary

This notebook provides a **simplified, streamlined approach** for copying SalesLT tables to bronze layer in Microsoft Fabric with **dual storage**:

### ✅ **Key Features**:
- **Pure SQL approach** - No dbutils dependencies
- **Dual storage strategy** - Both Files and managed Tables
- **Metadata enrichment** - Adds bronze layer tracking columns
- **Error handling** - Graceful failure handling and reporting
- **Comprehensive validation** - Files and Tables validation
- **Streamlined workflow** - Essential steps only for team adoption

### 🎯 **Output**:
- **Files**: Bronze layer data in `Files/SalesLT/` directory structure
- **Tables**: Managed lakehouse tables with `bronze_` prefix
- Each table saved as parquet files with load metadata
- Queryable tables for SQL analytics
- Validation reports for data quality assurance
- Ready for immediate downstream processing

### 💡 **Usage Patterns**:
- **File-based processing**: Use `spark.read.parquet("Files/SalesLT/tablename")`
- **SQL analytics**: Use `SELECT * FROM bronze_tablename`
- **Cross-lakehouse queries**: Join with other lakehouse data
- **Data quality checks**: Built-in validation and metadata tracking

### 🚀 **Next Steps**:
- Silver layer transformations
- Data quality rules implementation
- Incremental load patterns
- Gold layer aggregations