# Test Retrieval Graph

This notebook tests the retrieval graph by connecting to the LangGraph server.

**Prerequisites:**
1. Start the LangGraph dev server: `cd backend && npm run dev`
2. Make sure the server is running at `http://localhost:2024`

In [2]:
# Install required packages (run once)
# %pip install langgraph-sdk
# Ollama is running locally for this test.

In [3]:
from langgraph_sdk import get_client

# Connect to the local LangGraph server
# Make sure you've started it with: cd backend && npm run dev
client = get_client(url="http://localhost:2024")

print("‚úÖ Connected to LangGraph server")


‚úÖ Connected to LangGraph server


In [39]:
# Create a new thread (conversation session)
thread = await client.threads.create()
print(f"‚úÖ Created thread: {thread['thread_id']}")


‚úÖ Created thread: b6b56110-e393-4b62-8adf-482374bd2e17


In [44]:
import asyncio

# Test with a simple query
test_query = "Add 600 + 3000"

print(f"Query: {test_query}\n")

result = await client.runs.create(
    thread_id=thread['thread_id'],
    assistant_id="retrieval_graph",
    input={"query": test_query},
    config={
        "configurable": {
            "queryModel": "ollama/llama3.2:1b"
        }
    }
)

# cancel the run after 100ms
run_id = result.get('run_id') if isinstance(result, dict) else getattr(result, 'run_id', None)
thread_id = thread['thread_id']
if run_id:
    await asyncio.sleep(0.1)
    
    target_run_id = run_id or globals().get('current_run_id')
    if not target_run_id:
        print("‚ùå No run ID found.")
    else:
        # Cancel the run
        await client.runs.cancel(thread_id=thread_id, run_id=target_run_id)
        print(f"‚úÖ Canceled")


# Get the thread state which contains messages
thread_state = await client.threads.get(thread_id=thread['thread_id'])

# Access messages from the thread's state
messages = thread_state.get('values', {}).get('messages', [])

if messages:
    # Get the latest message (last in the list)
    latest = messages[-1]
    latest_content = latest.get('content') if isinstance(latest, dict) else getattr(latest, 'content', None)
    if latest_content:
        print(f"Response: {latest_content}")
print(thread_state.get('values', {}).get('messages', []))

Query: Add 600 + 3000

‚úÖ Canceled
Response: First, add 60 and 300: 60 + 300 = 360.

