In [None]:
pip install pandas



In [None]:
pip install pydantic



In [None]:
pip install langchain



In [None]:
pip install langchain-core



In [None]:
pip install langchain-community



In [None]:
pip install langchain-openai



In [None]:
pip install langchain-groq



In [None]:
pip install langgraph



In [None]:
pip install requests



In [None]:
pip install langmem



In [None]:
from langchain_openai import AzureChatOpenAI
import os
from google.colab import userdata
def get_llm_model():
  endpoint = userdata.get("endpoint")
  model_name = "gpt-4.1-mini"
  deployment = userdata.get("deployment")
  subscription_key = userdata.get("AZURE_OPENAI_API_KEY")
  api_version = "2024-12-01-preview"

  llm = AzureChatOpenAI(
            azure_endpoint=endpoint,
            api_key=subscription_key,
            api_version=api_version,
            model=deployment,
            azure_deployment=deployment,
            temperature=0.7,
            timeout=300,
            max_retries=3,
        )
  return llm

In [None]:
from langchain_openai import AzureOpenAIEmbeddings
embed = AzureOpenAIEmbeddings(
    model="TxtEmbedAda002",
    # dimensions: Optional[int] = None, # Can specify dimensions with new text-embedding-3 models
    azure_endpoint=userdata.get("azure_endpoint"),
    api_key=userdata.get("api_key"),
    openai_api_version="2023-05-15"
)

In [None]:
# code to test llm model
messages = [
    (
        "system",
        "You are a helpful assistant that translates English to French. Translate the user sentence.",
    ),
    ("human", "I love programming."),
]
ai_msg = get_llm_model().invoke(messages)
ai_msg

AIMessage(content="J'adore la programmation.", additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 6, 'prompt_tokens': 31, 'total_tokens': 37, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4.1-mini-2025-04-14', 'system_fingerprint': 'fp_3dcd5944f5', 'id': 'chatcmpl-D0VIqoNLeXfb99xJ8EPONzrgjWkHS', 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}

In [None]:
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain.agents.middleware import TodoListMiddleware
from typing import TypedDict, List, Dict, Any
# ---- Shared typed state ----
class RCAState(TypedDict):
    task: str
    output: str
    trace: List[Dict[str, Any]]   # structured trace entries
    history: List[Any] # Short term memory, working memory

In [None]:
import re
def extract_json_from_response(response_text: str) -> str:
  """Extract JSON from markdown-style triple backtick code block with or without newlines."""

  # Case 1: Code block with '```json\n ... \n```'
  match = re.search(r'```json\s*\n([\s\S]*?)\n```', response_text)
  if match:
    return match.group(1).strip()

  # Case 2: Code block with '```json{...}```' (inline style)
  match = re.search(r'```json\s*([\s\S]*?)```', response_text)
  if match:
    return match.group(1).strip()

  # Case 3: Any triple-backtick block (fallback)
  match = re.search(r'```\s*\n?([\s\S]*?)\n?```', response_text)
  if match:
    return match.group(1).strip()

  # Final fallback: raw content
  return response_text.strip()

In [None]:
import json
def invoke_and_process_response(messages):
    json_decoder_prompt = """
You are an expert in resolving JSON decoding errors.

Please review the User input (enclosed in triple backticks) and the AI Output (also enclosed in triple backticks). Your task is to return only the User Expected Output based on your understanding of the user’s request.

We encountered the following error while loading the AI Output into a JSON object: {e}. Kindly provide only the User Expected Output that resolves this issue.

User: '''{prompt}'''
AI Output: '''{response}'''

Let’s go step by step.
"""

    last_exception = None

    for attempt in range(0, 3):
        try:
            response = get_llm_model().invoke(messages)
            content = extract_json_from_response(response.content)

            if isinstance(content, str):
                content = json.loads(content)

            # Handle double-encoded JSON case
            if isinstance(content, str):
                content = json.loads(content)

            return content

        except json.JSONDecodeError as e:
            last_exception = e

            # Build recovery system prompt
            recovery_prompt = {
                "role": "system",
                "content": json_decoder_prompt.format(
                    e=e,
                    prompt=messages,
                    response=response.content
                )
            }

            # Replace original message list with the recovery prompt
            messages = [recovery_prompt]

    print(f"Model response could not be parsed: {str(last_exception)}")
    return None


In [None]:
from langchain.agents.middleware import wrap_tool_call
from langchain.messages import ToolMessage

@wrap_tool_call
def handle_tool_errors(request, handler):
    """Handle tool execution errors with custom messages."""
    try:
        return handler(request)
    except Exception as e:
        # Return a custom error message to the model
        return ToolMessage(
            content=f"Tool error: Please check your input and try again. ({str(e)})",
            tool_call_id=request.tool_call["id"]
        )

In [None]:
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver

# Set up storage
store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": embed,
    }
)

checkpointer = InMemorySaver()

In [None]:
from langchain_core.messages import HumanMessage, AIMessage

def append_rca_history(state):
    """
    Append user task, rewritten task, and RCA result to conversation history.
    """

    # Original user task
    if state.get("task"):
        state["history"].append(
            HumanMessage(content=state["task"])
        )

    # Final RCA response
    if state.get("output"):
        state["history"].append(
            AIMessage(content=state["output"])
        )


In [None]:
def episodic_recall(query, store, config):
    """
    Retrieve episodic RCA memories relevant to the query.
    """

    namespace = ("episodic", config["configurable"]["user_id"])

    results = store.search(
        namespace,
        query=query,
        limit=1
    )

    return results


In [None]:
def procedural_recall(task, store, config):
    namespace=("procedural", config["configurable"]["user_id"]),
    return store.search(
        namespace,
        query=task,
        limit=3
    )

**Confidence Decay**
What problem does this solve?

    Memory gets stale.

A rule that worked:

    Last month

    Under different demand patterns

    Before business changes

…may no longer be correct.

**Without decay:**

    Old rules dominate

    Agent becomes rigid

    Wrong confidence persists

**With decay:**

    Memory slowly loses influence unless reinforced

    System stays adaptive

**Human analogy**

“I used to do this, but I’m not so sure anymore.”

Example in our system

Semantic memory:

"Promo spikes usually cause stockouts"
Confidence: high


After many RCAs with no promo issues:

Confidence decays from high → medium → low

Eventually ignored or removed

**What decay applies to**

    Semantic memory

    Procedural memory

    NOT:

    Short-term memory

    Raw episodic memory

**Why decay matters for agents**

    Prevents outdated heuristics

    Handles seasonality

    Handles business model changes

In [None]:
import time

def semantic_recall(query, store, config, limit: int = 3):

    namespace = ("semantic", config["configurable"]["user_id"])

    results = store.search(
        namespace,
        query=query,
        limit=limit * 2  # pull more, filter later
    )

    now = time.time()
    decayed = []

    for r in results:
        v = r.value
        last_used = v.get("last_used_at", now)

        age_days = (now - last_used) / 86400

        # simple decay rule
        if age_days > 30 and v["confidence"] == "high":
            v["confidence"] = "medium"
        elif age_days > 60:
            v["confidence"] = "low"

        decayed.append(r)

    return decayed[:limit]


**history order matters (important)**

follow the correct cognitive + agentic order:

    Semantic Abstraction memory -> multiple episodic memories → generalized knowledge.

    Procedural memory → How should I approach this?

    Episodic memory → What happened in similar cases?

    Short-term memory → What is happening right now?

This is explicitly recommended in:

arXiv:2309.02427

arXiv:2404.13501

LangChain memory-for-agents blog

