In [19]:
# import pandas as pd
# from typing import TypedDict, List, Dict
# from langgraph.graph import StateGraph

import os
import json
import random
import re
import pandas as pd
from typing import TypedDict, List, Dict, Optional

from dotenv import load_dotenv
from langgraph.graph import StateGraph
from langchain_groq import ChatGroq

load_dotenv()

True

In [20]:
PLAN = pd.read_csv("plan_data.csv")
OPS = pd.read_csv("operator_efficiency.csv")
AVAIL = pd.read_csv("machine_availability.csv")
PARTS = pd.read_csv("part_cycle_time.csv")

In [21]:
class CapacityState(TypedDict):
    target_qty: int
    deadline_hrs: int
    part_id: str

    feasible_machines: List[Dict]
    machine_operator_pairs: List[Dict]
    final_recommendation: Optional[Dict]
    explanation: str
    historical_memory: List[Dict]

In [22]:
# def part_analysis_agent(state: CapacityState):
#     part_df = PARTS[PARTS["part_id"] == state["part_id"]]

#     machines = part_df.to_dict("records")
#     state["feasible_machines"] = machines
#     return state

def part_analysis_agent(state: CapacityState):
    part_df = PARTS[PARTS["part_id"] == state["part_id"]]
    state["feasible_machines"] = part_df.to_dict("records")
    return state

In [23]:
# def machine_feasibility_agent(state: CapacityState):
#     feasible = []

#     for m in state["feasible_machines"]:
#         plan_row = PLAN[PLAN["machine_id"] == m["machine_id"]]
#         avail_row = AVAIL[AVAIL["machine_id"] == m["machine_id"]]

#         if plan_row.empty or avail_row.empty:
#             continue

#         plan = plan_row.iloc[0]
#         avail = avail_row.iloc[0]

#         if not avail["is_available"]:
#             continue
        
#         if plan["uptime_percentage"] <= 0:
#             continue
        
#         if avail["criticality_level"] == "High" and avail["risk_of_failure"] > 0.15:
#             continue

#         prod_time = (m["cycle_time_seconds"] * state["target_qty"]) / 3600
#         prod_time += m["setup_time_minutes"] / 60
#         effective_time = prod_time / plan["uptime_percentage"]

#         if effective_time <= state["deadline_hrs"]:
#             feasible.append({
#                 "machine_id": m["machine_id"],
#                 "machine_type": plan["machine_type"],
#                 "effective_time": round(effective_time, 2),
#                 "risk": avail["risk_of_failure"]
#             })
       
   

#     state["feasible_machines"] = feasible
#     return state

def machine_feasibility_agent(state: CapacityState):
    feasible = []

    for m in state["feasible_machines"]:
        plan_row = PLAN[PLAN["machine_id"] == m["machine_id"]]
        avail_row = AVAIL[AVAIL["machine_id"] == m["machine_id"]]

        if plan_row.empty or avail_row.empty:
            continue

        plan = plan_row.iloc[0]
        avail = avail_row.iloc[0]

        if not avail["is_available"]:
            continue

        if plan["uptime_percentage"] <= 0:
            continue

        if avail["criticality_level"] == "High" and avail["risk_of_failure"] > 0.15:
            continue

        prod_time = (m["cycle_time_seconds"] * state["target_qty"]) / 3600
        prod_time += m["setup_time_minutes"] / 60

        effective_time = prod_time / plan["uptime_percentage"]

        if effective_time <= state["deadline_hrs"]:
            feasible.append({
                "machine_id": m["machine_id"],
                "machine_type": plan["machine_type"],
                "effective_time": round(effective_time, 2),
                "risk": avail["risk_of_failure"]
            })

    state["feasible_machines"] = feasible
    return state


In [24]:
# def operator_matching_agent(state: CapacityState):
#     pairs = []

#     for machine in state["feasible_machines"]:
#         eligible_ops = OPS[
#             OPS["preferred_machine_type"] == machine["machine_type"]
#         ]

#         for _, op in eligible_ops.iterrows():
#             adjusted_time = machine["effective_time"] / op["efficiency_score"]

