<a href="https://colab.research.google.com/github/So12344567/Gitclass/blob/master/Logistics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import yaml

rules_yaml_string = """
version: 1

canonical:
  # Final canonical columns with target dtypes
  columns:
    part_number: string
    lot_id: string
    qty: int
    category: string
    planner: string
    status: string           # NEW | ASSIGNED | READY_TO_SHIP | IN_REPAIR | REPAIRED | SHIPPED | SCRAPPED | REJECTED_POST_REPAIR
    quality_status: string   # UNKNOWN | GENUINE | SCRAPPED | REJECTED_POST_REPAIR
    location: string         # current location
    ship_to: string          # target location
    ship_date: datetime
    scrap_reason: string
    created_at: datetime
    updated_at: datetime

  # Key + lifecycle rules
  primary_key: [part_number, lot_id]
  allowed_statuses:
    - NEW
    - ASSIGNED
    - READY_TO_SHIP
    - IN_REPAIR
    - REPAIRED
    - SHIPPED
    - SCRAPPED
    - REJECTED_POST_REPAIR
  allowed_transitions:
    NEW: [ASSIGNED, SCRAPPED]
    ASSIGNED: [READY_TO_SHIP, SCRAPPED]
    READY_TO_SHIP: [SHIPPED, SCRAPPED]
    IN_REPAIR: [REPAIRED, REJECTED_POST_REPAIR]
    REPAIRED: [READY_TO_TO_SHIP, SCRAPPED]
    SHIPPED: []
    SCRAPPED: []
    REJECTED_POST_REPAIR: []
  ship_eligible_statuses: [READY_TO_SHIP, REPAIRED]
  required_for_shipping: [ship_to, qty]

planner_rules:
  # Assign planner by part category/prefix
  by_prefix:
    PWR: "Planner_A"
    MEC: "Planner_B"
    PCB: "Planner_C"
  default: "Planner_Default"

quality_rules:
  # Accepted quality labels
  valid_quality: [UNKNOWN, GENUINE, SCRAPPED, REJECTED_POST_REPAIR]
  # If quality = GENUINE → send to repair
  genuine_goes_to_repair: true

shipment_rules:
  # If ship_eligible and required fields present → count as "needs shipment"
  late_after_days: 3           # optional SLA metric for "late"

data_quality:
  required_columns: [part_number, qty, status, created_at]
  non_negative_qty: true
  dedupe_on: [part_number, lot_id]
"""

rules = yaml.safe_load(rules_yaml_string)

In [None]:

# agent_orchestrator.py
import pandas as pd
from datetime import datetime
from pathlib import Path

# If you have PyYAML, uncomment and use config loading from file
try:
    import yaml
except ImportError:
    yaml = None

# ---------------- Config Loading ----------------
def load_rules(rules_path: str | None = None):
    if rules_path and yaml:
        with open(rules_path, "r", encoding="utf-8") as f:
            return yaml.safe_load(f)

    # Fallback embedded rules (same as YAML above but trimmed)
    return {
        "canonical": {
            "columns": {
                "part_number": "string",
                "lot_id": "string",
                "qty": "int",
                "category": "string",
                "planner": "string",
                "status": "string",
                "quality_status": "string",
                "location": "string",
                "ship_to": "string",
                "ship_date": "datetime",
                "scrap_reason": "string",
                "created_at": "datetime",
                "updated_at": "datetime",
            },
            "primary_key": ["part_number", "lot_id"],
            "allowed_statuses": ["NEW","ASSIGNED","READY_TO_SHIP","IN_REPAIR","REPAIRED","SHIPPED","SCRAPPED","REJECTED_POST_REPAIR"],
            "allowed_transitions": {
                "NEW": ["ASSIGNED","SCRAPPED"],
                "ASSIGNED": ["READY_TO_SHIP","SCRAPPED"],
                "READY_TO_SHIP": ["SHIPPED","SCRAPPED"],
                "IN_REPAIR": ["REPAIRED","REJECTED_POST_REPAIR"],
                "REPAIRED": ["READY_TO_SHIP","SCRAPPED"],
                "SHIPPED": [],
                "SCRAPPED": [],
                "REJECTED_POST_REPAIR": []
            },
            "ship_eligible_statuses": ["READY_TO_SHIP","REPAIRED"],
            "required_for_shipping": ["ship_to","qty"]
        },
        "planner_rules": {
            "by_prefix": {"PWR":"Planner_A","MEC":"Planner_B","PCB":"Planner_C"},
            "default": "Planner_Default"
        },
        "quality_rules": {
            "valid_quality": ["UNKNOWN","GENUINE","SCRAPPED","REJECTED_POST_REPAIR"],
            "genuine_goes_to_repair": True
        },
        "shipment_rules": {"late_after_days": 3},
        "data_quality": {
            "required_columns": ["part_number","qty","status","created_at"],
            "non_negative_qty": True,
            "dedupe_on": ["part_number","lot_id"]
        }
    }


