In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Generate All Bronze / Silver / Gold Notebooks (Workspace)

# COMMAND ----------

import requests
import json
from datetime import datetime

# ----------------------------------------------------------------------
# 1. Configuration
# ----------------------------------------------------------------------
# Base workspace path (where notebooks will be created)
BASE = "/Users/dhruvil@uciny.com/ShopFast/Layers"
BRONZE = f"{BASE}/Bronze"
SILVER = f"{BASE}/Silver"
GOLD = f"{BASE}/Gold"

# Get Databricks workspace URL and token
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = ctx.apiUrl().get()
token = ctx.apiToken().get()

# ----------------------------------------------------------------------
# 2. Notebook template
# ----------------------------------------------------------------------
TEMPLATE = """# Databricks notebook source
# MAGIC %md
# MAGIC # {title}
# MAGIC
# MAGIC **Layer**: {layer}  
# MAGIC **Target table(s)**: `{tables}`  
# MAGIC **Description**: {desc}  
# MAGIC **Generated**: {ts}

# COMMAND ----------

# MAGIC %md
# MAGIC ## 1. Setup & Imports

# COMMAND ----------

from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

# COMMAND ----------

# MAGIC %md
# MAGIC ## 2. Read Source Data

# COMMAND ----------

# TODO: Implement source data reading
# Example:
# df = spark.read.format("...").load("...")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 3. Data Quality & Validation

# COMMAND ----------

# TODO: Add data quality checks
# Example: Check for nulls, duplicates, schema validation

# COMMAND ----------

# MAGIC %md
# MAGIC ## 4. Write to Delta Table

# COMMAND ----------

# TODO: Write to Delta table
# Example:
# df.write \
#   .format("delta") \
#   .mode("append") \
#   .option("mergeSchema", "true") \
#   .saveAsTable("{tables}")

# COMMAND ----------

# MAGIC %md
# MAGIC ## 5. Logging & Monitoring

# COMMAND ----------

# TODO: Add logging and alerts
# Record counts, execution time, data quality metrics
"""

# ----------------------------------------------------------------------
# 3. Notebook definitions
# ----------------------------------------------------------------------
NOTEBOOKS = {
    "Bronze": [
        ("b01_bronze_web_orders_ingestion", "Ingest Web Orders (PostgreSQL CDC)", "bronze_web_orders", "CDC from PostgreSQL using Debezium/Fivetran"),
        ("b02_bronze_web_order_items_ingestion", "Ingest Web Order Items", "bronze_web_order_items", "Line items from web orders"),
        ("b03_bronze_web_inventory_ingestion", "Ingest Web Inventory Snapshot", "bronze_web_inventory", "Real-time inventory view from website DB"),
        ("b04_bronze_web_products_ingestion", "Ingest Product Catalog", "bronze_web_products", "Master product data with pricing & suppliers"),
        ("b05_bronze_app_orders_ingestion", "Ingest Mobile App Orders (MongoDB)", "bronze_app_orders", "Change streams from MongoDB Atlas"),
        ("b06_bronze_app_order_items_ingestion", "Ingest Mobile Order Items", "bronze_app_order_items", "Flattened array items"),
        ("b07_bronze_app_cart_events_ingestion", "Ingest Cart Events", "bronze_app_cart_events", "Add/remove events for inventory holds"),
        ("b08_bronze_app_inventory_sync_ingestion", "Ingest App Inventory Cache", "bronze_app_inventory_sync", "30-min sync cycle"),
        ("b09_bronze_wh_east_inventory_ingestion", "Ingest East Warehouse CSV", "bronze_wh_east_inventory", "Daily CSV via AutoLoader"),
        ("b10_bronze_wh_west_inventory_ingestion", "Ingest West Warehouse CSV", "bronze_wh_west_inventory", "Pipe-delimited, schema-on-read"),
        ("b11_bronze_wh_central_inventory_ingestion", "Ingest Central Warehouse CSV", "bronze_wh_central_inventory", "Detailed export with damaged & in-transit"),
        ("b12_bronze_pos_manhattan_inventory_ingestion", "Ingest Manhattan POS Inventory (API)", "bronze_pos_manhattan_inventory", "REST polling every 5 min"),
        ("b13_bronze_pos_la_transactions_ingestion", "Ingest LA POS Transactions (Kafka)", "bronze_pos_la_transactions", "Real-time Kafka stream"),
        ("b14_bronze_pos_la_transaction_items_ingestion", "Ingest LA Transaction Items", "bronze_pos_la_transaction_items", "Line items with inventory impact"),
        ("b15_bronze_pos_la_inventory_adjustments_ingestion", "Ingest LA Inventory Adjustments", "bronze_pos_la_inventory_adjustments", "Damage/theft/return events"),
    ],
    "Silver": [
        ("s01_silver_orders_unification", "Unify Orders (Web + Mobile)", "silver_orders", "Harmonise schema, dedupe, SCD prep"),
        ("s02_silver_order_items_unification", "Unify Order Items", "silver_order_items", "Standard fields across channels"),
        ("s03_silver_inventory_unified_reconciliation", "Reconcile Inventory Across Sources", "silver_inventory_unified", "Merge 5 sources, calculate ATP"),
        ("s04_silver_inventory_discrepancies_detection", "Detect Inventory Discrepancies", "silver_inventory_discrepancies", "Negative qty, reserved > physical, etc."),
        ("s05_silver_products_master_harmonization", "Harmonise Product Master", "silver_products", "SCD Type-2 prep, supplier mapping"),
        ("s06_silver_transactions_unification", "Unify Transactions (All Channels)", "silver_transactions", "Web, app, POS sales in one table"),
        ("s07_silver_transaction_items_unification", "Unify Transaction Line Items", "silver_transaction_items", "With inventory impact"),
        ("s08_silver_inventory_movements_consolidation", "Consolidate Inventory Movements", "silver_inventory_movements", "Sales, returns, transfers, adjustments"),
        ("s09_silver_data_quality_checks", "Run Data-Quality Suite", "silver_data_quality_checks", "Great Expectations + custom rules"),
        ("s10_silver_late_arriving_data_handling", "Handle Late-Arriving Data", "silver_late_arriving_data", "Re-process delayed warehouse CSVs"),
    ],
    "Gold": [
        ("g01_dim_product_scd_type2", "Build Dim Product (SCD Type 2)", "dim_product", "Track name, category, supplier changes"),
        ("g02_dim_product_price_history_scd", "Build Price History (SCD Type 2)", "dim_product_price_history", "List price, cost, promotions"),
        ("g03_dim_location_master", "Build Dim Location", "dim_location", "Warehouses, stores, online"),
        ("g04_dim_date_calendar", "Build Date Dimension", "dim_date", "Holidays, fiscal calendar"),
        ("g05_dim_customer_scd_type2", "Build Dim Customer (SCD Type 2)", "dim_customer", "Segment, tier, LTV changes"),
        ("g06_fact_inventory_snapshot_daily", "Build Fact Inventory Snapshot", "fact_inventory_snapshot", "Point-in-time ATP, value, coverage"),
        ("g07_fact_sales_transactions_lineitem", "Build Fact Sales Transactions", "fact_sales_transactions", "Line-item grain, margin, channel"),
        ("g08_fact_sales_velocity_rolling", "Build Sales Velocity & Forecast", "fact_sales_velocity", "7/30-day avg, stockout prediction"),
        ("g09_fact_stockout_events_impact", "Build Fact Stockout Events", "fact_stockout_events", "Revenue loss, root cause, resolution"),
        ("g10_fact_inventory_movements_audit", "Build Fact Inventory Movements", "fact_inventory_movements", "Full audit trail"),
        ("g11_agg_daily_inventory_summary", "Build Daily Inventory Summary", "agg_daily_inventory_summary", "Pre-aggregated for dashboards"),
        ("g12_agg_product_performance_monthly", "Build Monthly Product Performance", "agg_product_performance_monthly", "Revenue rank, fill-rate, stockouts"),
        ("g13_gold_inventory_alerts_realtime", "Generate Real-Time Alerts", "gold_inventory_alerts", "Low-stock, stockout, demand-spike"),
        ("g14_gold_business_metrics_kpis", "Update Business KPIs", "gold_business_metrics", "Cancellation rate, sync latency, recovered revenue"),
    ]
}

