# Phase 2B Production Pipeline - Delta Lake with YAML Config

**Version:** v1.2.0-alpha.2-phase2b  
**Focus:** Production-ready Delta Lake pipelines using YAML + Key Vault

---

## What You'll Learn

‚úÖ **Best Practices:**
- YAML configuration (not hardcoded Python)
- Key Vault authentication (not direct keys)
- Delta Lake as default format
- Multi-account storage setup
- Error handling and validation

‚úÖ **Real-World Scenarios:**
- Bronze ‚Üí Silver ‚Üí Gold architecture
- CSV to Delta conversion
- Time travel for auditing
- VACUUM maintenance
- Version comparison

---

## Prerequisites

**Azure Setup:**
1. Storage account with containers (bronze, silver, gold)
2. Key Vault with storage keys stored as secrets
3. Managed Identity or Service Principal with access

**Local Setup:**
```bash
pip install -e ".[pandas,azure]"
```

## Part 1: Create Production YAML Configuration

In [None]:
import yaml
from pathlib import Path

# Configuration for production Delta Lake pipeline
config = {
    "project": "delta_lake_production",
    "description": "Production Delta Lake pipeline with Bronze/Silver/Gold architecture",
    "engine": "pandas",
    "connections": {
        # Local for testing
        "local": {"type": "local", "base_path": "./pipeline_data"},
        # Bronze layer (raw data from sources)
        "bronze": {
            "type": "azure_adls",
            "account": "your_account_name",  # ‚Üê Update this
            "container": "your_container_name",  # ‚Üê Update this
            "path_prefix": "raw",
            "auth_mode": "key_vault",  # ‚Üê Best practice!
            "auth": {
                "key_vault_name": "your_key_valut",  # ‚Üê Update this
                "secret_name": "your_secret_name",  # ‚Üê Update this
            },
        },
        # Silver layer (cleaned, validated data)
        "silver": {
            "type": "azure_adls",
            "account": "your_account_name",  # ‚Üê Update this
            "container": "your_container_name",  # ‚Üê Update this
            "path_prefix": "clean",
            "auth_mode": "key_vault",
            "auth": {
                "key_vault_name": "your_key_valut",  # ‚Üê Update this
                "secret_name": "your_secret_name",  # ‚Üê Update this
            },
        },
        # Gold layer (aggregated, business-ready data)
        "gold": {
            "type": "azure_adls",
            "account": "your_account_name",  # ‚Üê Update this
            "container": "your_container_name",  # ‚Üê Update this
            "path_prefix": "aggregated",
            "auth_mode": "key_vault",
            "auth": {
                "key_vault_name": "your_key_valut",  # ‚Üê Update this
                "secret_name": "your_secret_name",  # ‚Üê Update this
            },
        },
    },
    "story": {"connection": "local", "path": "stories/", "enabled": True},
    "retry": {"max_attempts": 3, "backoff_seconds": 2.0},
    "logging": {"level": "INFO"},
    "pipelines": [
        # Pipeline 1: Bronze ‚Üí Silver (CSV to Delta)
        {
            "pipeline": "bronze_to_silver",
            "name": "Bronze to Silver - Sales Data",
            "description": "Convert raw CSV to cleaned Delta Lake tables",
            "nodes": [
                # Read raw CSV from Bronze
                {
                    "name": "read_raw_sales",
                    "read": {
                        "connection": "bronze",
                        "path": "sales/raw_sales.csv",
                        "format": "csv",
                    },
                },
                # Clean and validate (single SQL statement)
                {
                    "name": "clean_sales",
                    "depends_on": ["read_raw_sales"],
                    "transform": {
                        "steps": [
                            """
                            SELECT 
                                *,
                                now() as processed_at
                            FROM read_raw_sales 
                            WHERE order_id IS NOT NULL 
                            AND amount > 0
                            """
                        ]
                    },
                },
                # Write to Silver as Delta (best practice!)
                {
                    "name": "write_silver_sales",
                    "depends_on": ["clean_sales"],
                    "write": {
                        "connection": "silver",
                        "path": "sales/sales.delta",  # ‚Üê Delta format!
                        "format": "delta",
                        "mode": "append",  # ‚Üê Incremental loads
                    },
                },
            ],
        },
        # Pipeline 2: Silver ‚Üí Gold (Aggregation)
        {
            "pipeline": "silver_to_gold",
            "name": "Silver to Gold - Daily Aggregates",
            "description": "Create business-ready aggregated tables",
            "nodes": [
                # Read from Silver Delta
                {
                    "name": "read_silver_sales",
                    "read": {
                        "connection": "silver",
                        "path": "sales/sales.delta",
                        "format": "delta",
                    },
                },
                # Aggregate by date
                {
                    "name": "aggregate_daily",
                    "depends_on": ["read_silver_sales"],
                    "transform": {
                        "steps": [
                            """
                            SELECT 
                                DATE(processed_at) as date,
                                COUNT(*) as order_count,
                                SUM(amount) as total_amount,
                                AVG(amount) as avg_amount,
                                MIN(amount) as min_amount,
                                MAX(amount) as max_amount
                            FROM read_silver_sales
                            GROUP BY DATE(processed_at)
                            ORDER BY date DESC
                            """
                        ]
                    },
                },
                # Write to Gold as Delta with partitioning
                {
                    "name": "write_gold_daily",
                    "depends_on": ["aggregate_daily"],
                    "write": {
                        "connection": "gold",
                        "path": "sales/daily_summary.delta",
                        "format": "delta",
                        "mode": "overwrite",  # ‚Üê Full refresh for aggregates
                    },
                },
            ],
        },
        # Pipeline 3: Time Travel Audit
        {
            "pipeline": "audit_delta_versions",
            "name": "Audit Trail - Compare Versions",
            "description": "Compare current vs previous Delta versions",
            "nodes": [
                # Read current version
                {
                    "name": "read_current",
                    "read": {
                        "connection": "silver",
                        "path": "sales/sales.delta",
                        "format": "delta",
                    },
                },
                # Read previous version (time travel!)
                {
                    "name": "read_previous",
                    "read": {
                        "connection": "silver",
                        "path": "sales/sales.delta",
                        "format": "delta",
                        "options": {
                            "versionAsOf": 0  # ‚Üê Time travel!
                        },
                    },
                },
                # Compare versions
                {
                    "name": "compare_versions",
                    "depends_on": ["read_current", "read_previous"],
                    "transform": {
                        "steps": [
                            """
                            SELECT 
                                'Current' as version_type,
                                COUNT(*) as row_count,
                                SUM(amount) as total_amount
                            FROM read_current
                            UNION ALL
                            SELECT 
                                'Previous' as version_type,
                                COUNT(*) as row_count,
                                SUM(amount) as total_amount
                            FROM read_previous
                            """
                        ]
                    },
                },
                # Save audit report
                {
                    "name": "save_audit",
                    "depends_on": ["compare_versions"],
                    "write": {
                        "connection": "local",
                        "path": "audit/version_comparison.csv",
                        "format": "csv",
                    },
                },
            ],
        },
    ],
}

