# Batch Scoring Pipeline - Late Delivery Predictions for Open Deliveries

**Goal:** Generate late delivery risk predictions for **open deliveries** (not yet shipped).

**Use Case:** Enable operations team to:
- Identify deliveries at high risk of shipping late
- Prioritize corrective actions for strategic accounts
- Proactively communicate with business teams about potential delays

**Workflow:**
1. Load trained regression model from MLflow
2. Get **open deliveries** using DAX (deliveries without GI Date)
3. Generate predictions: days late + risk score + lateness bucket
4. Save predictions to Lakehouse table: `late_delivery_predictions`
5. Visualize high-risk deliveries
6. Enable Power BI reporting

### üü¶ 1. Import Libraries

In [None]:
# Data manipulation
import pandas as pd
import numpy as np

# Model loading
import mlflow

# Semantic Link - Connect to Power BI
import sempy.fabric as fabric

# Spark for writing to Lakehouse
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

print("‚úÖ All libraries imported")

### üü¶ 2. Configuration

In [None]:
# Semantic model name
DATASET = "DLV Aging Columns & Measures"

# MLflow model details
MODEL_NAME = "POC-LateDelivery-Regression-AutoML"
MODEL_VERSION = "latest"  # or specify version like "1", "2", etc.

# Target variable (must match training notebook)
TARGET_COLUMN = "AGE_REQ_DATE"

# Output table name
OUTPUT_TABLE = "late_delivery_predictions"

# Workspace
ws = fabric.get_workspace_id()

print(f"üìä Dataset: {DATASET}")
print(f"ü§ñ Model: {MODEL_NAME}")
print(f"üìå Version: {MODEL_VERSION}")
print(f"üéØ Target: {TARGET_COLUMN}")
print(f"üíæ Output Table: {OUTPUT_TABLE}")

### üü¶ 3. Load Trained Model from MLflow

In [None]:
# Load model from MLflow registry
model_uri = f"models:/{MODEL_NAME}/{MODEL_VERSION}"

print(f"Loading model from: {model_uri}")
model = mlflow.sklearn.load_model(model_uri)

print(f"‚úÖ Model loaded successfully!")
print(f"   Model type: {type(model).__name__}")

### üü¶ 4. Load Open Deliveries from Semantic Model

Load **only open deliveries** (not yet shipped) that need predictions.

In [None]:
# DAX query: Get OPEN deliveries (no GI Date = not yet shipped)
# These are the deliveries we want to predict late delivery risk for
dax_query = """
EVALUATE
FILTER(
    Aging,
    ISBLANK(Aging[GI Date])
)
"""

# Execute DAX query
print("Loading open deliveries from semantic model...")
df_open = fabric.evaluate_dax(dataset=DATASET, dax_string=dax_query, workspace=ws)

# Clean column names
df_open.columns = [col.split('[')[-1].replace(']', '') if '[' in col else col for col in df_open.columns]

print(f"‚úÖ Loaded {len(df_open):,} open deliveries (awaiting shipment)")
print(f"‚úÖ Columns: {df_open.shape[1]}")

# Show sample
print(f"\nSample open deliveries:")
df_open[['Delivery Number', 'Plant', 'Brand', 'Channel', 'STRATEGIC_ACCOUNT', 
         'Delivery Created On', 'Req. Date Header', 'STATUS']].head(10)

### üü¶ 5. Prepare Features

Extract the same features used during training.

**IMPORTANT:** These features must match exactly what was used in notebook 02.

In [None]:
# Define features - MUST MATCH TRAINING NOTEBOOK
potential_features = [
    # Location & Routing
    'Plant',
    'Shipping Point',
    'EWM_CARRIER_CODE',
    
    # Product
    'Brand',
    'Channel',
    'Product Category',
    'Product Type',
    'Standard Or Custom',
    
    # Customer & Account
    'STRATEGIC_ACCOUNT',
    'Sold To - Key',
    
    # Delivery Attributes
    'Delivery Type',
    'DELIVERY_QTY',
    'DELIVERY_VALUE_USD',
    'Delivery Priority',
    'Shipping Condition',
    
    # Processing Status
    'Credit Status',
    'Distribution Status',
    'STATUS',
]

