# B1 Orders Pipeline - Bronze to Gold

**Purpose**: Transform B1 sales orders into a standardized gold layer fact table for demand planning and analytics.

**Source**: `ent_dse_dev.1_b1_bronze.sales_order` + `sales_order_rows`  
**Target**: `resources/gold/orders_fact`

**Architecture**: Follows Unix-like tool composition with medallion architecture patterns.

## 1. Setup & Initialization

In [None]:
# Core tools
from tools.tool__workstation import get_spark
from tools.tool__dag_chainer import DagChain
from tools.tool__table_polisher import polish
from tool__table_indexer import TableIndexer

# PySpark functions for DataFrame operations
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Spark session and workflow chain
spark = get_spark("local_delta")
chain__gold_orders = DagChain()

print("✅ Spark session initialized")
print("✅ DAG chain ready for orders pipeline")
print("✅ PySpark functions imported")

## 2. Base Data Import & Join

Join sales order headers with line items, extracting key business fields.

In [None]:
# Load source tables and join with minimal filtering - preserve all columns
headers = spark.table("ent_dse_dev.`1_b1_bronze`.sales_order")
rows = spark.table("ent_dse_dev.`1_b1_bronze`.sales_order_rows")

# Join and apply only essential filters - keep ALL columns from both tables
chain__gold_orders.dag__import_orders_base = headers.join(
    rows, 
    ["ZSOURCE", "DocEntry"], 
    "inner"
).filter(
    (F.col("CANCELED") != "Y") &  # Exclude canceled orders
    (F.col("ItemCode").isNotNull())  # Exclude non-item lines
)

print(f"✅ Base orders data loaded with ALL source columns")
print(f"   Total columns: {len(chain__gold_orders.dag__import_orders_base.columns)}")
print(f"   Total records: {chain__gold_orders.dag__import_orders_base.count():,}")
print(f"   Sample columns: {chain__gold_orders.dag__import_orders_base.columns[:10]}")

# Show structure
chain__gold_orders.look(0)

## 3. Data Standardization

Apply table polishing for consistent column naming and formatting.

In [None]:
# Apply standardization
chain__gold_orders.dag__polish_orders = polish(
    chain__gold_orders.dag__import_orders_base
)

print("✅ Table polishing applied")
print("Column standardization:")
for col in chain__gold_orders.dag__polish_orders.columns[:10]:
    print(f"  - {col}")
print("  ... (and more)")

## 4. Entity Indexing

Create dimensional indices for customers, items, and warehouses.

In [None]:
# Initialize indexer with polished data
indexer = TableIndexer(chain__gold_orders.dag__polish_orders)

# Create entity indices
print("Creating entity indices...")

customer_result = indexer.customer("zsource", "customer_code")
print(f"✅ Customer indexing: {customer_result['mapping'].count():,} unique customers")

item_result = indexer.item("zsource", "item_code")
print(f"✅ Item indexing: {item_result['mapping'].count():,} unique items")

warehouse_result = indexer.warehouse("zsource", "warehouse_code")
print(f"✅ Warehouse indexing: {warehouse_result['mapping'].count():,} unique warehouses")

# Get the fully indexed DataFrame
chain__gold_orders.dag__indexed_orders = indexer.filtered_indexed_df
print(f"✅ Indexed orders: {chain__gold_orders.dag__indexed_orders.count():,} records")

## 5. Business Logic Enrichment

Add calculated fields and business intelligence metrics.

In [None]:
# Add business enrichments using simple withColumns pattern (case study style)
chain__gold_orders.dag__enriched = chain__gold_orders.pick(-1).withColumns({
    "calc_delivery_fill_rate": F.when(F.col("quantity") > 0, F.col("delivrdqty") / F.col("quantity")).otherwise(0),
    "calc_open_fill_rate": F.when(F.col("quantity") > 0, F.col("openqty") / F.col("quantity")).otherwise(0),
    "calc_order_age_days": F.datediff(F.current_date(), F.to_date(F.col("docdate"), "yyyy-MM-dd")),
    "calc_fiscal_year": F.year(F.to_date(F.col("docdate"), "yyyy-MM-dd")),
    "calc_fiscal_quarter": F.quarter(F.to_date(F.col("docdate"), "yyyy-MM-dd")),
    "calc_delivery_performance": F.when(F.col("actdeldate").isNull(), "Pending")
                                   .when(F.to_date(F.col("actdeldate"), "yyyy-MM-dd") > F.to_date(F.col("docduedate"), "yyyy-MM-dd"), "Late")
                                   .otherwise("On Time"),
    "calc_delivered_value": F.col("delivrdqty") * F.col("price"),
    "calc_open_value": F.col("openqty") * F.col("price")
})