# Save configuration
config_path = Path("config_production.yaml")
with open(config_path, "w") as f:
    yaml.dump(config, f, default_flow_style=False, sort_keys=False)

print("‚úÖ Production YAML configuration created!")
print(f"\nLocation: {config_path.absolute()}")
print("\nüìù Next steps:")
print("1. Update YOUR_STORAGE_ACCOUNT with your storage account name")
print("2. Update YOUR_KEY_VAULT with your Key Vault name")
print("3. Ensure secrets exist in Key Vault:")
print("   - bronze-storage-key")
print("   - silver-storage-key")
print("   - gold-storage-key")

## Part 2: Review the Generated Configuration

In [None]:
# Display the configuration
print("Generated Configuration:")
print("=" * 70)
with open("config_production.yaml", "r") as f:
    print(f.read())

## Part 3: Update Configuration with Your Azure Details

**‚ö†Ô∏è IMPORTANT:** Before running pipelines, update the YAML file:

```yaml
connections:
  bronze:
    account: "mycompanystorage"      # ‚Üê Your storage account
    auth:
      key_vault_name: "mycompany-kv"   # ‚Üê Your Key Vault
      secret_name: "bronze-storage-key" # ‚Üê Secret in Key Vault
```

**Key Vault Setup:**
1. Store your storage account keys as secrets in Key Vault
2. Grant your identity access to Key Vault (Get Secret permission)
3. Use DefaultAzureCredential (works in Databricks, local with `az login`)