#             if adjusted_time <= state["deadline_hrs"]:
#                 pairs.append({
#                     "machine_id": machine["machine_id"],
#                     "operator_id": op["operator_id"],
#                     "operator_name": op["operator_name"],
#                     "final_time": round(adjusted_time, 2),
#                     "operator_efficiency": op["efficiency_score"],
#                     "risk": machine["risk"]
#                 })

#     state["machine_operator_pairs"] = pairs
#     return state


def operator_matching_agent(state: CapacityState):
    pairs = []

    for machine in state["feasible_machines"]:
        eligible_ops = OPS[
            OPS["preferred_machine_type"] == machine["machine_type"]
        ]

        for _, op in eligible_ops.iterrows():
            adjusted_time = machine["effective_time"] / op["efficiency_score"]

            if adjusted_time <= state["deadline_hrs"]:
                pairs.append({
                    "machine_id": machine["machine_id"],
                    "machine_type": machine["machine_type"],
                    "operator_id": op["operator_id"],
                    "operator_name": op["operator_name"],
                    "final_time": round(adjusted_time, 2),
                    "operator_efficiency": op["efficiency_score"],
                    "risk": machine["risk"]
                })

    state["machine_operator_pairs"] = pairs
    return state

In [25]:
# def optimization_agent(state: CapacityState):
#     pairs = state["machine_operator_pairs"]


#     if not pairs:
#         state["final_recommendation"] = {
#             "status": "NO_FEASIBLE_PLAN",
#             "reason": "No machine-operator combination met constraints"
#         }
#         return state

#     best_score = -1
#     best = None

#     for pair in pairs:
#         utilization = pair["final_time"] / state["deadline_hrs"]

#         score = (
#             (1 - utilization) * 0.4 +
#             pair["operator_efficiency"] * 0.4 +
#             (1 - pair["risk"]) * 0.2
#         )

#         if score > best_score:
#             best_score = score
#             best = pair

#     state["final_recommendation"] = best
#     return state



def memory_retrieval_agent(state: CapacityState):
    try:
        with open("memory.json", "r") as f:
            memory = json.load(f)
    except FileNotFoundError:
        memory = []

    state["historical_memory"] = memory
    return state


def learning_agent(state: CapacityState):
    history = state["historical_memory"]
    pairs = state["machine_operator_pairs"]

    for pair in pairs:
        penalty = 0
        reward = 0

        for past in history:
            if past["machine_id"] == pair["machine_id"] and past["operator_id"] == pair["operator_id"]:
                if not past["success"]:
                    penalty += 0.1
                else:
                    reward += 0.05

        pair["learning_penalty"] = penalty
        pair["learning_reward"] = reward

    state["machine_operator_pairs"] = pairs
    return state

In [9]:
from langchain_groq import ChatGroq
import os

In [26]:
llm = ChatGroq(
    model="llama-3.1-8b-instant",
    temperature=0.2,
    groq_api_key=os.getenv("GROQ_API_KEY")
)

In [27]:
# def llm_explanation_agent(state: CapacityState):
#     rec = state["final_recommendation"]

#     if rec is None or rec.get("status") == "NO_FEASIBLE_PLAN":
#         state["explanation"] = (
#             "No feasible machine-operator plan could be generated "
#             "for the given target quantity, deadline, and part."
#         )
#         return state

#     prompt = f"""
# You are an AI Capacity Planning Expert.

# Explain why the following machineâ€“operator combination
# was selected for the production plan.

# Production Target:
# - Quantity: {state['target_qty']}
# - Deadline: {state['deadline_hrs']} hours
# - Part ID: {state['part_id']}

# Selected Combination:
# - Machine ID: {rec['machine_id']}
# - Operator: {rec['operator_name']} ({rec['operator_id']})

# Performance Metrics:
# - Estimated Production Time: {rec['final_time']} hours
# - Operator Efficiency Score: {rec['operator_efficiency']}
# - Machine Failure Risk: {rec['risk']}

# Explain clearly but in crisp manner:
# 1. Why this machine is suitable
# 2. Why this operator is optimal
# 3. How risks were minimized
# 4. Why this choice ensures on-time delivery
# """

