# Supply Chain Pipeline Orchestrator (v2)

**Run the entire v2 pipeline with a single click.** This notebook executes all
ingestion, transformation, and forecasting notebooks in the correct order using
`dbutils.notebook.run()`.

### Pipeline Stages
| Stage | Notebooks | Execution |
|-------|-----------|-----------|
| **Setup** | `00_setup_catalog_v2` | Sequential (must run first) |
| **Ingestion** | 01–04, 06–10, 12–13 | Parallel (independent tables) |
| **Transformation** | `01_unified_demand_signals_v2` → `02_dod_metrics_inputs_v2` | Sequential |
| **Forecasting** | Prophet, ARIMA, Random Forest | Parallel |

### Widgets
- **run_stages**: Comma-separated stages to run (`setup,ingestion,transformation,forecasting`)
- **parallel_ingestion**: Run ingestion notebooks in parallel (`true`/`false`)
- **timeout_seconds**: Timeout per notebook in seconds
- **continue_on_error**: Continue pipeline if a notebook fails (`true`/`false`)
- **sam_api_key**: SAM.gov API key (passed to `04_sam_entity_ingestion_v2`)


## Configuration


In [None]:
dbutils.widgets.text("run_stages", "setup,ingestion,transformation,forecasting", "Stages to run (comma-separated)")
dbutils.widgets.dropdown("parallel_ingestion", "true", ["true", "false"], "Parallel ingestion?")
dbutils.widgets.text("timeout_seconds", "3600", "Timeout per notebook (seconds)")
dbutils.widgets.dropdown("continue_on_error", "false", ["true", "false"], "Continue on error?")
dbutils.widgets.text("sam_api_key", "", "SAM.gov API key (optional override)")


In [None]:
import time
import traceback
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed

# Parse widgets
RUN_STAGES = [s.strip().lower() for s in dbutils.widgets.get("run_stages").split(",")]
PARALLEL_INGESTION = dbutils.widgets.get("parallel_ingestion").lower() == "true"
TIMEOUT = int(dbutils.widgets.get("timeout_seconds"))
CONTINUE_ON_ERROR = dbutils.widgets.get("continue_on_error").lower() == "true"
SAM_API_KEY = dbutils.widgets.get("sam_api_key").strip()

