# CacheSaver Tutorial

**CacheSaver** is a caching library for LLM inference that minimizes API costs through intelligent caching, request deduplication, and batching. It works as a **drop-in replacement** for your existing LLM client — same API, transparent caching.

This tutorial walks through practical scenarios where CacheSaver saves you time and money:
1. **Quickstart** — swap in CacheSaver with one import change
2. **Reproducibility** — get identical results across runs
3. **Error recovery** — resume experiments without re-paying for completed work
4. **Iterative development** — modify algorithms without re-running unchanged steps
5. **Parallelism** — speed up experiments with async execution
6. **Complex applications** — ReAct agents, Tree-of-Thought, and RAG pipelines

## 0. Setup

In [None]:
# pip install cachesaver openai

In [None]:
import logging
logging.getLogger("asyncio").setLevel(logging.CRITICAL)

import shutil, os, time, asyncio, json

---
## 1. LLM Inference: Quickstart

### 1a. OpenAI — without vs. with CacheSaver

First, a standard OpenAI call:

In [None]:
from openai import AsyncOpenAI

client = AsyncOpenAI()  # uses OPENAI_API_KEY env var

response = await client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "What is the capital of France? Answer in one word."}],
)
print("OpenAI SDK:", response.choices[0].message.content)

Now the same call with CacheSaver — just change the import:

In [None]:
# Clean cache for a fresh demo
shutil.rmtree("./cache/quickstart", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

client = AsyncOpenAI(namespace="quickstart", cachedir="./cache/quickstart")

# First call — hits the API
response = await client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "What is the capital of France? Answer in one word."}],
)
print("CacheSaver (1st call):", response.choices[0].message.content)

In [None]:
client = AsyncOpenAI(namespace="quickstart", cachedir="./cache/quickstart")

# Second call — same prompt, returned from cache (no API cost)
response = await client.chat.completions.create(
    model="gpt-4.1-nano",
    messages=[{"role": "user", "content": "What is the capital of France? Answer in one word."}],
)
print("CacheSaver (2nd call, cached):", response.choices[0].message.content)

The second call returns instantly with the exact same result — no API request was made.

### 1b. HuggingFace Transformers — without vs. with CacheSaver



CacheSaver also wraps local models. With standard `transformers`:

```python
from transformers import AutoModelForCausalLM, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B-Instruct")
model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3.2-1B-Instruct", device_map="auto")

messages = [{"role": "user", "content": "What is the capital of France? Answer in one word."}]
inputs = tokenizer.apply_chat_template(messages, tokenize=True, return_tensors="pt").to(model.device)
outputs = model.generate(inputs, max_new_tokens=20)
print(tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True))
```

With CacheSaver (same result, with automatic caching and batching):

```python
from cachesaver.models.transformers import AsyncHFTransformers

client = AsyncHFTransformers(
    model_name="meta-llama/Llama-3.2-1B-Instruct",
    namespace="hf_demo",
    cachedir="./cache/hf",
    batch_size=4,
)

# First call — runs GPU inference
response = await client.chat.completions.create(
    messages=[{"role": "user", "content": "What is the capital of France? Answer in one word."}],
    max_new_tokens=20,
)
print("First call:", response)

# Second call — returned from cache, no GPU needed
response = await client.chat.completions.create(
    messages=[{"role": "user", "content": "What is the capital of France? Answer in one word."}],
    max_new_tokens=20,
)
print("Cached:", response)
```

> **Note:** The HF Transformers sections are shown as code blocks because they require a GPU and the `transformers` package (`pip install cachesaver[transformers]`). The rest of this tutorial uses the OpenAI API.

---
## 2. CacheSaver: Making everything fully reproducible!

### 2a. Reproducibility

Run an experiment, then re-run it from scratch — CacheSaver guarantees identical results from cache.

**Namespaces** track which cached responses have been used, so even prompts that appear multiple times get the correct response on replay.

In [None]:
shutil.rmtree("./cache/reproducibility", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

sentences = [
    "I love this product, it works great!",
    "Terrible experience, would not recommend.",
    "It's okay, nothing special.",
    "Absolutely fantastic, exceeded my expectations!",
    "Worst purchase I've ever made.",
]

async def classify_sentiment(sentences, namespace):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/reproducibility")
    results = []
    for s in sentences:
        response = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"Classify the sentiment as positive/negative/neutral (one word only): {s}"}],
        )
        results.append(response.choices[0].message.content)
    return results

