# Metadata-Driven Spark Batch Framework - Interactive Notebook

This notebook demonstrates how to use the Curation Framework for batch processing from Bronze (Lakeflow Streaming Tables) to Silver (Delta Tables).

## Features
- **High-Watermark Incremental Processing**: Efficiently process only new records
- **SCD Type 1**: Standard Upsert (INSERT/UPDATE)
- **SCD Type 2**: History tracking with effective dates

## Architecture
```
Bronze Layer (Lakeflow Streaming Tables) 
    → [Batch Framework with High Watermark] 
        → Silver Layer (Delta Tables)
```

In [None]:
# Enable autoreload for development
%load_ext autoreload
%autoreload 2

# Configuration path
CONFIG_PATH = "conf/tables_config.json"

In [0]:
# Import the framework
from curation_framework import (
    BatchFrameworkOrchestrator,
    SilverProcessor,
    process_all_tables,
    process_single_table
)
from curation_framework.utils import (
    validate_table_config,
    table_exists,
    get_table_row_count
)

print("✓ Curation Framework loaded successfully")

## 1. Load and Validate Configuration

First, let's load the tables configuration and validate it.

In [None]:
import json

# Load configuration
with open(CONFIG_PATH, 'r') as f:
    config = json.load(f)

# Display tables configuration
print("Configured Tables:")
print("-" * 60)
for table in config.get("tables", []):
    status = "✓ Enabled" if table.get("enabled", True) else "✗ Disabled"
    scd_type = f"SCD{table['scd_type']}"
    print(f"{status} | {table['table_name']} | {scd_type} | Keys: {table['business_key_columns']}")

print("\nGlobal Settings:")
print("-" * 60)
for key, value in config.get("global_settings", {}).items():
    print(f"  {key}: {value}")

## 2. Validate Table Configurations

Validate each table configuration to ensure all required fields are present.

In [None]:
# Validate all table configurations
all_valid = True
for table_config in config.get("tables", []):
    errors = validate_table_config(table_config)
    if errors:
        print(f"✗ {table_config['table_name']}: {errors}")
        all_valid = False
    else:
        print(f"✓ {table_config['table_name']}: Valid")

if all_valid:
    print("\n✓ All configurations are valid!")

## 3. Process All Tables (Full Run)

Run the complete batch processing for all enabled tables.

In [None]:
# Process all enabled tables
# Uncomment the line below to run the full batch processing
# result = process_all_tables(CONFIG_PATH)

# To process specific tables only:
# result = process_all_tables(CONFIG_PATH, table_filter=["silver_orders", "silver_customers"])

print("To run batch processing, uncomment the process_all_tables() call above.")

## 4. Manual Processing Example

This section demonstrates how to use the SilverProcessor class directly for more control.

In [None]:
# Example: Manual processing with SilverProcessor
# This gives you more control over the processing steps

# Get the first table config as an example
table_config = config["tables"][0]
global_settings = config["global_settings"]

print(f"Table: {table_config['table_name']}")
print(f"Source: {table_config['source_table']}")
print(f"Target: {table_config['target_table']}")
print(f"SCD Type: {table_config['scd_type']}")
print(f"Business Keys: {table_config['business_key_columns']}")
print(f"Watermark Column: {table_config.get('watermark_column', 'ingestion_ts')}")

In [None]:
# Example: Create a SilverProcessor instance
# processor = SilverProcessor(spark, table_config, global_settings)

# Step 1: Get high watermark (last processed timestamp)
# watermark = processor.get_high_watermark()
# print(f"High Watermark: {watermark}")

# Step 2: Read incremental data from source
# source_df = processor.read_incremental_source(watermark)
# print(f"Records to process: {source_df.count()}")

# Step 3: Apply transformations
# transformed_df = processor.apply_transformation(source_df)

# Step 4: Process based on SCD type
# if processor.scd_type == 1:
#     processor.process_scd_type1(transformed_df)
# elif processor.scd_type == 2:
#     processor.process_scd_type2(transformed_df)

print("Uncomment the code above to run manual processing steps.")

## 5. View SQL Transformation

Let's look at a sample SQL transformation file.

In [None]:
# Read and display a sample SQL transformation
sql_path = table_config.get("transformation_sql_path", "conf/sql/orders_transform.sql")

try:
    with open(sql_path, 'r') as f:
        sql_content = f.read()
    print(f"SQL Transformation: {sql_path}")
    print("-" * 60)
    print(sql_content)
except FileNotFoundError:
    print(f"SQL file not found: {sql_path}")