# ThreeService ML — Runnable Dry Run + Lecture Notes

This notebook is designed for **Google NotebookLM / lecture-style teaching**. It **runs the same commands** we used in the repo to demonstrate:

1. **ML inference** in both supported formats
   - Format A: `records` (already-featured rows)
   - Format B: raw Server snapshot (`nodes/requests/shipments/batches`)
2. The **transfer planner** endpoint (separate module; not KMeans/IsolationForest)
3. The most important **edge cases** and what errors you’ll see

---

## Big picture (say this out loud)

Our ML subsystem is *not* a classical supervised model that predicts a labeled outcome. It is an **unsupervised signal generator**:

- **KMeans** assigns each region-time bucket to a **cluster** (similar behavior groups).
- **IsolationForest** assigns an **anomaly score** and an **anomaly flag** (unusual behavior).

The output is useful to *guide decisions*, but **it does not directly plan shipments**.

## 0) Setup

This notebook expects you to run it from the repo with the `ml/` folder present.

We will run the Python modules exactly as the Node gateway does:

- `python -m src.infer --model-dir <runDir>`
- `python -m src.transfer_planner ...`

We’ll also show the intermediate engineered features for the raw snapshot path.

In [None]:
from __future__ import annotations

import json
import subprocess
import sys
from pathlib import Path

REPO_ROOT = Path.cwd()
ML_DIR = REPO_ROOT / "ml"
EXAMPLES_DIR = ML_DIR / "examples"
RUN_DIR = ML_DIR / "artifacts" / "20251101T153949Z"

def require_paths() -> None:
    if not ML_DIR.exists():
        raise RuntimeError(
            f"Expected ML dir at {ML_DIR}. Start this notebook from the repo root."
        )
    if not RUN_DIR.exists():
        raise RuntimeError(f"Expected trained run dir at {RUN_DIR}.")

def run_module(module: str, args: list[str], payload: object | None, cwd: Path) -> dict:
    """Run `python -m <module> <args>` with payload piped to stdin and parse JSON output."""
    if payload is None:
        stdin_text = ""
    elif isinstance(payload, str):
        stdin_text = payload
    else:
        stdin_text = json.dumps(payload)

    proc = subprocess.run(
        [sys.executable, "-m", module, *args],
        input=stdin_text,
        text=True,
        capture_output=True,
        cwd=str(cwd),
    )

    if proc.returncode != 0:
        cmd = f"{sys.executable} -m {module} {' '.join(args)}"
        message = (
            f"Command failed ({proc.returncode})\n"
            f"CMD: {cmd}\n\n"
            f"STDOUT:\n{proc.stdout}\n\n"
            f"STDERR:\n{proc.stderr}\n"
        )
        raise RuntimeError(message)

    return json.loads(proc.stdout)

require_paths()
print("Python:", sys.executable)
print("ML_DIR:", ML_DIR)
print("RUN_DIR:", RUN_DIR)

## 1) What is inside a trained run directory?

A *run directory* is the ML artifact bundle created during training. Inference needs it.

It contains:
- `metadata.json` (most important: `feature_columns`)
- `kmeans_model.joblib`
- `isolation_forest_model.joblib`

In [None]:
metadata_path = RUN_DIR / 'metadata.json'
meta = json.loads(metadata_path.read_text(encoding='utf-8'))

print('feature_frequency:', meta.get('config', {}).get('feature_frequency'))
feature_columns = meta.get('feature_summary', {}).get('feature_columns') or meta.get('feature_columns')
print('feature_columns count:', len(feature_columns) if feature_columns else 0)
print('first 10 feature columns:', (feature_columns or [])[:10])

# This matters: inference will force incoming data into exactly these columns.

## 2) Inference Format B (raw Server snapshot) — the ‘full pipeline’ path

This is what happens when Backend-A sends `nodes/requests/shipments/batches` to the ML service.