# Run 1 — calls the API
run1 = await classify_sentiment(sentences, namespace="experiment_v1")
print("Run 1:", run1)

In [None]:
# Run 2 — same namespace, same results from cache (simulates a fresh restart)
run2 = await classify_sentiment(sentences, namespace="experiment_v1")
print("Run 2:", run2)
print("Identical?", run1 == run2)

Both runs produce identical results. The second run costs nothing — all responses came from cache.

### 2b. Repeat experiment after an error

Imagine processing 10 items and hitting an error on item 7. Without CacheSaver, you'd re-pay for items 1–6 on retry. With CacheSaver, only items 7–10 hit the API.

In [None]:
shutil.rmtree("./cache/error_recovery", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

topics = [
    "Climate change",
    "Artificial intelligence",
    "Space exploration",
    "Renewable energy",
    "Cybersecurity",
    "Mental health awareness",
    "Quantum computing",
    "Sustainable agriculture",
    "The future of work",
    "Robotics"
]

items = [f"Write a one-sentence summary of topic {t}." for t in topics]

async def process_items(items, namespace, simulate_error_at=None):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/error_recovery")
    results = []
    for i, item in enumerate(items):
        if simulate_error_at is not None and i == simulate_error_at:
            raise RuntimeError(f"Simulated error at item {i+1}!")
        response = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": item}],
        )
        text = response.choices[0].message.content
        results.append(text)
        print(f"  Item {i+1}: {text}")
    return results

# First attempt — crashes on item 6 (0-indexed)
print("=== Attempt 1 (will fail at item 6) ===")
try:
    results = await process_items(items, namespace="error_demo", simulate_error_at=6)
except RuntimeError as e:
    print(f"Error: {e}")
    print("Items 1-7 are cached, items 7-10 need to be re-run.")

In [None]:
# Second attempt — no error this time
# Items 0-5 come from cache instantly, only 6-9 call the API
print("=== Attempt 2 (no error) ===")
results = await process_items(items, namespace="error_demo")
print(f"\nAll {len(results)} items completed successfully!")

### 2c. Repeat experiment when you forgot to add logs

You ran an experiment but forgot to log intermediate results. With CacheSaver, just add the logging and re-run — the cached responses are reused, so you get the exact same results with your new logging, at zero extra cost.

In [None]:
shutil.rmtree("./cache/logging_demo", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

questions = [
    "What is photosynthesis?",
    "Explain gravity in one sentence.",
    "What causes rain?",
]

# Version 1: no logging, just collect results
async def run_without_logging(questions, namespace):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/logging_demo")
    results = []
    for q in questions:
        response = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": q}],
        )
        results.append(response.choices[0].message.content)
    return results

print("=== Run without logging ===")
results_v1 = await run_without_logging(questions, namespace="logging_exp")
for r in results_v1:
    print(f"  {r[:80]}...")

In [None]:
# Version 2: oops, we forgot to log! Add logging and re-run.
# Same responses come from cache — no extra API cost.
async def run_with_logging(questions, namespace):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/logging_demo")
    log = []
    results = []
    for q in questions:
        response = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": q}],
        )
        answer = response.choices[0].message.content
        results.append(answer)
        log.append({"question": q, "answer": answer, "model": "gpt-4.1-nano"})
    return results, log

print("=== Re-run with logging (all cached) ===")
results_v2, log = await run_with_logging(questions, namespace="logging_exp")
print("Results identical?", results_v1 == results_v2)
print("\nLog entries:")
for entry in log:
    print(f"  Q: {entry['question']}")
    print(f"  A: {entry['answer'][:80]}...")
    print()

---
## 3. CacheSaver: Making developping easier and stress-free!

### 3a. Algorithm A → Algorithm A + modifications