In [None]:
# Manually edit the file or update programmatically
import yaml

# Option 1: Edit config_production.yaml manually in VS Code
print("üìù Edit config_production.yaml and update:")
print("\n1. YOUR_STORAGE_ACCOUNT ‚Üí your actual storage account name")
print("2. YOUR_KEY_VAULT ‚Üí your actual Key Vault name")
print("\nThen run the next cell to validate.")

# Option 2: Update programmatically (uncomment and modify)
# with open("config_production.yaml", "r") as f:
#     config = yaml.safe_load(f)
#
# # Update values
# for conn_name in ["bronze", "silver", "gold"]:
#     config["connections"][conn_name]["account"] = "your_actual_account"
#     config["connections"][conn_name]["auth"]["key_vault_name"] = "your_actual_kv"
#
# # Save
# with open("config_production.yaml", "w") as f:
#     yaml.dump(config, f, default_flow_style=False, sort_keys=False)

## Part 4: Validate Configuration (Local Testing First)

In [None]:
import sys
from pathlib import Path

sys.path.insert(0, str(Path.cwd().parent))

from odibi.pipeline import Pipeline
from odibi.config import ProjectConfig

# Load and validate configuration
try:
    with open("config_production.yaml", "r") as f:
        config_dict = yaml.safe_load(f)

    # Validate with Pydantic
    project_config = ProjectConfig(**config_dict)

    print("‚úÖ Configuration is valid!")
    print(f"\nConnections defined: {list(project_config.connections.keys())}")
    print(f"Pipelines defined: {[p.pipeline for p in project_config.pipelines]}")

except Exception as e:
    print(f"‚ùå Configuration error: {e}")
    print("\nPlease fix the configuration and try again.")

## Part 5: Create Sample Data for Testing

In [None]:
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
from pathlib import Path

# Create sample sales data
np.random.seed(42)
n_rows = 1000

sample_data = pd.DataFrame(
    {
        "order_id": range(1, n_rows + 1),
        "customer": [f"Customer_{i % 100}" for i in range(n_rows)],
        "product": np.random.choice(["Laptop", "Phone", "Tablet", "Monitor", "Keyboard"], n_rows),
        "amount": np.random.uniform(50, 2000, n_rows).round(2),
        "quantity": np.random.randint(1, 10, n_rows),
        "region": np.random.choice(["North", "South", "East", "West"], n_rows),
        "order_date": [
            (datetime.now() - timedelta(days=np.random.randint(0, 90))).strftime("%Y-%m-%d")
            for _ in range(n_rows)
        ],
    }
)

# Add some nulls for testing (10%)
null_indices = np.random.choice(n_rows, size=int(n_rows * 0.1), replace=False)
sample_data.loc[null_indices, "order_id"] = None

# Add some invalid amounts (5%)
invalid_indices = np.random.choice(n_rows, size=int(n_rows * 0.05), replace=False)
sample_data.loc[invalid_indices, "amount"] = -100

# Save in correct location (with sales/ subdirectory)
output_path = Path("pipeline_data/input/sales")
output_path.mkdir(parents=True, exist_ok=True)
sample_data.to_csv(output_path / "raw_sales.csv", index=False)

print("‚úÖ Sample data created!")
print(f"\nLocation: {output_path / 'raw_sales.csv'}")
print(f"Rows: {len(sample_data):,}")
print(
    f"Nulls: {sample_data['order_id'].isna().sum()} ({sample_data['order_id'].isna().sum() / len(sample_data) * 100:.1f}%)"
)
print(f"Invalid amounts: {(sample_data['amount'] < 0).sum()}")
print("\nSample:")
print(sample_data.head(10))

## Part 6: Test Locally First (Before Cloud)

**Best Practice:** Always test with local connections before running against cloud.

