An end-to-end agentic chatbot that answers research questions by fetching live data from multiple sources, analyzing it, and synthesizing a coherent answer via a local Ollama LLM.
Built to demonstrate when and why to use Python's three concurrency primitives — asyncio, threading, and subprocess / ProcessPoolExecutor — in a single realistic system.
User Question
│
▼
┌─────────────────────────────────────────────────────────┐
│ ORCHESTRATOR │
│ orchestrator/pipeline.py │
│ │
│ [LangChain + ChatOllama] │
│ Step 1 ── extract_topics_chain → search keywords │
│ │
│ [asyncio.gather — all run in parallel] │
│ Step 2 ──┬── fetch_agent ── ASYNCIO │
│ ├── memory_agent ── THREADING │
│ ├── analysis_agent ── SUBPROCESS │
│ └── embedding_agent ── PROCESS POOL │
│ │
│ [LangChain + ChatOllama] │
│ Step 3 ── synthesis_chain → final answer │
│ │
│ [Background tasks — fire-and-forget] │
│ Step 4 ──┬── token_optimizer (async, every N turns) │
│ └── memory save (thread, non-blocking) │
└─────────────────────────────────────────────────────────┘
│
▼
Answer + updated session state
Why: Fetching from Wikipedia and HackerNews is pure network I/O. The CPU sits idle while waiting for bytes. asyncio.gather() fires all three requests simultaneously; total time = slowest source, not the sum.
Why not threads: aiohttp already speaks async natively. Using threads for async-capable I/O adds thread overhead with zero benefit.
results = await asyncio.gather(
_fetch_wikipedia_summary(session, query),
_fetch_wikipedia_search(session, query),
_fetch_hackernews(session, query),
return_exceptions=True,
)Why: sqlite3 is a synchronous library with no async API. Calling conn.execute() directly inside an async function blocks the entire event loop, freezing all other agents during the disk write.
loop.run_in_executor(ThreadPoolExecutor) wraps the blocking call in a Future the event loop can await without freezing.
async def save_state(state):
loop = asyncio.get_event_loop()
await loop.run_in_executor(_db_executor, _save_state_sync, state, db_path)Memory architecture:
| Layer | Storage | Access Pattern |
|---|---|---|
| Short-term | ConversationState in RAM |
Instant, every turn |
| Long-term | SQLite on disk | Fire-and-forget thread write |
Why: The analysis script runs in a fully isolated OS process. Guarantees:
- A crash in the script cannot corrupt the orchestrator's state
asyncio.wait_for()enforces a hard timeout;proc.kill()terminates it if exceeded- In production: safely run LLM-generated or untrusted code with resource limits
proc = await asyncio.create_subprocess_exec(
sys.executable, str(_SCRIPT_PATH),
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(input=payload), timeout=15)Why: Embedding generation is CPU-bound — matrix multiplications peg the CPU. Python's GIL means threads can only take turns on one core for CPU work.
ProcessPoolExecutor spawns separate Python interpreters, each with their own GIL. The numpy math runs on a real separate core, in parallel with the event loop.
with ProcessPoolExecutor(max_workers=2) as pool:
embeddings = await loop.run_in_executor(pool, _compute_embeddings, texts)utils/token_optimizer.py prevents unbounded context growth:
Turn 1 Turn 2 Turn 3 Turn 4 Turn 5 Turn 6
▼ ▼ ▼ ▼ ▼ ▼
─────────────────────────────────────────────
┌─────────┐
│SUMMARIZE│ ← LLM compresses turns 1-3
└─────────┘
summary + [Turn 4, Turn 5, Turn 6]
- After
Nmessages (default 6), older messages are compressed into a rolling summary via an async LLM call - The most recent
Kmessages (default 3) are always kept verbatim - The summary is then synced to SQLite via the thread agent (fire-and-forget)
research_agent/
├── main.py Entry point (interactive or --demo)
├── requirements.txt
│
├── core/
│ ├── config.py All tunable parameters
│ ├── models.py Shared dataclasses (Article, Message, …)
│ └── state.py In-memory session registry
│
├── agents/
│ ├── async_fetch/
│ │ └── fetch_agent.py ASYNCIO — Wikipedia + HackerNews
│ ├── threaded_memory/
│ │ └── memory_agent.py THREADING — SQLite short/long-term memory
│ └── subprocess_analysis/
│ ├── analysis_agent.py SUBPROCESS — spawns the script below
│ └── analysis_script.py Isolated analysis (runs in child process)
│
├── utils/
│ ├── embeddings.py PROCESS POOL — CPU-bound vector generation
│ └── token_optimizer.py LLM summarization after N conversation turns
│
└── orchestrator/
└── pipeline.py LangChain + Ollama, coordinates all agents
- Python 3.10+
- Ollama installed and running locally
# Install Ollama — https://ollama.com
curl -fsSL https://ollama.com/install.sh | sh
# Pull the default model
ollama pull llama3.2
# Ollama starts automatically; verify it's up
curl http://localhost:11434/api/tags# Clone / download the project
cd research_agent
# Create a virtual environment
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
# Install dependencies
pip install -r requirements.txtpython main.pyYou: What is the transformer architecture?
Assistant: The transformer is a neural network architecture introduced in
"Attention Is All You Need" (Vaswani et al., 2017)...
You: How does self-attention work?
Assistant: Self-attention allows each token in the sequence to attend to
every other token...
python main.py --session abc123python main.py --demoFires three questions about transformers, showing the full pipeline including token optimization.
All knobs are in core/config.py:
| Parameter | Default | Effect |
|---|---|---|
ollama_model |
llama3.2 |
Any model you've pulled with ollama pull |
summarize_after_n_messages |
6 |
Token optimization threshold |
keep_recent_n_messages |
3 |
Messages kept verbatim after compression |
subprocess_timeout_seconds |
15 |
Hard kill for analysis script |
embedding_workers |
2 |
ProcessPoolExecutor workers |
fetch_timeout_seconds |
8 |
Per-source HTTP timeout |
| Situation | Primitive used |
|---|---|
| Network / API calls (async library) | asyncio |
| Sync library blocking the event loop | threading + run_in_executor |
| Isolated / sandboxed code execution | subprocess |
| CPU-bound work (numpy, model inference) | ProcessPoolExecutor |
| Background task (non-blocking) | asyncio.create_task() |