def coerce_types(df: pd.DataFrame, col_types: dict) -> pd.DataFrame:
    for col, typ in col_types.items():
        if col not in df.columns:
            df[col] = pd.NA
        if typ in ("string", "str"):
            df[col] = df[col].astype("string")
        elif typ in ("int", "int64"):
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
        elif typ in ("float", "float64"):
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Float64")
        elif typ.startswith("datetime"):
            df[col] = pd.to_datetime(df[col], errors="coerce")
        else:
            df[col] = df[col].astype("string")
    return df[list(col_types.keys())]

# ---------------- Agents ----------------
class PlannerAgent:
    def __init__(self, rules: dict):
        self.prefix_map = rules["planner_rules"]["by_prefix"]
        self.default_planner = rules["planner_rules"]["default"]

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        def assign(planner, part_number):
            if pd.notna(planner) and str(planner).strip():
                return planner
            prefix = str(part_number)[:3]
            return self.prefix_map.get(prefix, self.default_planner)
        df["planner"] = df.apply(lambda r: assign(r.get("planner"), r.get("part_number")), axis=1)
        # NEW -> ASSIGNED for items without planner previously
        df.loc[df["status"].eq("NEW") & df["planner"].notna(), "status"] = "ASSIGNED"
        return df

class DataQualityAgent:
    def __init__(self, rules: dict):
        self.req_cols = rules["data_quality"]["required_columns"]
        self.allowed = set(rules["canonical"]["allowed_statuses"])
        self.non_negative = rules["data_quality"]["non_negative_qty"]
        self.dedupe_on = rules["data_quality"]["dedupe_on"]
        self.allowed_transitions = rules["canonical"]["allowed_transitions"]

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        issues = []

        # Required columns
        for col in self.req_cols:
            missing = df[col].isna().sum()
            if missing:
                issues.append(f"Missing required column values: {col} -> {missing} rows")

        # Status values
        invalid_status = ~df["status"].isin(self.allowed)
        if invalid_status.any():
            bad = df.loc[invalid_status, ["part_number", "status"]]
            issues.append(f"Invalid status values in {len(bad)} rows")

        # Quantity checks
        if self.non_negative and (df["qty"] < 0).any():
            issues.append("Negative quantities found")

        # Deduplicate
        before = len(df)
        df = df.drop_duplicates(subset=self.dedupe_on, keep="last")
        dups = before - len(df)
        if dups:
            issues.append(f"Deduplicated {dups} rows on keys={self.dedupe_on}")

        if issues:
            print("DataQualityAgent findings:")
            for msg in issues:
                print("  -", msg)
        return df

class QualityAgent:
    """Classifies parts as SCRAPPED or GENUINE for given part_numbers."""
    def __init__(self, rules: dict):
        self.valid_quality = set(rules["quality_rules"]["valid_quality"])
        self.send_genuine_to_repair = rules["quality_rules"]["genuine_goes_to_repair"]

    def run(self, df: pd.DataFrame, scrapped_parts: list[str], genuine_parts: list[str]) -> pd.DataFrame:
        now = pd.Timestamp.now()
        # Mark scrapped
        if scrapped_parts:
            mask = df["part_number"].isin(scrapped_parts)
            df.loc[mask, ["quality_status","status","scrap_reason","updated_at"]] = ["SCRAPPED","SCRAPPED","FAILED_QC", now]
        # Mark genuine
        if genuine_parts:
            mask = df["part_number"].isin(genuine_parts)
            df.loc[mask, "quality_status"] = "GENUINE"
            if self.send_genuine_to_repair:
                df.loc[mask, ["status","updated_at"]] = ["IN_REPAIR", now]
        return df