# Add temporal features if 'Delivery Created On' exists
if 'Delivery Created On' in df_open.columns:
    try:
        df_open['created_dayofweek'] = pd.to_datetime(df_open['Delivery Created On']).dt.dayofweek
        df_open['created_month'] = pd.to_datetime(df_open['Delivery Created On']).dt.month
        potential_features.extend(['created_dayofweek', 'created_month'])
        print("‚úÖ Added temporal features")
    except Exception as e:
        print(f"‚ö†Ô∏è Could not create temporal features: {e}")

# Filter to available features
feature_cols = [f for f in potential_features if f in df_open.columns]

print(f"=== Features for Scoring ===")
print(f"Using {len(feature_cols)} features:")
for i, f in enumerate(feature_cols, 1):
    print(f"  {i}. {f}")

missing_features = [f for f in potential_features if f not in df_open.columns]
if missing_features:
    print(f"\n‚ö†Ô∏è Missing features: {missing_features}")

### üü¶ 6. Feature Engineering

Apply the same transformations used in training.

In [None]:
# Extract features
X = df_open[feature_cols].copy()

# Encode categorical variables
categorical_cols = X.select_dtypes(include=['object', 'string']).columns.tolist()

print(f"\n=== Encoding Categorical Features ===")
for col in categorical_cols:
    X[col] = X[col].fillna('Unknown')
    X[col] = X[col].astype("category").cat.codes
    print(f"  ‚úì Encoded: {col}")

# Handle numeric NaNs
numeric_cols = X.select_dtypes(include=['number']).columns.tolist()
for col in numeric_cols:
    if X[col].isnull().sum() > 0:
        median_val = X[col].median()
        X[col] = X[col].fillna(median_val)
        print(f"  ‚úì Filled NaNs in {col} with median: {median_val}")

print(f"\n‚úÖ Features prepared: {X.shape[1]} columns, {X.shape[0]:,} rows")

### üü¶ 7. Generate Predictions

In [None]:
# Generate predictions
print("üîÆ Generating late delivery predictions...")
predicted_days_late = model.predict(X)

print(f"‚úÖ Generated {len(predicted_days_late):,} predictions")
print(f"\nPredicted AGE_REQ_DATE statistics:")
print(f"  Mean: {predicted_days_late.mean():.2f} days")
print(f"  Median: {np.median(predicted_days_late):.2f} days")
print(f"  Min: {predicted_days_late.min():.2f} days")
print(f"  Max: {predicted_days_late.max():.2f} days")
print(f"  Std Dev: {predicted_days_late.std():.2f} days")

# Calculate late delivery metrics
late_count = (predicted_days_late > 0).sum()
on_time_count = (predicted_days_late <= 0).sum()

print(f"\nüìä Late Delivery Forecast:")
print(f"  Predicted LATE: {late_count:,} ({late_count/len(predicted_days_late)*100:.1f}%)")
print(f"  Predicted ON-TIME/EARLY: {on_time_count:,} ({on_time_count/len(predicted_days_late)*100:.1f}%)")

### üü¶ 8. Create Output DataFrame with Risk Scores and Buckets

Combine predictions with business logic for late delivery risk management.

In [None]:
from datetime import datetime

# Create output dataframe with predictions
output_df = pd.DataFrame()

# Add key identifiers
if 'Delivery Number' in df_open.columns:
    output_df['Delivery_Number'] = df_open['Delivery Number']

# Add business-critical columns
key_columns = ['Plant', 'Brand', 'Channel', 'Product Category', 'STRATEGIC_ACCOUNT',
               'Sales Order', 'Delivery Created On', 'Req. Date Header', 
               'DELIVERY_QTY', 'DELIVERY_VALUE_USD', 'STATUS',
               'EWM_CARRIER_CODE', 'Credit Status', 'Distribution Status']

for col in key_columns:
    if col in df_open.columns:
        col_name = col.replace(' ', '_').replace('.', '')
        output_df[col_name] = df_open[col]

# Add predictions
output_df['predicted_days_late'] = predicted_days_late

# Add classification: is_late (binary)
output_df['is_late'] = (predicted_days_late > 0).astype(int)
output_df['is_late_label'] = output_df['is_late'].map({1: 'LATE', 0: 'ON-TIME'})

# Add lateness buckets (matching use case requirements: 0-2, 3-5, 6-9, 10+)
def categorize_lateness(days):
    if days <= 0:
        return 'On-time/Early'
    elif days <= 2:
        return '0-2 days late'
    elif days <= 5:
        return '3-5 days late'
    elif days <= 9:
        return '6-9 days late'
    else:
        return '10+ days late'

