# Bronze Layer Ingestion - Databricks

This notebook reads raw Instacart CSV files from DBFS and writes them to Delta Lake format in the Bronze layer.

**Prerequisites:**
- Cluster is running and attached
- CSV files uploaded to `/FileStore/instacart/raw/`

**Output:**
- Delta tables in `/FileStore/instacart/bronze/`

In [None]:
# Configuration
RAW_PATH = "/FileStore/instacart/raw"
BRONZE_PATH = "/FileStore/instacart/bronze"

print(f"Raw data path: {RAW_PATH}")
print(f"Bronze output path: {BRONZE_PATH}")
print(f"Spark version: {spark.version}")

## Verify Raw Files Exist

Check if CSV files are uploaded to DBFS.

In [None]:
# List files in raw directory
raw_files = dbutils.fs.ls(RAW_PATH)

print("Files in raw directory:")
for file in raw_files:
    print(f"  - {file.name} ({file.size / 1024 / 1024:.2f} MB)")

# Expected files
expected_files = ["orders.csv", "products.csv", "aisles.csv", 
                  "departments.csv", "order_products_train.csv", 
                  "order_products_prior.csv"]

missing_files = [f for f in expected_files if f not in [file.name for file in raw_files]]

if missing_files:
    print(f"\n‚ö†Ô∏è Missing files: {', '.join(missing_files)}")
    print("Upload missing files via Data ‚Üí Create Table ‚Üí Upload File")
else:
    print("\n‚úì All required files found!")

## Helper Function: Ingest CSV to Bronze

In [None]:
from pyspark.sql.functions import current_timestamp, lit

def ingest_csv_to_bronze(csv_filename, table_name):
    """
    Read CSV from DBFS and write to Bronze Delta table
    
    Args:
        csv_filename: Name of CSV file (e.g., 'orders.csv')
        table_name: Name for Bronze table (e.g., 'orders')
    """
    raw_file_path = f"{RAW_PATH}/{csv_filename}"
    bronze_table_path = f"{BRONZE_PATH}/{table_name}"
    
    print(f"üì• Ingesting: {csv_filename} ‚Üí {table_name}")
    
    try:
        # Read CSV with header and schema inference
        df = spark.read.csv(raw_file_path, header=True, inferSchema=True)
        
        # Add metadata columns
        df_with_metadata = df \
            .withColumn("ingestion_timestamp", current_timestamp()) \
            .withColumn("source_file", lit(csv_filename))
        
        # Write to Delta Lake
        df_with_metadata.write \
            .format("delta") \
            .mode("overwrite") \
            .save(bronze_table_path)
        
        record_count = df.count()
        print(f"   ‚úì Ingested {record_count:,} records\n")
        
        return True
        
    except Exception as e:
        print(f"   ‚úó Error: {str(e)}\n")
        return False

## Ingest All Tables

Run ingestion for each CSV file.

In [None]:
print("=" * 80)
print("BRONZE LAYER INGESTION")
print("=" * 80)

ingestion_tasks = [
    ("orders.csv", "orders"),
    ("products.csv", "products"),
    ("aisles.csv", "aisles"),
    ("departments.csv", "departments"),
    ("order_products_train.csv", "order_products_train"),
    ("order_products_prior.csv", "order_products_prior")
]

results = []
for csv_file, table_name in ingestion_tasks:
    success = ingest_csv_to_bronze(csv_file, table_name)
    results.append((table_name, success))

print("=" * 80)
print("INGESTION SUMMARY")
print("=" * 80)

successful = sum(1 for _, success in results if success)
total = len(results)

for table_name, success in results:
    status = "‚úì" if success else "‚úó"
    print(f"{status} {table_name}")

print(f"\nCompleted {successful}/{total} ingestions successfully")

## Verify Bronze Tables

Check that Delta tables were created and preview data.

In [None]:
# List Bronze tables
bronze_tables = dbutils.fs.ls(BRONZE_PATH)

print("Bronze Delta tables created:")
for table in bronze_tables:
    print(f"  - {table.name}")

In [None]:
# Preview orders table
orders_df = spark.read.format("delta").load(f"{BRONZE_PATH}/orders")

print(f"Orders table: {orders_df.count():,} records")
print("\nSample data:")
display(orders_df.limit(10))

In [None]:
# Preview products table
products_df = spark.read.format("delta").load(f"{BRONZE_PATH}/products")

print(f"Products table: {products_df.count():,} records")
print("\nSample data:")
display(products_df.limit(10))

## Summary

‚úÖ **Bronze layer ingestion complete!**

**Next steps:**
1. Run `02_silver_transformation_databricks` to create cleaned, enriched tables
2. Check Bronze tables in Data Explorer: `/FileStore/instacart/bronze/`