# Zep Agent Conversation with Full Trace

**Goal**: See every function call, LLM prompt/response, and Neo4j query from user input to database.

**Approach**: Use `graphiti_core` directly to simulate Zep Cloud API behavior, with comprehensive tracing.

The markdown description is generated by Claude and edited by me.

## 1. Arch Overview

### Zep Cloud Production Architecture

In production, Zep Cloud has a multi-layer architecture:

```
┌───────────────────────────────────────────────────────────┐
│   Agent Application                                       │
│  (Uses Zep Python SDK: zep_cloud.client)                  │
└─────────────────────┬─────────────────────────────────────┘
                      │ HTTPS REST API
                      ▼
┌───────────────────────────────────────────────────────────┐
│  Zep Server (Go)                                          │
│  - Manages users, sessions, threads                       │
│  - Handles authentication and rate limiting               │
│  - Orchestrates memory operations                         │
│  Source: zep/legacy/src/api/apihandlers/                  │
└─────────────────────┬─────────────────────────────────────┘
                      │ HTTP REST API
                      ▼
┌───────────────────────────────────────────────────────────┐
│  Graphiti Server (Python FastAPI)                         │
│  - Wraps graphiti_core library                            │
│  - Provides REST endpoints for graph operations           │
│  - Handles async processing queue                         │
│  Source: zep-graphiti/server/graph_service/               │
└─────────────────────┬─────────────────────────────────────┘
                      │ Python function calls
                      ▼
┌───────────────────────────────────────────────────────────┐
│  graphiti_core (Python Library)                           │
│  - Temporal knowledge graph engine                        │
│  - Entity extraction and resolution                       │
│  - Bi-temporal data model                                 │
│  Source: zep-graphiti/graphiti_core/                      │
├─────────────────────┬─────────────────────────────────────┤
│                     │                                     │
│    ┌────────────────┴────────────────┐                    │
│    ▼                                 ▼                    │
│  LLM API                        Neo4j Database            │
│  (Entity extraction,            (Graph storage,           │
│   summarization,                 Cypher queries)          │
│   reranking)                                              │
└───────────────────────────────────────────────────────────┘
```

like a **Layered Microservices** pattern:

| Layer | Component | Role | Responsibility |
|-------|-----------|------|----------------|
| **Client / Interface** | Zep Python SDK | HTTP Client Wrapper | Converts function calls (`memory.add()`) to REST API requests, handles retries and authentication. **Contains no business logic.** |
| **Gateway / Orchestration** | Zep Server (Go) | API Gateway & User Management | Multi-tenant management (AuthN/AuthZ), rate limiting, request routing. Dispatches tasks to downstream services. |
| **Worker / Execution** | Graphiti Server (FastAPI) | Async Task Runner | Maintains async job queue. Since LLM processing is slow (seconds to tens of seconds), it converts memory operations into async jobs to prevent blocking the gateway. |
| **Business Logic / \"The Brain\"** | Graphiti Core (Library) | Core Logic Engine | Contains all prompt templates (`prompts/*.py`), LLM interaction flow control (extract → dedupe → validate), and dynamic Cypher query generation (`*_db_queries.py`). **Stateless logic code.** |

**Evidence from Source Code:**

1. **Zep Python SDK** - HTTP client with no logic:
   ```python
   # zep/examples/python/graph_example/graph_example.py
   client = AsyncZep(api_key=API_KEY)  # Just an HTTP client
   await client.graph.add(graph_id=graph_id, data="...", type="text")
   ```

2. **Zep Server (Go)** - Routes to Graphiti:
   ```go
   // zep/legacy/src/store/memory_ce.go
   graphiti.I().PutMemory(ctx, session.SessionID, memoryMessages.Messages, true)
   graphiti.I().GetMemory(ctx, graphiti.GetMemoryRequest{...})
   ```

3. **Graphiti Server** - Async queue for slow LLM operations:
   ```python
   # zep-graphiti/server/graph_service/routers/ingest.py
   class AsyncWorker:
       def __init__(self):
           self.queue = asyncio.Queue()  # Async job queue
   
   @router.post('/messages', status_code=status.HTTP_202_ACCEPTED)  # Returns immediately
   async def add_messages(...):
       await async_worker.queue.put(partial(add_messages_task, m))  # Queue the job
   ```

4. **Graphiti Core** - All business logic:
   - `prompts/extract_nodes.py` - Entity extraction prompts
   - `prompts/dedupe_nodes.py` - Entity deduplication prompts
   - `prompts/extract_edges.py` - Relationship extraction prompts
   - `models/nodes/node_db_queries.py` - Dynamic Cypher generation (Neo4j/FalkorDB/Kuzu)
   - `graphiti.py:add_episode()` - Orchestrates the full extraction pipeline

### My Setup

I just bypass Zep Server and Graphiti Server, calling `graphiti_core` directly.

This can give **complete visibility** into every operation easily

later I may run with Zep Server and Graphiti Server, and compare the results.

```
┌────────────────────────────────────────────────────────────┐
│  - ZepSimulator: Mimics Zep SDK patterns                   │
│  - TraceLogger: Captures all operations                    │
│  - Agent: Simple conversation loop                         │
└─────────────────────┬──────────────────────────────────────┘
                      │ Direct Python calls (OTEL tracing)
                      ▼
┌────────────────────────────────────────────────────────────┐
│  graphiti_core (Python Library)                            │
│  - Full source code access                                 │
│  - All internal operations visible                         │
├─────────────────────┬──────────────────────────────────────┤
│    ┌────────────────┴────────────────┐                     │
│    ▼                                 ▼                     │
│  vLLM Server                    Local Neo4j                │
│  (Qwen2.5-32B on H100)          (bolt://localhost:7687)    │
│  - HTTP traffic logged          - All queries logged       │
└────────────────────────────────────────────────────────────┘
```

## 2. Zep Cloud API → Graphiti Complete Mapping

This mapping is verified from source code analysis.

### API Mapping Table

| Zep Python SDK | Zep Server (Go) | Graphiti Server | graphiti_core | Purpose |
|----------------|-----------------|-----------------|---------------|----------|
| `thread.add_messages()` | `memory_ce.go:_initializeProcessingMemory()` | `POST /messages` | `add_episode()` | Store conversation messages |
| `thread.get_user_context()` | `memory_ce.go:_get()` | `POST /get-memory` | `search()` | Retrieve relevant context |
| `session.get_memory()` | `memory_ce.go:_get()` | `POST /get-memory` | `search()` | Get session memory |
| `graph.search()` | `memory_ce.go:_searchSessions()` | `POST /search` | `search()` | Search knowledge graph |
| `user.add()` | `userstore_ce.go:_processCreatedUser()` | `POST /entity-node` | `save_entity_node()` | Create user entity |
| `graph.add()` | N/A | `POST /entity-node` | `save_entity_node()` | Add structured data |
| `user.delete()` | `user_handlers.go:DeleteUserHandler()` | `DELETE /group/{group_id}` | `delete_group()` | Delete user and all data |
| `fact.get()` | `fact_handlers_ce.go:getFact()` | `GET /entity-edge/{uuid}` | `EntityEdge.get_by_uuid()` | Get a specific fact |
| `fact.delete()` | `fact_handlers_ce.go:deleteSessionFact()` | `DELETE /entity-edge/{uuid}` | `delete_entity_edge()` | Delete a fact |
| `session.delete_memory()` | `memory_handlers_ce.go:deleteMemory()` | `DELETE /group/{group_id}` | `delete_group()` | Delete session memory |
| `episode.delete()` | N/A | `DELETE /episode/{uuid}` | `delete_episodic_node()` | Delete an episode |

### Source Code Evidence

> **Just FYI, no need to read this.**

**1. thread.add_messages() → add_episode()**

From `zep/legacy/src/store/memory_ce.go`:
```go
func (dao *memoryDAO) _initializeProcessingMemory(...) error {
    err := graphiti.I().PutMemory(ctx, session.SessionID, memoryMessages.Messages, true)
}
```

From `zep-graphiti/server/graph_service/routers/ingest.py`:
```python
@router.post('/messages', status_code=status.HTTP_202_ACCEPTED)
async def add_messages(request: AddMessagesRequest, graphiti: ZepGraphitiDep):
    async def add_messages_task(m: Message):
        await graphiti.add_episode(
            uuid=m.uuid,
            group_id=request.group_id,
            name=m.name,
            episode_body=f'{m.role or ""}({m.role_type}): {m.content}',
            ...
        )
```

**2. thread.get_user_context() → search()**

From `zep/legacy/src/store/memory_ce.go`:
```go
func (dao *memoryDAO) _get(...) (*models.Memory, error) {
    memory, err := graphiti.I().GetMemory(ctx, graphiti.GetMemoryRequest{
        GroupID:  groupID,
        MaxFacts: 5,
        Messages: mForRetrieval,
    })
}
```

From `zep-graphiti/server/graph_service/routers/retrieve.py`:
```python
@router.post('/get-memory', status_code=status.HTTP_200_OK)
async def get_memory(request: GetMemoryRequest, graphiti: ZepGraphitiDep):
    combined_query = compose_query_from_messages(request.messages)
    result = await graphiti.search(
        group_ids=[request.group_id],
        query=combined_query,
        num_results=request.max_facts,
    )
```

**3. user.delete() → delete_group()**

From `zep/legacy/src/api/apihandlers/user_handlers.go`:
```go
func DeleteUserHandler(appState *models.AppState) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        err := userStore.DeleteUser(ctx, userID)
    }
}
```

From `zep-graphiti/server/graph_service/zep_graphiti.py`:
```python
async def delete_group(self, group_id: str):
    edges = await EntityEdge.get_by_group_ids(self.driver, [group_id])
    for edge in edges:
        await edge.delete(self.driver)
    # Also deletes nodes and episodes
```

**4. fact.get() / fact.delete() → get_entity_edge() / delete_entity_edge()**

From `zep/legacy/src/api/apihandlers/fact_handlers_ce.go`:
```go
func getFact(...) (*models.Fact, error) { ... }
func deleteSessionFact(...) error { ... }
```

From `zep-graphiti/server/graph_service/routers/ingest.py`:
```python
@router.delete('/entity-edge/{uuid}')
async def delete_entity_edge(uuid: str, graphiti: ZepGraphitiDep):
    await graphiti.delete_entity_edge(uuid)
```