In [None]:
# Create local test configuration
local_config = {
    "project": "delta_lake_production",
    "description": "Production Delta Lake pipeline with Bronze/Silver/Gold architecture",
    "engine": "pandas",
    "connections": {
        "bronze": {"type": "local", "base_path": "./pipeline_data/input"},
        "silver": {"type": "local", "base_path": "./pipeline_data/silver"},
        "gold": {"type": "local", "base_path": "./pipeline_data/gold"},
        "local": {"type": "local", "base_path": "./pipeline_data"},
    },
    "story": {"connection": "local", "path": "stories/", "enabled": True},
    "retry": {"max_attempts": 3, "backoff_seconds": 2.0},
    "logging": {"level": "INFO"},
    "pipelines": config_dict["pipelines"],  # Use same pipelines
}

# Save local config
with open("config_local.yaml", "w") as f:
    yaml.dump(local_config, f, default_flow_style=False, sort_keys=False)

print("‚úÖ Local test configuration created!")
print("\nThis uses local file system instead of Azure for testing.")

## Part 7: Run Pipeline - Bronze to Silver (Local)

In [None]:
# Load pipeline from local config
manager = Pipeline.from_yaml("config_local.yaml")

print("Available pipelines:")
for pipeline_name in manager._pipelines.keys():
    print(f"  - {pipeline_name}")

print("\n" + "=" * 70)
print("Running: bronze_to_silver")
print("=" * 70)

# Run Bronze ‚Üí Silver pipeline
result = manager.run("bronze_to_silver")

print("\n‚úÖ Pipeline completed!")
print("\nResults:")
print(f"  Completed: {len(result.completed)} nodes")
print(f"  Failed: {len(result.failed)} nodes")
print(f"  Skipped: {len(result.skipped)} nodes")
print(f"  Duration: {result.duration:.2f}s")

if result.completed:
    print("\nCompleted nodes:")
    for node_name in result.completed:
        print(f"  ‚úÖ {node_name}")

if result.failed:
    print("\nFailed nodes:")
    for node_name in result.failed:
        print(f"  ‚ùå {node_name}")

## Part 8: Verify Delta Table Created

In [None]:
from odibi.engine.pandas_engine import PandasEngine
from odibi.connections.local import LocalConnection

engine = PandasEngine()
silver_conn = LocalConnection(base_path="./pipeline_data/silver")

# Read the Delta table we just created
df_silver = engine.read(connection=silver_conn, format="delta", path="sales/sales.delta")

print("‚úÖ Delta table in Silver layer:")
print(f"\nRows: {len(df_silver):,}")
print(f"Columns: {list(df_silver.columns)}")
print("\nData quality after cleaning:")
print(f"  Nulls in order_id: {df_silver['order_id'].isna().sum()} (should be 0)")
print(f"  Invalid amounts: {(df_silver['amount'] <= 0).sum()} (should be 0)")
print("\nSample:")
print(df_silver.head())

## Part 9: Check Delta Table History

In [None]:
# Get Delta table history
history = engine.get_delta_history(connection=silver_conn, path="sales/sales.delta")

print("Delta Table History:")
print("=" * 70)

for entry in history:
    print(f"\nVersion {entry['version']}:")
    print(f"  Operation: {entry['operation']}")
    print(f"  Timestamp: {entry['timestamp']}")
    if "operationMetrics" in entry and "numOutputRows" in entry["operationMetrics"]:
        print(f"  Rows: {entry['operationMetrics']['numOutputRows']}")

## Part 10: Run Pipeline - Silver to Gold (Aggregation)

In [None]:
print("=" * 70)
print("Running: silver_to_gold")
print("=" * 70)

# Run Silver ‚Üí Gold pipeline
result = manager.run("silver_to_gold")

print("\n‚úÖ Pipeline completed!")
print("\nResults:")
print(f"  Completed: {len(result.completed)} nodes")
print(f"  Failed: {len(result.failed)} nodes")
print(f"  Duration: {result.duration:.2f}s")

if result.completed:
    print("\nCompleted nodes:")
    for node_name in result.completed:
        print(f"  ‚úÖ {node_name}")

if result.failed:
    print("\nFailed nodes:")
    for node_name in result.failed:
        print(f"  ‚ùå {node_name}")

In [None]:
# Read Gold layer aggregates
gold_conn = LocalConnection(base_path="./pipeline_data/gold")

df_gold = engine.read(connection=gold_conn, format="delta", path="sales/daily_summary.delta")