In [None]:
def build_memory_augmented_prompt(
    query: str,
    state: dict,
    config,
) -> str:
    """
    Builds a prompt using semantic + procedural + episodic memory + short-term conversation history.
    """

    # ---------------------------
    # 0. Semantic Recall (WHY)
    # ---------------------------
    semantic_memories = semantic_recall(query, store, config, 3)

    semantic_context = ""
    if semantic_memories:
        facts = []
        for sm in semantic_memories:
            v = sm.value
            facts.append(
                f"- {v.get('semantic_fact')} "
                f"(confidence: {v.get('confidence')})"
            )

        semantic_context = f"""
    Generalized RCA knowledge:
    {chr(10).join(facts)}
    """
    else:
        semantic_context = "No generalized RCA knowledge found."


    # ---------------------------
    # 1. Procedural Recall (HOW)
    # ---------------------------
    procedural_memories = procedural_recall(query, store, config)

    procedural_context = ""
    if procedural_memories:
        procedures = []
        for pm in procedural_memories:
            proc = pm.value
            procedures.append(f"""
- Procedure: {proc.get("procedure_name", "N/A")}
  Applicable when: {proc.get("applicable_when", "N/A")}
  Steps:
    {chr(10).join([f"    - {s}" for s in proc.get("steps", [])])}
  Tool heuristics:
    {chr(10).join([f"    - {h}" for h in proc.get("tool_heuristics", [])])}
""")

        procedural_context = f"""
Relevant RCA procedures (how to act):
{''.join(procedures)}
"""
    else:
        procedural_context = "No relevant RCA procedures found."

    # ---------------------------
    # 2. Episodic Recall
    # ---------------------------
    episodic_memories = episodic_recall(query, store, config)

    episodic_context = ""
    if episodic_memories:
        mem = episodic_memories[0].value  # reflection dict

        episodic_context = f"""
Similar past RCA experience:
- Current Conversation Match: {mem.get("conversation", "N/A")}
- Summary: {mem.get("conversation_summary", "N/A")}
- What worked: {mem.get("what_worked", "N/A")}
- What to avoid: {mem.get("what_to_avoid", "N/A")}
"""
    else:
        episodic_context = "No closely related past RCA experience found."

    # ---------------------------
    # 3. Short-term History
    # ---------------------------
    history = state.get("history", [])

    formatted_history = []
    for m in history:
        if isinstance(m, HumanMessage):
            formatted_history.append(f"USER: {m.content}")
        elif isinstance(m, AIMessage):
            formatted_history.append(f"ASSISTANT: {m.content}")

    history_context = "\n".join(formatted_history) if formatted_history else "No prior conversation."

    # ---------------------------
    # 4. Final Prompt
    # ---------------------------
    prompt = f"""
You are an RCA assistant with access to memory.

{semantic_context}

{procedural_context}

{episodic_context}

Recent conversation context:
{history_context}

Instructions:
- Follow relevant procedures first
- Use past experiences to avoid known pitfalls
- Use recent conversation context for continuity
"""

    return prompt.strip()


In [None]:
from langgraph.graph import MessagesState
from langmem import create_manage_memory_tool, create_search_memory_tool
import json
from langchain.agents import create_agent
from langchain.tools import tool
from typing import List, Dict, Any

# ReAct agent for hypothesis generation
hypothesis_react_agent = create_agent(
    model=get_llm_model(),
    tools=[
        create_manage_memory_tool(namespace=("hypothesis", "{user_id}")),
        create_search_memory_tool(namespace=("hypothesis", "{user_id}"))
    ],
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)

@tool
def hypothesis_agent_tool(
    task: str,
    user_id: str,
    query_id: str,
    memory_context: str
) -> Dict[str, Any]:
    """
    Purpose:
        Generate multiple plausible root-cause hypotheses for a given RCA query.

    When to use:
        Use this tool when an RCA investigation requires enumerating
        possible causes of an observed problem. This is typically
        the first analytical step after query routing.

    Inputs:
        - task (str): The resolved and disambiguated user query.
        - user_id (str): Identifier of the user or session.
        - query_id (str): Unique identifier of the current query/thread.
        - memory_context (str): episodic + conversation memory

    Output:
        - dict: Contains updated fields:
            - "hypotheses" (List[str]): Newly generated root-cause hypotheses.
            - "trace" (List[Dict]): Trace entry recording the tool call.

    Notes:
        - Hypotheses are returned as plain strings with no categorization.
        - This tool does not validate hypotheses.
        - It may read from long-term memory but only updates the provided data.
        - Subsequent tools or agents are expected to validate or eliminate hypotheses.
    """
    # Build messages for LLM
    messages = [
        {
            "role": "system",
            "content": f"""
You are an RCA hypothesis-generation expert.

Context (do not repeat, only use for reasoning):
{memory_context}

Your task:
Given the user input, generate possible root-cause hypotheses.

STRICT OUTPUT RULES:
1. Output **only valid JSON**.
2. Root JSON object must have exactly two fields:
   - "hypotheses": an array of **plain strings**.
   - "reasoning": a string explaining how the hypotheses were generated.
3. No markdown or code fences.
4. No extra commentary or fields.

JSON schema:
{{
  "hypotheses": ["...", "..."],
  "reasoning": "..."
}}
"""
        },
        {"role": "user", "content": task}
    ]

    # Build config dynamically for the agent
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    # Invoke ReAct agent
    result = hypothesis_react_agent.invoke({"messages": messages}, config)

    # Parse LLM output
    final_msg = result["messages"][-1].content
    output = process_response(final_msg)

    # Extract hypotheses
    hypotheses: List[str] = output.get("hypotheses", [])

    # Extract internal tool-call reasoning messages
    internal_msgs = result["messages"][2:-1]

    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    # Create trace entry
    trace_entry = {
        "agent": "HypothesisAgent",
        "step": "Generated hypotheses",
        "calls": serialize_messages(tool_call_msgs),
        "hypotheses": hypotheses
    }

    #print(json.dumps(trace_entry, indent=3))

    # Return as dict
    return {
        "hypotheses": hypotheses,
        "trace": [trace_entry]
    }


In [None]:
from langchain.tools import tool

@tool
def get_daily_sales():
    """Return daily aggregated sales by store."""
    import pandas as pd

    df = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])

    daily = (
        df.groupby(["transaction_date", "store_id", "store_name"], as_index=False)["quantity_sold"]
        .sum()
        .sort_values(["transaction_date","store_id"])
    )

    return daily.to_dict(orient="records")

@tool
def get_promo_period():
    """Return promotion start and end date based on sales data."""
    import pandas as pd

    df = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])

    # promo flagged in data OR infer from promo tag
    promo_df = df[df["is_promotion"] == True]

    promo_start = promo_df["transaction_date"].min()
    promo_end   = promo_df["transaction_date"].max()

    return {
        "promo_start": str(promo_start.date()),
        "promo_end": str(promo_end.date())
    }


@tool
def get_promo_sales_by_store():
    """Return total promotion-period sales by store."""
    import pandas as pd

    df = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])

    # identify promo rows
    promo_df = df[df["is_promotion"] == True]

    promo_sales = (
        promo_df.groupby(["store_id","store_name"], as_index=False)["quantity_sold"]
        .sum()
        .rename(columns={"quantity_sold": "promo_qty_sold"})
    )

    return promo_sales.to_dict(orient="records")


@tool
def get_sales_data():
    """Return sales data as list of dicts."""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])
    return df.to_dict(orient="records")

In [None]:
def serialize_messages(msgs):
    """Convert LangChain messages/tool-calls to JSON-safe dicts."""
    cleaned = []

    for m in msgs:
        entry = {
            "type": m.__class__.__name__,
            "content": m.content
        }
        # If the message contains tool calls
        if hasattr(m, "tool_calls") and m.tool_calls:
            entry["tool_calls"] = [
                {
                    "name": tc.get("name"),
                    "args": tc.get("args"),
                    "id": tc.get("id")
                }
                for tc in m.tool_calls
            ]

        # If message is a ToolMessage
        if hasattr(m, "tool_call_id"):
            entry["tool_call_id"] = m.tool_call_id

        cleaned.append(entry)

    return cleaned


In [None]:
import json

def process_response(response_content):
    json_decoder_prompt = """
You are an expert in resolving JSON decoding errors.

Please review the AI Output (enclosed in triple backticks).

We encountered the following error while loading the AI Output into a JSON object: {e}. Kindly resolve this issue.

AI Output: '''{response}'''

Return ONLY the corrected JSON.
"""

    last_exception = None

    for attempt in range(3):
        try:
            # Try to extract/parse the JSON the normal way
            content = extract_json_from_response(response_content)

            if isinstance(content, str):
                content = json.loads(content)

            # Handle double-encoded JSON
            if isinstance(content, str):
                content = json.loads(content)

            return content

        except json.JSONDecodeError as e:
            last_exception = e

            # Build recovery prompt asking LLM to fix the JSON
            recovery_prompt = {
                "role": "system",
                "content": json_decoder_prompt.format(
                    e=str(e),
                    response=response_content
                )
            }

            # Re-invoke the model with the recovery request
            fixed_response = get_llm_model().invoke([recovery_prompt])

            # Replace response_content with model's corrected attempt
            response_content = fixed_response.content

    # If all attempts failed
    print(f"Model response could not be parsed: {str(last_exception)}")
    return None


