# Performance Optimization and Monitoring

This notebook complements `12_performance_optimization.md` with runnable code for baselining, caching, routing, concurrency, profiling, and monitoring in LLM applications.

Notes:
- Some cells use optional libraries (e.g., `tiktoken`, `langchain_openai`, `langgraph`). Guarded imports are used so the notebook remains runnable even if those packages are absent.
- Replace stubs or mocks with live integrations when you have credentials and dependencies configured.


## 0) Setup and Utilities
Imports and environment checks for optional dependencies.

In [None]:
# Setup: imports and optional dependencies
import time
import json
import asyncio
import random
import hashlib
from typing import Dict, Any, List

try:
    import tiktoken
except Exception:
    tiktoken = None

try:
    from langchain_core.prompts import PromptTemplate
except Exception:
    PromptTemplate = None

try:
    from langchain_openai import ChatOpenAI
except Exception:
    ChatOpenAI = None

try:
    from langgraph.graph import StateGraph, START, END
except Exception:
    StateGraph = None
    START = None
    END = None

print({
    'tiktoken': bool(tiktoken),
    'PromptTemplate': bool(PromptTemplate),
    'ChatOpenAI': bool(ChatOpenAI),
    'LangGraph': bool(StateGraph)
})

## 1) Baseline Metrics
A minimal measurement harness for latency and tokens per request.

In [None]:
def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
    """Count tokens with tiktoken if available; fallback to naive split."""
    if tiktoken is not None:
        try:
            enc = tiktoken.encoding_for_model(model)
        except KeyError:
            enc = tiktoken.get_encoding("cl100k_base")
        return len(enc.encode(text))
    # naive fallback
    return max(1, len(text.strip().split()))

class Metrics:
    def __init__(self):
        self.requests = 0
        self.errors = 0
        self.total_time = 0.0
        self._times = []
        self.total_tokens = 0

    def record(self, start: float, in_tokens: int, out_tokens: int, error: bool = False):
        self.requests += 1
        self.errors += int(error)
        dt = (time.time() - start)
        self.total_time += dt
        self._times.append(dt)
        self.total_tokens += (in_tokens + out_tokens)

    def summary(self) -> Dict[str, Any]:
        times = sorted(self._times)
        p50 = times[int(0.5 * (len(times)-1))] * 1000 if times else 0
        p95 = times[int(0.95 * (len(times)-1))] * 1000 if times else 0
        return {
            "requests": self.requests,
            "errors": self.errors,
            "avg_ms": (self.total_time / max(1, self.requests)) * 1000,
            "p50_ms": p50,
            "p95_ms": p95,
            "avg_tokens": self.total_tokens / max(1, self.requests),
        }

# Demo: simulate 10 requests with random latencies
m = Metrics()
for _ in range(10):
    t0 = time.time()
    time.sleep(random.uniform(0.01, 0.05))
    in_t = random.randint(10, 60)
    out_t = random.randint(20, 120)
    m.record(t0, in_t, out_t, error=False)
m.summary()

## 2) Prompt and Context Optimization
Refactor templates to keep outputs concise and bound context size.

In [None]:
if PromptTemplate is not None:
    rag_prompt = PromptTemplate(
        template=(
            "Answer using only the context. If missing, say 'I don't know'.\n"
            "Context:\n{context}\n\n"
            "Question: {question}\n"
            "Answer (concise, 2-3 sentences):"
        ),
        input_variables=["context", "question"],
    )
    print("PromptTemplate ready")
else:
    print("PromptTemplate unavailable; skipping")

## 3) Token Budgeting and Early Exit
Pre-truncate long inputs by token limits using `tiktoken` when present.

In [None]:
def truncate_by_tokens(text: str, max_tokens: int = 200, model: str = "gpt-3.5-turbo") -> str:
    if tiktoken is not None:
        try:
            enc = tiktoken.encoding_for_model(model)
        except KeyError:
            enc = tiktoken.get_encoding("cl100k_base")
        toks = enc.encode(text)
        if len(toks) <= max_tokens:
            return text
        return enc.decode(toks[:max_tokens])
    # naive fallback: split words
    words = text.split()
    return " ".join(words[:max_tokens])

sample = "Lorem ipsum " * 100
len(sample.split()), len(truncate_by_tokens(sample, max_tokens=50).split())

## 4) Batching and Concurrency
Use asyncio with a semaphore to bound concurrency; add backoff/retry for transient errors.

In [None]:
import math
from asyncio import Semaphore

class RateLimitError(Exception):
    pass

async def call_model(prompt: str, max_latency_ms: int = 50) -> str:
    # Simulate latency and occasional rate limit
    await asyncio.sleep(random.uniform(0.005, max_latency_ms/1000))
    if random.random() < 0.05:
        raise RateLimitError("429: Too Many Requests")
    return f"out:{prompt[:20]}"

