# ü•â Bronze Layer: Data Ingestion
**Load classification results and learning database into Delta Lake**

This notebook:
1. Loads configuration from setup notebook
2. Ingests classification JSON files ‚Üí Delta Lake
3. Migrates learning_database.json ‚Üí Delta Lake
4. Enables time travel and ACID transactions

## Load Configuration

In [None]:
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Load config from setup notebook
config_path = "/dbfs/tamu-datathon-config.json"
if not os.path.exists(config_path):
    raise Exception("‚ùå Configuration not found! Run 00_setup_and_verify.ipynb first")

with open(config_path, 'r') as f:
    config = json.load(f)

print("=" * 80)
print("üìã LOADED CONFIGURATION")
print("=" * 80)
print(f"Repository: {config['repo_path']}")
print(f"Results: {config['results_path']}")
print(f"Classification files: {config['num_classification_files']}")
print(f"Learning entries: {config['num_learning_entries']}")
print(f"Delta base: {config['delta_base']}")

# Extract paths
results_path = config['results_path']
learning_db_path = config['learning_db_path']
bronze_path = config['bronze_path']

## Step 1: Ingest Classification Results (Bronze Layer)

In [None]:
print("\n" + "=" * 80)
print("üì• INGESTING CLASSIFICATION RESULTS")
print("=" * 80)

try:
    # List all JSON files
    json_files = [f for f in os.listdir(results_path) 
                  if f.endswith('.json') and f != 'learning_database.json']
    
    if len(json_files) == 0:
        print("‚ö†Ô∏è  No classification files found. Creating sample data...")
        sample_data = [{
            "document_id": "sample_001",
            "filename": "sample_document.pdf",
            "classification": "Public",
            "confidence": 0.95,
            "additional_labels": ["General Content"],
            "requires_review": False,
            "safety_check": {"is_safe": True}
        }]
        classifications_df = spark.createDataFrame(sample_data)
    else:
        # Read all JSON files
        all_data = []
        for json_file in json_files:
            file_path = os.path.join(results_path, json_file)
            with open(file_path, 'r') as f:
                data = json.load(f)
                all_data.append(data)
        
        classifications_df = spark.createDataFrame(all_data)
        print(f"‚úÖ Loaded {len(all_data)} classification records")
    
    # Write to Delta Lake
    bronze_classifications_path = f"{bronze_path}/classifications"
    classifications_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(bronze_classifications_path)
    
    print(f"\n‚úÖ Bronze layer created: {bronze_classifications_path}")
    
    # Verify
    bronze_df = spark.read.format("delta").load(bronze_classifications_path)
    print(f"‚úÖ Verified: {bronze_df.count()} records in Delta Lake")
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    raise

## Step 2: Ingest Learning Database

In [None]:
print("\n" + "=" * 80)
print("üìö INGESTING LEARNING DATABASE")
print("=" * 80)

try:
    if os.path.exists(learning_db_path):
        with open(learning_db_path, 'r') as f:
            learning_raw = json.load(f)
        
        learning_data = learning_raw.get('learning_entries', [])
        
        if len(learning_data) > 0:
            learning_df = spark.createDataFrame(learning_data)
            print(f"‚úÖ Loaded {len(learning_data)} learning records")
        else:
            # Create sample
            sample_learning = [{
                "document_id": "sample_001",
                "original_classification": "Public",
                "corrected_classification": "Confidential",
                "approved": False,
                "feedback_notes": "Contains internal business information",
                "timestamp": "2024-11-09T10:00:00"
            }]
            learning_df = spark.createDataFrame(sample_learning)
            print("‚úÖ Created sample learning data")
    else:
        sample_learning = [{
            "document_id": "sample_001",
            "original_classification": "Public",
            "corrected_classification": "Confidential",
            "approved": False,
            "feedback_notes": "Contains internal business information"
        }]
        learning_df = spark.createDataFrame(sample_learning)
        print("‚úÖ Created sample learning data")
    
    # Write to Delta Lake
    bronze_learning_path = f"{bronze_path}/learning_database"
    learning_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(bronze_learning_path)
    
    print(f"\n‚úÖ Learning database ingested: {bronze_learning_path}")
    
    # Optimize
    spark.sql(f"OPTIMIZE delta.`{bronze_learning_path}`")
    print("‚úÖ Optimized Delta Lake storage")
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    raise

## Step 3: Verify Bronze Layer

In [None]:
print("\n" + "=" * 80)
print("‚úÖ BRONZE LAYER VERIFICATION")
print("=" * 80)

# Count records
classifications_count = spark.read.format("delta").load(f"{bronze_path}/classifications").count()
learning_count = spark.read.format("delta").load(f"{bronze_path}/learning_database").count()

print(f"üìä Bronze Layer Summary:")
print(f"   Classifications: {classifications_count} records")
print(f"   Learning Database: {learning_count} records")

# Show samples
print("\nüîç Sample Classification Record:")
display(spark.read.format("delta").load(f"{bronze_path}/classifications").limit(1))

print("\nüîç Sample Learning Record:")
display(spark.read.format("delta").load(f"{bronze_path}/learning_database").limit(1))

## Step 4: Enable Time Travel

In [None]:
print("\n" + "=" * 80)
print("‚è∞ DELTA LAKE HISTORY")
print("=" * 80)

# Show history
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{bronze_path}/learning_database`")
display(history_df.select("version", "timestamp", "operation", "operationMetrics"))

print("\n‚úÖ Time Travel enabled!")
print(f"   Query past versions: SELECT * FROM delta.`{bronze_path}/learning_database` VERSION AS OF 0")

## ‚úÖ Ingestion Complete!

Bronze layer created with:
- ‚úÖ Classification results in Delta Lake
- ‚úÖ Learning database in Delta Lake  
- ‚úÖ Time Travel enabled
- ‚úÖ ACID transactions enabled

**Next**: Run `03_pattern_mining.ipynb`

In [None]:
# Update config
config['bronze_classifications'] = f"{bronze_path}/classifications"
config['bronze_learning'] = f"{bronze_path}/learning_database"

with open(config_path, 'w') as f:
    json.dump(config, f, indent=2)

print("üíæ Configuration updated for next notebooks")