In [None]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.messages import AIMessage, ToolMessage
from typing import Dict, Any, List
import json

sales_tools = [
    get_daily_sales,
    get_promo_period,
    get_promo_sales_by_store,
    get_sales_data,
    create_manage_memory_tool(namespace=("sales", "{user_id}")),
    create_search_memory_tool(namespace=("sales", "{user_id}"))
]

sales_react_agent = create_agent(
    model=get_llm_model(),
    tools=sales_tools,
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)


@tool
def sales_analysis_agent_tool(
    task: str,
    hypotheses: List[str],
    user_id: str,
    query_id: str,
    memory_context: str
) -> Dict[str, Any]:
    """
    Purpose:
        Analyze sales and promotion data to evaluate hypotheses that may
        explain observed issues in an RCA investigation.

    When to use:
        Use this tool after hypotheses have been generated and when
        sales, demand, forecasting, or promotion-related factors may
        contribute to the problem.

    Inputs:
        - task (str):
            The resolved RCA task or problem statement to analyze.
        - hypotheses (List[str]):
            A list of candidate root-cause hypotheses to be validated
            from a sales perspective.
        - user_id (str):
            Identifier for the user or session, used for scoped memory access.
        - query_id (str):
            Unique identifier for the current RCA query or thread.
        - memory_context (str): episodic + conversation memory

    Output:
        - dict:
            Contains the following fields:
            - "sales_insights":
                Structured findings derived from sales and promotion data
                that support or refute the provided hypotheses.
            - "trace":
                A list of trace entries capturing tool calls and reasoning
                steps performed during the analysis.
    Notes:
        - This tool may call sales and promotion data tools as needed.
        - The output is strictly structured and intended for downstream
          RCA agents or summarization steps.
        - The tool does not mutate external state.
    """

    # Step 1 — Filter sales-related hypotheses
    sales_related_hypotheses = [
        h for h in hypotheses
        if any(
            k in h.lower()
            for k in [
                "sales",
                "demand",
                "promotion",
                "spike",
                "forecast",
                "underestimated"
            ]
        )
    ]

    # Fallback: analyze all hypotheses if none are sales-specific
    if not sales_related_hypotheses:
        sales_related_hypotheses = hypotheses

    # Step 2 — Build LLM messages
    messages = [
        {
            "role": "system",
            "content": f"""
You are a Sales Analysis Agent for RCA.

Context (do not repeat, only use for reasoning):
{memory_context}

Your responsibilities:
- Use available tools to analyze sales patterns
- Validate or refute sales-related hypotheses

STRICT OUTPUT RULES:
1. Output ONLY valid JSON
2. Root JSON object MUST contain EXACTLY ONE key: "sales_insights"
3. NO extra keys, commentary, or markdown

JSON schema:
{{
  "sales_insights": {...}
}}
"""
        },
        {
            "role": "user",
            "content": f"""
Task: {task}
Hypotheses: {sales_related_hypotheses}
"""
        }
    ]

    # Step 3 — Build config (same pattern as hypothesis_agent)
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    # Step 4 — Invoke ReAct agent
    result = sales_react_agent.invoke({"messages": messages}, config)

    final_msg = result["messages"][-1].content
    output = process_response(final_msg)

    sales_insights = output.get("sales_insights")

    # Step 5 — Extract internal tool-call trace
    # Skip system(0), user(1), final(-1)
    internal_msgs = result["messages"][2:-1]

    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    trace_entry = {
        "agent": "SalesAnalysisAgent",
        "step": "Validated sales hypotheses",
        "calls": serialize_messages(tool_call_msgs),
        "sales_insights": sales_insights
    }

    #print(json.dumps(trace_entry, indent=3))

    # Step 6 — Return
    return {
        "sales_insights": sales_insights,
        "trace": [trace_entry]
    }


In [None]:
from langchain.tools import tool

@tool
def get_unique_stores() -> dict:
    """Return list of unique store_ids from sales and inventory timeline."""
    import pandas as pd
    sales_df = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])
    inv_df   = pd.read_csv("/content/sample_data/inventory_transactions.csv", parse_dates=["transaction_date"])

    stores = sorted(pd.concat([
        sales_df["store_id"],
        inv_df["store_id"]
    ]).dropna().unique())

    return {"stores": stores}

def drop_store_name(df):
    return df.drop(columns=["store_name"], errors="ignore")

@tool
def theoretical_onhand_after_promo_sales(promo_start: str, promo_end: str):
    """
    Compute theoretical on-hand inventory after promo sales for each store.
    Inputs: promo_start (YYYY-MM-DD), promo_end (YYYY-MM-DD)
    """
    import pandas as pd
    # Load data
    sales = pd.read_csv("/content/sample_data/sales_transactions.csv", parse_dates=["transaction_date"])
    inv = pd.read_csv("/content/sample_data/inventory_transactions.csv", parse_dates=["transaction_date"])

    promo_start = pd.to_datetime(promo_start)
    promo_end = pd.to_datetime(promo_end)

    # === 1. Compute as_of date (day before promo_start) ===
    as_of = promo_start - pd.Timedelta(days=1)

    # === 2. START INVENTORY (sum of RECEIPT qty before promo) ===
    inv_receipts_before = inv[
        (inv["transaction_date"] <= as_of) &
        (inv["transaction_type"] == "RECEIPT")
    ]

    start_inv = (
        inv_receipts_before.groupby("destination_location", as_index=False)["quantity"]
        .sum()
        .rename(columns={"destination_location": "store_id", "quantity": "start_receipt_qty"})
    )


    # === 3. NET INVENTORY CHANGES AFTER as_of DATE ===
    inv_changes = inv[inv["transaction_date"] > as_of].copy()

    # Convert NONE → NaN
    inv_changes["destination_location"] = inv_changes["destination_location"].replace("NONE", pd.NA)

    # Map to store
    inv_changes["target_store"] = inv_changes["destination_location"].fillna(inv_changes["store_id"])

    inv_net_after = (
        inv_changes.groupby("target_store", as_index=False)["quantity"]
        .sum()
        .rename(columns={"target_store": "store_id", "quantity": "net_qty_after"})
    )

    # === 4. PROMO REPLENISHMENT ===
    # (Receipts on the promo_start date)
    promo_repl = inv[
        (inv["transaction_date"] == promo_start) &
        (inv["transaction_type"] == "RECEIPT")
    ]

    promo_repl_by_store = (
        promo_repl.groupby("destination_location", as_index=False)["quantity"]
        .sum()
        .rename(columns={"destination_location": "store_id", "quantity": "promo_repl_qty"})
    )

    # === 5. PROMO SALES ===
    promo_sales = sales[
        (sales["transaction_date"] >= promo_start) &
        (sales["transaction_date"] <= promo_end)
    ]

    promo_by_store = (
        promo_sales.groupby(["store_id","store_name"], as_index=False)["quantity_sold"]
        .sum()
        .rename(columns={"quantity_sold": "promo_qty_sold"})
    )

    # === 6. MERGE EVERYTHING ===
    stores = sales[["store_id","store_name"]].drop_duplicates()

    summary = (
        stores
        .merge(drop_store_name(start_inv), on="store_id", how="left")
        .merge(drop_store_name(inv_net_after), on="store_id", how="left")
        .merge(drop_store_name(promo_by_store), on="store_id", how="left")
        .merge(drop_store_name(promo_repl_by_store), on="store_id", how="left")
    ).fillna(0)

    # === 7. COMPUTE THEORETICAL INVENTORY ===
    summary["theoretical_after_changes"] = (
        summary["start_receipt_qty"] + summary["net_qty_after"]
    )

    summary["theoretical_onhand_after_promo_sales"] = (
        summary["theoretical_after_changes"] - summary["promo_qty_sold"]
    )

    return summary.to_dict(orient="records")