print("‚úÖ Gold layer aggregates:")
print(f"\nDays: {len(df_gold)}")
print("\nDaily Summary:")
print(df_gold.sort_values("date", ascending=False).head(10))

## Part 11: Run Second Load (Test Append Mode)

In [None]:
# Create more sample data
new_data = pd.DataFrame(
    {
        "order_id": range(n_rows + 1, n_rows + 101),
        "customer": [f"Customer_{i % 100}" for i in range(100)],
        "product": np.random.choice(["Laptop", "Phone", "Tablet"], 100),
        "amount": np.random.uniform(100, 1500, 100).round(2),
        "quantity": np.random.randint(1, 5, 100),
        "region": np.random.choice(["North", "South"], 100),
        "order_date": [datetime.now().strftime("%Y-%m-%d") for _ in range(100)],
    }
)

# Append to CSV
new_data.to_csv("pipeline_data/input/raw_sales.csv", mode="a", header=False, index=False)

print("‚úÖ Added 100 more orders to source CSV")

# Run pipeline again (should append to Delta)
result = manager.run("bronze_to_silver")

print("\n‚úÖ Second pipeline run completed!")

# Check Delta table
df_silver_v2 = engine.read(silver_conn, format="delta", path="sales/sales.delta")

print(f"\nDelta table now has {len(df_silver_v2):,} rows")
print(f"Added {len(df_silver_v2) - len(df_silver):,} new rows")

## Part 12: Time Travel - Compare Versions

In [None]:
# Create audit directory
from pathlib import Path

Path("pipeline_data/audit").mkdir(parents=True, exist_ok=True)
print("‚úÖ Audit directory created")

print("=" * 70)
print("Running: audit_delta_versions")
print("=" * 70)

# Run audit pipeline (uses time travel)
result = manager.run("audit_delta_versions")

print("\n‚úÖ Audit pipeline completed!")
print("\nResults:")
print(f"  Completed: {len(result.completed)} nodes")
print(f"  Failed: {len(result.failed)} nodes")
print(f"  Duration: {result.duration:.2f}s")

if result.completed:
    print("\nCompleted nodes:")
    for node_name in result.completed:
        print(f"  ‚úÖ {node_name}")

if result.failed:
    print("\nFailed nodes:")
    for node_name in result.failed:
        print(f"  ‚ùå {node_name}")

# Only read the file if pipeline succeeded
if len(result.failed) == 0 and len(result.completed) > 0:
    # Read audit report
    audit_df = pd.read_csv("pipeline_data/audit/version_comparison.csv")

    print("\nVersion Comparison:")
    print("=" * 70)
    print(audit_df)

    # Calculate differences
    current = audit_df[audit_df["version_type"] == "Current"].iloc[0]
    previous = audit_df[audit_df["version_type"] == "Previous"].iloc[0]

    print("\nChanges:")
    print(f"  Rows added: {current['row_count'] - previous['row_count']:,}")
    print(f"  Amount added: ${current['total_amount'] - previous['total_amount']:,.2f}")
else:
    print("\n‚ùå Pipeline failed - check story for details")
    if result.story_path:
        print(f"Story: {result.story_path}")

## Part 13: VACUUM - Clean Old Files (Production Pattern)

In [None]:
# Production VACUUM pattern
print("VACUUM Maintenance:")
print("=" * 70)

# VACUUM with 7-day retention (production default)
result = engine.vacuum_delta(
    connection=silver_conn,
    path="sales/sales.delta",
    retention_hours=168,  # 7 days
    dry_run=True,  # Preview first
)

print(f"\nDry run: Would delete {result['files_deleted']} files")

# In production, run weekly:
print("\nüí° Production VACUUM schedule:")
print("""\n# Weekly VACUUM job
tables = [
    ('silver', 'sales/sales.delta'),
    ('gold', 'sales/daily_summary.delta')
]

for conn_name, table_path in tables:
    result = engine.vacuum_delta(
        connection=connections[conn_name],
        path=table_path,
        retention_hours=168  # Keep 7 days for time travel
    )
    print(f"{table_path}: cleaned {result['files_deleted']} files")
""")

## Part 14: Deploy to Production (Azure ADLS + Key Vault)