class RepairAgent:
    """Processes genuine parts: repair them, optionally scrap some post-repair."""
    def run(self, df: pd.DataFrame, post_repair_scrap_parts: list[str] | None = None) -> pd.DataFrame:
        now = pd.Timestamp.now()
        # Move IN_REPAIR -> REPAIRED
        mask_in = df["status"].eq("IN_REPAIR")
        df.loc[mask_in, ["status","updated_at"]] = ["REPAIRED", now]
        # Post-repair scrap
        if post_repair_scrap_parts:
            mask = df["part_number"].isin(post_repair_scrap_parts)
            df.loc[mask & df["status"].eq("REPAIRED"),
                   ["status","quality_status","scrap_reason","updated_at"]] = ["REJECTED_POST_REPAIR","REJECTED_POST_REPAIR","POST_REPAIR_QC_FAIL", now]
        # Repaired items can be marked READY_TO_SHIP
        mask_rep = df["status"].eq("REPAIRED")
        df.loc[mask_rep, "status"] = "READY_TO_SHIP"
        return df

class QuantityAgent:
    """Computes shipment and scrap metrics."""
    def __init__(self, rules: dict):
        self.ship_eligible = set(rules["canonical"]["ship_eligible_statuses"])
        self.required_for_ship = rules["canonical"]["required_for_shipping"]

    def run_metrics(self, df: pd.DataFrame) -> dict:
        shipped = df["status"].eq("SHIPPED")
        scrapped = df["status"].isin(["SCRAPPED", "REJECTED_POST_REPAIR"])

        # Needs shipment: eligible status, not shipped, required fields present
        eligible = df["status"].isin(list(self.ship_eligible))
        required_ok = pd.Series(True, index=df.index)
        for col in self.required_for_ship:
            required_ok &= df[col].notna() & (df[col].astype("string").str.len() > 0 if df[col].dtype=="string" else True)
        needs_ship = eligible & ~shipped & required_ok

        return {
            "total_parts": int(df["qty"].fillna(0).sum()),
            "rows": int(len(df)),
            "scrapped_qty": int(df.loc[scrapped, "qty"].fillna(0).sum()),
            "shipped_qty": int(df.loc[shipped, "qty"].fillna(0).sum()),
            "needs_shipment_qty": int(df.loc[needs_ship, "qty"].fillna(0).sum()),
            "needs_shipment_by_location": (
                df.loc[needs_ship].groupby("ship_to")["qty"].sum().dropna().astype(int).to_dict()
                if needs_ship.any() else {}
            ),
            "shipped_by_location": (
                df.loc[shipped].groupby("ship_to")["qty"].sum().dropna().astype(int).to_dict()
                if shipped.any() else {}
            )
        }

    def mark_shipped(self, df: pd.DataFrame, part_numbers: list[str], ship_to: str):
        now = pd.Timestamp.now()
        mask = df["part_number"].isin(part_numbers) & df["status"].isin(["READY_TO_SHIP","REPAIRED"])
        df.loc[mask, ["status","ship_to","ship_date","updated_at"]] = ["SHIPPED", ship_to, now, now]
        return df

# ---------------- Orchestrator ----------------
class Orchestrator:
    def __init__(self, rules: dict):
        self.rules = rules
        self.planner = PlannerAgent(rules)
        self.dq = DataQualityAgent(rules)
        self.quality = QualityAgent(rules)
        self.repair = RepairAgent()
        self.quantity = QuantityAgent(rules)

    def run(self, df: pd.DataFrame,
            scrapped_parts: list[str],
            genuine_parts: list[str],
            post_repair_scrap_parts: list[str],
            ship_part_map: dict[str, list[str]]  # {location: [part_numbers]}
           ):
        # 1) Planner
        df = self.planner.run(df)
        # 2) Data Quality
        df = self.dq.run(df)
        # 3) Quality outcomes
        df = self.quality.run(df, scrapped_parts, genuine_parts)
        # 4) Repair processing
        df = self.repair.run(df, post_repair_scrap_parts)
        # 5) Ship some items to locations
        for loc, parts in (ship_part_map or {}).items():
            df = self.quantity.mark_shipped(df, parts, loc)
        # Final metrics
        metrics = self.quantity.run_metrics(df)
        return df, metrics