@tool
def get_daily_inventory_for_store(store_id: str):
    """
    Return daily inventory on-hand timeline for a given store_id.
    Computes:
      - daily inventory movements (receipts, adjustments, transfers)
      - daily sales
      - net change
      - running inventory

    Args:
      store_id: store code, e.g., "S001"

    Returns:
      List of dicts with:
      transaction_date, quantity, quantity_sold,
      net_change, running_inventory
    """
    import pandas as pd
    # Load datasets
    inv = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                      parse_dates=["transaction_date"])
    sales = pd.read_csv("/content/sample_data/sales_transactions.csv",
                        parse_dates=["transaction_date"])

    # ---------- CLEANING ----------
    # Convert "NONE" → NaN
    inv["destination_location"] = inv["destination_location"].replace("NONE", pd.NA)

    # Always use store_id (never destination_location)
    inv["store"] = inv["store_id"]

    # ---------- INVENTORY MOVEMENTS ----------
    daily_inv_moves = (
        inv.groupby(["transaction_date", "store"], as_index=False)["quantity"]
        .sum()
    )

    # ---------- SALES ----------
    daily_sales = (
        sales.groupby(["transaction_date", "store_id"], as_index=False)["quantity_sold"]
        .sum()
        .rename(columns={"store_id": "store"})
    )

    # ---------- MERGE TIMELINE ----------
    timeline = pd.merge(
        daily_inv_moves, daily_sales, on=["transaction_date", "store"], how="outer"
    ).fillna(0)

    # Compute net change
    timeline["net_change"] = timeline["quantity"] - timeline["quantity_sold"]

    # Sort
    timeline = timeline.sort_values(["store", "transaction_date"])

    # Running inventory
    timeline["running_inventory"] = (
        timeline.groupby("store")["net_change"].cumsum()
    )

    # ---------- FILTER FOR SPECIFIC STORE ----------
    result = timeline[timeline["store"] == store_id]

    return result.to_dict(orient="records")

@tool
def get_adjustments():
    """Return all shrinkage/adjustment rows."""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])
    adjustments = df[df["transaction_type"] == "ADJUSTMENT"]
    return adjustments.to_dict(orient="records")

@tool
def get_shrinkage_before_promo(promo_start: str):
    """
    promo_start: promo start date as 'YYYY-MM-DD'
    """
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])
    promo_start = pd.to_datetime(promo_start)

    result = df[
        (df["transaction_type"] == "ADJUSTMENT") &
        (df["transaction_date"] < promo_start)
    ]
    return result.to_dict(orient="records")

@tool
def get_shrinkage_during_promo(promo_start: str, promo_end: str):
    """get shrinkage during promo"""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])

    promo_start = pd.to_datetime(promo_start)
    promo_end   = pd.to_datetime(promo_end)

    result = df[
        (df["transaction_type"] == "ADJUSTMENT") &
        (df["transaction_date"] >= promo_start) &
        (df["transaction_date"] <= promo_end)
    ]
    return result.to_dict(orient="records")

@tool
def get_delayed_replenishments():
    """Return all inventory rows with DELAYED note."""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])

    delayed = df[df["notes"].str.contains("DELAYED", na=False)]
    return delayed.to_dict(orient="records")

@tool
def get_promo_replenishment_for_date(date: str):
    """
    date: day replenishment was expected (promo_start or a different date)
    """
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])
    date = pd.to_datetime(date)

    promo_repl = df[
        (df["transaction_date"] == date) &
        (df["transaction_type"] == "RECEIPT")
    ]
    return promo_repl.to_dict(orient="records")

@tool
def get_all_transfers():
    """Return all transfer rows."""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])

    transfers = df[df["transaction_type"] == "TRANSFER"]
    return transfers.to_dict(orient="records")

@tool
def get_transfers_for_date(date: str):
    """get transfers for a given date"""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])

    date = pd.to_datetime(date)
    result = df[
        (df["transaction_type"] == "TRANSFER") &
        (df["transaction_date"] == date)
    ]
    return result.to_dict(orient="records")

@tool
def get_emergency_receipts():
    """get emergency receipts"""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv",
                     parse_dates=["transaction_date"])

    emergency = df[df["notes"].str.contains("Emergency", na=False)]
    return emergency.to_dict(orient="records")

@tool
def get_inventory_data():
    """Return inventory movements as list of dicts."""
    import pandas as pd
    df = pd.read_csv("/content/sample_data/inventory_transactions.csv", parse_dates=["transaction_date"])
    return df.to_dict(orient="records")

In [None]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.messages import AIMessage, ToolMessage
from typing import Dict, Any, List
import json

inventory_tools = [
    get_promo_period,
    get_unique_stores,
    theoretical_onhand_after_promo_sales,
    get_daily_inventory_for_store,
    get_adjustments,
    get_shrinkage_before_promo,
    get_shrinkage_during_promo,
    get_delayed_replenishments,
    get_promo_replenishment_for_date,
    get_all_transfers,
    get_transfers_for_date,
    get_emergency_receipts,
    get_inventory_data,
    create_manage_memory_tool(namespace=("inventory", "{user_id}")),
    create_search_memory_tool(namespace=("inventory", "{user_id}"))
]

inventory_react_agent = create_agent(
    model=get_llm_model(),
    tools=inventory_tools,
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)


@tool
def inventory_analysis_agent_tool(
    task: str,
    hypotheses: List[str],
    user_id: str,
    query_id: str,
    memory_context: str
) -> Dict[str, Any]:
    """
    Purpose:
        Analyze inventory movements, replenishments, transfers, and
        adjustments to validate inventory-related RCA hypotheses.

    When to use:
        Use this tool when stock availability, shrinkage, replenishment
        timing, transfers, or warehouse operations may contribute to
        the observed problem.

    Inputs:
        - task (str): Resolved RCA task or problem statement
        - hypotheses (List[str]): Candidate hypotheses to validate
        - user_id (str): User/session identifier for scoped memory access
        - query_id (str): Query/thread identifier
        - memory_context (str): episodic + conversation memory

    Output:
        - dict:
            - "inventory_insights": Structured inventory analysis
            - "trace": Tool-call trace for observability
    """

    # Step 1 — Filter inventory-related hypotheses
    inventory_related_hypotheses = [
        h for h in hypotheses
        if any(
            k in h.lower()
            for k in [
                "inventory",
                "stock",
                "supply",
                "replenish",
                "transfer",
                "shrink",
                "adjust",
                "warehouse"
            ]
        )
    ]

    # Fallback: analyze all hypotheses if none are inventory-specific
    if not inventory_related_hypotheses:
        inventory_related_hypotheses = hypotheses

    # Step 2 — Build LLM messages
    messages = [
        {
            "role": "system",
            "content": f"""
You are the Inventory RCA Agent.

Context (do not repeat, only use for reasoning):
{memory_context}

Your responsibilities:
- Analyze inventory levels, movements, transfers, adjustments, and replenishments
- Use available tools via a ReAct loop
- Produce structured insights

STRICT OUTPUT RULES:
1. Output ONLY valid JSON
2. Root JSON object MUST contain EXACTLY ONE key: "inventory_insights"
3. NO extra keys, markdown, or commentary

JSON schema:
{{
  "inventory_insights": {{...}}
}}
"""
        },
        {
            "role": "user",
            "content": f"""
Task: {task}
Hypotheses to validate: {inventory_related_hypotheses}
"""
        }
    ]

    # Step 3 — Build config
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    # Step 4 — Invoke ReAct agent
    result = inventory_react_agent.invoke({"messages": messages}, config)

    final_msg = result["messages"][-1].content
    output = process_response(final_msg)

    inventory_insights = output.get("inventory_insights")

    # Step 5 — Extract internal tool-call trace
    # Skip system(0), user(1), final(-1)
    internal_msgs = result["messages"][2:-1]

    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    trace_entry = {
        "agent": "InventoryAnalysisAgent",
        "step": "Validated inventory hypotheses",
        "calls": serialize_messages(tool_call_msgs),
        "inventory_insights": inventory_insights
    }

    #print(json.dumps(trace_entry, indent=3))

    # Step 6 — Return result (no state mutation)
    return {
        "inventory_insights": inventory_insights,
        "trace": [trace_entry]
    }


In [None]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.messages import AIMessage, ToolMessage
from typing import Dict, Any, List
import json

validation_react_agent = create_agent(
    model=get_llm_model(),
    tools=[
        create_manage_memory_tool(namespace=("hypothesis_validation", "{user_id}")),
        create_search_memory_tool(namespace=("hypothesis_validation", "{user_id}"))
    ],
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)