#     response = llm.invoke(prompt)
#     state["explanation"] = response.content
#     return state


def llm_selection_agent(state: CapacityState):
    pairs = state["machine_operator_pairs"]

    if not pairs:
        state["final_recommendation"] = None
        state["explanation"] = "No feasible machine-operator combination found."
        return state

    prompt = f"""
You are an AI Capacity Planning Optimization Engine.

Production Requirements:
- Quantity: {state['target_qty']}
- Deadline: {state['deadline_hrs']} hours
- Part ID: {state['part_id']}

Available Machine-Operator Options:
{json.dumps(pairs, indent=2)}

Select ONLY the single BEST combination.
Return ONLY ONE JSON object.
"""

    response = llm.invoke(prompt)
    raw_output = response.content.replace("```json", "").replace("```", "").strip()

    match = re.search(r'\{.*\}', raw_output, re.DOTALL)

    if not match:
        state["final_recommendation"] = None
        state["explanation"] = "LLM did not return valid JSON."
        return state

    decision = json.loads(match.group(0))
    state["final_recommendation"] = decision
    state["explanation"] = decision.get("reasoning", "")
    return state



In [28]:
# import json

# def memory_retrieval_agent(state: CapacityState):
#     try:
#         with open("memory.json", "r") as f:
#             memory = json.load(f)
#     except:
#         memory = []

#     state["historical_memory"] = memory
#     return state


def memory_update_agent(state: CapacityState):
    rec = state["final_recommendation"]
    if rec is None:
        return state

    success = random.choices([True, False], weights=[0.8, 0.2])[0]

    record = {
        "part_id": state["part_id"],
        "machine_id": rec["machine_id"],
        "operator_id": rec["operator_id"],
        "operator_name": rec["operator_name"],
        "success": success,
        "time_taken": rec["final_time"],
        "risk": rec["risk"]
    }

    try:
        with open("memory.json", "r") as f:
            memory = json.load(f)
    except FileNotFoundError:
        memory = []

    memory.append(record)

    with open("memory.json", "w") as f:
        json.dump(memory, f, indent=4)

    state["historical_memory"] = memory
    return state

In [None]:
# def learning_agent(state: CapacityState) -> CapacityState:
#     print("LEARNING AGENT STATE:", state)

In [None]:
# def learning_agent(state: CapacityState):
#     rec = state["final_recommendation"]

#     if rec is None or rec.get("status") == "NO_FEASIBLE_PLAN":
#         return state

#     history = state["historical_memory"]
#     penalty = 0
#     reward = 0

#     for past in history:
#         if (
#             past["machine_id"] == rec["machine_id"]
#             and past["operator_id"] == rec["operator_id"]
#         ):
#             if not past["success"]:
#                 penalty += 0.1
#             else:
#                 reward += 0.05

#     rec["learning_penalty"] = penalty
#     rec["learning_reward"] = reward
#     state["final_recommendation"] = rec
#     return state

In [None]:
# import random
# def memory_update_agent(state: CapacityState):
#     """
#     Stores the final recommendation along with success/failure outcome
#     into memory.json for future learning.
#     """
#     rec = state["final_recommendation"]

#     # Simulate real-world outcome (success/failure)
#     success = random.choices([True, False], weights=[0.8, 0.2])[0]

#     # Record to store
#     record = {
#         "part_id": state["part_id"],
#         "machine_id": rec.get("machine_id", ""),
#         "operator_id": rec.get("operator_id", ""),
#         "operator_name": rec.get("operator_name", ""),
#         "success": success,
#         "time_taken": rec.get("final_time", 0),
#         "risk": rec.get("risk", 0)
#     }

#     # Load existing memory
#     try:
#         with open("memory.json", "r") as f:
#             memory = json.load(f)
#     except FileNotFoundError:
#         memory = []

#     # Append new record
#     memory.append(record)

#     # Save back to file
#     with open("memory.json", "w") as f:
#         json.dump(memory, f, indent=4)

#     # Update state
#     state["historical_memory"] = memory

#     return state

In [None]:
# from langgraph.graph import StateGraph