Pipeline steps (lecture-friendly):
1. Convert JSON arrays into DataFrames
2. Feature engineer into **region+time buckets** (`state`, `district`, `period_start`)
3. Align to `feature_columns` from `metadata.json`
4. Predict `cluster_id` and `anomaly_score`

We use a small example snapshot file that intentionally includes:
- request `items` as list AND as dict AND as null
- shipment `batchIds` as list AND as single value
- batch missing `freshnessPct` / `shelf_life_hours` to trigger defaults

In [None]:
payload_server = json.loads(
    (EXAMPLES_DIR / "predict_server_snapshot_example.json").read_text(encoding="utf-8")
 )

result_server = run_module(
    module="src.infer",
    args=["--model-dir", str(RUN_DIR)],
    payload=payload_server,
    cwd=ML_DIR,
 )

print("count:", result_server["count"])
print("missing_feature_columns (count):", len(result_server.get("missing_feature_columns", [])))

for row in result_server["results"]:
    print("\n---")
    summary = {
        k: row.get(k)
        for k in ["state", "district", "period_start", "cluster_id", "is_anomaly", "anomaly_score"]
    }
    print(summary)
    for k in ["requested_kg", "incoming_kg", "outgoing_kg", "produced_kg", "avg_travel_time_minutes"]:
        if k in row:
            print(f"{k}: {row[k]}")

### 2.1 (Optional) Show the engineered feature rows *before* modeling

This makes the model’s behavior easy to teach: it’s operating on aggregated numeric columns.

We call `prepare_feature_frame(...)` directly (same code training uses).

In [None]:
import pandas as pd

from src.feature_engineering import prepare_feature_frame

nodes_df = pd.DataFrame(payload_server.get('nodes') or [])
requests_df = pd.DataFrame(payload_server.get('requests') or [])
shipments_df = pd.DataFrame(payload_server.get('shipments') or [])
batches_df = pd.DataFrame(payload_server.get('batches') or [])

features_df, feature_meta = prepare_feature_frame(
    nodes_df=nodes_df,
    requests_df=requests_df,
    shipments_df=shipments_df,
    batches_df=batches_df,
    freq=str(payload_server.get('freq') or 'M'),
    festival_csv_path=None,
    income_csv_path=None,
)

print('feature rows:', len(features_df))
print('key columns:', ['state','district','period_start'])
print('engineered numeric columns (sample):', [c for c in features_df.columns if c not in ['state','district','period_start']][:12])
display(features_df.head(10))

## 3) Inference Format A (`records`) — the ‘already-featured’ path

Here we skip feature engineering and give feature rows directly.

Key teaching point: **the model forces the input into `feature_columns` from metadata**.
So if your record includes only a handful of numeric features, inference adds the rest as `0.0`.

In [None]:
payload_records = json.loads((EXAMPLES_DIR / 'predict_records_example.json').read_text(encoding='utf-8'))
result_records = run_module(
    module='src.infer',
    args=['--model-dir', str(RUN_DIR)],
    payload=payload_records,
    cwd=ML_DIR,
)

print('count:', result_records['count'])
print('missing_feature_columns (count):', len(result_records.get('missing_feature_columns', [])))
print('example missing feature columns (first 8):', result_records.get('missing_feature_columns', [])[:8])

for row in result_records['results']:
    print({k: row[k] for k in ['state','district','period_start','cluster_id','is_anomaly','anomaly_score']})

## 4) Transfer planner (separate module) — runnable example

This is a different Python module: `src.transfer_planner`.

It does **inventory balancing**, not clustering/anomaly detection. It consumes:
- `nodes[]` with capacities and coordinates
- `batches[]` with current inventory positions

And returns suggestions in two categories:
- `warehouse_to_warehouse`
- `farm_to_warehouse`

In [None]:
payload_transfer = json.loads((EXAMPLES_DIR / 'transfer_planner_example.json').read_text(encoding='utf-8'))
plan = run_module(
    module='src.transfer_planner',
    args=['--mode','all','--max-pairs','5','--min-transfer-kg','200','--overstock-ratio','0.8','--understock-ratio','0.4','--target-ratio','0.6'],
    payload=payload_transfer,
    cwd=ML_DIR,
)