@tool
def hypothesis_validation_agent_tool(
    hypotheses: List[str],
    sales_insights: Dict[str, Any],
    inventory_insights: Dict[str, Any],
    user_id: str,
    query_id: str
) -> Dict[str, Any]:
    """
    Purpose:
        Validate each hypothesis by cross-referencing sales and inventory
        insights gathered during the RCA investigation.

    When to use:
        Use this tool after domain-specific analysis tools (e.g., Sales,
        Inventory) have produced structured insights.

    Inputs:
        - hypotheses (List[str]):
            Hypotheses to be validated.
        - sales_insights (dict):
            Output from the Sales Analysis tool.
        - inventory_insights (dict):
            Output from the Inventory Analysis tool.
        - user_id (str):
            User/session identifier for scoped memory access.
        - query_id (str):
            Query/thread identifier.

    Output:
        - dict:
            - "validated": Mapping of hypothesis → true / false
            - "reasoning": Mapping of hypothesis → explanation
            - "trace": Tool-call trace for observability
    """

    # Step 1 — Build LLM messages
    messages = [
        {
            "role": "system",
            "content": """
Validate each hypothesis using sales and inventory insights.

STRICT OUTPUT RULES:
1. Output ONLY valid JSON
2. No markdown or code fences
3. No extra fields or commentary

JSON schema:
{
  "validated": { "hypothesis": true | false },
  "reasoning": { "hypothesis": "explanation" }
}
"""
        },
        {
            "role": "user",
            "content": f"""
Hypotheses:
{hypotheses}

Sales insights:
{sales_insights}

Inventory insights:
{inventory_insights}
"""
        }
    ]

    # Step 2 — Build config
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    # Step 3 — Invoke ReAct agent
    result = validation_react_agent.invoke({"messages": messages}, config)

    final_msg = result["messages"][-1].content
    resp = process_response(final_msg)

    # Step 4 — Extract internal tool-call trace
    # Skip system(0), user(1), final(-1)
    internal_msgs = result["messages"][2:-1]

    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    trace_entry = {
        "agent": "HypothesisValidationAgent",
        "step": "Validated hypotheses",
        "calls": serialize_messages(tool_call_msgs),
        "details": resp
    }

    #print(json.dumps(trace_entry, indent=3))

    # Step 5 — Return result (no state mutation)
    return {
        "validated": resp.get("validated"),
        "reasoning": resp.get("reasoning"),
        "trace": [trace_entry]
    }


In [None]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.messages import AIMessage, ToolMessage
from typing import Dict, Any, List
import json


root_cause_react_agent = create_agent(
    model=get_llm_model(),
    tools=[
    ],
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)


@tool
def root_cause_analysis_agent_tool(
    validated_hypotheses: Dict[str, bool],
    sales_insights: Dict[str, Any],
    inventory_insights: Dict[str, Any],
    trace: List[Dict[str, Any]],
    user_id: str,
    query_id: str
) -> Dict[str, Any]:
    """
    Purpose:
        Produce the final Root Cause Analysis by synthesizing validated
        hypotheses, sales insights, inventory insights, and prior analysis
        trace into a structured RCA outcome.

    When to use:
        Use this tool after hypothesis validation has been completed.

    Inputs:
        - validated_hypotheses (dict): Hypothesis → true/false mapping
        - sales_insights (dict): Sales analysis output
        - inventory_insights (dict): Inventory analysis output
        - trace (list): Prior agent trace entries
        - user_id (str): User/session identifier for scoped memory access
        - query_id (str): Query/thread identifier

    Output:
        - dict:
            - "root_cause": Final structured RCA
            - "reasoning": Explanation of RCA decisions
            - "trace": Tool-call trace for observability
    """

    # ---------- Step 1: Generate structured Root Cause JSON ----------

    messages = [
        {
            "role": "system",
            "content": """
Produce a final Root Cause Analysis.

Include:
- primary root causes
- supporting evidence
- contributing factors
- timeline
- recommendations

STRICT OUTPUT RULES:
1. Output ONLY valid JSON
2. No markdown or code fences
3. No extra commentary
4. JSON MUST contain EXACTLY two top-level keys:
   - "root_cause"
   - "reasoning"

JSON schema:
{
  "root_cause": {
    "primary_root_causes": ["string"],
    "supporting_evidence": {
      "sales": {},
      "inventory": {},
      "cross_analysis": {}
    },
    "contributing_factors": ["string"],
    "timeline": [
      { "date": "YYYY-MM-DD", "event": "string" }
    ],
    "recommendations": ["string"]
  },
  "reasoning": {
    "primary_root_causes": "explanation",
    "contributing_factors": "explanation",
    "supporting_evidence": "explanation",
    "timeline": "explanation",
    "recommendations": "explanation"
  }
}
"""
        },
        {
            "role": "user",
            "content": f"""
Validated hypotheses:
{validated_hypotheses}

Sales insights:
{sales_insights}

Inventory insights:
{inventory_insights}

Prior trace:
{trace}
"""
        }
    ]

    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    result = root_cause_react_agent.invoke({"messages": messages}, config)

    final_msg = result["messages"][-1].content
    resp = process_response(final_msg)

    root_cause = resp.get("root_cause")
    reasoning = resp.get("reasoning")

    # ---------- Step 2: Extract tool-call trace ----------

    internal_msgs = result["messages"][2:-1]

    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    structured_trace_entry = {
        "agent": "RootCauseAnalysisAgent",
        "step": "Generated structured root cause",
        "calls": serialize_messages(tool_call_msgs),
        "root_cause": root_cause
    }

    #print(json.dumps(structured_trace_entry, indent=3))


    # ---------- Step 4: Return final output ----------

    return {
        "root_cause": root_cause,
        "reasoning": reasoning,
        "trace": [
            structured_trace_entry
        ]
    }


In [None]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_core.messages import AIMessage, ToolMessage
from typing import Dict, Any, List
import json


rca_report_agent = create_agent(
    model=get_llm_model(),
    tools=[
    ],
    middleware=[handle_tool_errors],
    store=store,
    checkpointer=checkpointer
)


@tool
def rca_report_agent_tool(
    root_cause: str,
    reasoning: str,
    user_id: str,
    query_id: str
) -> Dict[str, Any]:
    """
    Purpose:
        Produce the final human-readable report.

    When to use:
        Use this tool as the final step of an RCA workflow to generate a
        human-readable report.

    Inputs:
        - root_cause (str): root cause
        - reasoning (str): reasoning
        - user_id (str): User/session identifier for scoped memory access
        - query_id (str): Query/thread identifier

    Output:
        - dict:
            - "report_text": Human-readable RCA report
            - "trace": Tool-call trace for observability
    """

    # ---------- Step 1: Generate human-readable RCA report ----------

    report_messages = [
        {
            "role": "system",
            "content": """
You are an expert supply chain and demand planning analyst.

Create a professional Root Cause Analysis Report.

Audience:
- Demand Planning
- Inventory Management
- Supply Chain Teams

Requirements:
- Clear structured sections
- Bullet points where appropriate
- No JSON, no code
- Pure narrative report

The report MUST include:
- Executive Summary
- Primary Root Cause(s)
- Supporting Evidence
- Contributing Factors
- Key Data Points
- Timeline of Events
- Recommendations
- Final Conclusion

Tone:
Analytical, data-driven, formal, concise.
"""
        },
        {
            "role": "user",
            "content": f"""
Use the following structured RCA output:

{json.dumps(root_cause, indent=2)}
{json.dumps(reasoning, indent=2)}
"""
        }
    ]

    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    report_text = rca_report_agent.invoke(report_messages, config).content

    report_trace_entry = {
        "agent": "RootCauseAnalysisAgent",
        "step": "Generated RCA report",
        "report_text": report_text
    }

    # ---------- Step 4: Return final output ----------

    return {
        "report_text": report_text,
        "trace": [
            report_trace_entry
        ]
    }


In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser

reflection_prompt_template = """
You are analyzing conversations from a supply-chain Root Cause Analysis (RCA) assistant to create episodic memories that will improve future RCA interactions.

Your task is to extract the most useful, reusable insights from the conversation that would help when handling similar RCA scenarios in the future.

Review the conversation and create a memory reflection following these rules:

1. For any field where information is missing or not applicable, use "N/A"
2. Be extremely concise — each string must be one clear, actionable sentence
3. Focus only on information that improves future RCA effectiveness
4. Context_tags must be specific enough to match similar RCA situations but general enough to be reusable

Output valid JSON in exactly this format:
{{
    "context_tags": [               // 2–4 keywords identifying similar RCA scenarios
        string,                     // Use domain-specific terms like "sales_decline", "inventory_stockout", "logistics_delay", "forecast_bias"
        ...
    ],
    "conversation_summary": string, // One sentence describing what RCA problem was addressed and resolved
    "what_worked": string,          // Most effective RCA technique or reasoning strategy used
    "what_to_avoid": string         // Key RCA pitfall or ineffective approach to avoid in future
}}

Good context_tags examples:
- ["sales_decline", "regional_performance", "pricing_impact"]
- ["inventory_stockout", "demand_supply_mismatch", "replenishment_delay"]
- ["logistics_delay", "carrier_performance", "lead_time_variance"]
- ["forecast_bias", "seasonality_error", "demand_planning"]

Bad context_tags examples:
- ["supply_chain", "rca", "analysis"]
- ["problem", "issue", "data"]

Good conversation_summary examples:
- "Identified delayed replenishment and regional logistics constraints as root causes of declining sales in the South region"
- "Analyzed forecast bias caused by seasonality misalignment leading to excess inventory buildup"

Bad conversation_summary examples:
- "Discussed a supply chain problem"
- "Did root cause analysis"

Good what_worked examples:
- "Sequentially ruling out demand and inventory issues before focusing on logistics lead-time variance"
- "Using historical sales and inventory trends to validate and eliminate hypotheses systematically"

Bad what_worked examples:
- "Analyzed the data"
- "Used RCA techniques"

Good what_to_avoid examples:
- "Jumping to conclusions without validating hypotheses against sales and inventory data"
- "Focusing on a single function without checking cross-functional dependencies"

Do not include any text outside the JSON object in your response.

Here is the prior conversation:

{conversation}
"""
reflection_prompt = ChatPromptTemplate.from_template(reflection_prompt_template)

reflect = reflection_prompt | get_llm_model() | JsonOutputParser()

In [None]:
def format_conversation(history):

    # Create an empty list placeholder
    conversation = []

    for message in history:
      role = ""
      content = ""
      if isinstance(message, BaseMessage) or isinstance(message, HumanMessage) or isinstance(message, AIMessage):
            role = message.type.upper()
            content = message.content
      conversation.append(f"{role}: {content}")

    # Join with newlines
    return "\n".join(conversation)

In [None]:
import uuid

def add_episodic_memory(rca_state, config):
    """
    Creates and stores episodic memory for an RCA conversation.
    """

    # Guard: history must exist and be meaningful
    history = rca_state.get("history")
    if not history:
        return

    # Convert messages → readable conversation text
    conversation = format_conversation(history)

    # Generate reflection
    reflection = reflect.invoke({
        "conversation": conversation
    })

    reflection["conversation"] = conversation

    print("episodic reflection ===", reflection)

    # Store episodic memory
    store.put(
        namespace=("episodic", config["configurable"]["user_id"]),
        key=f"episodic_rca_{uuid.uuid4().hex}",
        value=reflection
    )


**Important**

Episodic memory alone does not scale agents. It is only good at remembering what happened. Not yet good at remembering how to act better next time. Procedural + Semantic abstraction is what makes agents improve over time.

**What procedural memory is**

Stable strategies

Decision heuristics

Tool-usage patterns

Ordering of steps that worked well

**Example procedural memories:**

“Always validate demand-side hypotheses before inventory”

“Promo-related RCAs should check replenishment timing first”

“If sales spike + stockout → check emergency receipts”

In [None]:
procedural_reflection_template = """
You are extracting PROCEDURAL MEMORY for an RCA agent.

Focus ONLY on reusable process knowledge.

Extract:
1. When to use which agent
2. Ordering of analysis steps
3. Tool usage heuristics
4. Decision rules

Output JSON:
{{
  "procedure_name": "string",
  "applicable_when": "string",
  "steps": ["step1", "step2", "..."],
  "tool_heuristics": ["rule1", "rule2"]
}}
Conversation:
{conversation}
"""
procedural_reflection_prompt = ChatPromptTemplate.from_template(procedural_reflection_template)

procedural_reflection = procedural_reflection_prompt | get_llm_model() | JsonOutputParser()

In [None]:
import uuid

def add_procedural_memory(rca_state, config):
    """
    Creates and stores procedural memory for an RCA conversation.
    """

    # Guard: history must exist and be meaningful
    history = rca_state.get("history")
    if not history:
        return

    # Convert messages → readable conversation text
    conversation = format_conversation(history)

    # Generate reflection
    reflection = procedural_reflection.invoke({
        "conversation": conversation
    })

    print("procedural_reflection ===", reflection)

    # Store procedural memory
    store.put(
        namespace=("procedural", config["configurable"]["user_id"]),
        key=f"procedural_rca_{uuid.uuid4().hex}",
        value=reflection
    )


**Semantic Abstraction**
multiple episodic memories → generalized knowledge.
This runs periodically, not per request

In [None]:
semantic_abstraction_prompt = """
You are building SEMANTIC MEMORY for an RCA agent.

Given multiple episodic RCA reflections, extract generalized,
reusable knowledge that holds across cases.

Rules:
- Do NOT mention specific dates, stores, or conversations
- Focus on patterns, causal relationships, and general truths
- One semantic fact should apply to many future RCA cases

Output ONLY valid JSON in this format:
{{
  "semantic_fact": "string",
  "applicable_context": ["keyword1", "keyword2"],
  "confidence": "low | medium | high"
}}

Episodic memories:
{episodes}
"""
semantic_reflection_prompt = ChatPromptTemplate.from_template(
    semantic_abstraction_prompt
)

semantic_reflection_chain = (
    semantic_reflection_prompt
    | get_llm_model()
    | JsonOutputParser()
)

In [None]:
def build_semantic_memory(user_id: str, query: str, store, llm, min_episodes: int = 3):
    """
    Distill episodic RCA memories into semantic memory.
    """

    # Pull recent episodic memories
    namespace=("episodic", user_id)
    episodic = store.search(
        namespace,
        query=query,
        limit=10
    )

    if len(episodic) < min_episodes:
        return None  # not enough signal yet

    # Prepare episodes text
    episodes_text = []
    for e in episodic:
        v = e.value
        episodes_text.append(
            f"- Summary: {v.get('conversation_summary')}\n"
            f"  Worked: {v.get('what_worked')}\n"
            f"  Avoid: {v.get('what_to_avoid')}"
        )

    # ---------------------------
    # 3. Invoke semantic chain
    # ---------------------------
    semantic = semantic_reflection_chain.invoke({
        "episodes": "\n".join(episodes_text)
    })

    print("semantic_reflection ===", semantic)

    if not semantic or not isinstance(semantic, dict):
        return None

    # What “usefulness” means here A memory is useful if:It was retrieved And the RCA completed successfully
    semantic["usefulness"] = 0
    semantic["last_used_at"] = time.time()

    # Store semantic memory
    store.put(
        namespace=("semantic", user_id),
        key=f"semantic_{uuid.uuid4().hex}",
        value=semantic
    )

    return semantic


In [None]:
def mark_memory_useful(memories):
    for m in memories:
        m.value["usefulness"] = m.value.get("usefulness", 0) + 1

In [None]:
'''
TodoListMiddleware()
  Equip agents with task planning and tracking capabilities for complex multi-step tasks. To-do lists are useful for the following:
    Complex multi-step tasks requiring coordination across multiple tools.
    Long-running operations where progress visibility is important.
  This middleware automatically provides agents with a write_todos tool and system prompts to guide effective task planning.

Context management: FilesystemMiddleware()
    File system tools (ls, read_file, write_file, edit_file) allow agents to offload large context to memory,
    preventing context window overflow and enabling work with variable-length tool results.
    By default, these tools write to a local “filesystem” in graph state.
'''

'\nTodoListMiddleware()\n  Equip agents with task planning and tracking capabilities for complex multi-step tasks. To-do lists are useful for the following:\n    Complex multi-step tasks requiring coordination across multiple tools.\n    Long-running operations where progress visibility is important.\n  This middleware automatically provides agents with a write_todos tool and system prompts to guide effective task planning.\n\nContext management: FilesystemMiddleware()\n    File system tools (ls, read_file, write_file, edit_file) allow agents to offload large context to memory,\n    preventing context window overflow and enabling work with variable-length tool results.\n    By default, these tools write to a local “filesystem” in graph state.\n'

In [None]:
from langmem import create_manage_memory_tool, create_search_memory_tool
import json
from langchain.agents import create_agent



router_agent = create_agent(
    model=get_llm_model(),
    tools=[
        create_search_memory_tool(
            namespace=("orchestration", "{user_id}")
        ),
        create_manage_memory_tool(
            namespace=("orchestration", "{user_id}")
        ),
        hypothesis_agent_tool,
        sales_analysis_agent_tool,
        inventory_analysis_agent_tool,
        hypothesis_validation_agent_tool,
        root_cause_analysis_agent_tool,
        rca_report_agent_tool
    ],

    middleware=[handle_tool_errors, TodoListMiddleware()],
    store=store,
    checkpointer=checkpointer
)