async def backoff_retry(fn, *args, retries: int = 3, base_ms: int = 50, **kwargs):
    for i in range(retries):
        try:
            return await fn(*args, **kwargs)
        except RateLimitError as e:
            await asyncio.sleep((base_ms * (2 ** i)) / 1000)
    return await fn(*args, **kwargs)

async def run_batch(prompts: List[str], max_concurrency: int = 5) -> List[str]:
    sem = Semaphore(max_concurrency)
    async def worker(p):
        async with sem:
            return await backoff_retry(call_model, p)
    return await asyncio.gather(*(worker(p) for p in prompts))

# Demo
res = asyncio.run(run_batch([f"q{i}" for i in range(20)], max_concurrency=8))
len(res), res[:5]

## 5) Caching Layers
Prompt/response cache keyed by normalized payload.

In [None]:
class ResponseCache:
    def __init__(self):
        self.store = {}

    def key(self, prompt: str, **params) -> str:
        payload = json.dumps({"prompt": prompt, **params}, sort_keys=True)
        return hashlib.md5(payload.encode()).hexdigest()

    def get(self, prompt: str, **params):
        return self.store.get(self.key(prompt, **params))

    def set(self, prompt: str, value: str, **params):
        self.store[self.key(prompt, **params)] = value

# Demo
cache = ResponseCache()
p = "What is MMR retrieval?"
kparams = {"model":"gpt-3.5-turbo", "temperature":0}
print(cache.get(p, **kparams))
cache.set(p, "MMR balances relevance and diversity.", **kparams)
print(cache.get(p, **kparams))

## 6) Model Routing (Smart LLM Selection)
Route easy tasks to smaller models; complex tasks to larger ones. Uses `langchain_openai.ChatOpenAI` if available.

In [None]:
class Router:
    def __init__(self):
        self.simple = ChatOpenAI(model="gpt-3.5-turbo", temperature=0) if ChatOpenAI else None
        self.complex = ChatOpenAI(model="gpt-4", temperature=0) if ChatOpenAI else None

    def is_complex(self, text: str) -> bool:
        long = len(text.split()) > 60
        hard_kw = any(k in text.lower() for k in ["analyze", "compare", "synthesize", "proof", "derive"])
        return long or hard_kw

    def choose(self, text: str):
        return self.complex if self.is_complex(text) else self.simple

# Demo (without calling the API)
router = Router()
router.is_complex("Summarize this short sentence") , router.is_complex("Analyze and compare approaches to distributed tracing across microservices with proofs")

## 7) Retrieval Efficiency (MMR)
Demonstration with simulated documents. Replace with actual vector DB retriever when available.

In [None]:
# Simulated docs and naive MMR-like selection
docs = [
    {"content": "Vector search uses embeddings for semantic similarity.", "vec": [0.1, 0.9]},
    {"content": "BM25 is a keyword-based relevance model.", "vec": [0.8, 0.2]},
    {"content": "MMR balances relevance and diversity among candidates.", "vec": [0.5, 0.5]},
    {"content": "RRF is a rank fusion technique combining lists.", "vec": [0.6, 0.4]},
    {"content": "Hybrid search blends BM25 and vector results.", "vec": [0.7, 0.3]},
]

def cosine(a, b):
    import math
    num = sum(x*y for x,y in zip(a,b))
    da = math.sqrt(sum(x*x for x in a))
    db = math.sqrt(sum(y*y for y in b))
    return num / (da*db + 1e-9)

qvec = [0.45, 0.55]
scores = [(i, cosine(d["vec"], qvec)) for i, d in enumerate(docs)]
scores_sorted = sorted(scores, key=lambda x: x[1], reverse=True)
top = [docs[i]["content"] for i,_ in scores_sorted[:3]]
"\n\n".join(s[:80] for s in top)

## 8) Profiling Hot Paths
Use a simple decorator to measure wall-clock times of functions.

In [None]:
def timed(fn):
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        out = fn(*args, **kwargs)
        dt = (time.perf_counter() - t0) * 1000
        print(f"[timed] {fn.__name__} took {dt:.1f} ms")
        return out
    return wrapper

@timed
def slow_op(n=20000):
    s = 0
    for i in range(n):
        s += (i % 7)
    return s

slow_op(50000)

## 9) Monitoring Counters
Track tokens, requests, and errors at a coarse level.

In [None]:
class Counters:
    def __init__(self):
        self.token_in = 0
        self.token_out = 0
        self.requests = 0
        self.errors = 0

    def log_req(self, in_t: int, out_t: int, ok: bool = True):
        self.requests += 1
        self.token_in += in_t
        self.token_out += out_t
        if not ok:
            self.errors += 1

ct = Counters()
ct.log_req(50, 120, ok=True)
ct.log_req(30, 90, ok=False)
{'requests': ct.requests, 'errors': ct.errors, 'token_in': ct.token_in, 'token_out': ct.token_out}