# ----------------------------------------------------------------------
# 4. Helper functions
# ----------------------------------------------------------------------

def create_directory(path):
    """Create a directory in the workspace"""
    headers = {"Authorization": f"Bearer {token}"}
    data = {"path": path}
    response = requests.post(
        f"{workspace_url}/api/2.0/workspace/mkdirs",
        headers=headers,
        json=data
    )
    if response.status_code == 200:
        print(f"✓ Directory created/verified: {path}")
    else:
        print(f"✗ Failed to create directory {path}: {response.text}")
    return response.status_code == 200

def create_notebook(path, content):
    """Create a notebook in the workspace"""
    import base64
    
    headers = {"Authorization": f"Bearer {token}"}
    
    # Encode content to base64
    content_bytes = content.encode('utf-8')
    content_base64 = base64.b64encode(content_bytes).decode('utf-8')
    
    # Import the notebook (this creates it)
    data = {
        "path": path,
        "format": "SOURCE",
        "language": "PYTHON",
        "content": content_base64,
        "overwrite": True
    }
    
    response = requests.post(
        f"{workspace_url}/api/2.0/workspace/import",
        headers=headers,
        json=data
    )
    
    if response.status_code == 200:
        return True
    else:
        print(f"✗ Failed to create {path}: {response.text}")
        return False

# ----------------------------------------------------------------------
# 5. Create all notebooks
# ----------------------------------------------------------------------

ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
total_created = 0
total_failed = 0

print("=" * 70)
print("GENERATING BRONZE/SILVER/GOLD NOTEBOOKS")
print("=" * 70)

# Create base directories
for folder in [BASE, BRONZE, SILVER, GOLD]:
    create_directory(folder)

print("\n" + "=" * 70)

# Create notebooks
for layer, items in NOTEBOOKS.items():
    folder = BRONZE if layer == "Bronze" else SILVER if layer == "Silver" else GOLD
    print(f"\n>>> Creating {len(items)} {layer} notebooks in {folder}")
    print("-" * 70)
    
    for fname, title, tables, desc in items:
        notebook_path = f"{folder}/{fname}"
        content = TEMPLATE.format(
            title=title,
            layer=layer,
            tables=tables,
            desc=desc,
            ts=ts
        )
        
        if create_notebook(notebook_path, content):
            print(f"✓ Created: {notebook_path}")
            total_created += 1
        else:
            print(f"✗ Failed: {notebook_path}")
            total_failed += 1

# Summary
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"✓ Successfully created: {total_created} notebooks")
if total_failed > 0:
    print(f"✗ Failed: {total_failed} notebooks")
print(f"\nNotebooks location: {BASE}")
print("=" * 70)