def orchestration_agent(rca_state, config):

    #  Ensure history exists
    if "history" not in rca_state or rca_state["history"] is None:
        print("***Empty History***")
        rca_state["history"] = []

    #  Build memory-augmented context
    #  Inject sementic abstract + procedural + episodic + conversation context
    memory_context = build_memory_augmented_prompt(
        query=rca_state["task"],
        state=rca_state,
        config=config
    )

    messages = [
                {
            "role": "system",
            "content": f"""
        You are a Deep Research Agent.

        Task: {rca_state["task"]}

        User Id: {config["configurable"]["user_id"]}

        Query Id: {config["configurable"]["thread_id"]}

        Use the following sementic abstract + procedural + episodic + conversation context:
        Memory Context(memory_context):'''{memory_context}'''

        Your role is to analyze the user's input, determine the appropriate
        research or response strategy, and use the available tools to resolve
        the request.

        The set of tools available to you may change dynamically.
        You must infer what each tool does from its description.

        ------------------------------------------------------------
        CORE RESPONSIBILITIES:

        1. Understand User Intent
          - The user input may be:
            • a greeting or help request (e.g., "hi", "hello", "help")
            • a general question
            • a root cause analysis or supply chain investigation
          - Do not assume the input is analytical.

        2. Decide the Level of Depth Required
          - If the input can be addressed with a simple explanation or response,
            prefer a lightweight approach.
          - If the input requires investigation, reasoning, or analysis,
            proceed with deep research behavior.

        3. Create an Internal Plan
          - Before calling any tool, determine:
            • what information is missing
            • what needs to be discovered or generated
            • whether memory or prior context is relevant
          - The plan does not need to be shown unless required by a tool.

        4. Execute Using Tools
          - Use the **todo's** tools to carry out the plan.
          - Choose tools based on their descriptions, not their names.
          - You may call multiple tools if necessary.
          - Always prefer the minimal set of tool calls needed.

        5. RCA-Specific Behavior (when applicable)
          - When the task involves diagnosing causes of a problem:
            • avoid jumping to conclusions
            • favor hypothesis generation before validation
            • rely on state, memory, and evidence

        ------------------------------------------------------------
        IMPORTANT RULES:

        - Do not hard-code assumptions about tool availability.
        - Do not invent tools or capabilities.
        - Do not answer complex questions directly in free text
          if an appropriate tool exists.
        - Be robust to vague, short, or conversational user inputs.
        - Think first, then act through tools.

        You are expected to behave as a flexible, adaptive
        deep-research agent, not a fixed pipeline.
        """
        },
        {"role": "user", "content": rca_state["task"] }
    ]

    #print("messages==", messages)
    config = {
        **config,
        "configurable": {
            **config.get("configurable", {}),
            "rca_state": rca_state
        }
    }

    result = router_agent.invoke({"messages": messages}, config)

    final_msg = result["messages"][-1].content
    # final message from the agent
    #output = process_response(final_msg)
    output = final_msg

    # Extract internal tool-call reasoning messages
    # Skip system(0), user(1), final(-1)
    internal_msgs = result["messages"][2:-1]
    tool_call_msgs = [
        m for m in internal_msgs
        if (
            (isinstance(m, AIMessage) and getattr(m, "tool_calls", None))
            or isinstance(m, ToolMessage)
        )
    ]

    trace_entry = {
        "agent": "Orchestration Agent",
        "tool_calls": serialize_messages(tool_call_msgs)
    }

    #print(json.dumps(trace_entry, indent=3))

    rca_state["output"] = output
    rca_state["trace"] = trace_entry

    append_rca_history(rca_state)

    return rca_state

In [None]:
#Minimal inspector function
def inspect_memory(store, user_id):

    print("\n--------------------------------------------------------------------------")
    print("memory inspector")
    report = {}

    for layer in ["episodic", "procedural", "semantic"]:
        namespace=(layer, user_id)
        memories = store.search(
            namespace,
            limit=10
        )

        report[layer] = [
            {
                "key": m.key,
                "confidence": m.value.get("confidence"),
                "usefulness": m.value.get("usefulness", 0),
                "summary": (
                    m.value.get("conversation_summary")
                    or m.value.get("semantic_fact")
                    or m.value.get("procedure_name")
                )
            }
            for m in memories
        ]
    print("--------------------------------------------------------------------------")
    return report


In [None]:
from langgraph.graph import StateGraph
import json
from pathlib import Path


graph = StateGraph(RCAState)

graph.add_node("orchestration_agent", orchestration_agent)
graph.set_entry_point("orchestration_agent")

app = graph.compile(checkpointer=checkpointer, store=store)

print("\n RCA Chatbot (type 'exit' to quit)\n")
DEFAULT_USER_ID = "2"
DEFAULT_QUERY_ID = "2"
# ----- chat loop -----
while True:
    print("\n" + "=" * 70)

    #user_id = input(f"User ID     [{DEFAULT_USER_ID}] : ").strip()
    #query_id = input(f"Query ID   [{DEFAULT_QUERY_ID}] : ").strip()

    user_id = DEFAULT_USER_ID
    query_id = DEFAULT_QUERY_ID

    print("-" * 70)
    user_input = input("You      : ").strip()

    if user_input.lower() in {"exit", "quit"}:

        add_episodic_memory(rca_state, config)
        print("== Conversation Stored in Episodic Memory ==")

        build_semantic_memory(
            user_id=config["configurable"]["user_id"],
            query=user_input,
            store=store,
            llm=get_llm_model()
        )
        print("== Conversation Stored in Semantic Memory ==")

        add_procedural_memory(rca_state, config)
        print("== Conversation Stored in procedural Memory ==")


        #Increment usefulness after a successful RCA
        used_semantic = semantic_recall(
            rca_state["task"], store, config
        )
        mark_memory_useful(used_semantic)

        print(json.dumps(
            inspect_memory(store, user_id),
            indent=2
        ))
        print("\n Exiting RCA chatbot.")
        break

    # build config dynamically
    config = {
        "configurable": {
            "user_id": user_id,
            "thread_id": query_id
        }
    }

    # update task
    rca_state: RCAState = {
        "task": user_input,
        "output": "",
        "trace": []
    }

    print("\n" + "-" * 70)
    print(" RCA Bot is thinking...")
    print("-" * 70)

    # invoke graph
    rca_state = app.invoke(rca_state, config)

    print("\n RCA Bot Answer")
    print("-" * 70)
    print(rca_state.get("output", "No response generated"))
    print(rca_state.get("trace", "No trace generated"))
    print("=" * 70)



 RCA Chatbot (type 'exit' to quit)


----------------------------------------------------------------------
You      : Investigate why stores are expeciecning stockouts

----------------------------------------------------------------------
 RCA Bot is thinking...
----------------------------------------------------------------------

 RCA Bot Answer
----------------------------------------------------------------------
Here is the Root Cause Analysis report for the stockouts experienced by stores:

Executive Summary:
Stores experienced stockouts primarily due to unexpectedly high demand driven by a promotional event, combined with delays in inventory replenishment and disruptions in supply chain operations. Uneven stock allocation across stores and additional stock shrinkage further exacerbated shortages.

Primary Root Causes:
- Unexpectedly high demand exceeding forecasted sales caused by promotional activities.
- Delays and disruptions in inventory replenishment and shipments leadi

In [None]:
TOOL_TO_AGENT = {
    "hypothesis_agent_tool": "HypothesisAgent",
    "sales_analysis_agent_tool": "SalesAnalysisAgent",
    "inventory_analysis_agent_tool": "InventoryAnalysisAgent",
    "hypothesis_validation_agent_tool": "HypothesisValidationAgent",
    "root_cause_analysis_agent_tool": "RootCauseAgent",
    "write_todos": "OrchestrationAgent"
}