You have a pipeline (Algorithm A) that summarizes articles and extracts keywords. You then modify it (Algorithm A') to also add sentiment analysis. CacheSaver ensures the summarization step is cached — only the new sentiment calls hit the API.

In [None]:
shutil.rmtree("./cache/algorithm", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

articles = [
    "Scientists discover high-temperature superconductor that works at room temperature, potentially revolutionizing energy.",
    "New study finds that remote workers report higher job satisfaction but struggle with collaboration.",
    "Global chip shortage eases as major fabs increase production capacity by 40 percent.",
    "Research team develops biodegradable plastic from seaweed that decomposes in 6 weeks.",
    "City implements congestion pricing, reducing downtown traffic by 25 percent in first month.",
]

# Algorithm A: summarize + extract keywords
async def algorithm_a(articles, namespace):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/algorithm")
    results = []
    for article in articles:
        # Step 1: Summarize
        summary_resp = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"Summarize in one sentence: {article}"}],
        )
        summary = summary_resp.choices[0].message.content

        # Step 2: Extract keywords
        kw_resp = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"Extract 3 keywords (comma-separated): {article}"}],
        )
        keywords = kw_resp.choices[0].message.content

        results.append({"summary": summary, "keywords": keywords})
    return results

print("=== Algorithm A ===")
results_a = await algorithm_a(articles, namespace="algo_exp")
for i, r in enumerate(results_a):
    print(f"\nArticle {i+1}:")
    print(f"  Summary:  {r['summary'][:80]}...")
    print(f"  Keywords: {r['keywords']}")

In [None]:
# Algorithm A': add sentiment analysis after summarization
# Summarize and keyword steps are CACHED — only sentiment calls hit the API
async def algorithm_a_prime(articles, namespace):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/algorithm")
    results = []
    for article in articles:
        # Step 1: Summarize (CACHED)
        summary_resp = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"Summarize in one sentence: {article}"}],
        )
        summary = summary_resp.choices[0].message.content

        # Step 2: Extract keywords (CACHED)
        kw_resp = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"Extract 3 keywords (comma-separated): {article}"}],
        )
        keywords = kw_resp.choices[0].message.content

        # Step 3: Sentiment analysis (NEW — hits API)
        sent_resp = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[{"role": "user", "content": f"What is the sentiment of this text? Answer positive/negative/neutral: {article}"}],
        )
        sentiment = sent_resp.choices[0].message.content

        results.append({"summary": summary, "keywords": keywords, "sentiment": sentiment})
    return results

print("=== Algorithm A' (modified — only sentiment calls are new) ===")
results_a_prime = await algorithm_a_prime(articles, namespace="algo_exp")
for i, r in enumerate(results_a_prime):
    print(f"\nArticle {i+1}:")
    print(f"  Summary:   {r['summary'][:80]}...")
    print(f"  Keywords:  {r['keywords']}")
    print(f"  Sentiment: {r['sentiment']}")

# Verify that the summarization and keyword results are identical
for i in range(len(articles)):
    assert results_a[i]["summary"] == results_a_prime[i]["summary"]
    assert results_a[i]["keywords"] == results_a_prime[i]["keywords"]
print("\nSummary and keyword results are identical to Algorithm A.")

---
## 4. Parallelism: Async Capabilities

### 4a. CacheSaver is fully asynchronous

Use `asyncio.gather` to run multiple requests concurrently.

In [None]:
shutil.rmtree("./cache/parallel", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

client = AsyncOpenAI(namespace="parallel_demo", cachedir="./cache/parallel")

prompts = [
    "What is the largest ocean?",
    "What is the smallest country?",
    "What is the longest river?",
    "What is the tallest mountain?",
    "What is the deepest lake?",
]

async def ask(prompt):
    response = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": f"{prompt} Answer in one word."}],
    )
    return response.choices[0].message.content

# Sequential
t0 = time.time()
sequential_results = []
for p in prompts:
    sequential_results.append(await ask(p))
seq_time = time.time() - t0
print(f"Sequential: {seq_time:.2f}s")
print("Results:", sequential_results)

In [None]:
async def ask2(prompt):
    response = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": f"{prompt} Answer in one word."}],
    )
    return response.choices[0].message.content

# Parallel
t0 = time.time()
parallel_results = await asyncio.gather(*[ask2(p) for p in prompts])
par_time = time.time() - t0
print(f"Parallel:   {par_time:.2f}s")
print("Results:", list(parallel_results))
print(f"\nSpeedup: {seq_time / par_time:.1f}x")

### 4b. Deterministic ordering with async agents

A common pattern: make one LLM call requesting `n=5` outputs, then dispatch each output to a separate "agent" (another LLM call) for further processing. These 5 agents run concurrently with `asyncio.gather`.