From `zep-graphiti/server/graph_service/routers/retrieve.py`:
```python
@router.get('/entity-edge/{uuid}')
async def get_entity_edge(uuid: str, graphiti: ZepGraphitiDep):
    return await graphiti.get_entity_edge(uuid)
```

## 3. Internal Flow: What Happens Inside Each Operation

### 3.1 add_episode() Internal Flow

When you call `graphiti.add_episode()`, here's what happens internally.

```
add_episode(episode_body="Alice Chen(user): Hi, I'm Alice Chen. I work at TechCorp...")
│
├─► Step 0: Query Previous Episodes
│   Neo4j: MATCH (e:Episodic) WHERE e.valid_at <= $reference_time AND e.group_id IN $group_ids
│          RETURN e ORDER BY e.valid_at DESC LIMIT $num_episodes
│   Purpose: Get conversation context for entity extraction
│
├─► Step 1: Extract Entities (LLM Call - extract_nodes.extract_message)
│   System: "You are an AI assistant that extracts entity nodes from conversational messages..."
│   User: "<ENTITY TYPES>...</ENTITY TYPES> <CURRENT MESSAGE>Alice Chen(user): Hi, I'm Alice Chen...</CURRENT MESSAGE>"
│   Response: {"extracted_entities": [{"name": "Alice Chen", "entity_type_id": 0}, {"name": "TechCorp", "entity_type_id": 0}]}
│
├─► Step 2: Search for Existing Entities (Neo4j - Parallel Queries)
│   For EACH extracted entity, run two parallel searches:
│   │
│   ├─► 2a: BM25 Fulltext Search
│   │   Neo4j: CALL db.index.fulltext.queryNodes(\"node_name_and_summary\", $query, {limit: $limit})
│   │          YIELD node AS n, score WHERE n.group_id IN $group_ids
│   │
│   └─► 2b: Cosine Similarity Search
│       Neo4j: MATCH (n:Entity) WHERE n.group_id IN $group_ids
│              WITH n, vector.similarity.cosine(n.name_embedding, $search_vector) AS score
│              WHERE score > $min_score RETURN n ORDER BY score DESC
│
├─► Step 3: Deduplicate Entities (LLM Call - dedupe_nodes.nodes)
│   System: "You are a helpful assistant that determines whether or not ENTITIES extracted from a conversation are duplicates..."
│   User: "<ENTITIES>[extracted entities]</ENTITIES> <EXISTING ENTITIES>[candidates from Neo4j]</EXISTING ENTITIES>"
│   Response: {"entity_resolutions": [{"id": 0, "name": "Alice Chen", "duplicate_idx": -1, "duplicates": []}, ...]}
│   Purpose: Match new entities to existing ones or confirm they are new
│
├─► Step 4: Extract Relationships (LLM Call - extract_edges.edge)
│   System: "You are an expert fact extractor that extracts fact triples from text..."
│   User: "<ENTITIES>[resolved entities]</ENTITIES> <CURRENT_MESSAGE>...</CURRENT_MESSAGE> <REFERENCE_TIME>...</REFERENCE_TIME>"
│   Response: {"edges": [{"relation_type": "WORKS_AT", "source_entity_id": 0, "target_entity_id": 1,
│              "fact": "Alice Chen works at TechCorp as a senior software engineer.", "valid_at": "2026-01-31T13:35:08Z"}]}
│
├─► Step 5: Search for Existing Edges (Neo4j - Multiple Queries)
│   │
│   ├─► 5a: Direct Edge Lookup
│   │   Neo4j: MATCH (n:Entity {uuid: $source_node_uuid})-[e:RELATES_TO]->(m:Entity {uuid: $target_node_uuid}) RETURN e
│   │
│   ├─► 5b: BM25 Fulltext Search on Edges
│   │   Neo4j: CALL db.index.fulltext.queryRelationships(\"edge_name_and_fact\", $query, {limit: $limit})
│   │          YIELD relationship AS rel, score MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
│   │
│   └─► 5c: Cosine Similarity Search on Edges
│       Neo4j: MATCH (n:Entity)-[e:RELATES_TO]->(m:Entity) WHERE e.group_id IN $group_ids
│              WITH DISTINCT e, n, m, vector.similarity.cosine(e.fact_embedding, $search_vector) AS score
│
├─► Step 6: Deduplicate Edges (LLM Call - dedupe_edges.resolve_edge) [Only if existing edges found]
│   System: "You are a helpful assistant that de-duplicates facts from fact lists..."
│   User: "<EXISTING FACTS>[...]</EXISTING FACTS> <FACT INVALIDATION CANDIDATES>[...]</FACT INVALIDATION CANDIDATES> <NEW FACT>...</NEW FACT>"
│   Response: {"duplicate_facts": [], "contradicted_facts": [], "fact_type": "DEFAULT"}
│   Purpose: Detect duplicate or contradicting facts
│
├─► Step 7: Generate Entity Summaries (LLM Calls - PARALLEL - extract_nodes.extract_summary)
│   For EACH new or updated entity, run in parallel:
│   System: "You are a helpful assistant that extracts entity summaries from the provided text..."
│   User: "<MESSAGES>[conversation history]</MESSAGES> <ENTITY>{name, summary, entity_types, attributes}</ENTITY>"
│   Response: {"summary": "Alice Chen works at TechCorp as a senior software engineer."}
│   parallel execution
│
└─► Step 8: Write to Neo4j (Implicit - happens during steps above)
    Entities and edges are created/updated as they are processed.
    The EpisodicNode is also created to store the original message.
```

### 3.2 search() Internal Flow

When you call `graphiti.search()`, the system uses **two search methods in parallel** for edge retrieval.

```
search(query="What does Alice work on?", group_ids=["demo_session_..."], num_results=10)
│
├─► Step 1: Generate Query Embedding (Local Embedder)
│   Embedder: sentence-transformers/all-MiniLM-L6-v2
│   encode("What does Alice work on?") → [0.12, -0.34, ...] (384 dimensions)
│
├─► Step 2: Execute Search Methods (Parallel)
│   │
│   ├─► 2a: BM25 Fulltext Search on Relationships
│   │   Neo4j: CALL db.index.fulltext.queryRelationships(\"edge_name_and_fact\", $query, {limit: $limit})
│   │          YIELD relationship AS rel, score
│   │          MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
│   │          WHERE e.group_id IN $group_ids
│   │          RETURN e, n, m ORDER BY score DESC
│   │
│   └─► 2b: Cosine Similarity Search on Relationships
│       Neo4j: MATCH (n:Entity)-[e:RELATES_TO]->(m:Entity)
│              WHERE e.group_id IN $group_ids
│              WITH DISTINCT e, n, m, vector.similarity.cosine(e.fact_embedding, $search_vector) AS score
│              WHERE score > $min_score
│              RETURN e, n, m ORDER BY score DESC LIMIT $limit
│
├─► Step 3: Combine and Deduplicate Results
│   Merge results from both search methods
│   Remove duplicates based on edge UUID
│   Sort by relevance score
│
└─► Step 4: Return Top Results
    Return: [EntityEdge(fact="Alice Chen is currently leading Project Phoenix.", ...),
             EntityEdge(fact="Project Phoenix has a deadline on February 15th.", ...),
             EntityEdge(fact="Alice Chen works at TechCorp as a senior software engineer.", ...)]
```

**Search Methods by Data Type** (from `graphiti_core/search/search.py`):
- **EntityEdge**: BM25 Fulltext + Cosine Similarity (+ optional BFS, LLM rerank)
- **EntityNode**: BM25 Fulltext + Cosine Similarity (+ optional BFS)
- **EpisodicNode**: BM25 Fulltext only
- **Community**: BM25 Fulltext + Cosine Similarity

## 4. Environment Setup

The following cells configure the same environment as `graphiti_neo4j_otel_demo.ipynb`.

In [1]:
# Cell 4.1: Imports and Basic Logging Setup
import os
import sys
import json
import logging
import asyncio
from datetime import datetime, timezone
from collections.abc import Iterable
from pathlib import Path

print(f"Python: {sys.version}")
print(f"Working dir: {os.getcwd()}")

Python: 3.12.12 (main, Jan 14 2026, 19:35:58) [Clang 21.1.4 ]
Working dir: /mnt/data-disk-1/home/cpii.local/ericlo/projects/zep-repos/zep-graphiti/examples/neo4j_otel


In [2]:
# Cell 4.2: Load Environment Variables
from dotenv import load_dotenv

load_dotenv()