# ---------------- Sample Data & Demo ----------------
def sample_parts(n=60):
    """
    Create 60 sample rows:
    - part_numbers PWR0001..PWR0030 and MEC0001..MEC0030
    - qty=1 each, NEW status, ship_to prefilled for demo
    """
    rows = []
    now = pd.Timestamp.now()
    for i in range(1, 31):
        rows.append({
            "part_number": f"PWR{i:04d}", "lot_id": "LOT_A", "qty": 1, "category": "PWR",
            "planner": pd.NA, "status": "NEW", "quality_status": "UNKNOWN",
            "location": "WH1", "ship_to": "LOC_NORTH", "ship_date": pd.NaT,
            "scrap_reason": pd.NA, "created_at": now, "updated_at": now
        })
    for i in range(1, 31):
        rows.append({
            "part_number": f"MEC{i:04d}", "lot_id": "LOT_B", "qty": 1, "category": "MEC",
            "planner": pd.NA, "status": "NEW", "quality_status": "UNKNOWN",
            "location": "WH1", "ship_to": "LOC_SOUTH", "ship_date": pd.NaT,
            "scrap_reason": pd.NA, "created_at": now, "updated_at": now
        })
    return pd.DataFrame(rows)

if __name__ == "__main__":
    rules = load_rules(None)
    df = sample_parts(60)
    df = coerce_types(df, rules["canonical"]["columns"])


    scrapped_initial = [f"PWR{i:04d}" for i in range(1, 31)]      # 30 scrapped
    genuine_initial = [f"MEC{i:04d}" for i in range(1, 31)]       # 30 genuine

    # Post-repair scrapped (e.g., 8 of the genuine ones fail post-repair QC)
    post_repair_scrap = [f"MEC{i:04d}" for i in range(1, 9)]

    # Ship 12 repaired/ready items to their destinations
    ship_map = {
        "LOC_SOUTH": [f"MEC{i:04d}" for i in range(9, 21)],       # 12 shipped south
        # leave some as READY_TO_SHIP to demonstrate "needs shipment"
    }

    orch = Orchestrator(rules)
    final_df, metrics = orch.run(
        df,
        scrapped_parts=scrapped_initial,
        genuine_parts=genuine_initial,
        post_repair_scrap_parts=post_repair_scrap,
        ship_part_map=ship_map
    )

    print("\n=== METRICS ===")
    for k, v in metrics.items():
        print(f"{k}: {v}")

    print("\n=== SAMPLE OUTPUT ROWS ===")
    # show a compact view
    cols = ["part_number","qty","planner","status","quality_status","ship_to","ship_date","scrap_reason"]
    print(final_df[cols].sort_values("part_number").head(15).to_string(index=False))



=== METRICS ===
total_parts: 60
rows: 60
scrapped_qty: 38
shipped_qty: 12
needs_shipment_qty: 10
needs_shipment_by_location: {'LOC_SOUTH': 10}
shipped_by_location: {'LOC_SOUTH': 12}

=== SAMPLE OUTPUT ROWS ===
part_number  qty   planner               status       quality_status   ship_to                  ship_date        scrap_reason
    MEC0001    1 Planner_B REJECTED_POST_REPAIR REJECTED_POST_REPAIR LOC_SOUTH                        NaT POST_REPAIR_QC_FAIL
    MEC0002    1 Planner_B REJECTED_POST_REPAIR REJECTED_POST_REPAIR LOC_SOUTH                        NaT POST_REPAIR_QC_FAIL
    MEC0003    1 Planner_B REJECTED_POST_REPAIR REJECTED_POST_REPAIR LOC_SOUTH                        NaT POST_REPAIR_QC_FAIL
    MEC0004    1 Planner_B REJECTED_POST_REPAIR REJECTED_POST_REPAIR LOC_SOUTH                        NaT POST_REPAIR_QC_FAIL
    MEC0005    1 Planner_B REJECTED_POST_REPAIR REJECTED_POST_REPAIR LOC_SOUTH                        NaT POST_REPAIR_QC_FAIL
    MEC0006    1 Planner_B REJECT