Then, add 360 to the previous result: 360 + 360 = 720.
[{'content': 'Add 2 + 3', 'additional_kwargs': {}, 'response_metadata': {}, 'id': 'cc826b5e-96d3-482a-9989-38f4064c528c', 'type': 'human'}, {'content': 'The answer to 2 + 3 is 5.', 'additional_kwargs': {}, 'response_metadata': {'model': 'llama3.2:1b', 'created_at': '2025-12-29T11:25:47.664355Z', 'done': True, 'done_reason': 'stop', 'total_duration': 490024541, 'load_duration': 80910250, 'prompt_eval_count': 31, 'prompt_eval_duration': 163246792, 'eval_count': 13, 'eval_duration': 146090790}, 'tool_call_chunks': [], 'id': 'run-019b69db-6f25-7224-b857-f3be36278e53', 'tool_calls': [], 'invalid_tool_calls': [], 'type': 'ai'}, {'content': 'Add 6 + 30', 'additional_kwargs': {}, 'response_metadata': {}, 'id': '9b782a42-16b4-49a7-b60d-8eea98558134', 'type': 'human'}, {'content': '6 + 30 = 36.', 'additional_kwargs': {}, 'response_metadata': {'model': 'llam

In [36]:
# Get the thread state which contains messages
thread_state = await client.threads.get(thread_id=thread['thread_id'])

# Access messages from the thread's state
messages = thread_state.get('values', {}).get('messages', [])

if messages:
    # Get the latest message (last in the list)
    latest = messages[-1]
    latest_content = latest.get('content') if isinstance(latest, dict) else getattr(latest, 'content', None)
    if latest_content:
        print(f"Response: {latest_content}")
print(thread_state.get('values', {}).get('messages', []))

Response: Photosynthesis is the process by which plants, algae, and some bacteria convert light energy from the sun into chemical energy in the form of organic compounds, such as glucose. This process is essential for life on Earth, as it provides the primary source of energy and organic compounds for nearly all living organisms.

Here's a simplified overview of the photosynthesis process:

**Step 1: Light absorption**

Light energy from the sun is absorbed by pigments such as chlorophyll in plant cells. Chlorophyll plays a crucial role in absorbing light, specifically in the blue and red parts of the visible spectrum.

**Step 2: Energy transfer**

The absorbed light energy excites electrons in the pigment molecule, which are then transferred to a special molecule called an electron acceptor. This process is known as photoionization.

**Step 3: Water splitting**

Water (H2O) is split into hydrogen ions (H+) and oxygen gas (O2) through a process known as photolysis. This reaction is cat

In [4]:
## Test 1: Simple Query (Non-streaming)

# Test with a simple query
test_query = "What is the capital of France?"

print(f"Query: {test_query}\n")

result = await client.runs.create(
    thread_id=thread['thread_id'],
    assistant_id="retrieval_graph",
    input={"query": test_query},
    config={
        "configurable": {
            "queryModel": "ollama/llama3.2:1b"
        }
    }
)

print("‚úÖ Graph execution completed!")
print(f"\nResult: {result}")
print(f"\nStatus: {result.get('status', 'unknown')}")


Query: What is the capital of France?

‚úÖ Graph execution completed!

Result: {'run_id': '940e26b9-deeb-4c91-bf77-f7a4e330c979', 'thread_id': 'f4d5c8cb-309f-4535-8bbd-31b6ccade6bb', 'assistant_id': '571ade52-f5cd-582a-89d9-d79dc861a8ba', 'metadata': {'created_by': 'system', 'graph_id': 'retrieval_graph', 'assistant_id': '571ade52-f5cd-582a-89d9-d79dc861a8ba'}, 'status': 'pending', 'kwargs': {'input': {'query': 'What is the capital of France?'}, 'config': {'configurable': {'queryModel': 'ollama/llama3.2:1b', 'user-agent': 'langgraph-sdk-py/0.2.14', 'run_id': '940e26b9-deeb-4c91-bf77-f7a4e330c979', 'thread_id': 'f4d5c8cb-309f-4535-8bbd-31b6ccade6bb', 'graph_id': 'retrieval_graph', 'assistant_id': '571ade52-f5cd-582a-89d9-d79dc861a8ba'}, 'metadata': {'created_by': 'system', 'graph_id': 'retrieval_graph', 'assistant_id': '571ade52-f5cd-582a-89d9-d79dc861a8ba'}}, 'context': {}, 'stream_mode': ['values'], 'temporary': False, 'subgraphs': False, 'resumable': False}, 'multitask_strategy': 're

In [7]:
# Get the thread state which contains messages
thread_state = await client.threads.get(thread_id=thread['thread_id'])

# Access messages from the thread's state
messages = thread_state.get('values', {}).get('messages', [])

print("Messages in thread:")
for msg in messages:
    msg_type = msg.get('type') if isinstance(msg, dict) else getattr(msg, 'type', 'unknown')
    msg_content = msg.get('content') if isinstance(msg, dict) else getattr(msg, 'content', '')
    print(f"\n- {msg_type}: {msg_content}")

Messages in thread:

- human: What is the capital of France?

- ai: The capital of France is Paris.


In [8]:
## Test 2: Streaming Response

# Test with streaming
test_query_2 = "Tell me a joke"

print(f"Query: {test_query_2}\n")
print("Streaming response:")

# Remove 'await' here - stream() returns an async generator directly
stream = client.runs.stream(
    thread_id=thread['thread_id'],
    assistant_id="retrieval_graph",
    input={"query": test_query_2},
    stream_mode=["messages", "updates"],
    config={
        "configurable": {
            "queryModel": "ollama/llama3.2:1b"
        }
    }
)

async for chunk in stream:
    chunk_event = chunk.get('event') if isinstance(chunk, dict) else getattr(chunk, 'event', None)
    if chunk_event == 'messages':
        chunk_data = chunk.get('data') if isinstance(chunk, dict) else getattr(chunk, 'data', None)
        if chunk_data:
            for msg in chunk_data:
                msg_content = msg.get('content') if isinstance(msg, dict) else getattr(msg, 'content', None)
                if msg_content:
                    print(f"üì® {msg_content}")
    elif chunk_event == 'updates':
        print(f"üîÑ Update: {chunk}")
    else:
        print(f"üì¶ Chunk: {chunk}")

print("\n‚úÖ Streaming completed!")

Query: Tell me a joke

Streaming response:
üì¶ Chunk: StreamPart(event='metadata', data={'run_id': '1be2de3b-b35a-42e9-a35b-1dedeb429de7', 'attempt': 1})
üì¶ Chunk: StreamPart(event='messages/complete', data=[{'content': 'What is the capital of France?', 'additional_kwargs': {}, 'response_metadata': {}, 'id': 'c710cca0-6195-4f10-9bcf-1e81675351a4', 'type': 'human'}, {'content': 'The capital of France is Paris.', 'additional_kwargs': {}, 'response_metadata': {'model': 'llama3.2:1b', 'created_at': '2025-12-29T09:30:09.625641Z', 'done': True, 'done_reason': 'stop', 'total_duration': 822236500, 'load_duration': 83998500, 'prompt_eval_count': 32, 'prompt_eval_duration': 595260875, 'eval_count': 8, 'eval_duration': 87249834}, 'tool_call_chunks': [], 'id': 'run-019b6971-9020-7224-b84e-95533abaec8c', 'tool_calls': [], 'invalid_tool_calls': [], 'type': 'ai'}])
üì¶ Chunk: StreamPart(event='messages/metadata', data={'run-019b6971-efb5-7224-b84e-c1f387666309': {'metadata': {'created_by': 'syste

In [None]:
## Test 3: Multiple Queries in Same Thread

thread = await client.threads.create()
# Test multiple queries in the same thread (conversation history)
queries = [
    "What is 2 + 2?",
    "What about 3 + 3?",
    "Can you add those two answers together?"
]

import asyncio

for i, query in enumerate(queries, 1):
    print(f"\n{'='*50}")
    print(f"Query {i}: {query}")
    print('='*50)
    
    result = await client.runs.create(
        thread_id=thread['thread_id'],
        assistant_id="retrieval_graph",
        input={"query": query},
        config={
            "configurable": {
                # "queryModel": "ollama/qwen3:4b"
                # "queryModel": "ollama/llama3.2:1b"
            }
        }
    )
    
    # Wait for the run to complete by polling its status
    run_id = result.get('run_id') if isinstance(result, dict) else getattr(result, 'run_id', None)
    if run_id:
        while True:
            run_status = await client.runs.get(
                thread_id=thread['thread_id'],
                run_id=run_id
            )
            status = run_status.get('status') if isinstance(run_status, dict) else getattr(run_status, 'status', None)
            if status in ['success', 'error', 'cancelled']:
                break
            await asyncio.sleep(0.5)  # Poll every 500ms
    
    # Get the thread state which contains messages
    thread_state = await client.threads.get(thread_id=thread['thread_id'])
    
    # Access messages from the thread's state
    messages = thread_state.get('values', {}).get('messages', [])
    
    if messages:
        # Get the latest message (last in the list)
        latest = messages[-1]
        latest_content = latest.get('content') if isinstance(latest, dict) else getattr(latest, 'content', None)
        if latest_content:
            print(f"Response: {latest_content}")

print("\n‚úÖ All queries completed!")


Query 1: What is 2 + 2?
Response: 2 + 2 = 4.

Query 2: What about 3 + 3?
Response: 3 + 3 = 6.

Query 3: Can you add those two answers together?
Response: Let's add the two results:

4 (from 2 + 2) + 6 (from 3 + 3) = 10.

So, the total is 10.

‚úÖ All queries completed!