# Neo4j Configuration
neo4j_uri = os.environ.get('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.environ.get('NEO4J_USER', 'neo4j')
neo4j_password = os.environ.get('NEO4J_PASSWORD', 'password')

# Local LLM Configuration
local_llm_enabled = os.environ.get('LOCAL_LLM_ENABLED', 'false').lower() == 'true'
local_llm_base_url = os.environ.get('LOCAL_LLM_BASE_URL', 'http://localhost:8000/v1')
local_llm_model = os.environ.get('LOCAL_LLM_MODEL', 'Qwen/Qwen2.5-32B-Instruct')
local_llm_api_key = os.environ.get('LOCAL_LLM_API_KEY', 'vllm')

# Embedding Configuration
embedding_provider = os.environ.get('EMBEDDING_PROVIDER', 'local')
local_embedding_model = os.environ.get('LOCAL_EMBEDDING_MODEL', 'all-MiniLM-L6-v2')

print(f'Neo4j URI: {neo4j_uri}')
print(f'Local LLM Enabled: {local_llm_enabled}')
if local_llm_enabled:
    print(f'  Base URL: {local_llm_base_url}')
    print(f'  Model: {local_llm_model}')
print(f'Embedding Provider: {embedding_provider}')

Neo4j URI: bolt://localhost:7687
Local LLM Enabled: True
  Base URL: http://localhost:8801/v1
  Model: Qwen/Qwen2.5-32B-Instruct
Embedding Provider: local


In [3]:
# Cell 4.3: Configure OpenTelemetry Tracing
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor

def setup_otel_tracing():
    """Configure OpenTelemetry to output traces to console"""
    resource = Resource(attributes={
        'service.name': 'zep-agent-full-trace',
        'service.version': '1.0.0',
    })
    
    provider = TracerProvider(resource=resource)
    provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
    trace.set_tracer_provider(provider)
    
    return trace.get_tracer(__name__)

otel_tracer = setup_otel_tracing()
print("OpenTelemetry tracing configured")

OpenTelemetry tracing configured


In [4]:
# Cell 4.4: Define Local Embedder (sentence-transformers)
from graphiti_core.embedder.client import EmbedderClient
from sentence_transformers import SentenceTransformer

class SentenceTransformerEmbedder(EmbedderClient):
    """
    Local embedder using sentence-transformers.
    No API key required - runs entirely locally.
    """
    
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        print(f"Loading sentence-transformers model: {model_name}")
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()
        print(f"Model loaded. Embedding dimension: {self.embedding_dim}")
    
    async def create(
        self, input_data: str | list[str] | Iterable[int] | Iterable[Iterable[int]]
    ) -> list[float]:
        """Create embedding for input text."""
        if isinstance(input_data, str):
            text = input_data
        elif isinstance(input_data, list) and len(input_data) > 0:
            text = input_data[0] if isinstance(input_data[0], str) else str(input_data[0])
        else:
            text = str(input_data)
        
        loop = asyncio.get_running_loop()
        embedding = await loop.run_in_executor(
            None, 
            lambda: self.model.encode(text, convert_to_numpy=True).tolist()
        )
        return embedding
    
    async def create_batch(self, input_data_list: list[str]) -> list[list[float]]:
        """Create embeddings for a batch of texts."""
        loop = asyncio.get_running_loop()
        embeddings = await loop.run_in_executor(
            None,
            lambda: self.model.encode(input_data_list, convert_to_numpy=True).tolist()
        )
        return embeddings

print("SentenceTransformerEmbedder class defined")

  from .autonotebook import tqdm as notebook_tqdm


SentenceTransformerEmbedder class defined


In [5]:
# Cell 4.5: vLLM Reranker Client
#
# Custom CrossEncoderClient that uses vLLM for reranking.
# Uses direct scoring (0-100) instead of logprobs for better compatibility.

import re
from graphiti_core.cross_encoder.client import CrossEncoderClient
from graphiti_core.helpers import semaphore_gather

class VLLMRerankerClient(CrossEncoderClient):
    """CrossEncoderClient implementation using vLLM with direct relevance scoring."""
    
    def __init__(self, client, model: str):
        self.client = client  # AsyncOpenAI client
        self.model = model
    
    async def rank(self, query: str, passages: list[str]) -> list[tuple[str, float]]:
        if not passages:
            return []
        if len(passages) == 1:
            return [(passages[0], 1.0)]
        
        async def score_passage(passage: str) -> tuple[str, float]:
            prompt = f"""Rate relevance 0-100. Query: {query} Passage: {passage} Number only:"""
            try:
                response = await self.client.chat.completions.create(
                    model=self.model,
                    messages=[{'role': 'user', 'content': prompt}],
                    temperature=0,
                    max_tokens=5,
                )
                score_text = response.choices[0].message.content.strip()
                match = re.search(r'\\d+', score_text)
                score = min(100, max(0, int(match.group()))) / 100.0 if match else 0.5
                return (passage, score)
            except Exception as e:
                print(f'Reranker error: {e}')
                return (passage, 0.0)
        
        results = await semaphore_gather(*[score_passage(p) for p in passages])
        return sorted(results, key=lambda x: x[1], reverse=True)

print('VLLMRerankerClient class defined')

VLLMRerankerClient class defined


In [6]:
# Cell 4.6: Initialize Graphiti with vLLM Reranker
#
# IMPORTANT: Make sure vLLM is running on port 8801:
# CUDA_VISIBLE_DEVICES=4,5 uv run vllm serve Qwen/Qwen2.5-32B-Instruct \
#     --port 8801 --api-key vllm --tensor-parallel-size 2 \
#     --max-model-len 16384 --enforce-eager --gpu-memory-utilization 0.85

from graphiti_core import Graphiti
from graphiti_core.llm_client.config import LLMConfig
from graphiti_core.llm_client.openai_generic_client import OpenAIGenericClient
from openai import AsyncOpenAI

# Timeout Configuration
LLM_TIMEOUT_SECONDS = 600  # 10 minutes per LLM call

# Initialize Local Embedder
print(f'Initializing local embedder with model: {local_embedding_model}')
embedder = SentenceTransformerEmbedder(model_name=local_embedding_model)

# Initialize LLM Client
if not local_llm_enabled:
    raise ValueError('LOCAL_LLM_ENABLED must be true in .env')

print(f'Using Local LLM at {local_llm_base_url}')
print(f'Model: {local_llm_model}')
print(f'Timeout: {LLM_TIMEOUT_SECONDS}s')

llm_config = LLMConfig(
    api_key=local_llm_api_key,
    model=local_llm_model,
    small_model=local_llm_model,
    base_url=local_llm_base_url,
)

custom_openai_client = AsyncOpenAI(
    api_key=local_llm_api_key,
    base_url=local_llm_base_url,
    timeout=LLM_TIMEOUT_SECONDS,
)

llm_client = OpenAIGenericClient(
    config=llm_config,
    client=custom_openai_client,
    max_tokens=4096
)

# Use VLLMRerankerClient with the same AsyncOpenAI client
cross_encoder = VLLMRerankerClient(client=custom_openai_client, model=local_llm_model)

# Initialize Graphiti
graphiti = Graphiti(
    uri=neo4j_uri,
    user=neo4j_user,
    password=neo4j_password,
    llm_client=llm_client,
    embedder=embedder,
    cross_encoder=cross_encoder,
    tracer=otel_tracer,
    trace_span_prefix='zep.graphiti',
)

print(f'Graphiti initialized with VLLMRerankerClient, connected to {neo4j_uri}')

Initializing local embedder with model: all-MiniLM-L6-v2
Loading sentence-transformers model: all-MiniLM-L6-v2
Model loaded. Embedding dimension: 384
Using Local LLM at http://localhost:8801/v1
Model: Qwen/Qwen2.5-32B-Instruct
Timeout: 600s
Graphiti initialized with VLLMRerankerClient, connected to bolt://localhost:7687


In [7]:
# Cell 4.7: Build Indices and Constraints
await graphiti.build_indices_and_constraints()
print("Indices and constraints built successfully")

Indices and constraints built successfully


## 5. Trace System Implementation

We implement a dual-output trace system:
- **Raw JSON** → `trace_raw.jsonl` (for debugging)
- **Pretty Print** → stdout (for reading)

In [8]:
# Cell 5.1: Enhanced TraceLogger - File-Only Output System
#
# This enhanced TraceLogger outputs ALL logs to files only (no console output).
# It captures: API calls, LLM calls, Neo4j queries (READ + WRITE), Embeddings, Parallel ops, OTEL spans.
#
# Log Files:
#   - trace_main.jsonl      : Main trace log (API, Graphiti calls, sections)
#   - trace_llm.jsonl       : LLM requests and responses
#   - trace_neo4j.jsonl     : Neo4j READ queries
#   - trace_neo4j_write.jsonl: Neo4j WRITE operations (CREATE, MERGE, SET)
#   - trace_embeddings.jsonl: Embedding generation calls
#   - trace_parallel.jsonl  : Parallel operation tracking
#   - trace_otel.jsonl      : OpenTelemetry spans

class EnhancedTraceLogger:
    """
    Enhanced trace logger that outputs to separate files with NO console output.
    Captures complete information about all operations including embeddings and writes.
    """
    
    def __init__(self, output_dir: str = '.'):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # Open all log files
        self.files = {
            'main': open(self.output_dir / 'trace_main.jsonl', 'w'),
            'llm': open(self.output_dir / 'trace_llm.jsonl', 'w'),
            'neo4j': open(self.output_dir / 'trace_neo4j.jsonl', 'w'),
            'neo4j_write': open(self.output_dir / 'trace_neo4j_write.jsonl', 'w'),
            'embeddings': open(self.output_dir / 'trace_embeddings.jsonl', 'w'),
            'parallel': open(self.output_dir / 'trace_parallel.jsonl', 'w'),
            'otel': open(self.output_dir / 'trace_otel.jsonl', 'w'),
        }
        
        self.indent_level = 0
        self.counters = {
            'llm_call': 0,
            'neo4j_query': 0,
            'neo4j_write': 0,
            'embedding': 0,
            'parallel_task': 0,
            'otel_span': 0,
        }
        
        # Track parallel operations
        self.active_parallel_tasks = {}
        
        # Log initialization (only to file)
        self._write('main', {
            'level': 'INIT',
            'message': 'EnhancedTraceLogger initialized',
            'log_files': list(self.files.keys()),
        })
    
    def _write(self, file_key: str, entry: dict):
        """Write JSON entry to specified log file"""
        entry['timestamp'] = datetime.now(timezone.utc).isoformat()
        self.files[file_key].write(json.dumps(entry, default=str) + '\n')
        self.files[file_key].flush()
    
    def _indent(self) -> str:
        return '   ' * self.indent_level
    
    # ========== Main Trace Events ==========
    
    def log_main(self, event: str, data: dict = None):
        """Generic main log entry (for any high-level event)"""
        self._write('main', {'level': 'MAIN', 'event': event, 'data': data})
    
    def log_section(self, title: str):
        """Log a section header (NO console output)"""
        self._write('main', {'level': 'SECTION', 'title': title})
    
    def log_api_call(self, method: str, params: dict):
        """Log a Zep-equivalent API call"""
        self._write('main', {'level': 'API', 'method': method, 'params': params})
    
    def log_graphiti_call(self, method: str, params: dict = None):
        """Log a graphiti_core function call"""
        self._write('main', {'level': 'GRAPHITI_START', 'method': method, 'params': params})
        self.indent_level += 1
    
    def log_graphiti_step(self, step_num: int, description: str):
        """Log a step within a graphiti operation"""
        self._write('main', {'level': 'GRAPHITI_STEP', 'step': step_num, 'description': description})
    
    def log_graphiti_end(self, duration_ms: float = None, result_summary: str = None):
        """End a graphiti operation"""
        self.indent_level = max(0, self.indent_level - 1)
        self._write('main', {'level': 'GRAPHITI_END', 'duration_ms': duration_ms, 'result': result_summary})
    
    def log_result(self, description: str, data: any = None):
        """Log a result"""
        self._write('main', {'level': 'RESULT', 'description': description, 'data': str(data) if data else None})
    
    # ========== LLM Events ==========
    
    def log_llm_request(self, call_number: int, model: str, messages: list, **kwargs):
        """Log LLM request"""
        self.counters['llm_call'] = call_number
        self._write('llm', {
            'level': 'LLM_REQUEST',
            'call_number': call_number,
            'model': model,
            'messages': messages,
            'params': kwargs,
        })
    
    def log_llm_response(self, call_number: int, content: str, usage: dict, finish_reason: str = None):
        """Log LLM response"""
        self._write('llm', {
            'level': 'LLM_RESPONSE',
            'call_number': call_number,
            'content': content,
            'usage': usage,
            'finish_reason': finish_reason,
        })
    
    # ========== Neo4j Events ==========
    
    def log_neo4j_query(self, query_number: int, query: str, params: dict, is_write: bool = False):
        """Log Neo4j query - separates READ and WRITE operations"""
        file_key = 'neo4j_write' if is_write else 'neo4j'
        level = 'NEO4J_WRITE' if is_write else 'NEO4J_QUERY'
        
        # Detect query type
        query_upper = query.strip().upper()
        query_type = 'UNKNOWN'
        if query_upper.startswith('CREATE'):
            query_type = 'CREATE'
        elif query_upper.startswith('MERGE'):
            query_type = 'MERGE'
        elif 'SET ' in query_upper:
            query_type = 'UPDATE'
        elif query_upper.startswith('DELETE') or 'DELETE ' in query_upper:
            query_type = 'DELETE'
        elif query_upper.startswith('MATCH'):
            query_type = 'READ'
        elif 'fulltext.query' in query.lower():
            query_type = 'BM25_SEARCH'
        elif 'vector.similarity' in query.lower():
            query_type = 'VECTOR_SEARCH'
        
        self._write(file_key, {
            'level': level,
            'query_number': query_number,
            'query_type': query_type,
            'query': query,
            'params': {k: str(v)[:500] for k, v in params.items()} if params else {},
        })
    
    def log_neo4j_result(self, query_number: int, duration_ms: float, record_count: int, 
                         records_preview: str, is_write: bool = False):
        """Log Neo4j query result"""
        file_key = 'neo4j_write' if is_write else 'neo4j'
        level = 'NEO4J_WRITE_RESULT' if is_write else 'NEO4J_RESULT'
        
        self._write(file_key, {
            'level': level,
            'query_number': query_number,
            'duration_ms': duration_ms,
            'record_count': record_count,
            'records_preview': records_preview,
        })
    
    # ========== Embedding Events ==========
    
    def log_embedding_start(self, operation: str, input_texts: list[str]):
        """Log embedding generation start"""
        self.counters['embedding'] += 1
        self._write('embeddings', {
            'level': 'EMBEDDING_START',
            'embedding_number': self.counters['embedding'],
            'operation': operation,
            'input_count': len(input_texts),
            'input_texts': [t[:200] for t in input_texts],  # Truncate for readability
        })
        return self.counters['embedding']
    
    def log_embedding_end(self, embedding_number: int, duration_ms: float, output_dims: list[int]):
        """Log embedding generation result"""
        self._write('embeddings', {
            'level': 'EMBEDDING_END',
            'embedding_number': embedding_number,
            'duration_ms': duration_ms,
            'output_dimensions': output_dims,
        })
    
    # ========== Parallel Operation Events ==========
    
    def log_parallel_start(self, task_group_id: str, task_names: list[str]):
        """Log start of parallel operation"""
        self.counters['parallel_task'] += 1
        self._write('parallel', {
            'level': 'PARALLEL_START',
            'task_group_id': task_group_id,
            'parallel_task_number': self.counters['parallel_task'],
            'task_count': len(task_names),
            'task_names': task_names,
        })
        self.active_parallel_tasks[task_group_id] = task_names
    
    def log_parallel_end(self, task_group_id: str, duration_ms: float, results_summary: str = None):
        """Log end of parallel operation"""
        task_names = self.active_parallel_tasks.pop(task_group_id, [])
        self._write('parallel', {
            'level': 'PARALLEL_END',
            'task_group_id': task_group_id,
            'task_count': len(task_names),
            'duration_ms': duration_ms,
            'results_summary': results_summary,
        })
    
    # ========== OTEL Span Events ==========
    
    def log_otel_span_start(self, span_name: str):
        """Log OTEL span start"""
        self._write('otel', {'level': 'OTEL_SPAN_START', 'span_name': span_name})
    
    def log_otel_span_end(self, span_name: str, duration_ms: float, attributes: dict = None, status: str = None):
        """Log OTEL span end"""
        self._write('otel', {
            'level': 'OTEL_SPAN_END',
            'span_name': span_name,
            'duration_ms': duration_ms,
            'attributes': attributes,
            'status': status,
        })
    
    # ========== Utilities ==========
    
    def close(self):
        """Close all log files"""
        for f in self.files.values():
            f.close()
    
    def get_summary(self) -> dict:
        """Get summary of logged events"""
        return {
            'counters': self.counters.copy(),
            'log_files': [str(self.output_dir / f'{k}.jsonl') for k in self.files.keys()],
        }

# Initialize the enhanced trace logger
trace_logger = EnhancedTraceLogger('.')

In [9]:
# Cell 5.2: Configure Logging (Suppress Console Output)
#
# This configures logging to suppress console output while still
# capturing to our trace files.

import logging

# Suppress all console logging - we only want file output
logging.basicConfig(
    level=logging.WARNING,
    format='%(asctime)s | %(name)s | %(levelname)s | %(message)s',
    datefmt='%H:%M:%S',
)

# Suppress verbose loggers
logging.getLogger('graphiti_core').setLevel(logging.WARNING)
logging.getLogger('neo4j').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('httpcore').setLevel(logging.WARNING)

# NO console output - just confirm setup
# (This print is the only output from setup cells)

### 5.3 Deep Trace Hooks (LLM + Neo4j + OTEL)

This cell implements **non-invasive** hooks to capture:
1. **LLM calls**: Full prompt and response via httpx event hooks
2. **Neo4j queries**: Query text, parameters, and results via driver wrapper
3. **OTEL spans**: All internal operation spans via custom SpanProcessor

In [10]:
# Cell 5.3: Deep Trace Hooks (File-Only Output)
#
# These hooks capture complete information WITHOUT modifying graphiti_core source code.
# ALL output goes to files - NO console output.
#
# Captures:
# 1. LLM requests/responses via httpx event hooks
# 2. Neo4j queries (READ + WRITE) via driver wrapper
# 3. OTEL spans via custom SpanProcessor
# 4. Embedding generation via TracedEmbedder wrapper

import httpx
from functools import wraps
from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace import ReadableSpan

# ============================================================================
# 1. LLM Request/Response Hook via httpx (NO console output)
# ============================================================================

class LLMTraceHook:
    """Captures LLM requests/responses - file output only."""
    
    def __init__(self, trace_logger: EnhancedTraceLogger):
        self.trace = trace_logger
        self.call_count = 0
    
    async def log_request(self, request: httpx.Request):
        """Called before each HTTP request"""
        self.call_count += 1
        
        if '/v1/chat/completions' in str(request.url):
            try:
                body = json.loads(request.content.decode('utf-8'))
                self.trace.log_llm_request(
                    call_number=self.call_count,
                    model=body.get('model'),
                    messages=body.get('messages', []),
                    temperature=body.get('temperature'),
                    max_tokens=body.get('max_tokens'),
                )
            except Exception:
                pass  # Silent fail - no console output
    
    async def log_response(self, response: httpx.Response):
        """Called after each HTTP response"""
        if '/v1/chat/completions' in str(response.url):
            try:
                await response.aread()
                body = json.loads(response.content.decode('utf-8'))
                choices = body.get('choices', [])
                usage = body.get('usage', {})
                content = choices[0].get('message', {}).get('content', '') if choices else ''
                
                self.trace.log_llm_response(
                    call_number=self.call_count,
                    content=content,
                    usage=usage,
                    finish_reason=choices[0].get('finish_reason') if choices else None,
                )
            except Exception:
                pass  # Silent fail

# ============================================================================
# 2. Neo4j Query Hook with WRITE detection (NO console output)
# ============================================================================

class Neo4jTraceHook:
    """Wraps Neo4j driver to capture all queries including WRITE operations."""
    
    WRITE_KEYWORDS = ('CREATE', 'MERGE', 'SET ', 'DELETE', 'REMOVE', 'DETACH')
    
    def __init__(self, trace_logger: EnhancedTraceLogger):
        self.trace = trace_logger
        self.query_count = 0
    
    def _is_write_query(self, query: str) -> bool:
        """Detect if query modifies the database"""
        query_upper = query.strip().upper()
        return any(kw in query_upper for kw in self.WRITE_KEYWORDS)
    
    def wrap_driver(self, driver):
        """Wrap the driver's execute_query method"""
        original_execute = driver.execute_query
        
        @wraps(original_execute)
        async def traced_execute(cypher_query, **kwargs):
            self.query_count += 1
            params = kwargs.get('params', {})
            is_write = self._is_write_query(cypher_query)
            
            # Log query (separates READ and WRITE)
            self.trace.log_neo4j_query(
                query_number=self.query_count,
                query=cypher_query,
                params=params,
                is_write=is_write,
            )
            
            # Execute original
            start_time = time.time()
            result = await original_execute(cypher_query, **kwargs)
            duration_ms = (time.time() - start_time) * 1000
            
            # Log result
            records = result.records if hasattr(result, 'records') else []
            record_count = len(records)
            
            records_preview = []
            for r in records[:5]:
                try:
                    records_preview.append(dict(r))
                except:
                    records_preview.append(str(r))
            
            self.trace.log_neo4j_result(
                query_number=self.query_count,
                duration_ms=duration_ms,
                record_count=record_count,
                records_preview=str(records_preview)[:1000],
                is_write=is_write,
            )
            
            return result
        
        driver.execute_query = traced_execute
        return driver

# ============================================================================
# 3. OTEL Span Processor (NO console output)
# ============================================================================

class TraceSpanProcessor(SpanProcessor):
    """Custom SpanProcessor - file output only."""
    
    def __init__(self, trace_logger: EnhancedTraceLogger):
        self.trace = trace_logger
    
    def on_start(self, span: ReadableSpan, parent_context=None):
        span_name = span.name if hasattr(span, 'name') else str(span)
        self.trace.log_otel_span_start(span_name)
    
    def on_end(self, span: ReadableSpan):
        span_name = span.name
        duration_ns = span.end_time - span.start_time if span.end_time and span.start_time else 0
        duration_ms = duration_ns / 1_000_000
        attributes = dict(span.attributes) if span.attributes else {}
        
        self.trace.log_otel_span_end(
            span_name=span_name,
            duration_ms=duration_ms,
            attributes={k: str(v) for k, v in attributes.items()},
            status=str(span.status) if hasattr(span, 'status') else None,
        )
    
    def shutdown(self):
        pass
    
    def force_flush(self, timeout_millis=None) -> bool:
        return True

# ============================================================================
# 4. TracedEmbedder - Captures all embedding operations
# ============================================================================

class TracedEmbedder:
    """Wraps an embedder to log all embedding generation calls."""
    
    def __init__(self, embedder, trace_logger: EnhancedTraceLogger):
        self.embedder = embedder
        self.trace = trace_logger
    
    async def create(self, input_data):
        """Wrap single embedding creation"""
        # Normalize input
        if isinstance(input_data, str):
            texts = [input_data]
        elif isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], str):
            texts = input_data
        else:
            texts = [str(input_data)]
        
        emb_num = self.trace.log_embedding_start('create', texts)
        start = time.time()
        
        result = await self.embedder.create(input_data)
        
        duration_ms = (time.time() - start) * 1000
        self.trace.log_embedding_end(emb_num, duration_ms, [len(result)])
        
        return result
    
    async def create_batch(self, input_data_list: list[str]):
        """Wrap batch embedding creation"""
        emb_num = self.trace.log_embedding_start('create_batch', input_data_list)
        start = time.time()
        
        results = await self.embedder.create_batch(input_data_list)
        
        duration_ms = (time.time() - start) * 1000
        dims = [len(r) for r in results] if results else []
        self.trace.log_embedding_end(emb_num, duration_ms, dims)
        
        return results
    
    # Forward other attributes to wrapped embedder
    def __getattr__(self, name):
        return getattr(self.embedder, name)

