# LangGraph Memory Patterns: Notebook Walkthrough

This notebook accompanies the blog post on Memory Patterns in Agentic AI and demonstrates practical implementations of hierarchical memory, consolidation, and semantic retrieval within an automotive service assistant use case.

### 🔍 What You'll Learn

- How to structure **short-term**, **episodic**, and **long-term** memory for an LLM-powered agent.
- Techniques to consolidate episodic events and promote context to long-term stores.
- How to use **semantic retrieval** from vector and graph stores to reduce hallucinations and enhance contextual reasoning.
- Examples using memory orchestration tools compatible with **LangGraph** 

### 🚗 Use Case: Automotive Service Center

To make these patterns concrete, we use an AI agent that assists in diagnosing and resolving car service complaints. The agent leverages different memory tiers to recall past interactions, reason over historical data, and ground its responses in semantic context (e.g., similar issues in other vehicles).

This notebook was tested on SageMaker Studio Jupyter Lab

In [None]:
!pip install boto3 --upgrade

In [None]:
!pip install langgraph langchain langchain_aws

In [None]:
!pip install --force-reinstall --no-deps --ignore-installed pytz==2023.3

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install chromadb
!pip install sentence-transformers

In [1]:
%load_ext autoreload
%autoreload 2

## 🧪 Generating Test Data for Memory Stores

In this section, we simulate realistic memory data for the agent to use across different memory tiers. These datasets serve as the foundation for testing memory orchestration, retrieval patterns, and consolidation logic.

### 🗂️ Memory Store Overview

- **Episodic Memory**  
  Stores past service interactions keyed by session, customer, and vehicle identifiers. These are structured as timestamped events such as repair logs, customer complaints, and agent resolutions.

- **Long-Term Memory**  
  Contains aggregated knowledge distilled from episodic memory. Examples include issue summaries per vehicle or service recommendations based on historical frequency.

- **Semantic Store**  
  Two parallel systems are populated for semantic memory:
  - **Vector Store**: Uses embedding-based similarity to match issue descriptions and resolutions.
  - **Graph Store**: Represents vehicle issues, causes, and resolutions as nodes and relationships for traversal-based reasoning.

### 📋 Synthetic Use Case: Automotive Service Logs

We generate structured data such as:
- Multiple repair sessions for the same vehicle (VIN)
- Repeating issues (e.g., battery drain, brake wear)
- Variations in resolutions and cost estimates
- Semantic embedding-ready entries (e.g., "engine knocking on cold start")
- Graph edges representing causal and similarity links between events

These examples are used to:
- Validate hierarchical memory reads
- Test consolidation (e.g., summarizing similar events)
- Run semantic retrieval on issue queries

In [2]:
from agentic_memory.base import BaseCheckPointer,BaseEpisodicStore, BaseLongTermStore
from agentic_memory.implementation import CheckPointerInMemory, EpisodicStoreFile, LongTermStoreFile
from agentic_memory.orchestrator import MultiTierMemoryOrchestrator
import os, json
from agentic_memory.automotive import RepairCostEstimate, AutomotiveKnowledgeToolkit

In [3]:
from gen_data import generate_long_term_data, generate_episodic_data

generate_episodic_data()
generate_long_term_data()

In [4]:
# Initialize memory stores
short_term = CheckPointerInMemory()
episodic = EpisodicStoreFile()
long_term = LongTermStoreFile()

In [5]:
for result in long_term.search("A/C"):
    print(result)

In [None]:
from agentic_memory.retrievers import SemanticStoreRetrieval, GraphRetrieval
ssr = SemanticStoreRetrieval(long_term)
ssr.build()

In [7]:
from agentic_memory.retrievers import GraphRetrieval
gr = GraphRetrieval(long_term)
gr.build()

# Initialize memory stores

In [8]:
from agentic_memory.base import BaseCheckPointer,BaseEpisodicStore, BaseLongTermStore
from agentic_memory.implementation import CheckPointerInMemory, EpisodicStoreFile, LongTermStoreFile
from agentic_memory.orchestrator import MultiTierMemoryOrchestrator
import os, json
from agentic_memory.automotive import RepairCostEstimate, AutomotiveKnowledgeToolkit