print(f"Pipeline started at {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"  Stages       : {RUN_STAGES}")
print(f"  Parallel ingest: {PARALLEL_INGESTION}")
print(f"  Timeout/nb   : {TIMEOUT}s")
print(f"  Continue err  : {CONTINUE_ON_ERROR}")


## Pipeline Definition


In [None]:
# ── Notebook paths (relative to this notebook's location) ────────────────────
SETUP_NOTEBOOKS = [
    "ingestion/00_setup_catalog_v2",
]

INGESTION_NOTEBOOKS = [
    "ingestion/01_usaspending_ingestion_v2",
    "ingestion/02_fpds_ingestion_v2",
    "ingestion/03_subaward_ingestion_v2",
    "ingestion/04_sam_entity_ingestion_v2",
    "ingestion/06_tariff_trade_ingestion_v2",
    "ingestion/07_commodity_ingestion_v2",
    "ingestion/08_weather_ingestion_v2",
    "ingestion/09_worldbank_wdi_ingestion_v2",
    "ingestion/10_worldbank_wgi_ingestion_v2",
    "ingestion/12_nyfed_gscpi_ingestion_v2",
    "ingestion/13_wto_trade_barometer_ingestion_v2",
]

TRANSFORMATION_NOTEBOOKS = [
    "transformation/01_unified_demand_signals_v2",
    "transformation/02_dod_metrics_inputs_v2",
]

FORECASTING_NOTEBOOKS = [
    "forecasting/01_prophet_forecasting_v2",
    "forecasting/02_arima_forecasting_v2",
    "forecasting/03_random_forest_forecasting_v2",
]


## Execution Helpers


In [None]:
class NotebookResult:
    """Tracks the result of a single notebook run."""
    def __init__(self, path: str, status: str = "pending", duration: float = 0.0, error: str = ""):
        self.path = path
        self.status = status       # pending | running | success | failed | skipped
        self.duration = duration   # seconds
        self.error = error

    def __repr__(self):
        icon = {"success": "✅", "failed": "❌", "skipped": "⏭️", "running": "🔄"}.get(self.status, "⏳")
        dur = f" ({self.duration:.1f}s)" if self.duration else ""
        err = f" — {self.error[:120]}" if self.error else ""
        return f"{icon} {self.path}: {self.status}{dur}{err}"


def run_notebook(path: str, extra_params: dict = None) -> NotebookResult:
    """Run a single notebook and return its result."""
    params = extra_params or {}
    result = NotebookResult(path, status="running")
    start = time.time()
    try:
        dbutils.notebook.run(f"./{path}", TIMEOUT, params)
        result.status = "success"
    except Exception as e:
        result.status = "failed"
        result.error = str(e)
        if not CONTINUE_ON_ERROR:
            raise
    finally:
        result.duration = time.time() - start
    return result


def run_sequential(notebooks: list, stage_name: str, extra_params: dict = None) -> list:
    """Run notebooks one after another. Returns list of NotebookResult."""
    results = []
    print(f"\n{'='*60}")
    print(f"  Stage: {stage_name} (sequential, {len(notebooks)} notebooks)")
    print(f"{'='*60}")
    for nb in notebooks:
        print(f"  ▶ Running {nb} ...")
        r = run_notebook(nb, extra_params)
        results.append(r)
        print(f"    {r}")
        if r.status == "failed" and not CONTINUE_ON_ERROR:
            print(f"  ⛔ Stopping pipeline — {nb} failed.")
            break
    return results


def run_parallel(notebooks: list, stage_name: str, extra_params: dict = None) -> list:
    """Run notebooks concurrently using ThreadPoolExecutor. Returns list of NotebookResult."""
    results = []
    print(f"\n{'='*60}")
    print(f"  Stage: {stage_name} (parallel, {len(notebooks)} notebooks)")
    print(f"{'='*60}")
    with ThreadPoolExecutor(max_workers=len(notebooks)) as executor:
        futures = {executor.submit(run_notebook, nb, extra_params): nb for nb in notebooks}
        for future in as_completed(futures):
            nb = futures[future]
            try:
                r = future.result()
            except Exception as e:
                r = NotebookResult(nb, status="failed", error=str(e))
            results.append(r)
            print(f"    {r}")
    return results


## Execute Pipeline


In [None]:
pipeline_start = time.time()
all_results = []

# ── Stage 1: Setup ───────────────────────────────────────────────────────────
if "setup" in RUN_STAGES:
    all_results += run_sequential(SETUP_NOTEBOOKS, "Setup")
    if any(r.status == "failed" for r in all_results) and not CONTINUE_ON_ERROR:
        dbutils.notebook.exit("FAILED at Setup stage")
else:
    print("\n⏭️  Skipping Setup stage")


In [None]:
# ── Stage 2: Ingestion ───────────────────────────────────────────────────────
if "ingestion" in RUN_STAGES:
    # Build extra params for SAM notebook
    ingestion_params = {}
    if SAM_API_KEY:
        ingestion_params["sam_api_key"] = SAM_API_KEY

    if PARALLEL_INGESTION:
        # All ingestion notebooks write to independent tables so they can run in parallel
        all_results += run_parallel(INGESTION_NOTEBOOKS, "Ingestion", ingestion_params)
    else:
        all_results += run_sequential(INGESTION_NOTEBOOKS, "Ingestion", ingestion_params)

    failed_ingestion = [r for r in all_results if r.status == "failed"]
    if failed_ingestion and not CONTINUE_ON_ERROR:
        dbutils.notebook.exit(f"FAILED at Ingestion stage: {[r.path for r in failed_ingestion]}")
else:
    print("\n⏭️  Skipping Ingestion stage")


In [None]:
# ── Stage 3: Transformation ──────────────────────────────────────────────────
if "transformation" in RUN_STAGES:
    all_results += run_sequential(TRANSFORMATION_NOTEBOOKS, "Transformation")
    if any(r.status == "failed" for r in all_results if r.path.startswith("transformation/")) and not CONTINUE_ON_ERROR:
        dbutils.notebook.exit("FAILED at Transformation stage")
else:
    print("\n⏭️  Skipping Transformation stage")


In [None]:
# ── Stage 4: Forecasting ─────────────────────────────────────────────────────
if "forecasting" in RUN_STAGES:
    # Prophet, ARIMA, and RF are independent — run in parallel
    all_results += run_parallel(FORECASTING_NOTEBOOKS, "Forecasting")
else:
    print("\n⏭️  Skipping Forecasting stage")


## Pipeline Summary


In [None]:
pipeline_duration = time.time() - pipeline_start

print(f"\n{'='*70}")
print(f"  PIPELINE COMPLETE — {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"  Total duration: {pipeline_duration:.1f}s ({pipeline_duration/60:.1f} min)")
print(f"{'='*70}\n")

succeeded = [r for r in all_results if r.status == "success"]
failed    = [r for r in all_results if r.status == "failed"]
skipped   = [r for r in all_results if r.status == "skipped"]

print(f"  ✅ Succeeded: {len(succeeded)}")
print(f"  ❌ Failed:    {len(failed)}")
print(f"  ⏭️  Skipped:   {len(skipped)}")
print()

for r in all_results:
    print(f"  {r}")


In [None]:
# Build summary DataFrame for display
import pandas as pd

summary_data = [{
    "Notebook": r.path,
    "Status": r.status,
    "Duration (s)": round(r.duration, 1),
    "Error": r.error[:200] if r.error else ""
} for r in all_results]

summary_df = spark.createDataFrame(pd.DataFrame(summary_data))
display(summary_df)


In [None]:
# Exit with status
if failed:
    msg = f"FAILED: {len(failed)} notebook(s) failed — {[r.path for r in failed]}"
    print(f"\n⛔ {msg}")
    dbutils.notebook.exit(msg)
else:
    msg = f"SUCCESS: All {len(succeeded)} notebooks completed in {pipeline_duration:.1f}s"
    print(f"\n✅ {msg}")
    dbutils.notebook.exit(msg)


## (Optional) Deploy as Databricks Workflow Job

To schedule this pipeline as an automated Databricks Workflow, run the cell
below. It reads the job definition from `jobs/supply_chain_full_pipeline_v2.json`
and creates (or resets) the Workflow via the Jobs API.

**Prerequisites:**
- Update `notebook_path` values in the JSON to match your workspace location
- The cluster running this cell needs permission to call the Jobs API


In [None]:
# import requests, json, os
#
# # ── Uncomment and run this cell to deploy the Workflow job ──────────────────
#
# # Read the job definition
# job_json_path = os.path.join(
#     os.path.dirname(dbutils.notebook.entry_point.getDbutils()
#         .notebook().getContext().notebookPath().get()),
#     "jobs", "supply_chain_full_pipeline_v2.json"
# )
#
# # Alternative: read from local filesystem if synced
# # with open("/Workspace/SupplyChain/notebooks/v2/jobs/supply_chain_full_pipeline_v2.json") as f:
# #     job_def = json.load(f)
#
# host = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
# token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
# headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
#
# # Check if job already exists
# resp = requests.get(f"{host}/api/2.1/jobs/list", headers=headers, params={"name": "supply_chain_full_pipeline_v2"})
# existing = resp.json().get("jobs", [])
#
# if existing:
#     job_id = existing[0]["job_id"]
#     print(f"Job already exists (id={job_id}). Resetting definition...")
#     requests.post(f"{host}/api/2.1/jobs/reset", headers=headers, json={"job_id": job_id, **job_def})
#     print(f"✅ Job {job_id} updated.")
# else:
#     resp = requests.post(f"{host}/api/2.1/jobs/create", headers=headers, json=job_def)
#     job_id = resp.json().get("job_id")
#     print(f"✅ Job created with id={job_id}")
#
# print(f"View job: {host}/#job/{job_id}")