# NO print statement - silent initialization

In [11]:
# Cell 5.4: Apply Deep Trace Hooks (Including TracedEmbedder)
#
# This cell applies ALL hooks to the graphiti instance.
# Run AFTER Cell 4.5 (Graphiti initialization) and Cell 5.1 (TraceLogger).
#
# Applies:
# 1. Neo4j trace hook (READ + WRITE detection)
# 2. LLM trace hook (httpx event hooks)
# 3. OTEL span processor
# 4. TracedEmbedder (captures all embedding generation)

import httpx
from opentelemetry import trace as otel_trace

# 1. Apply Neo4j trace hook
neo4j_hook = Neo4jTraceHook(trace_logger)
neo4j_hook.wrap_driver(graphiti.driver)

# 2. Apply LLM trace hook via httpx event hooks
llm_hook = LLMTraceHook(trace_logger)
traced_http_client = httpx.AsyncClient(
    timeout=httpx.Timeout(LLM_TIMEOUT_SECONDS),
    event_hooks={
        'request': [llm_hook.log_request],
        'response': [llm_hook.log_response],
    }
)
traced_openai_client = AsyncOpenAI(
    api_key=local_llm_api_key,
    base_url=local_llm_base_url,
    timeout=LLM_TIMEOUT_SECONDS,
    http_client=traced_http_client,
)
llm_client.client = traced_openai_client