# graph = StateGraph(CapacityState)

# # Add nodes
# graph.add_node("part_agent", part_analysis_agent)
# graph.add_node("machine_agent", machine_feasibility_agent)
# graph.add_node("operator_agent", operator_matching_agent)
# graph.add_node("optimizer_agent", optimization_agent)
# graph.add_node("memory_reader", memory_retrieval_agent)
# graph.add_node("learning_agent", learning_agent)
# graph.add_node("llm_explainer", llm_explanation_agent)
# graph.add_node("memory_writer", memory_update_agent)

# # Define edges
# graph.set_entry_point("part_agent")
# graph.add_edge("part_agent", "machine_agent")
# graph.add_edge("machine_agent", "operator_agent")
# graph.add_edge("operator_agent", "optimizer_agent")
# graph.add_edge("optimizer_agent", "memory_reader")
# graph.add_edge("memory_reader", "learning_agent")
# graph.add_edge("learning_agent", "llm_explainer")
# graph.add_edge("llm_explainer", "memory_writer")

# # Compile the graph
# capacity_agent = graph.compile()



graph = StateGraph(CapacityState)

graph.add_node("part_agent", part_analysis_agent)
graph.add_node("machine_agent", machine_feasibility_agent)
graph.add_node("operator_agent", operator_matching_agent)
graph.add_node("memory_reader", memory_retrieval_agent)
graph.add_node("learning_agent", learning_agent)
graph.add_node("llm_selector", llm_selection_agent)
graph.add_node("memory_writer", memory_update_agent)

graph.set_entry_point("part_agent")

graph.add_edge("part_agent", "machine_agent")
graph.add_edge("machine_agent", "operator_agent")
graph.add_edge("operator_agent", "memory_reader")
graph.add_edge("memory_reader", "learning_agent")
graph.add_edge("learning_agent", "llm_selector")
graph.add_edge("llm_selector", "memory_writer")

capacity_agent = graph.compile()

In [29]:
# input_state = {
#     "target_qty": 1500,
#     "deadline_hrs": 90,
#     "part_id": "P1009",
#     "feasible_machines": [],
#     "machine_operator_pairs": [],
#     "final_recommendation": {},
#     "explanation": "",
#     "historical_memory": []
# }

# result = capacity_agent.invoke(input_state)


input_state: CapacityState = {
    "target_qty": 500,
    "deadline_hrs": 72,
    "part_id": "P1001",
    "feasible_machines": [],
    "machine_operator_pairs": [],
    "final_recommendation": None,
    "explanation": "",
    "historical_memory": []
}

result = capacity_agent.invoke(input_state)

best = result.get("final_recommendation")

print("\n===== AI AGENT RECOMMENDATION =====\n")

if best:
    print(f"Machine        : {best['machine_id']}")
    print(f"Operator       : {best['operator_name']} ({best['operator_id']})")
    print(f"Estimated Time : {best['final_time']} hrs")
    print(f"Efficiency     : {best['operator_efficiency']}")
    print(f"Risk           : {best['risk']}")
else:
    print("No feasible recommendation found.")

print("\n===== AI EXPLANATION =====\n")
print(result["explanation"])


===== AI AGENT RECOMMENDATION =====

Machine        : M01
Operator       : Operator_10 (O010)
Estimated Time : 9.59 hrs
Efficiency     : 0.97
Risk           : 0.05

===== AI EXPLANATION =====

**Capacity Planning Expert's Analysis**

**1. Why Machine M01 is suitable:**

Machine M01 was selected due to its high production capacity and reliability. The estimated production time of 9.59 hours for 500 units of Part ID P1001 indicates that M01 can efficiently process the required quantity within the given deadline. Additionally, the machine's low failure risk of 0.05% suggests that it is less prone to downtime, ensuring consistent production.

**2. Why Operator O010 is optimal:**

Operator O010 was chosen due to their high efficiency score of 0.97, indicating that they are highly skilled and productive. This score suggests that O010 can work efficiently on the machine, minimizing waste and maximizing output. Their expertise will help ensure that the production process runs smoothly, reduci