print("✅ Business enrichments added using simple withColumns pattern")
chain__gold_orders.look(-1)

## 6. Data Quality Validation

Add data quality flags and business rule validation.

In [None]:
# Simple quality filtering using basic checks (case study style)
chain__gold_orders.dag__quality_filtered = chain__gold_orders.pick(-1).filter(
    (F.col("cardcode").isNotNull()) &
    (F.col("itemcode").isNotNull()) &
    (F.col("quantity") > 0) &
    (F.col("price") >= 0) &
    (F.col("docdate").isNotNull())
)

print("✅ Quality filtering applied using simple filter operations")
print(f"Records after quality filter: {chain__gold_orders.dag__quality_filtered.count():,}")
chain__gold_orders.trace(shape=True)

## 7. Final Gold Layer Structure

Create the final fact table with clean, organized columns.

In [None]:
# Add final metadata and write to gold layer (case study style)
chain__gold_orders.dag__final_gold = chain__gold_orders.pick(-1).withColumns({
    "meta_order_line_key": F.concat_ws("-", F.col("zsource"), F.col("docentry"), F.col("linenum")),
    "meta_etl_processed_at": F.current_timestamp(),
    "meta_etl_source": F.lit("b1_orders_pipeline_v1")
})

print(f"✅ Final gold orders: {chain__gold_orders.dag__final_gold.count():,} records")
print(f"   Total columns: {len(chain__gold_orders.dag__final_gold.columns)}")

# Show final workflow
chain__gold_orders.trace(shape=True)

## 8. Persistence & Quality Reporting

Save to gold layer and provide comprehensive inspection capabilities.

In [None]:
# Write to gold layer (case study style)
(chain__gold_orders.pick(-1)
    .write.format("delta")
    .mode("overwrite")
    .save("resources/gold/orders_fact"))

print("✅ Data written to resources/gold/orders_fact")

# Save entity mappings
for kind, mapping in {
    "customer": customer_result['mapping'],
    "item": item_result['mapping'], 
    "warehouse": warehouse_result['mapping']
}.items():
    (mapping.write.format("delta")
        .mode("overwrite")
        .save(f"resources/gold/map__{kind}s"))

print("✅ Entity mappings saved")
print("🎉 B1 Orders Pipeline Complete!")

## 9. Pipeline Inspection & Analytics

Comprehensive data quality and business intelligence reporting.

In [None]:
# Pipeline inspection using built-in DagChain methods
print("📋 B1 Orders Pipeline Summary")
print("=" * 40)

# Use built-in trace method
chain__gold_orders.trace(shape=True)

# Simple summary
print(f"\\n✅ Pipeline complete:")
print(f"   - {customer_result['mapping'].count():,} customers indexed")
print(f"   - {item_result['mapping'].count():,} items indexed") 
print(f"   - {warehouse_result['mapping'].count():,} warehouses indexed")

# Use built-in look method for final check
print("\\nFinal data sample:")
chain__gold_orders.look(-1)

## 10. Quick Access Functions

Utility functions for easy data access and exploration.

In [None]:
# Quick data access (simple, no custom functions)
def load_orders():
    return spark.read.format("delta").load("resources/gold/orders_fact")

print("✅ Simple utility function available:")
print("  - load_orders() → Access gold orders fact table")
print("\\nUse DagChain built-in methods for inspection:")
print("  - chain__gold_orders.trace() → See workflow steps") 
print("  - chain__gold_orders.look(n) → View DataFrame n")
print("  - chain__gold_orders.pick(n) → Get DataFrame n")
print("\\n🎉 Clean, tool-focused B1 Orders Pipeline!")