# 3. Add custom span processor to OTEL
provider = otel_trace.get_tracer_provider()
if hasattr(provider, 'add_span_processor'):
    trace_span_processor = TraceSpanProcessor(trace_logger)
    provider.add_span_processor(trace_span_processor)

# 4. Wrap embedder with TracedEmbedder
traced_embedder = TracedEmbedder(embedder, trace_logger)
graphiti.clients.embedder = traced_embedder

# NO print statements - silent setup

### 5.5 Log Files Documentation

All trace logs are written to JSONL files in the current directory:

| File | Content | Use Case |
|------|---------|----------|
| `trace_main.jsonl` | High-level operations, API calls | Overview of what happened |
| `trace_llm.jsonl` | LLM requests and responses | Full prompts and outputs |
| `trace_neo4j.jsonl` | Neo4j READ queries | Retrieval operations |
| `trace_neo4j_write.jsonl` | Neo4j WRITE queries (CREATE, MERGE) | Graph modifications |
| `trace_embeddings.jsonl` | Embedding generation | Vector creation |
| `trace_parallel.jsonl` | Parallel operations | Concurrency analysis |
| `trace_otel.jsonl` | OpenTelemetry spans | Performance timing |

**Commands to View Logs:**

```bash
# View main trace
cat trace_main.jsonl | jq -c '{ts: .timestamp, event: .event, data: .data}'

# View all LLM calls
cat trace_llm.jsonl | jq -c 'select(.level=="LLM_REQUEST")'

# View Neo4j WRITE operations
cat trace_neo4j_write.jsonl | jq -c 'select(.level=="NEO4J_WRITE")'

# View embedding generation
cat trace_embeddings.jsonl | jq .

# Count operations
echo "LLM calls: $(grep -c LLM_REQUEST trace_llm.jsonl)"
echo "Neo4j READ: $(grep -c NEO4J_QUERY trace_neo4j.jsonl)"
echo "Neo4j WRITE: $(grep -c NEO4J_WRITE trace_neo4j_write.jsonl)"
```

## 6. ZepSimulator - Simulating Zep Cloud API

This class mimics the Zep Python SDK interface using graphiti_core directly.

**Search Modes:**
- `enable_bfs=False` (default): EDGE_HYBRID_SEARCH_RRF - BM25 + Cosine + RRF
- `enable_bfs=True`: EDGE_HYBRID_SEARCH_CROSS_ENCODER - BM25 + Cosine + BFS + vLLM cross_encoder

In [12]:
# Cell 6.1: ZepSimulator Class (with BFS Search Support)

from graphiti_core.nodes import EpisodeType
from graphiti_core.search.search_config_recipes import (
    EDGE_HYBRID_SEARCH_RRF,
    EDGE_HYBRID_SEARCH_CROSS_ENCODER,
)
import time

# EDGE_HYBRID_SEARCH_CROSS_ENCODER includes BFS and uses the cross_encoder (VLLMRerankerClient)
# EDGE_HYBRID_SEARCH_RRF uses only BM25+Cosine with RRF reranking (no graph traversal)

class ZepSimulator:
    """
    Simulates Zep Cloud API behavior using graphiti_core directly.
    
    This provides the same interface patterns as the Zep Python SDK,
    allowing us to see exactly what happens internally.
    
    Mapping:
    - add_message() → zep_client.thread.add_messages() → graphiti.add_episode()
    - get_user_context() → zep_client.thread.get_user_context() → graphiti.search()
    - graph_search() → zep_client.graph.search() → graphiti.search()
    
    Search Modes:
    - enable_bfs=False (default): Uses EDGE_HYBRID_SEARCH_RRF (BM25+Cosine+RRF, no graph traversal)
    - enable_bfs=True: Uses EDGE_HYBRID_SEARCH_CROSS_ENCODER (BM25+Cosine+BFS + vLLM cross_encoder)
    """
    
    def __init__(self, graphiti_client: Graphiti, trace_logger: EnhancedTraceLogger):
        self.graphiti = graphiti_client
        self.trace = trace_logger
    
    async def add_message(
        self,
        group_id: str,
        role: str,
        name: str,
        content: str,
    ) -> dict:
        """
        Equivalent to: zep_client.thread.add_messages()
        
        This stores a message in the knowledge graph by:
        1. Extracting entities from the message (LLM call)
        2. Resolving entity duplicates (LLM call)
        3. Extracting relationships (LLM call)
        4. Creating nodes and edges in Neo4j
        """
        self.trace.log_main('add_message', {
            'group_id': group_id,
            'role': role,
            'name': name,
            'content': content,
        })
        
        # Format message as Zep does: "{role}({role_type}): {content}"
        episode_body = f"{name}({role}): {content}"
        
        start_time = time.time()
        
        # This is the actual graphiti call
        # NOTE: Do NOT pass uuid - let graphiti auto-generate it for new episodes.
        result = await self.graphiti.add_episode(
            name=name,
            episode_body=episode_body,
            source_description='Agent conversation message',
            reference_time=datetime.now(timezone.utc),
            source=EpisodeType.message,
            group_id=group_id,
        )
        
        duration_ms = (time.time() - start_time) * 1000
        episode_uuid = result.episode.uuid
        
        self.trace.log_main('add_message_complete', {
            'episode_uuid': episode_uuid,
            'duration_ms': duration_ms,
        })
        
        return {
            'uuid': episode_uuid,
            'duration_ms': duration_ms,
            'result': result,
        }
    
    async def get_user_context(
        self,
        group_id: str,
        query: str,
        max_facts: int = 10,
        enable_bfs: bool = False,
    ) -> list[str]:
        """
        Equivalent to: zep_client.thread.get_user_context()
        
        Args:
            group_id: The group to search within
            query: Natural language query
            max_facts: Maximum number of facts to return
            enable_bfs: If True, use BFS graph traversal (CROSS_ENCODER config)
                       If False, use standard hybrid search (RRF config, default)
        
        Search configurations:
        - enable_bfs=False: EDGE_HYBRID_SEARCH_RRF
          - BM25 fulltext search + Cosine vector similarity
          - RRF (Reciprocal Rank Fusion) for reranking
          - NO graph traversal
        
        - enable_bfs=True: EDGE_HYBRID_SEARCH_CROSS_ENCODER
          - BM25 + Cosine + BFS graph traversal
          - Cross-encoder (VLLMRerankerClient) for reranking
          - Uses graph structure for path-based retrieval
        """
        # Select search config based on enable_bfs
        if enable_bfs:
            search_config = EDGE_HYBRID_SEARCH_CROSS_ENCODER
            search_mode = 'BFS_CROSS_ENCODER'
        else:
            search_config = EDGE_HYBRID_SEARCH_RRF
            search_mode = 'HYBRID_RRF'
        
        self.trace.log_main('get_user_context', {
            'group_id': group_id,
            'query': query,
            'max_facts': max_facts,
            'enable_bfs': enable_bfs,
            'search_mode': search_mode,
            'search_config': str(search_config),
        })
        
        start_time = time.time()
        
        # Use search_() method which accepts SearchConfig
        results = await self.graphiti.search_(
            group_ids=[group_id],
            query=query,
            config=search_config,
        )
        
        duration_ms = (time.time() - start_time) * 1000
        
        # Extract facts from results (limit to max_facts)
        # SearchResults has .edges attribute containing EntityEdge objects
        facts = [edge.fact for edge in results.edges[:max_facts]]
        
        self.trace.log_main('get_user_context_complete', {
            'duration_ms': duration_ms,
            'num_results': len(results.edges),
            'facts_returned': len(facts),
            'facts': facts,
        })
        
        return facts
    
    async def graph_search(
        self,
        group_ids: list[str],
        query: str,
        max_facts: int = 10,
        enable_bfs: bool = False,
    ) -> list:
        """
        Equivalent to: zep_client.graph.search()
        
        Same as get_user_context but can search across multiple groups.
        
        Args:
            group_ids: List of groups to search within
            query: Natural language query
            max_facts: Maximum number of results
            enable_bfs: If True, use BFS graph traversal
        """
        if enable_bfs:
            search_config = EDGE_HYBRID_SEARCH_CROSS_ENCODER
            search_mode = 'BFS_CROSS_ENCODER'
        else:
            search_config = EDGE_HYBRID_SEARCH_RRF
            search_mode = 'HYBRID_RRF'
        
        self.trace.log_main('graph_search', {
            'group_ids': group_ids,
            'query': query,
            'max_facts': max_facts,
            'enable_bfs': enable_bfs,
            'search_mode': search_mode,
        })
        
        start_time = time.time()
        
        results = await self.graphiti.search_(
            group_ids=group_ids,
            query=query,
            config=search_config,
        )
        
        duration_ms = (time.time() - start_time) * 1000
        
        # Limit to max_facts - SearchResults has .edges attribute
        edges = results.edges[:max_facts]
        
        self.trace.log_main('graph_search_complete', {
            'duration_ms': duration_ms,
            'num_results': len(edges),
            'facts': [edge.fact for edge in edges],
        })
        
        return edges