**The problem:** async tasks finish in unpredictable order. If you build your own cache by storing results in completion order, replaying gives agents the wrong outputs — the mapping is scrambled.

**CacheSaver solves this** because it caches based on the *content* of each request, not the order they arrive. Every replay is deterministic regardless of which agent finishes first.

In [None]:
import random

shutil.rmtree("./cache/ordering", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

client = AsyncOpenAI(namespace="ordering_demo", cachedir="./cache/ordering")

starters = ["As the moon cast a silvery glow, the tiny owl unintentionally unlocked a secret world hidden within the ancient forest’s shadows."] * 5
print("Generated 5 story starters:")
for i, s in enumerate(starters):
    print(f"  Agent {i}: {s}")

# Step 2: Dispatch each starter to a parallel "agent" that continues the story.
# Each agent adds a random sleep to simulate variable processing time,
# so the completion order is non-deterministic.
async def agent(agent_id, starter):
    resp = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "user", "content": f"Continue this story in one sentence: {starter}"},
        ],
    )
    continuation = resp.choices[0].message.content
    return continuation

continuations_run1 = await asyncio.gather(*[agent(i, s) for i, s in enumerate(starters)])

print("\nResults (agent → continuation):")
for i, (starter, cont) in enumerate(zip(starters, continuations_run1)):
    print(f"  Agent {i}: {cont[:80]}...")

In [None]:
# Run 2 — replay from cache with different random delays.
# Despite agents finishing in a completely different order, each agent
# gets the exact same continuation as before.
print("=== Run 2 (cached) — different agent completion order, same results ===")
client_replay = AsyncOpenAI(namespace="ordering_demo", cachedir="./cache/ordering")

async def agent_replay(agent_id, starter):
    resp = await client_replay.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "user", "content": f"Continue this story in one sentence: {starter}"},
        ],
    )
    continuation = resp.choices[0].message.content
    return continuation

random.seed(None)  # new random delays → different completion order
continuations_run2 = await asyncio.gather(*[agent_replay(i, s) for i, s in enumerate(starters)])

print("\nResults (agent → continuation):")
for i, cont in enumerate(continuations_run2):
    print(f"  Agent {i}: {cont[:80]}...")

print(f"Same continuations? {continuations_run1 == list(continuations_run2)}")
print("\nEach agent got the same output despite their asynchronicity")

### 4c. CacheSaver makes experiments faster

Run a batch of 20 requests: first sequentially, then in parallel, then from cache.

In [None]:
shutil.rmtree("./cache/batch_speed", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

batch_prompts = [f"Give me a fun fact about the number {i}. One sentence only." for i in range(1, 11)]

# Sequential
client = AsyncOpenAI(namespace="batch_seq", cachedir="./cache/batch_speed")

t0 = time.time()
seq_results = []
for p in batch_prompts:
    resp = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": p}],
    )
    seq_results.append(resp.choices[0].message.content)
seq_time = time.time() - t0
print(f"Sequential (10 requests): {seq_time:.2f}s")

In [None]:
# Parallel with asyncio.gather

async def fetch(prompt):
    resp = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": prompt}],
    )
    return resp.choices[0].message.content

t0 = time.time()
par_results = await asyncio.gather(*[fetch(p) for p in batch_prompts])
par_time = time.time() - t0
print(f"Parallel (20 requests):   {par_time:.2f}s  ({seq_time / par_time:.1f}x faster)")

In [None]:
# Cached — near-instant
client = AsyncOpenAI(namespace="batch_par", cachedir="./cache/batch_speed")

async def fetch_cached(prompt):
    resp = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": prompt}],
    )
    return resp.choices[0].message.content

t0 = time.time()
cached_results = await asyncio.gather(*[fetch_cached(p) for p in batch_prompts])
cache_time = time.time() - t0
print(f"Cached (20 requests):     {cache_time:.2f}s")
print(f"\nSummary:")
print(f"  Sequential: {seq_time:.2f}s")
print(f"  Parallel:   {par_time:.2f}s")
print(f"  Cached:     {cache_time:.2f}s")

---
## 5. Complex AI Applications

### 5a. ReAct Agent with CacheSaver

A **ReAct** agent interleaves thinking and acting: it reasons about what to do, calls a tool, observes the result, and repeats.