output_df['lateness_bucket'] = predicted_days_late.apply(categorize_lateness)

# Add risk score (normalized 0-1, where 1 = highest risk)
# Risk increases with predicted lateness
max_late = predicted_days_late.max()
min_late = predicted_days_late.min()
output_df['risk_score'] = ((predicted_days_late - min_late) / (max_late - min_late)).clip(0, 1)

# Add priority flag for strategic accounts that are predicted late
output_df['high_priority'] = (
    (output_df['is_late'] == 1) & 
    (output_df.get('STRATEGIC_ACCOUNT', 'No') == 'Yes')
).astype(int)

# Add prediction timestamp
output_df['prediction_date'] = datetime.now()
output_df['model_name'] = MODEL_NAME
output_df['model_version'] = MODEL_VERSION

print(f"‚úÖ Created output dataframe with {len(output_df):,} predictions")
print(f"\nOutput columns: {output_df.shape[1]}")
print(f"\nSample predictions:")
output_df.head(10)

### üü¶ 9. Save Predictions to Lakehouse

Write predictions to a Delta table for Power BI consumption.

**Note:** This table will be available in Power BI via Direct Lake mode.

In [None]:
# Convert to Spark DataFrame
spark_df = spark.createDataFrame(output_df)

# Table name
table_name = OUTPUT_TABLE

print(f"üíæ Writing predictions to Lakehouse table: {table_name}")

# Write to Lakehouse (overwrite mode with schema update)
spark_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_name)

print(f"‚úÖ Predictions saved to table: {table_name}")
print(f"   Rows written: {len(output_df):,}")
print(f"   Columns: {len(output_df.columns)}")
print(f"\nüìä Prediction Summary:")
print(f"   Total open deliveries: {len(output_df):,}")
print(f"   Predicted LATE: {output_df['is_late'].sum():,}")
print(f"   Predicted ON-TIME: {(1-output_df['is_late']).sum():,}")
print(f"   High priority (Strategic + Late): {output_df['high_priority'].sum():,}")

In [None]:
# Verify table was created
print("\n=== Verification ===")
verification_df = spark.sql(f"SELECT * FROM {OUTPUT_TABLE} LIMIT 10")
print(f"Table '{OUTPUT_TABLE}' contains {verification_df.count()} rows (showing first 10):")
verification_df.show()

### üü¶ 10. Visualize High-Risk Deliveries

Identify deliveries requiring immediate attention.

In [None]:
# Analyze high-risk deliveries
print("="*70)
print("HIGH-RISK LATE DELIVERY ANALYSIS")
print("="*70)

# Late deliveries by bucket
bucket_counts = output_df['lateness_bucket'].value_counts().sort_index()
print("\nüì¶ Deliveries by Lateness Bucket:")
print(bucket_counts)

# Late deliveries by strategic account
if 'STRATEGIC_ACCOUNT' in output_df.columns:
    print("\nüéØ Late Deliveries by Account Type:")
    late_df = output_df[output_df['is_late'] == 1]
    strategic_summary = late_df.groupby('STRATEGIC_ACCOUNT').agg({
        'Delivery_Number': 'count',
        'predicted_days_late': 'mean',
        'DELIVERY_VALUE_USD': 'sum'
    }).round(2)
    strategic_summary.columns = ['Count', 'Avg Days Late', 'Total Value ($)']
    print(strategic_summary)

# Top 10 highest risk deliveries
print("\nüö® TOP 10 HIGHEST RISK DELIVERIES:")
high_risk = output_df.nlargest(10, 'risk_score')[
    ['Delivery_Number', 'Brand', 'Channel', 'STRATEGIC_ACCOUNT', 
     'predicted_days_late', 'lateness_bucket', 'risk_score', 'DELIVERY_VALUE_USD']
]
print(high_risk.to_string(index=False))

# Late deliveries by brand/channel
if 'Brand' in output_df.columns and 'Channel' in output_df.columns:
    print("\nüìä Late Deliveries by Brand & Channel:")
    late_by_brand_channel = output_df[output_df['is_late'] == 1].groupby(['Brand', 'Channel']).size().reset_index(name='Late_Count')
    print(late_by_brand_channel.sort_values('Late_Count', ascending=False).head(15).to_string(index=False))

print("="*70)

### üü¶ 11. Visualizations