print('counts:', plan['counts'])

def summarize_suggestion(s):
    return {
        'type': s.get('type'),
        'suggested_quantity_kg': s.get('suggested_quantity_kg'),
        'distance_km': s.get('distance_km'),
        'source': (s.get('source') or {}).get('mongoId'),
        'target': (s.get('target') or {}).get('mongoId'),
    }

print('
Warehouse→Warehouse:')
for s in plan.get('warehouse_to_warehouse', []):
    print(summarize_suggestion(s))

print('
Farm→Warehouse:')
for s in plan.get('farm_to_warehouse', []):
    print(summarize_suggestion(s))

## 5) ‘All the cases’: Edge cases you should be able to explain

In a lecture, it’s powerful to show that the system is defensive and predictable.

Below we run a few **intentional failures** and explain what the error means.

### Case 5.1 — Empty payload
Expected: `ValueError("No inference payload supplied.")`

In [None]:
try:
    _ = run_module('src.infer', ['--model-dir', str(RUN_DIR)], payload='', cwd=ML_DIR)
except Exception as exc:
    print(type(exc).__name__)
    print(str(exc)[:600])

### Case 5.2 — Payload is not `records` and not a Server snapshot
Expected: `ValueError("Payload must be an object with 'records' or raw Server data...")`

In [None]:
try:
    bad_payload = {'hello': 'world'}
    _ = run_module('src.infer', ['--model-dir', str(RUN_DIR)], payload=bad_payload, cwd=ML_DIR)
except Exception as exc:
    print(type(exc).__name__)
    print(str(exc)[:600])

### Case 5.3 — Server snapshot provided but feature engineering yields 0 rows

This commonly happens if there are no requests/shipments/batches (and no festival/income blocks).
Expected: `ValueError("No feature rows could be generated from provided Server data.")`

In [None]:
try:
    emptyish_snapshot = {
        'freq': 'M',
        'nodes': payload_server.get('nodes') or [],
        'requests': [],
        'shipments': [],
        'batches': [],
    }
    _ = run_module('src.infer', ['--model-dir', str(RUN_DIR)], payload=emptyish_snapshot, cwd=ML_DIR)
except Exception as exc:
    print(type(exc).__name__)
    print(str(exc)[:700])

### Case 5.4 — Training guardrails (demonstrated locally, without MongoDB)

Training requires at least **2 aggregated rows** and at least **1 numeric feature column**.
We can show these validations by calling the training helpers directly.

In [None]:
import pandas as pd
import numpy as np

from src.models import build_feature_matrix, train_unsupervised_models
from src.config import load_config

config = load_config()

# (a) No numeric columns
try:
    no_numeric = pd.DataFrame([{'state': 'X', 'district': 'Y', 'period_start': '2026-01-01T00:00:00'}])
    _matrix, _cols = build_feature_matrix(no_numeric)
except Exception as exc:
    print('No numeric columns ->', type(exc).__name__, str(exc))

# (b) Only 1 sample row for unsupervised models
try:
    one_row = pd.DataFrame([{'f1': 1.0, 'f2': 2.0}])
    _ = train_unsupervised_models(one_row, config)
except Exception as exc:
    print('Need >= 2 samples ->', type(exc).__name__, str(exc))

## 6) Practical lecture summary (copy/paste)

- The ML model trains on **aggregated region+time rows**, not raw shipments/requests.
- It outputs **clusters** (KMeans) and **anomaly signals** (IsolationForest).
- Inference supports 2 input formats:
  - feature rows (`records`)
  - raw snapshot (`nodes/requests/shipments/batches`)
- Inference is robust: missing expected numeric features are filled with **0** using the run’s `feature_columns` schema.
- Transfer planner is a **separate** module that proposes inventory balancing transfers using capacities + distances.

If you want the cleanest demos, retrain the run in the same sklearn version you’ll use for inference (avoids pickle version warnings).