We implement a simple ReAct loop with two mock tools and use CacheSaver so re-running the agent is free.

In [None]:
shutil.rmtree("./cache/react", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

# Define simple tools
def lookup_population(country):
    """Look up the population of a country (mock data)."""
    data = {
        "france": 67_390_000,
        "belgium": 11_590_000,
        "germany": 83_200_000,
        "japan": 125_700_000,
        "brazil": 214_000_000,
    }
    return data.get(country.lower().strip(), "Unknown country")

def calculator(expression):
    """Evaluate a simple math expression."""
    try:
        return str(round(eval(expression, {"__builtins__": {}}), 2))
    except Exception as e:
        return f"Error: {e}"

tools = {"lookup_population": lookup_population, "calculator": calculator}

REACT_SYSTEM = """You are a helpful assistant that can use tools to answer questions.

Available tools:
- lookup_population(country): Returns the population of a country.
- calculator(expression): Evaluates a math expression.

To use a tool, respond EXACTLY in this format:
Thought: <your reasoning>
Action: <tool_name>(<argument>)

When you have the final answer, respond EXACTLY:
Thought: <your reasoning>
Answer: <final answer>

Always start with a Thought."""

async def react_agent(client, question):
    
    messages = [
        {"role": "system", "content": REACT_SYSTEM},
        {"role": "user", "content": question},
    ]

    for step in range(6):  # max steps
        response = await client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=messages,
        )
        reply = response.choices[0].message.content
        print(f"\n--- Step {step + 1} ---")
        print(reply)

        # Check for final answer
        if "Answer:" in reply:
            answer = reply.split("Answer:")[-1].strip()
            return answer

        # Parse action
        if "Action:" in reply:
            action_line = reply.split("Action:")[-1].strip()
            # Parse tool_name(argument)
            paren_idx = action_line.index("(")
            tool_name = action_line[:paren_idx].strip()
            argument = action_line[paren_idx+1:].rstrip(")")
            argument = argument.strip().strip('"').strip("'")

            if tool_name in tools:
                observation = str(tools[tool_name](argument))
            else:
                observation = f"Unknown tool: {tool_name}"

            print(f"Observation: {observation}")
            messages.append({"role": "assistant", "content": reply})
            messages.append({"role": "user", "content": f"Observation: {observation}"})
        else:
            messages.append({"role": "assistant", "content": reply})
            messages.append({"role": "user", "content": "Please continue. Use a tool or provide a final Answer."})

    return "Max steps reached"

print("=== ReAct Agent — Run 1 ===")
client = AsyncOpenAI(namespace="react_demo", cachedir="./cache/react")
answer = await react_agent(
    client,
    "What is the population of France divided by the population of Belgium? Give the ratio.",
)
print(f"\nFinal answer: {answer}")

In [None]:
# Re-run — all LLM calls cached, deterministic replay
print("=== ReAct Agent — Run 2 (cached) ===")
answer2 = await react_agent(
    client,
    "What is the population of France divided by the population of Belgium? Give the ratio."
)
print(f"\nFinal answer: {answer2}")

### 5b. Tree-of-Thought BFS with CacheSaver

**Tree-of-Thought (ToT)** explores multiple reasoning paths in parallel. At each depth:
1. Generate N candidate "thoughts" for each current state
2. Evaluate/score each candidate
3. Keep the top-K and expand further

CacheSaver caches all LLM calls, so re-running the search is free.