## 10) Reliability Patterns: Circuit Breaker
Short-circuit repeated errors for a cooldown period to protect upstreams.

In [None]:
class CircuitBreaker:
    def __init__(self, threshold: int = 3, cooldown: float = 1.0):
        self.failures = 0
        self.open_until = 0.0
        self.threshold = threshold
        self.cooldown = cooldown

    def call(self, fn, *args, **kwargs):
        now = time.time()
        if now < self.open_until:
            return {"error": "circuit-open"}
        try:
            res = fn(*args, **kwargs)
            self.failures = 0
            return res
        except Exception as e:
            self.failures += 1
            if self.failures >= self.threshold:
                self.open_until = now + self.cooldown
            return {"error": str(e)}

def flaky(x):
    if random.random() < 0.6:
        raise RuntimeError("boom")
    return x * 2

cb = CircuitBreaker(threshold=2, cooldown=1.5)
outs = []
for _ in range(6):
    outs.append(cb.call(flaky, 3))
outs

## 11) A/B Testing Scaffold
Switch between two prompt variants and measure differences in outcome metrics.

In [None]:
def choose_variant() -> str:
    return "A" if random.random() < 0.5 else "B"

def build_prompt(variant: str, context: str, question: str) -> str:
    if variant == "A":
        return f"Answer using only context:\n{context}\nQ: {question}\nA:"
    else:
        return f"Use context below. If unknown, say 'I don't know'.\n{context}\nQ: {question}\nA (concise):"

v = choose_variant()
build_prompt(v, "ctx: MMR and BM25", "Explain hybrid search")

## 12) Mini LLM Service (Cache + Metrics)
A service wrapper that counts tokens, uses a cache, and logs counters.
Uses a mock model to avoid external API calls; replace with `ChatOpenAI` as needed.

In [None]:
class MockModel:
    def __init__(self, model_name: str = "mock-model"):
        self.model_name = model_name
    def invoke(self, prompt: str, **params):
        # Simple transformation to simulate output
        return {"content": prompt[::-1][:80]}

class LLMService:
    def __init__(self, model, cache: ResponseCache = None, counters: Counters = None):
        self.model = model
        self.cache = cache or ResponseCache()
        self.counters = counters or Counters()

    def invoke(self, prompt: str, **params):
        start = time.time()
        model_name = getattr(self.model, "model_name", "gpt-3.5-turbo")
        in_toks = count_tokens(prompt, model_name)
        cached = self.cache.get(prompt, **params)
        if cached:
            self.counters.log_req(in_toks, 0, ok=True)
            return cached
        try:
            resp = self.model.invoke(prompt, **params)
            out_text = resp["content"] if isinstance(resp, dict) else str(resp)
            out_toks = count_tokens(out_text, model_name)
            self.counters.log_req(in_toks, out_toks, ok=True)
            self.cache.set(prompt, out_text, **params)
            return out_text
        except Exception as e:
            self.counters.log_req(in_toks, 0, ok=False)
            return f"error: {e}"

svc = LLMService(MockModel())
out1 = svc.invoke("Explain RAG briefly.", temperature=0)
out2 = svc.invoke("Explain RAG briefly.", temperature=0)  # cached
svc.counters.__dict__, out1, out2

## 13) Optional: LangGraph Events Streaming
If `langgraph` is installed and you have a graph, you can stream events. Here we construct a tiny mock graph when unavailable.

In [None]:
if StateGraph is not None:
    from typing import TypedDict
    class State(TypedDict):
        input_text: str

    def node_echo(state: Dict[str, str]):
        return {"input_text": state["input_text"] + "_processed"}

    g = StateGraph(State)
    g.add_node("echo", node_echo)
    g.add_edge(START, "echo")
    g.add_edge("echo", END)
    graph = g.compile()
    for ev in graph.astream_events({"input_text": "trace-me"}):
        print(ev.get("event"), ev.get("name"), ev.get("timestamp"))
else:
    print("LangGraph unavailable; skipping events demo.")

## Exercises

A) End-to-end metrics
- Wrap your main chain/agent with `Metrics` and `Counters`.
- Run 50 requests (mock or real) and compute avg/p50/p95 latency and avg tokens.

B) Batch embeddings
- Write a function that takes a list of texts, hashes them, and computes only missing vectors.
- Measure throughput (texts/sec) and cache hit rate on a repeated dataset.

C) Router tuning
- Implement a 2-tier router (fast vs accurate) and measure cost vs quality on an eval set.

D) Cache strategy
- Replace `ResponseCache` with Redis (or similar) and set TTL.
- Measure cache hit-ratio improvements on repeated queries.

E) Tracing integration
- Stream LangGraph events to a log with timestamps.
- Build a simple Gantt-like timeline from the recorded events.