**Now that local testing works, deploy to production:**

In [None]:
print("Production Deployment Checklist:")
print("=" * 70)

checklist = [
    (
        "1. Azure Storage Setup",
        [
            "‚úì Storage account created",
            "‚úì Containers: bronze, silver, gold",
            "‚úì Storage keys copied",
        ],
    ),
    (
        "2. Key Vault Setup",
        [
            "‚úì Key Vault created",
            "‚úì Secrets added: bronze-storage-key, silver-storage-key, gold-storage-key",
            "‚úì Access policy: Get Secret permission for your identity",
        ],
    ),
    (
        "3. Authentication",
        [
            "‚úì Local: 'az login' completed",
            "‚úì Databricks: Managed Identity configured",
            "‚úì DefaultAzureCredential working",
        ],
    ),
    (
        "4. Configuration",
        [
            "‚úì config_production.yaml updated with real values",
            "‚úì All YOUR_* placeholders replaced",
            "‚úì Configuration validated",
        ],
    ),
    (
        "5. Data Upload",
        [
            "‚úì Sample CSV uploaded to bronze/raw/sales/raw_sales.csv",
            "‚úì File accessible via Storage Explorer",
        ],
    ),
]

for step, items in checklist:
    print(f"\n{step}:")
    for item in items:
        print(f"  {item}")

print("\n" + "=" * 70)
print("Once checklist complete, run:")
print("\nmanager = Pipeline.from_yaml('config_production.yaml')")
print("manager.run('bronze_to_silver')")

In [None]:
# Uncomment when ready to run against production Azure

# Load production config
prod_manager = Pipeline.from_yaml("config_production.yaml")

# Run Bronze ‚Üí Silver with Key Vault auth
result = prod_manager.run("bronze_to_silver")

print("‚úÖ Production pipeline completed!")
print("\nDelta table written to:")
print("  abfss://silver@{account}.dfs.core.windows.net/clean/sales/sales.delta")

print("Uncomment code above when ready for production deployment.")

## Part 15: Summary and Best Practices

In [None]:
print("Phase 2B Production Pipeline - Summary")
print("=" * 70)

print("\n‚úÖ What You Learned:")
print("\n1. YAML Configuration:")
print("   - All settings in version-controlled YAML")
print("   - No hardcoded credentials in code")
print("   - Separate configs for local vs production")

print("\n2. Key Vault Authentication (Best Practice):")
print("   - Credentials stored in Azure Key Vault")
print("   - DefaultAzureCredential for access")
print("   - No secrets in code or config files")

print("\n3. Delta Lake as Default:")
print("   - format='delta' for all persistent tables")
print("   - ACID transactions prevent partial writes")
print("   - Time travel for auditing")
print("   - Schema evolution support")

print("\n4. Multi-Layer Architecture:")
print("   - Bronze: Raw data (CSV from sources)")
print("   - Silver: Cleaned, validated Delta tables")
print("   - Gold: Aggregated, business-ready Delta tables")

print("\n5. Testing Strategy:")
print("   - Test locally first (fast, free)")
print("   - Validate configuration before deployment")
print("   - Deploy to production when local tests pass")

print("\n6. Maintenance Patterns:")
print("   - Weekly VACUUM (retention_hours=168)")
print("   - Time travel for auditing changes")
print("   - History tracking for debugging")

print("\nüí° Best Practices Applied:")
print("   ‚úì Key Vault (not direct keys)")
print("   ‚úì YAML config (not Python hardcoding)")
print("   ‚úì Delta Lake (not CSV/Parquet)")
print("   ‚úì Local testing (before cloud)")
print("   ‚úì Append mode (incremental loads)")
print("   ‚úì Version control (config files in git)")
print("   ‚úì Validation (null removal, data quality)")
print("   ‚úì VACUUM (storage cost optimization)")

## Cleanup (Optional)

In [None]:
# Uncomment to clean up local test data

import shutil

shutil.rmtree("./pipeline_data", ignore_errors=True)
Path("config_local.yaml").unlink(missing_ok=True)
Path("config_production.yaml").unlink(missing_ok=True)
print("‚úÖ Test data cleaned up")

print("Uncomment code above to remove test data")