#### Prediction Distribution

Shows the distribution of predicted aging days.

In [None]:
# Prediction distribution histogram
plt.figure(figsize=(12, 5))

# Subplot 1: Prediction distribution
plt.subplot(1, 2, 1)
plt.hist(output_df['predicted_aging_days'], bins=50, edgecolor='black', alpha=0.7)
plt.axvline(output_df['predicted_aging_days'].mean(), color='red', linestyle='--', 
            label=f"Mean: {output_df['predicted_aging_days'].mean():.1f}")
plt.axvline(output_df['predicted_aging_days'].median(), color='green', linestyle='--', 
            label=f"Median: {output_df['predicted_aging_days'].median():.1f}")
plt.xlabel('Predicted Aging Days')
plt.ylabel('Frequency')
plt.title('Distribution of Predicted Aging Days')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 2: Box plot
plt.subplot(1, 2, 2)
plt.boxplot(output_df['predicted_aging_days'], vert=True)
plt.ylabel('Predicted Aging Days')
plt.title('Predicted Aging Days - Box Plot')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

#### Actual vs Predicted (if actuals available)

Compares predictions to actual values.

In [None]:
if 'actual_aging_days' in output_df.columns:
    valid_df = output_df[output_df['actual_aging_days'].notna()]
    
    if len(valid_df) > 0:
        plt.figure(figsize=(10, 6))
        plt.scatter(valid_df['actual_aging_days'], valid_df['predicted_aging_days'], alpha=0.5)
        
        # Perfect prediction line
        min_val = min(valid_df['actual_aging_days'].min(), valid_df['predicted_aging_days'].min())
        max_val = max(valid_df['actual_aging_days'].max(), valid_df['predicted_aging_days'].max())
        plt.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2, label='Perfect Prediction')
        
        plt.xlabel('Actual Aging Days')
        plt.ylabel('Predicted Aging Days')
        plt.title('Actual vs Predicted Aging Days')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()
    else:
        print("No valid actual values to compare")
else:
    print("Actual values not available for comparison")

### üü¶ 12. Power BI Integration Guide

## ‚úÖ Predictions Complete!

Your aging predictions have been saved to the **`aging_predictions`** table in your Lakehouse.

---

### üìä Next Steps: Connect to Power BI

#### Option 1: Direct Lake (Recommended)

1. Open Power BI Desktop or Power BI Service
2. Create a new report
3. Connect to your Fabric Lakehouse
4. Select the **`aging_predictions`** table
5. The table will auto-refresh when you re-run this notebook

#### Option 2: Import Mode

1. Get Data ‚Üí Lakehouse
2. Select **`aging_predictions`** table
3. Click **Load**

---

### üìà Recommended Power BI Visuals

**Page 1: Executive Dashboard**
- KPI Cards:
  - Total Deliveries
  - Avg Predicted Aging Days
  - Avg Actual Aging Days (if available)
  - MAE
- Line Chart: Predicted vs Actual over time
- Histogram: Distribution of predictions

**Page 2: Plant Analysis**
- Bar Chart: Avg Aging by Plant
- Table: Plant details with predictions
- Scatter Plot: Prediction error by Plant

**Page 3: Brand & Channel**
- Matrix: Brand √ó Channel with avg aging
- Bar Chart: Top brands by aging days
- Pie Chart: Distribution by channel

**Page 4: Detailed Explorer**
- Table with filters:
  - Delivery Number
  - Plant
  - Brand
  - Channel
  - Predicted Aging Days
  - Actual Aging Days
  - Prediction Error

---

### üîç Sample DAX Measures

Use the DAX measures in `powerbi/dax/measures_basic.dax` and `measures_advanced.dax`.

Key measures:
```dax
Total Deliveries = COUNTROWS(aging_predictions)

Avg Predicted Aging = AVERAGE(aging_predictions[predicted_aging_days])

Avg Prediction Error = AVERAGE(aging_predictions[absolute_error])

Predictions Within 2 Days = 
CALCULATE(
    COUNTROWS(aging_predictions),
    aging_predictions[absolute_error] <= 2
)
```

---

### üîÑ Rerunning Predictions

To update predictions with new data:
1. Refresh your Power BI semantic model
2. Rerun this notebook (03_batch_scoring_pipeline.ipynb)
3. Power BI will automatically see the updated predictions

---

**üéâ Your aging prediction pipeline is complete!**