# Initialize the simulator
zep = ZepSimulator(graphiti, trace_logger)
# NO print - silent initialization

## 7. Agent Conversation Demo

This demonstrates a real-world conversation that shows:
- **Adding nodes**: User introduces themselves
- **Adding relationships**: User mentions their work
- **Updating**: User provides more information
- **Searching**: Agent retrieves context

The demo simulates agent_memory_full_example pattern:

- Turn 1: User introduces themselves → Creates Alice Chen, TechCorp entities
- Turn 2: User mentions project → Creates Project Phoenix, LEADS relationship
- Turn 3: User provides deadline → Updates with deadline info
- Turn 4: Search test → Retrieves context about Alice

In [13]:
# Cell 7.1: Define the Conversation

# Use a unique group_id for this demo session
GROUP_ID = f"demo_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
USER_NAME = "Alice Chen"

# The conversation turns
CONVERSATION = [
    {
        "turn": 1,
        "description": "User introduces themselves - creates new entities",
        "user_message": "Hi, I'm Alice Chen. I work at TechCorp as a senior software engineer.",
        "expected_entities": ["Alice Chen (Person)", "TechCorp (Organization)"],
        "expected_relationships": ["Alice Chen WORKS_AT TechCorp"],
    },
    {
        "turn": 2,
        "description": "User mentions a project - adds more entities",
        "user_message": "I'm currently leading Project Phoenix, which is a cloud migration initiative.",
        "expected_entities": ["Project Phoenix (Project)"],
        "expected_relationships": ["Alice Chen LEADS Project Phoenix"],
    },
    {
        "turn": 3,
        "description": "User provides deadline - updates existing entity",
        "user_message": "The project deadline is February 15th, and we have 3 team members.",
        "expected_entities": [],
        "expected_relationships": ["Project Phoenix HAS_DEADLINE February 15th"],
    },
    {
        "turn": 4,
        "description": "Search test - retrieve context about Alice",
        "search_query": "What does Alice work on?",
        "expected_facts": ["Alice works at TechCorp", "Alice leads Project Phoenix"],
    },
]

print(f"Demo Group ID: {GROUP_ID}")
print(f"User: {USER_NAME}")
print(f"Conversation turns: {len(CONVERSATION)}")

Demo Group ID: demo_session_20260203_204107
User: Alice Chen
Conversation turns: 4


In [14]:
# Cell 7.2: Run Turn 1 - User Introduction

turn = CONVERSATION[0]
trace_logger.log_section(f"TURN {turn['turn']}: {turn['description']}")

print(f"\nUser: {turn['user_message']}")
print(f"\nExpected entities: {turn['expected_entities']}")
print(f"Expected relationships: {turn['expected_relationships']}")
print("\n" + "-"*70)

# Add the user message
result = await zep.add_message(
    group_id=GROUP_ID,
    role="user",
    name=USER_NAME,
    content=turn['user_message'],
)

print(f"\nOK: Turn 1 complete. Duration: {result['duration_ms']:.1f}ms")


User: Hi, I'm Alice Chen. I work at TechCorp as a senior software engineer.

Expected entities: ['Alice Chen (Person)', 'TechCorp (Organization)']
Expected relationships: ['Alice Chen WORKS_AT TechCorp']