In [None]:
shutil.rmtree("./cache/tot", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI

PROBLEM = """Creative writing task: Write a short 3-sentence story about a robot learning to paint.
Build the story one sentence at a time."""

async def tot_bfs(problem, namespace, branching=3, depth=3, keep_top_k=2):
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/tot")

    # Initial states: just the problem
    states = [""]

    for d in range(depth):
        print(f"\n=== Depth {d+1} ===")
        candidates = []

        # Generate candidates for each state
        for state in states:
            prompt = f"{problem}\n\nStory so far: {state if state else '(empty)'}\n\nWrite the next sentence only:"

            # Generate multiple candidates
            gen_tasks = []
            for _ in range(branching):
                gen_tasks.append(
                    client.chat.completions.create(
                        model="gpt-4.1-nano",
                        messages=[{"role": "user", "content": prompt}],
                    )
                )
            responses = await asyncio.gather(*gen_tasks)

            for resp in responses:
                next_sentence = resp.choices[0].message.content.strip()
                new_state = f"{state} {next_sentence}".strip()
                candidates.append(new_state)

        # Evaluate candidates
        scores = []
        eval_tasks = []
        for cand in candidates:
            eval_prompt = (
                f"{problem}\n\nPartial story: {cand}\n\n"
                f"Rate this partial story from 1-10 for creativity and coherence. "
                f"Reply with ONLY a number."
            )
            eval_tasks.append(
                client.chat.completions.create(
                    model="gpt-4.1-nano",
                    messages=[{"role": "user", "content": eval_prompt}],
                )
            )
        eval_responses = await asyncio.gather(*eval_tasks)

        for i, resp in enumerate(eval_responses):
            try:
                score = float(resp.choices[0].message.content.strip())
            except ValueError:
                score = 5.0  # default
            scores.append(score)
            print(f"  Candidate (score={score}): {candidates[i][:100]}...")

        # Keep top-K
        ranked = sorted(zip(scores, candidates), reverse=True)
        states = [cand for _, cand in ranked[:keep_top_k]]
        print(f"  Kept top {keep_top_k}")

    return states[0]  # best story

print("=== Tree-of-Thought BFS — Run 1 ===")
best_story = await tot_bfs(PROBLEM, namespace="tot_demo")
print(f"\nBest story:\n{best_story}")

In [None]:
# Re-run — all cached, deterministic
print("=== Tree-of-Thought BFS — Run 2 (cached) ===")
t0 = time.time()
best_story_2 = await tot_bfs(PROBLEM, namespace="tot_demo")
print(f"\nBest story:\n{best_story_2}")
print(f"\nTime: {time.time() - t0:.2f}s (all cached)")
print(f"Same result? {best_story == best_story_2}")

### 5c. RAG Pipeline A — Simple Retrieval + Generation

A minimal RAG pipeline: retrieve relevant chunks from a knowledge base using keyword overlap, then generate an answer with the LLM.

In [None]:
shutil.rmtree("./cache/rag", ignore_errors=True)

from cachesaver.models.openai import AsyncOpenAI
import math

# Simple knowledge base
KNOWLEDGE_BASE = [
    "The Eiffel Tower is 330 meters tall and is located in Paris, France. It was built in 1889 for the World's Fair.",
    "The Great Wall of China is over 21,000 kilometers long. It was built over many centuries starting from the 7th century BC.",
    "Photosynthesis is the process by which plants convert sunlight, water, and CO2 into glucose and oxygen.",
    "The human heart beats approximately 100,000 times per day, pumping about 7,500 liters of blood.",
    "Python was created by Guido van Rossum and first released in 1991. It emphasizes code readability.",
    "The Pacific Ocean is the largest ocean, covering more than 165 million square kilometers.",
    "DNA stands for deoxyribonucleic acid. It carries genetic instructions for the development of all living organisms.",
    "The speed of light in a vacuum is approximately 299,792,458 meters per second.",
    "Mount Everest is 8,849 meters tall, making it the highest point on Earth above sea level.",
    "The Amazon Rainforest produces about 20% of the world's oxygen and is home to 10% of all species.",
]

def simple_retrieve(query, knowledge_base, top_k=3):
    """Retrieve most relevant chunks using word overlap (bag-of-words)."""
    query_words = set(query.lower().split())
    scored = []
    for chunk in knowledge_base:
        chunk_words = set(chunk.lower().split())
        overlap = len(query_words & chunk_words)
        scored.append((overlap, chunk))
    scored.sort(reverse=True)
    return [chunk for _, chunk in scored[:top_k]]

async def rag_pipeline_a(query, namespace):
    """Simple RAG: retrieve + generate."""
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/rag")

    # Retrieve
    retrieved = simple_retrieve(query, KNOWLEDGE_BASE)
    context = "\n".join(f"- {chunk}" for chunk in retrieved)

    # Generate
    response = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "Answer the question based on the provided context. Be concise."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
        ],
    )
    return response.choices[0].message.content, retrieved

# Test queries
rag_queries = [
    "How tall is the Eiffel Tower?",
    "What is photosynthesis?",
    "How fast is the speed of light?",
    "Tell me about the Amazon Rainforest.",
    "Who created Python?",
]