def flatten_trace(result: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Flattens orchestrator-driven traces into logical agent steps.
    """
    flat = []

    for msg in result.get("trace", []):
        # AIMessage with tool calls
        if msg.get("tool_calls"):
            for call in msg["tool_calls"]:
                agent = TOOL_TO_AGENT.get(call["name"], call["name"])
                flat.append({
                    "agent": agent,
                    "tool": call["name"],
                    "args": call.get("args", {}),
                    "call_id": call.get("id")
                })

        # ToolMessage with content
        if msg.get("type") == "ToolMessage":
            flat.append({
                "agent": "ToolResult",
                "content": msg.get("content"),
                "tool_call_id": msg.get("tool_call_id")
            })

    return flat


In [None]:
def extract_root_cause(result: dict) -> dict:
    """
    Extract root cause output from root_cause_analysis_agent_tool
    """
    for step in result.get("trace", []):
        for call in step.get("tool_calls", []):
            if call.get("name") == "root_cause_analysis_agent_tool":
                return call.get("output", {})
    return {}


In [None]:
"""
RCA Agent Evaluation Framework
=============================

This file contains:
1. Gold RCA dataset design
2. End-to-end evaluation runner
3. Memory-on vs memory-off comparison
4. Learning-over-time metrics

Designed specifically for your Agentic RCA system.
"""

# --------------------------------------------------
# 1. GOLD RCA DATASET
# --------------------------------------------------

from typing import List, Dict, Any
from dataclasses import dataclass
import math

@dataclass
class GoldRCACase:
    case_id: str
    task: str
    expected_root_causes: List[str]
    gold_hypotheses: List[str]
    must_use_agents: List[str]
    forbidden_root_causes: List[str]

GOLD_RCA_DATASET: List[GoldRCACase] = [
    GoldRCACase(
        case_id="PROMO_STOCKOUT_01",
        task="Why did Store S003 face stockouts during the Diwali promotion?",
        expected_root_causes=[
            "Delayed replenishment",
            "Promo uplift underestimated"
        ],
        gold_hypotheses=[
            "Demand spike due to promotion",
            "Delayed replenishment",
            "Inventory transfer delay",
            "Forecast underestimation"
        ],
        must_use_agents=[
            "HypothesisAgent",
            "SalesAnalysisAgent",
            "InventoryAnalysisAgent",
            "HypothesisValidationAgent"
        ],
        forbidden_root_causes=[
            "System outage",
            "Pricing error"
        ]
    ),

    GoldRCACase(
        case_id="SALES_DROP_02",
        task="Why did sales drop in the North region despite stable inventory?",
        expected_root_causes=[
            "Pricing mismatch",
            "Local competition impact"
        ],
        gold_hypotheses=[
            "Price increase",
            "Competitive promotion",
            "Demand elasticity change",
            "Assortment mismatch"
        ],
        must_use_agents=[
            "HypothesisAgent",
            "SalesAnalysisAgent",
            "HypothesisValidationAgent"
        ],
        forbidden_root_causes=[
            "Inventory stockout",
            "Warehouse delay"
        ]
    )
]

# --------------------------------------------------
# 2. UTILITY FUNCTIONS
# --------------------------------------------------

def normalize(text: str) -> str:
    return text.lower().strip()


def semantic_match(a: str, b: str) -> bool:
    """
    Lightweight semantic match.
    You can replace this with embeddings later.
    """
    a, b = normalize(a), normalize(b)
    return a in b or b in a


def count_semantic_matches(predicted: List[str], gold: List[str]) -> int:
    count = 0
    for g in gold:
        if any(semantic_match(p, g) for p in predicted):
            count += 1
    return count

# --------------------------------------------------
# 3. CORE EVAL METRICS
# --------------------------------------------------

@dataclass
class EvalScores:
    precision: float
    recall: float
    hypothesis_coverage: float
    evidence_score: float
    process_compliance: bool
    forbidden_penalty: bool


# --------------------------------------------------
# 4. TRACE-BASED CHECKS
# --------------------------------------------------

def check_process_order(trace: List[Dict[str, Any]], required_agents: List[str]) -> bool:
    executed = {t["agent"] for t in trace}
    return all(agent in executed for agent in required_agents)


def evidence_backed(validated: Dict[str, bool], trace: List[Dict[str, Any]]) -> float:
    if not validated:
        return 0.0

    evidence_agents = {
        "SalesAnalysisAgent",
        "InventoryAnalysisAgent"
    }

    used_agents = {t["agent"] for t in trace}
    has_evidence = evidence_agents.intersection(used_agents)

    supported = sum(1 for v in validated.values() if v and has_evidence)

    return supported / max(len(validated), 1)


# --------------------------------------------------
# 5. SINGLE RCA CASE EVAL
# --------------------------------------------------

def evaluate_single_case(
    gold: GoldRCACase,
    rca_output: Dict[str, Any]
) -> EvalScores:

    trace = flatten_trace(rca_output)

    root_causes = rca_output["root_cause"]["primary_root_causes"]
    hypotheses = rca_output.get("hypotheses", [])
    validated = rca_output.get("validated", {})

    matched = count_semantic_matches(root_causes, gold.expected_root_causes)
    precision = matched / max(len(root_causes), 1)
    recall = matched / max(len(gold.expected_root_causes), 1)

    coverage = count_semantic_matches(hypotheses, gold.gold_hypotheses)
    hypothesis_coverage = coverage / max(len(gold.gold_hypotheses), 1)

    evidence = evidence_backed(validated, trace)

    process_ok = check_process_order(trace, gold.must_use_agents)

    forbidden_penalty = any(
        semantic_match(rc, f)
        for rc in root_causes
        for f in gold.forbidden_root_causes
    )

    return EvalScores(
        precision=precision,
        recall=recall,
        hypothesis_coverage=hypothesis_coverage,
        evidence_score=evidence,
        process_compliance=process_ok,
        forbidden_penalty=forbidden_penalty
    )

# --------------------------------------------------
# 6. MEMORY ON vs OFF EVAL
# --------------------------------------------------

def run_memory_ablation(
    case: GoldRCACase,
    run_with_memory,
    run_without_memory
) -> Dict[str, EvalScores]:
    """
    run_with_memory / run_without_memory are callables
    that return RCA output dicts.
    """

    out_mem = run_with_memory(case.task)
    out_nomem = run_without_memory(case.task)

    return {
        "with_memory": evaluate_single_case(case, out_mem),
        "without_memory": evaluate_single_case(case, out_nomem)
    }

# --------------------------------------------------
# 7. LEARNING OVER TIME EVAL
# --------------------------------------------------

def learning_curve(cases: List[GoldRCACase], run_fn) -> List[float]:
    """
    Measures recall improvement over repeated similar RCA runs.
    """
    recalls = []
    for c in cases:
        out = run_fn(c.task)
        score = evaluate_single_case(c, out)
        recalls.append(score.recall)
    return recalls

# --------------------------------------------------
# END
# --------------------------------------------------


In [None]:
def collect_trace(result):
    traces = []
    if "trace" in result and isinstance(result["trace"], dict):
        traces.append(result["trace"])
    return traces


def extract_hypotheses(result: dict) -> list:
    for step in result.get("trace", []):
        if step.get("agent") == "HypothesisAgent":
            return step.get("hypotheses", [])
    return []



def extract_validated(result: dict) -> dict:
    for step in result.get("trace", []):
        if step.get("agent") == "HypothesisValidationAgent":
            return step.get("details", {}).get("validated", {})
    return {}





In [None]:
def normalize_trace(trace):
    """
    Ensures trace is always List[Dict]
    """
    if trace is None:
        return []

    if isinstance(trace, dict):
        return [trace]

    if isinstance(trace, list):
        return [t for t in trace if isinstance(t, dict)]

    return []


In [None]:
def run_rca_with_memory(task: str) -> dict:
    config = {
        "configurable": {
            "user_id": "eval_user",
            "thread_id": "eval_thread",
            "memory_enabled": True
        }
    }

    rca_state = {
        "task": task,
        "trace": []
    }

    result = app.invoke(rca_state, config)

    normalized_trace = normalize_trace(result.get("trace"))

    return {
        "root_cause": extract_root_cause({"trace": normalized_trace}),
        "hypotheses": extract_hypotheses({"trace": normalized_trace}),
        "validated": extract_validated({"trace": normalized_trace}),
        "trace": normalized_trace
    }


In [None]:
def run_rca_without_memory(task: str) -> dict:
    config = {
        "configurable": {
            "user_id": "eval_user_nomem",
            "thread_id": "eval_thread_nomem",
            "memory_enabled": False
        }
    }

    empty_state = {
        "task": task,
        "trace": []
    }

    result = app.invoke(empty_state, config)

    return {
        "root_cause": extract_root_cause(result),
        "hypotheses": extract_hypotheses(result),
        "validated": extract_validated(result),
        "trace": result.get("trace", [])
    }


In [None]:

gold_case = GOLD_RCA_DATASET[0]

output = run_rca_with_memory(gold_case.task)

scores = evaluate_single_case(gold_case, output)

print(scores)


***Empty History***


KeyError: 'name'