----------------------------------------------------------------------
{
    "name": "zep.graphiti.llm.generate",
    "context": {
        "trace_id": "0xe810d2b2784e8ecd6e12a83c7ed0dcdf",
        "span_id": "0x701865dc243814cd",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": "0xad9c71ccdd255c31",
    "start_time": "2026-02-03T12:41:07.299406Z",
    "end_time": "2026-02-03T12:41:08.343357Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "llm.provider": "openai",
        "model.size": "medium",
        "max_tokens": 4096,
        "prompt.name": "extract_nodes.extract_message"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "zep-agent-full-trace",
  

In [15]:
# Cell 7.3: Run Turn 2 - Project Information

turn = CONVERSATION[1]
trace_logger.log_section(f"TURN {turn['turn']}: {turn['description']}")

print(f"\nUser: {turn['user_message']}")
print(f"\nExpected entities: {turn['expected_entities']}")
print(f"Expected relationships: {turn['expected_relationships']}")
print("\n" + "-"*70)

result = await zep.add_message(
    group_id=GROUP_ID,
    role="user",
    name=USER_NAME,
    content=turn['user_message'],
)

print(f"\nOK: Turn 2 complete. Duration: {result['duration_ms']:.1f}ms")


User: I'm currently leading Project Phoenix, which is a cloud migration initiative.

Expected entities: ['Project Phoenix (Project)']
Expected relationships: ['Alice Chen LEADS Project Phoenix']

----------------------------------------------------------------------
{
    "name": "zep.graphiti.llm.generate",
    "context": {
        "trace_id": "0x2dd356eff4f8e5c5388f8068b81fdc5d",
        "span_id": "0x0a8b46424f22ed33",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": "0x30337a89b7e12afe",
    "start_time": "2026-02-03T12:41:11.971473Z",
    "end_time": "2026-02-03T12:41:12.808769Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "llm.provider": "openai",
        "model.size": "medium",
        "max_tokens": 4096,
        "prompt.name": "extract_nodes.extract_message"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "zep-agent-full-trace",
           

In [16]:
# Cell 7.4: Run Turn 3 - Deadline Update

turn = CONVERSATION[2]
trace_logger.log_section(f"TURN {turn['turn']}: {turn['description']}")

print(f"\nUser: {turn['user_message']}")
print(f"\nExpected relationships: {turn['expected_relationships']}")
print("\n" + "-"*70)

result = await zep.add_message(
    group_id=GROUP_ID,
    role="user",
    name=USER_NAME,
    content=turn['user_message'],
)

print(f"\nOK: Turn 3 complete. Duration: {result['duration_ms']:.1f}ms")


User: The project deadline is February 15th, and we have 3 team members.

Expected relationships: ['Project Phoenix HAS_DEADLINE February 15th']

----------------------------------------------------------------------
{
    "name": "zep.graphiti.llm.generate",
    "context": {
        "trace_id": "0x775349768cc11717c2976afbfda8b53d",
        "span_id": "0x1444366963704662",
        "trace_state": "[]"
    },
    "kind": "SpanKind.INTERNAL",
    "parent_id": "0x34bd59f834b932ee",
    "start_time": "2026-02-03T12:41:16.171268Z",
    "end_time": "2026-02-03T12:41:17.009229Z",
    "status": {
        "status_code": "UNSET"
    },
    "attributes": {
        "llm.provider": "openai",
        "model.size": "medium",
        "max_tokens": 4096,
        "prompt.name": "extract_nodes.extract_message"
    },
    "events": [],
    "links": [],
    "resource": {
        "attributes": {
            "service.name": "zep-agent-full-trace",
            "service.version": "1.0.0"
        },
        "sc

In [17]:
# Cell 7.5: Run Turn 4 - Search Test

turn = CONVERSATION[3]
trace_logger.log_section(f"TURN {turn['turn']}: {turn['description']}")

print(f"\nSearch Query: {turn['search_query']}")
print(f"\nExpected facts: {turn['expected_facts']}")
print("\n" + "-"*70)

# Search for context
facts = await zep.get_user_context(
    group_id=GROUP_ID,
    query=turn['search_query'],
    max_facts=10,
)

print(f"\nOK: Turn 4 complete. Found {len(facts)} facts.")
print("\nRetrieved Facts:")
for i, fact in enumerate(facts):
    print(f"   {i+1}. {fact}")

# demo output
# ======================================================================
#   TURN 4: Search test - retrieve context about Alice
# ======================================================================

# Search Query: What does Alice work on?

# Expected facts: ['Alice works at TechCorp', 'Alice leads Project Phoenix']

# ----------------------------------------------------------------------

#  ZEP API: get_user_context (thread.get_user_context)()
#    group_id: demo_session_20260131_213508
#    query: What does Alice work on?
#    max_facts: 10
#  GRAPHITI: search()
#     Neo4j Query #36: CALL db.index.fulltext.queryRelationships("edge_name_and_fact", $query, {limit: $limit}) YIELD relationship AS rel, score MATCH (n:Entity)-[e:RELATES_...
#     Neo4j Query #37: MATCH (n:Entity)-[e:RELATES_TO]->(m:Entity) WHERE e.group_id IN $group_ids WITH DISTINCT e, n, m, vector.similarity.cosine(e.fact_embedding, $search_v...
#       → 3 records, 67.1ms
#       → 2 records, 60.2ms
# └─ Duration: 93.2ms
#    Result: Found 3 facts
#  Fact 1
#    Data: Alice Chen is currently leading Project Phoenix.
#  Fact 2
# ...
# Retrieved Facts:
#    1. Alice Chen is currently leading Project Phoenix.
#    2. Project Phoenix has a deadline on February 15th.
#    3. Alice Chen works at TechCorp as a senior software engineer.


Search Query: What does Alice work on?

Expected facts: ['Alice works at TechCorp', 'Alice leads Project Phoenix']

----------------------------------------------------------------------

OK: Turn 4 complete. Found 2 facts.

Retrieved Facts:
   1. Alice Chen is currently leading Project Phoenix.
   2. Alice Chen works at TechCorp as a senior software engineer.


## 8. Additional Search Tests

In [18]:
# Cell 8.1: Search for project deadline

trace_logger.log_section("ADDITIONAL SEARCH: Project Deadline")

query = "When is the project deadline?"
print(f"\nSearch Query: {query}")
print("\n" + "-"*70)

facts = await zep.get_user_context(
    group_id=GROUP_ID,
    query=query,
    max_facts=5,
)

print(f"\nRetrieved Facts:")
for i, fact in enumerate(facts):
    print(f"   {i+1}. {fact}")

# demo output
# ======================================================================
#   ADDITIONAL SEARCH: Project Deadline
# ======================================================================

# Search Query: When is the project deadline?

# ----------------------------------------------------------------------

#  ZEP API: get_user_context (thread.get_user_context)()
#    group_id: demo_session_20260131_213508
#    query: When is the project deadline?
#    max_facts: 5
#  GRAPHITI: search()
#     Neo4j Query #38: CALL db.index.fulltext.queryRelationships("edge_name_and_fact", $query, {limit: $limit}) YIELD relationship AS rel, score MATCH (n:Entity)-[e:RELATES_...
#     Neo4j Query #39: MATCH (n:Entity)-[e:RELATES_TO]->(m:Entity) WHERE e.group_id IN $group_ids WITH DISTINCT e, n, m, vector.similarity.cosine(e.fact_embedding, $search_v...
# 21:35:27 | neo4j.io | DEBUG | [#E860]  S: RECORD * 1
# 21:35:27 | neo4j.io | DEBUG | [#E860]  S: SUCCESS {'statuses': [{'gql_status': '00000', 'status_description': 'note: successful completion'}], 'type': 'r', 't_last': 1, 'db': 'neo4j'}
# 21:35:27 | neo4j.io | DEBUG | [#E860]  C: COMMIT
# 21:35:27 | neo4j.io | DEBUG | [#E860]  _: <CONNECTION> client state: TX_READY_OR_TX_STREAMING > READY
# 21:35:27 | neo4j.io | DEBUG | [#E86E]  S: SUCCESS {'bookmark': 'FB:kcwQLskJJyV+REC9lj1ew4ZBjkqQ'}
# 21:35:27 | neo4j.io | DEBUG | [#E86E]  _: <CONNECTION> server state: TX_READY_OR_TX_STREAMING > READY
# 21:35:27 | neo4j.pool | DEBUG | [#E86E]  _: <POOL> released bolt-104875
# 21:35:27 | neo4j.io | DEBUG | [#E860]  S: SUCCESS {'bookmark': 'FB:kcwQLskJJyV+REC9lj1ew4ZBjkqQ'}
# 21:35:27 | neo4j.io | DEBUG | [#E860]  _: <CONNECTION> server state: TX_READY_OR_TX_STREAMING > READY
# 21:35:27 | neo4j.pool | DEBUG | [#E860]  _: <POOL> released bolt-104890
# 21:35:27 | graphiti_core.search.search | DEBUG | search returned context for query When is the project deadline? in 89.6604061126709 ms
#       → 2 records, 64.8ms
#       → 2 records, 55.6ms
# └─ Duration: 91.1ms
#    Result: Found 2 facts
#  Fact 1
#    Data: Project Phoenix has a deadline on February 15th.
#  Fact 2
#    Data: Alice Chen is currently leading Project Phoenix.

# Retrieved Facts:
#    1. Project Phoenix has a deadline on February 15th.
#    2. Alice Chen is currently leading Project Phoenix.


Search Query: When is the project deadline?

----------------------------------------------------------------------

Retrieved Facts:
   1. The deadline for Project Phoenix is February 15th.
   2. Alice Chen is currently leading Project Phoenix.


In [19]:
# Cell 8.2: Search for company information

trace_logger.log_section("ADDITIONAL SEARCH: Company Information")

query = "What company does Alice work for?"
print(f"\nSearch Query: {query}")
print("\n" + "-"*70)

facts = await zep.get_user_context(
    group_id=GROUP_ID,
    query=query,
    max_facts=5,
)

print(f"\nRetrieved Facts:")
for i, fact in enumerate(facts):
    print(f"   {i+1}. {fact}")

# demo output
# ======================================================================
#   ADDITIONAL SEARCH: Company Information
# ======================================================================

# Search Query: What company does Alice work for?

# ----------------------------------------------------------------------

#  ZEP API: get_user_context (thread.get_user_context)()
#    group_id: demo_session_20260131_213508
#    query: What company does Alice work for?
#    max_facts: 5
#  GRAPHITI: search()
#     Neo4j Query #40: CALL db.index.fulltext.queryRelationships("edge_name_and_fact", $query, {limit: $limit}) YIELD relationship AS rel, score MATCH (n:Entity)-[e:RELATES_...
#     Neo4j Query #41: MATCH (n:Entity)-[e:RELATES_TO]->(m:Entity) WHERE e.group_id IN $group_ids WITH DISTINCT e, n, m, vector.similarity.cosine(e.fact_embedding, $search_v...
#       → 2 records, 70.1ms
#       → 3 records, 60.5ms
# 21:35:27 | graphiti_core.search.search | DEBUG | search returned context for query What company does Alice work for? in 95.98636627197266 ms
# └─ Duration: 97.9ms
#    Result: Found 3 facts
#  Fact 1
#    Data: Alice Chen is currently leading Project Phoenix.
#  Fact 2
#    Data: Alice Chen works at TechCorp as a senior software engineer.
#  Fact 3
#    Data: Project Phoenix has a deadline on February 15th.

# Retrieved Facts:
#    1. Alice Chen is currently leading Project Phoenix.
#    2. Alice Chen works at TechCorp as a senior software engineer.
#    3. Project Phoenix has a deadline on February 15th.


Search Query: What company does Alice work for?

----------------------------------------------------------------------

Retrieved Facts:
   1. The deadline for Project Phoenix is February 15th.
   2. Alice Chen works at TechCorp as a senior software engineer.
   3. Alice Chen is currently leading Project Phoenix.


### 8.3 BFS Search Comparison

This cell demonstrates the difference between standard hybrid search and BFS-enabled search.

In [20]:
# Cell 8.3: BFS Search Comparison
#
# This cell demonstrates the difference between standard hybrid search
# and BFS-enabled search that uses graph traversal.

trace_logger.log_section("BFS SEARCH COMPARISON")

query = "What projects is Alice working on and what are the deadlines?"
print(f"Search Query: {query}")
print("\n" + "-"*70)

# Standard search (no BFS)
print("\n[1] Standard Hybrid Search (RRF, no BFS):")
facts_standard = await zep.get_user_context(
    group_id=GROUP_ID,
    query=query,
    max_facts=5,
    enable_bfs=False,  # Default
)
for i, fact in enumerate(facts_standard):
    print(f"   {i+1}. {fact}")

# BFS-enabled search
print("\n[2] BFS Search (Cross-Encoder with graph traversal):")
facts_bfs = await zep.get_user_context(
    group_id=GROUP_ID,
    query=query,
    max_facts=5,
    enable_bfs=True,  # Enable BFS
)
for i, fact in enumerate(facts_bfs):
    print(f"   {i+1}. {fact}")

print("\n" + "-"*70)
print(f"Standard search found: {len(facts_standard)} facts")
print(f"BFS search found: {len(facts_bfs)} facts")
print("\nCheck trace_neo4j.jsonl for query differences.")

Search Query: What projects is Alice working on and what are the deadlines?

----------------------------------------------------------------------

[1] Standard Hybrid Search (RRF, no BFS):
   1. The deadline for Project Phoenix is February 15th.
   2. Alice Chen is currently leading Project Phoenix.
   3. Alice Chen works at TechCorp as a senior software engineer.

[2] BFS Search (Cross-Encoder with graph traversal):
   1. The deadline for Project Phoenix is February 15th.
   2. Alice Chen is currently leading Project Phoenix.
   3. Alice Chen works at TechCorp as a senior software engineer.

----------------------------------------------------------------------
Standard search found: 3 facts
BFS search found: 3 facts

Check trace_neo4j.jsonl for query differences.


## 9. Cleanup and Summary

In [21]:
# Cell 9.1: Close trace logger and show summary

trace_logger.log_section("DEMO COMPLETE")
summary = trace_logger.get_summary()
trace_logger.close()

print("\n" + "="*70)
print("  SUMMARY")
print("="*70)

print("\nTrace Log Files:")
for log_file in summary['log_files']:
    print(f"   - {log_file}")

print("\nOperation Counts:")
for key, count in summary['counters'].items():
    print(f"   - {key}: {count}")

print(f"\nDemo Group ID: {GROUP_ID}")
print(f"   - Use this to query the graph directly in Neo4j Browser")

print(f"\nNeo4j Browser Query:")
print(f"   MATCH (n) WHERE n.group_id = '{GROUP_ID}' RETURN n")

print("\n" + "-"*70)
print("Commands to view logs:")
print("   cat trace_main.jsonl | jq -c '{event: .event, data: .data}'")
print("   cat trace_llm.jsonl | jq -c 'select(.level==\"LLM_REQUEST\")'")
print("   cat trace_neo4j_write.jsonl | jq .")
print("   cat trace_embeddings.jsonl | jq .")


  SUMMARY

Trace Log Files:
   - main.jsonl
   - llm.jsonl
   - neo4j.jsonl
   - neo4j_write.jsonl
   - embeddings.jsonl
   - parallel.jsonl
   - otel.jsonl

Operation Counts:
   - llm_call: 17
   - neo4j_query: 0
   - neo4j_write: 0
   - embedding: 28
   - parallel_task: 0
   - otel_span: 0

Demo Group ID: demo_session_20260203_204107
   - Use this to query the graph directly in Neo4j Browser

Neo4j Browser Query:
   MATCH (n) WHERE n.group_id = 'demo_session_20260203_204107' RETURN n

----------------------------------------------------------------------
Commands to view logs:
   cat trace_main.jsonl | jq -c '{event: .event, data: .data}'
   cat trace_llm.jsonl | jq -c 'select(.level=="LLM_REQUEST")'
   cat trace_neo4j_write.jsonl | jq .
   cat trace_embeddings.jsonl | jq .


In [22]:
# Cell 9.2: View the raw trace log

print("First 20 lines of trace_raw.jsonl:\n")
with open('trace_raw.jsonl', 'r') as f:
    for i, line in enumerate(f):
        if i >= 20:
            print("...")
            break
        entry = json.loads(line)
        print(f"{i+1:3d}. [{entry.get('level', 'UNKNOWN'):15s}] {json.dumps(entry)[:100]}...")

First 20 lines of trace_raw.jsonl:

  1. [SECTION        ] {"level": "SECTION", "title": "TURN 1: User introduces themselves - creates new entities", "timestam...
  2. [API            ] {"level": "API", "method": "add_message (thread.add_messages)", "params": {"group_id": "demo_session...
  3. [GRAPHITI       ] {"level": "GRAPHITI", "method": "add_episode", "params": {"group_id": "demo_session_20260131_213508"...
  4. [OTEL_SPAN_START] {"level": "OTEL_SPAN_START", "span_name": "zep.graphiti.add_episode", "timestamp": "2026-01-31T13:35...
  5. [NEO4J_QUERY    ] {"level": "NEO4J_QUERY", "query_number": 1, "query": "\n                                    MATCH (e...
  6. [NEO4J_RESULT   ] {"level": "NEO4J_RESULT", "query_number": 1, "duration_ms": 30.347824096679688, "record_count": 0, "...
  7. [OTEL_SPAN_START] {"level": "OTEL_SPAN_START", "span_name": "zep.graphiti.llm.generate", "timestamp": "2026-01-31T13:3...
  8. [LLM_REQUEST    ] {"level": "LLM_REQUEST", "call_number": 1, "model": 

In [23]:
# Cell 9.3: Close Graphiti connection

await graphiti.close()
print("Graphiti connection closed.")

Graphiti connection closed.


## 10. Execution Results

This section documents the execution results from actual trace logs. For detailed step-by-step explanations, see `data_flow.md`.

### Execution Summary

| Turn | Input | Duration | LLM Calls | Neo4j Queries | Created |
|------|-------|----------|-----------|---------------|---------|
| 1 | "Hi, I'm Alice Chen. I work at TechCorp..." | 6813ms | 5 | 10 | Alice Chen, TechCorp, WORKS_AT |
| 2 | "I'm currently leading Project Phoenix..." | 6273ms | 6 | 10 | Project Phoenix, LEADS_PROJECT |
| 3 | "The deadline for Project Phoenix is Feb 15th..." | 5881ms | 6 | 15 | PROJECT_DEADLINE |
| 4 | Search: "What does Alice work on?" | ~130ms | 0 | 2 | - |

### Time Breakdown by Component

| Component | Turn 1 | Turn 2 | Turn 3 | Turn 4 (Search) |
|-----------|--------|--------|--------|-----------------|
| LLM Calls | 6106ms (89.6%) | 6009ms (95.8%) | 4835ms (82.2%) | 0ms |
| Neo4j Queries | 704ms (10.3%) | 264ms (4.2%) | 1047ms (17.8%) | 127ms (100%) |
| **Total** | **6813ms** | **6273ms** | **5881ms** | **~130ms** |

### Key Observations

| Turn | Key Difference |
|------|----------------|
| 1 | First episode: all entities/edges are new, no dedup matches |
| 2 | Alice Chen found as duplicate → reuse existing UUID, update summary |
| 3 | LEADS_PROJECT detected as duplicate → episode UUID appended to existing edge |
| 4 | Search: no LLM, only parallel BM25 + Cosine queries |

### Final Graph State

```
        TechCorp ◄──WORKS_AT── Alice Chen ──LEADS_PROJECT──► Project Phoenix
                                                                    │
                                                           PROJECT_DEADLINE
                                                                    ▼
                                                              (Feb 15th)
```

**Counts:** 3 Entities, 3 Edges, 3 Episodes

---

### 10.1 Neo4j's Role

To clarify Neo4j's actual role in this architecture:

perhaps **Neo4j is NOT a \"Graph Operator with LLM\"** - it is just a storage engine with vector retrieval capabilities, so zep is kinada like other agent mem papers but a bit better: still a **AI Agent Orchestration Framework Built on Top of a Database**.

| Aspect | Neo4j's Role | NOT Neo4j's Role |
|--------|--------------|------------------|
| Storage | ✅ Store nodes, edges, embeddings | |
| Query | ✅ Execute Cypher, BM25, vector similarity | |
| Reasoning | | ❌ Handled by LLM in Python layer |
| Entity Resolution | | ❌ \"Alice\" vs \"Alice Chen\" decided by LLM |
| Graph Integrity | | ❌ Deduplication logic in Graphiti Core |

**Scalability Implications:**

- Python/LLM Layer: Processing involves heavy prompt construction, JSON parsing, and concurrent LLM requests. Graphiti Server is the layer that needs horizontal scaling (more Python workers).
- DB Layer: Neo4j handles storage and retrieval efficiently. Unless the graph reaches hundreds of millions of nodes, the database is unlikely to be the primary bottleneck.

---

### 10.2 Comparison with VikingMem

graphiti/zep is NOT a DBMS of agent memory, but VikingMem seems to be a DBMS. The core of zep/graphiti is still python client-side logic, still a hard-coded pipeline, using data shipping. 
- db usage:
  - Yes, zep uses graph db to store all the data including raw data and higher-level 'views', but DB itself has no aware of what it stores.
  - However, vikingmem, as I speculate based on the paper, likely has customized the computation/access layer of VikingDB, or perhaps it wraps VikingDB within a tightly integrated system shell.
  - No built-in native operators such as SUM/AVG/LLM_MERGE in vikingmem (code/query shipping), no in-system processing, e.g., TTL, TIMECOMPRESS, LLM_MERGE are system primitives. Instead of DB->Python, it modifies the kernel/computation path.
- Data model wise: 
  - Graphiti has schema (entity node, episode node, community node, etc.), but it seems to be descriptive, and the evolutionary logic between nodes is loose (flexible though, fully managed by LLM that may have hallunations);
  - However, VikingMem's data model is presciptive and deterministic, as it has defined operators inside the data model. It explicitly states that the attribute Y of Entity X must and can only be calculated from the attribute B of Event A through the operator OP, attempting to make the evolution of memory a deterministic function.
- Also, VikingMem has lifecycle management, making Consolidation (memory consolidation/forgetting) a bg automatic process (not explicitly called by the application layer).
- Query optimization: graphiti's search is hard-coded python pipeline, but vikingmem mentioned multi-granular indexing and dynamic search, kinda meaning that system can automatically decide the best way to search, and use index effectively (query optimizer?).

---

### 10.3 Comparison with LOTUS

Target:
- LOTUS's target is to save money, sacraficing a little bit accuracy compared to full LLM queries；
- Graphiti's main target is accuracy, not 'saving money'.

Methodology:
- Graphiti has 2 phases:
  - create/update: expensive ETL pipeline, LLM heavy, creating and maintaining the view on top of raw data
  - retrieval: only graph query, no LLM, fast, just retrieve the view
- LOTUS has 1 phase:
  - retrieval: no insertion and ETL, every operation is based on retrieval, LLM heavy, retrieve raw data, then process

---

### 10.4 Comparison with Mem0

| Feature | Mem0 | Zep |
|---------|------|-----|
| arch | Python App Server + Vector DB (+ Neo4j for graph) | Python App Server + Neo4j |
| raw msg | No | Yes |
| mode | pure ETL, only keeps views | ETL + raw data storage |
| view | NL facts (+ optional graph triplets) | KG (entity + facts) |
| write policy | Extract facts -> Store | Extract Entities + Facts -> Dedup -> Store |
| temporal | timestamp only | invalidation supported |

Memory/View query policy

For **Mem0**:

```sql
CREATE VIEW user_memory AS
SELECT 
    memory_fact,           -- "Alice is vegetarian"
    timestamp,
    embedding
FROM memories
WHERE user_id = $user_id
  AND semantic_similarity(embedding, $query_embedding) > threshold
ORDER BY similarity DESC
LIMIT k;

-- optional: Graph Memory View (Mem0g)
CREATE VIEW user_graph_memory AS
SELECT 
    source_entity,         -- "Alice"
    relation,              -- "has_preference"
    target_entity,         -- "vegetarian"
    timestamp
FROM graph_triplets
WHERE user_id = $user_id
  AND (source_entity SIMILAR TO $query OR target_entity SIMILAR TO $query);
```

For **Zep**:

```sql
CREATE VIEW knowledge_graph AS
SELECT 
    e.fact,                -- "Alice works at TechCorp as a software engineer"
    e.source_node,         -- Entity: Alice
    e.target_node,         -- Entity: TechCorp
    e.created_at,
    e.valid_at,
    e.invalid_at           -- support temporal invalidation!
FROM edges e
WHERE e.group_id = $group_id
  AND (
    BM25_score(e.fact, $query) > threshold
    OR cosine_similarity(e.embedding, $query_embedding) > threshold
  )
ORDER BY RRF_fusion(BM25_rank, cosine_rank)
LIMIT k;

-- optional: BFS expansion View
CREATE VIEW expanded_knowledge AS
SELECT * FROM knowledge_graph
UNION
SELECT * FROM BFS_traverse(center_node, max_depth=2);

CREATE VIEW raw_episodes AS
SELECT content, timestamp
FROM episodic_nodes
WHERE group_id = $group_id
ORDER BY created_at DESC
LIMIT n;
```