print("=== RAG Pipeline A ===")
rag_a_results = []
for q in rag_queries:
    answer, chunks = await rag_pipeline_a(q, namespace="rag_a")
    rag_a_results.append({"query": q, "answer": answer})
    print(f"\nQ: {q}")
    print(f"A: {answer}")

### 5d. RAG Pipeline B — Retrieve + Re-rank + Generate

An enhanced pipeline that adds an LLM-based **re-ranking** step: retrieve more chunks, use the LLM to pick the most relevant ones, then generate.

In [None]:
async def rag_pipeline_b(query, namespace):
    """Enhanced RAG: retrieve → re-rank (LLM) → generate."""
    client = AsyncOpenAI(namespace=namespace, cachedir="./cache/rag")

    # Retrieve more candidates
    retrieved = simple_retrieve(query, KNOWLEDGE_BASE, top_k=5)
    chunks_text = "\n".join(f"{i+1}. {chunk}" for i, chunk in enumerate(retrieved))

    # Re-rank with LLM
    rerank_response = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "You are a relevance judge. Given a question and numbered passages, return ONLY the numbers of the top 2 most relevant passages, comma-separated."},
            {"role": "user", "content": f"Question: {query}\n\nPassages:\n{chunks_text}"},
        ],
    )
    rerank_text = rerank_response.choices[0].message.content

    # Parse re-ranked indices
    try:
        indices = [int(x.strip()) - 1 for x in rerank_text.replace(" ", "").split(",") if x.strip().isdigit()]
        reranked = [retrieved[i] for i in indices if 0 <= i < len(retrieved)]
    except (ValueError, IndexError):
        reranked = retrieved[:2]  # fallback

    if not reranked:
        reranked = retrieved[:2]

    context = "\n".join(f"- {chunk}" for chunk in reranked)

    # Generate
    response = await client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {"role": "system", "content": "Answer the question based on the provided context. Be concise."},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
        ],
    )
    return response.choices[0].message.content, reranked

print("=== RAG Pipeline B (with re-ranking) ===")
rag_b_results = []
for q in rag_queries:
    answer, reranked = await rag_pipeline_b(q, namespace="rag_b")
    rag_b_results.append({"query": q, "answer": answer})
    print(f"\nQ: {q}")
    print(f"A: {answer}")

### 5e. Benchmarking the two RAG pipelines

Compare Pipeline A and Pipeline B on the same questions, then show that re-running the benchmark is instant.

In [None]:
print("=== Benchmark: Pipeline A vs Pipeline B ===")
print(f"{'Query':<45} {'Pipeline A':<40} {'Pipeline B':<40}")
print("-" * 125)
for a, b in zip(rag_a_results, rag_b_results):
    q = a["query"]
    ans_a = a["answer"][:37] + "..." if len(a["answer"]) > 40 else a["answer"]
    ans_b = b["answer"][:37] + "..." if len(b["answer"]) > 40 else b["answer"]
    print(f"{q:<45} {ans_a:<40} {ans_b:<40}")

In [None]:
# Re-run the full benchmark — all cached, near-instant
print("=== Re-running full benchmark (all cached) ===")
t0 = time.time()

rag_a_cached = []
rag_b_cached = []
for q in rag_queries:
    ans_a, _ = await rag_pipeline_a(q, namespace="rag_a")
    ans_b, _ = await rag_pipeline_b(q, namespace="rag_b")
    rag_a_cached.append(ans_a)
    rag_b_cached.append(ans_b)

cache_time = time.time() - t0
print(f"Time for full benchmark (cached): {cache_time:.2f}s")
print(f"Results identical to first run? A={[r['answer'] for r in rag_a_results] == rag_a_cached}, B={[r['answer'] for r in rag_b_results] == rag_b_cached}")

---

## Summary

CacheSaver gives you:

| Feature | Benefit |
|---|---|
| **Transparent caching** | Never pay twice for the same LLM call |
| **Namespace isolation** | Reproducible experiments across runs |
| **Drop-in replacement** | Change one import, keep your code |
| **Async-native** | Speed up experiments with `asyncio.gather` |
| **Error recovery** | Resume from where you left off |
| **Iterative development** | Modify pipelines without re-running cached steps |

For more providers (Anthropic, Gemini, Together AI, Groq, vLLM, OpenRouter, HuggingFace), see the `providers_example.ipynb` notebook.