In [9]:
from agentic_memory.retrievers import SemanticStoreRetrieval, GraphRetrieval
ssr = SemanticStoreRetrieval(long_term)
gr = GraphRetrieval(long_term)

Chroma collection loaded or created at semantic_vector_store


In [10]:
# Initialize memory stores
short_term = CheckPointerInMemory()
episodic = EpisodicStoreFile()
long_term = LongTermStoreFile()

In [11]:
#Create Auto toolkit
auto_tool_kit = AutomotiveKnowledgeToolkit()

# Create orchestrator
orchestrator = MultiTierMemoryOrchestrator(short_term, episodic, long_term,ssr)

## Validate Checkpointer memory
In this section we create a session, create entries to checkpoiner memory and save them by ending the session

In [12]:
# Start new session
session_id = orchestrator.create_session()
vin = "5NPE24AF6FH123456"
cust_id = "cust_016"
key = (cust_id, vin)

# Add diagnostic data
short_term.put(session_id, {"code": "C1234", "description": "ABS pump failure"})
short_term.put(session_id, {"code": "B4567", "description": "Low battery voltage"})

orchestrator.end_session(session_id,key)

In [13]:
episodic.get(key), key

([{'v': 1,
   'value': {'service_type': 'Steering Vibration Diagnosis',
    'mileage': 62000,
    'dealer': 'Precision Tire & Alignment',
    'technician_checks': ['Balanced all four wheels and checked for bent rims',
     'Inspected tie rods and control arms for wear',
     'Checked tire tread and inflation',
     'Test drove at highway speeds'],
    'issues_observed': ['Two wheels out of balance, one rim slightly bent',
     'Minor play in right outer tie rod end',
     'Front tires worn unevenly'],
    'customer_agreement': 'Customer approved wheel balancing and tie rod replacement. Deferred rim repair for future visit.',
    'service_notes': 'Balanced wheels, replaced tie rod, rotated tires, steering vibration reduced. Advised customer on rim repair options.',
    'service_date': '2025-07-24T20:10:01.254720+00:00'}},
  {'v': 1, 'value': {'code': 'C1234', 'description': 'ABS pump failure'}},
  {'v': 1, 'value': {'code': 'B4567', 'description': 'Low battery voltage'}}],
 ('cust_016',

## 🔄 Patterns Implementation in LangGraph

This section showcases how memory patterns can be orchestrated using **LangGraph**, a composable graph-based framework for building LLM agents. We use the same **automotive service assistant** use case to maintain consistency with the Strands section.

### ⚙️ Core Capabilities

- **Stateful Agent Architecture**  
  LangGraph enables a stepwise, conditional agent flow. We define `AgentState` to carry forward the full conversation state, vehicle identifiers, memory contexts, and final outputs.

- **Node-Based Execution**  
  Each operation (session creation, context gathering, resolution generation, tool execution, memory storage) is implemented as a LangGraph node.

- **Tool Binding and Semantic Reasoning**  
  The assistant uses tool-invocation capabilities provided by LangGraph + LangChain to:
  - Retrieve **hierarchical memory** context (`get_hierarchical_memory_context`)
  - Perform **semantic lookups** from prior issues (`get_semantic_memory_context`)
  - Fetch **vehicle information** based on VIN (`get_vehicle_info`)
  - Compute **repair cost estimates** (`estimate_repair_costs`)

- **Intelligent Resolution Parsing**  
  The `parse_response` function extracts structured data (JSON blocks) from LLM output to persist into episodic memory and return meaningful summaries.

### 📌 Graph Flow Overview

The workflow is defined using the following graph structure:

```text
START
  ↓
create_session
  ↓
generate_resolution
  ↓ ┌───────────── if tool_use found ──────────────┐
   └── tool_node ─→ generate_resolution ─→ should_continue
                                          ↓
                                        end_session
                                          ↓
                                         END


In [14]:
gr = GraphRetrieval(long_term)

In [15]:
from langgraph.graph import StateGraph, END, START
from langchain_core.tools import tool
from typing import TypedDict, Annotated, Optional, List, Tuple, Any
from typing_extensions import TypedDict, List
import uuid
import json
import random
from datetime import datetime, timezone
from langchain_aws import ChatBedrockConverse

from typing import (
    Annotated,
    Sequence,
    TypedDict,
)

from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
from langchain_core.messages import ToolMessage, SystemMessage, HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig

# Define the agent state
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    customer_id: str
    vin: str
    complaint: str
    session_id: Optional[str]
    resolution: Optional[str]
    cost_estimate: Tuple[Optional[str], Optional[float]]

class RepairCostEstimate(TypedDict):
    item: str
    labor_cost: float
    parts_cost: float
    total_cost: float


@tool
def get_hierarchical_memory_context(session_id: str, customer_id:str, vin:str) -> dict:
    """Retrieves hierarchical memory context for current session accepts customer_id:str, vin:str """
    print(f"TOOL[get_hierarchical_memory_context]: customer_id: {customer_id}, vin: {vin}, session_id : {session_id}")
    key = (customer_id, vin)
    return orchestrator.get_hierarchical_memory(
        session_id=session_id,
        key=key
    )


@tool
def get_semantic_memory_context(issue_description:str,make:str = None,model:str = None) -> dict:
    """Retrieves semantic memory context for current session. Accepts issue_description:str,make:str = None,model:str = None """
    print(f"TOOL[get_semantic_memory_context]: issue_description: {issue_description}, make: {make}, model : {model} ")
    return orchestrator.search_semantic_store(
        issue_description=issue_description,
        make= make,
        model=model
    )

def store_resolution(session_id: str, resolution: str, cost_estimate: List[RepairCostEstimate]):
    """Stores final resolution in short-term memory"""
    orchestrator.checkpointer.put(session_id, {
        "type": "resolution",
        "text": resolution,
        "cost": cost_estimate,
        "timestamp": datetime.now(timezone.utc).isoformat()
    })
    return "Resolution stored"


@tool
def estimate_repair_costs(repair_items: List[str]) -> List[RepairCostEstimate]:
    """
    Accepts a list of repair item strings and returns a list of dicts with cost breakdowns.
    Each dict contains: 'item', 'labor_cost', 'parts_cost', and 'total_cost'.
    """
    print(f"TOOL[estimate_repair_costs] repair_items:{repair_items}")
    try:
        estimates =  auto_tool_kit.estimate_repair_costs(repair_items)
        return estimates

    except Exception as e:
        print(e)

@tool
def get_vehicle_info(vin, json_path="vehicle_model.json"):
    """
    Returns (make, model, year) for a given VIN.
    """
    print(f"TOOL[get_vehicle_info] vin:{vin}")
    return auto_tool_kit.get_vehicle_info(vin)

def parse_response(messages: List[Any]) -> Tuple[Optional[str], Optional[float]]:
    """
    Given a list of message objects, find the last message with a JSON string containing
    'Resolution' and 'CostEstimates'. Return (resolution, total_cost).
    """
    for msg in reversed(messages):
        content = getattr(msg, 'content', None)
        if not content:
            continue
        # If content is a list (structured), check each element for JSON
        if isinstance(content, list):
            for element in content:
                if isinstance(element, dict) and 'text' in element:
                    text = element['text']
                    try:
                        json_start = text.index('{')
                        json_str = text[json_start:]
                        data = json.loads(json_str)
                        resolution = data.get('Resolution')
                        cost_estimates = data.get('CostEstimates', [])
                        return resolution, cost_estimates
                    except (ValueError, json.JSONDecodeError):
                        continue
        # If content is a string, check for JSON
        elif isinstance(content, str):
            try:
                json_start = content.index('{')
                json_str = content[json_start:]
                data = json.loads(json_str)
                resolution = data.get('Resolution')
                cost_estimates = data.get('CostEstimates', [])
                return resolution, cost_estimates
            except (ValueError, json.JSONDecodeError):
                continue
    return None, None


def create_session(state: AgentState) -> dict:
    """Creates new session for customer-vehicle interaction"""
    session_id = str(uuid.uuid4())
    return {"session_id": session_id}

def end_session(state: AgentState) -> dict:
    """Finalizes session and transfers data"""
    resolution, cost = parse_response(state["messages"])
    store_resolution(state["session_id"],resolution, cost )
    orchestrator.end_session(
        state["session_id"],
        (state["customer_id"], state["vin"])
    )
    return {"resolution": resolution, "cost_estimate": cost,"status": "Session ended"}


# Generate resolution tool
def generate_resolution(state: AgentState) -> dict:
    
    system_message = SystemMessage(content=f"""
    You are an automotive service assistant. You are provided with session Id, customer ID and VIN and customer complaints.
    Always:
    1. Use provided tools to get hierarchical memory context from past interactions. Identify vehicle's model, make and year. Search semantic information to get more information to resolve issues.  
    2. When you have a list of repair or fix items in your resolution, use the estimate_repair_costs tool to get detailed cost estimates for each item.
    3. Include cost estimates in USD where possible.
    4. Format your final response as a JSON object with the following fields:
           {{
             "VIN": "<vehicle identification number>",
             "Issue": "<summary of the customer's complaint or diagnosed issue>",
             "Resolution": "<detailed resolution steps or recommendations>",
             "CostEstimates": [
               {{
                 "item": "<repair item>",
                 "labor_cost": <labor cost as float>,
                 "parts_cost": <parts cost as float>,
                 "total_cost": <total cost as float>
               }},
               ...
             ]
           }}

    VIN: {state['vin']}
    session_id: {state['session_id']}
    customer_id: {state['customer_id']}

    """)

    user_message =  HumanMessage(content=""" complaint: {state['complaint']} """)
    prompt = [system_message] + [user_message] + list(state["messages"])
    response = llm.invoke(prompt)
    
    # Handle different response types
    if hasattr(response, "content"):
        content = response.content
    elif isinstance(response, dict) and "content" in response:
        content = response["content"]
    elif isinstance(response, str):
        content = response
    else:
        raise ValueError("Unexpected response type from LLM")
    return {"messages": state["messages"] + [AIMessage(content=content)]}


def tool_node(state: AgentState):
    messages = state["messages"]
    last_message = messages[-1]
    outputs = []
    # Ensure content is a list (structured format)
    content = getattr(last_message, "content", [])
    if not isinstance(content, list):
        content = [content]
    # Find all tool_use elements
    for element in content:
        if isinstance(element, dict) and element.get("type") == "tool_use":
            tool_name = element["name"]
            tool_args = element["input"]
            tool_call_id = element["id"]
            # Execute the tool
            tool_result = tools_by_name[tool_name].invoke(tool_args)
            outputs.append(
                ToolMessage(
                    content=json.dumps(tool_result),
                    name=tool_name,
                    tool_call_id=tool_call_id,
                )
            )
    # Append tool results as new messages
    return {"messages": messages + outputs}


def should_continue(state: AgentState):
    messages = state["messages"]
    if messages:
        last_message = messages[-1]
        content = getattr(last_message, "content", [])
        if not isinstance(content, list):
            content = [content]
        # Check for any tool_use elements
        for element in content:
            if isinstance(element, dict) and element.get("type") == "tool_use":
                return "continue"
        return "end"
    return "end"


# Initialize memory system (using your existing implementations)
orchestrator = MultiTierMemoryOrchestrator(short_term, episodic, long_term,gr)

llm = ChatBedrockConverse(model_id="us.amazon.nova-pro-v1:0")
tools = [get_hierarchical_memory_context,get_semantic_memory_context,get_vehicle_info,estimate_repair_costs]
tools_by_name = {tool.name: tool for tool in tools}
llm = llm.bind_tools(tools)

# Build LangGraph workflow
graph_builder = StateGraph(AgentState)

# Add nodes
graph_builder.add_node("create_session", create_session)
graph_builder.add_node("generate_resolution", generate_resolution)
graph_builder.add_node("tool_node", tool_node)
graph_builder.add_node("store_resolution", store_resolution)
graph_builder.add_node("end_session", end_session)

# Define edges
graph_builder.add_edge(START, "create_session")
graph_builder.add_edge("create_session", "generate_resolution")
graph_builder.add_edge("tool_node", "generate_resolution")
graph_builder.add_edge("end_session", END)

graph_builder.add_conditional_edges(
    "generate_resolution",
    should_continue,
    {
        "continue": "tool_node",
        "end": "end_session",
    },
)

# Compile the graph
automotive_workflow = graph_builder.compile()

# Example invocation
def handle_complaint_langgraph(customer_id: str, vin: str, complaint: str):
    """Processes automotive complaint through LangGraph"""
    # Initialize state
    initial_state = AgentState(
        customer_id=customer_id,
        vin=vin,
        complaint=complaint,
        session_id=None,
        #context=None,
        #resolution=None,
        #cost_estimate=None
    )
    
    # Execute workflow
    final_state = automotive_workflow.invoke(
        initial_state,
        # Inject dependencies via config
        config={"configurable": {"llm": llm}}
    )
    
    return {
        "vin": vin,
        "complaint": complaint,
        "resolution": final_state["resolution"],
        "cost": final_state["cost_estimate"]
    }

result = handle_complaint_langgraph(
    customer_id="cust_012",
    vin="2T1BURHE6JC074321",
    complaint="Engine misfire at highway speeds"
)

TOOL[get_hierarchical_memory_context]: customer_id: cust_012, vin: 2T1BURHE6JC074321, session_id : 0c89b0a3-c561-45ed-9c8a-cd06468c4171
TOOL[get_vehicle_info] vin:2T1BURHE6JC074321
TOOL[get_semantic_memory_context]: issue_description: Customer reported dashboard flicker when using power windows, make: Nissan, model : Sentra 
TOOL[estimate_repair_costs] repair_items:['Battery replacement', 'Wiring repair', 'Power window system inspection and repair']


In [16]:
print(f"Result: {result} ")

Result: {'vin': '2T1BURHE6JC074321', 'complaint': 'Engine misfire at highway speeds', 'resolution': 'Based on the previous diagnosis and the current complaint, the following steps are recommended:\n1. Replace the battery.\n2. Repair the wiring, particularly in the trunk light circuit.\n3. Inspect and repair the power window system to address the dashboard flicker issue.', 'cost': [{'item': 'Battery replacement', 'labor_cost': 163.07, 'parts_cost': 126.04, 'total_cost': 289.11}, {'item': 'Wiring repair', 'labor_cost': 61.82, 'parts_cost': 33.65, 'total_cost': 95.47}, {'item': 'Power window system inspection and repair', 'labor_cost': 167.61, 'parts_cost': 223.8, 'total_cost': 391.41}]} 


## Consolidator Pattern

In [17]:
from agentic_memory.compaction import Consolidator
consolidator = Consolidator(episodic,long_term)
consolidator.consolidate(('cust_789','1HGBH41JXMN109186'))


[
  {
    "issue_summary": "Engine misfire and low compression in cylinder 2",
    "resolution": "Replaced spark plugs and ignition coil for cylinder 2",
    "service_engineer": "Unnamed Technician",
    "service_date": "2025-07-24",
    "additional_notes": "Customer agreed to recommended repairs after explanation of findings and estimated costs. Cleared codes and performed test drive after replacement.",
    "cost": {
      "parts": 120.0,
      "labor": 150.0,
      "tax": 13.5,
      "total": 283.5
    }
  },
  {
    "issue_summary": "Slow coolant leak at lower radiator hose clamp",
    "resolution": "Replaced faulty clamp and topped off coolant",
    "service_engineer": "Unnamed Technician",
    "service_date": "2025-07-24",
    "additional_notes": "Advised customer to monitor coolant level.",
    "cost": {
      "parts": 20.0,
      "labor": 50.0,
      "tax": 4.5,
      "total": 74.5
    }
  },
  {
    "issue_summary": "Worn front brake pads and rotors, contaminated brake fluid"

'Consolidated 10 episodic events for 1HGBH41JXMN109186.'