# Cookbook

Real-world patterns and recipes for **httpxr**. Every snippet below is a self-contained, runnable cell.

In [None]:
# Install httpxr in this notebook environment (skip if already installed)
# !pip install httpxr

## OAuth2 Token Refresh

Use event hooks to automatically refresh expired OAuth2 tokens:

In [None]:
import httpxr

TOKEN = {"access_token": "initial_token", "expires_at": 0}


def refresh_token() -> str:
    """Call your OAuth2 token endpoint."""
    with httpxr.Client() as token_client:
        r = token_client.post(
            "https://auth.example.com/oauth/token",
            data={
                "grant_type": "client_credentials",
                "client_id": "my-client-id",
                "client_secret": "my-secret",
            },
        )
        r.raise_for_status()
        data = r.json()
        TOKEN["access_token"] = data["access_token"]
        return data["access_token"]


def add_auth(request: httpxr.Request) -> None:
    request.headers["authorization"] = f"Bearer {TOKEN['access_token']}"


def handle_401(response: httpxr.Response) -> None:
    if response.status_code == 401:
        token = refresh_token()
        response.request.headers["authorization"] = f"Bearer {token}"


# with httpxr.Client(
#     event_hooks={"request": [add_auth], "response": [handle_401]}
# ) as client:
#     r = client.get("https://api.example.com/data")
#     print(r.json())

## Microsoft Graph Pagination

Use `paginate()` with `@odata.nextLink` to fetch all pages from Microsoft Graph:

In [None]:
import httpxr

# with httpxr.Client() as client:
#     all_users = []
#     for page in client.paginate(
#         "GET",
#         "https://graph.microsoft.com/v1.0/users",
#         next_url="@odata.nextLink",
#         max_pages=50,
#         headers={"Authorization": "Bearer YOUR_TOKEN"},
#     ):
#         page.raise_for_status()
#         users = page.json().get("value", [])
#         all_users.extend(users)
#         print(f"Fetched {len(users)} users (total: {len(all_users)})")
#     print(f"Total users: {len(all_users)}")

## GitHub Issues with Link Header Pagination

Paginate GitHub's REST API using the `Link` response header:

In [None]:
import httpxr

with httpxr.Client(
    headers={"Accept": "application/vnd.github.v3+json"}
) as client:
    all_issues: list = []

    for page in client.paginate(
        "GET",
        "https://api.github.com/repos/python/cpython/issues",
        next_header="link",
        max_pages=5,
    ):
        issues = page.json()
        all_issues.extend(issues)
        print(f"Page: {len(issues)} issues")

    print(f"Total: {len(all_issues)} issues")

## Concurrent API Calls with Error Handling

Use `gather()` to fetch many resources in parallel with graceful error handling:

In [None]:
import httpxr

with httpxr.Client() as client:
    requests = [
        client.build_request("GET", f"https://httpbin.org/status/{200 if i % 5 != 0 else 500}")
        for i in range(10)
    ]

    responses = client.gather(requests, max_concurrency=5, return_exceptions=True)

    succeeded = 0
    failed = 0
    for i, resp in enumerate(responses):
        if isinstance(resp, Exception):
            print(f"Item {i}: FAILED — {resp}")
            failed += 1
        else:
            print(f"Item {i}: {resp.status_code}")
            succeeded += 1

    print(f"\n✓ {succeeded} succeeded, ✗ {failed} failed")

## Async Concurrent Requests

The `AsyncClient` exposes the same `gather()` API:

In [None]:
import asyncio
import httpxr


async def main() -> None:
    async with httpxr.AsyncClient() as client:
        requests = [
            client.build_request("GET", f"https://httpbin.org/delay/{i % 3}")
            for i in range(6)
        ]
        responses = await client.gather(requests, max_concurrency=3)

        for r in responses:
            print(f"{r.url} → {r.status_code}")


await main()  # Jupyter event loop — use asyncio.run(main()) in a script

## Download Large Files with Progress

Stream large files to disk without loading everything into memory:

In [None]:
import httpxr

url = "https://httpbin.org/stream-bytes/65536"  # 64 KB demo

with httpxr.Client() as client:
    with client.stream("GET", url) as response:
        response.raise_for_status()
        total = int(response.headers.get("content-length", 0))
        downloaded = 0

        with open("/tmp/demo_download.bin", "wb") as f:
            for chunk in response.iter_bytes(chunk_size=4096):
                f.write(chunk)
                downloaded += len(chunk)
                if total:
                    pct = downloaded / total * 100
                    print(f"\r{pct:.1f}%", end="", flush=True)

        print(f"\n✓ Downloaded {downloaded:,} bytes")

## Retry with Backoff

Use `RetryConfig` for automatic retries with exponential backoff:

In [None]:
import httpxr

retry = httpxr.RetryConfig(
    max_retries=3,
    backoff_factor=0.5,
    retry_on_status=[429, 500, 502, 503, 504],
    jitter=True,
)

with httpxr.Client() as client:
    response = client.get(
        "https://httpbin.org/get",
        extensions={"retry": retry},
    )
    print(response.status_code, response.elapsed)

## Custom SSL Certificates

Use a custom CA bundle or disable verification in development:

In [None]:
import httpxr

# Custom CA bundle
# with httpxr.Client(verify="/path/to/ca-bundle.crt") as client:
#     r = client.get("https://internal-api.corp.example.com/data")

# Disable SSL verification (development only!)
with httpxr.Client(verify=False) as client:
    r = client.get("https://httpbin.org/get")
    print(r.status_code)

## Session Cookies

Cookies persist automatically across requests within a client session:

In [None]:
import httpxr

with httpxr.Client() as client:
    # httpbin sets a cookie we can inspect
    client.get("https://httpbin.org/cookies/set/session_id/abc123")

    r = client.get("https://httpbin.org/cookies")
    print(r.json())

    for name in client.cookies:
        print(f"  {name} = {client.cookies[name]}")

## Logging All Requests

Use event hooks to log every request and response:

In [None]:
import logging
import httpxr

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("http")


def log_request(request: httpxr.Request) -> None:
    log.info(f"→ {request.method} {request.url}")


def log_response(response: httpxr.Response) -> None:
    elapsed = response.elapsed.total_seconds() * 1000
    log.info(f"← {response.status_code} ({elapsed:.0f}ms) {response.url}")


with httpxr.Client(
    event_hooks={"request": [log_request], "response": [log_response]}
) as client:
    client.get("https://httpbin.org/get")
    client.get("https://httpbin.org/status/404")

## Raw API for Maximum Speed

When you need the absolute lowest latency and don't need the full `Response` object:

In [None]:
import json
import httpxr

with httpxr.Client() as client:
    # Returns (status, headers_dict, body_bytes)
    status, headers, body = client.get_raw("https://httpbin.org/get")

    if status == 200:
        data = json.loads(body)
        print(f"status={status}, url={data.get('url')}")
    else:
        print(f"Error: {status}")

## Base URL for API Clients

Set a `base_url` to avoid repeating the host in every call:

In [None]:
import httpxr

with httpxr.Client(
    base_url="https://httpbin.org",
    headers={"X-Client": "httpxr"},
) as api:
    r_get = api.get("/get")
    r_post = api.post("/post", json={"name": "Alice"})
    print(r_get.status_code, r_post.status_code)

## Testing with MockTransport

Use `MockTransport` to unit-test code that uses httpxr without real network calls:

In [None]:
import httpxr


def mock_handler(request: httpxr.Request) -> httpxr.Response:
    if request.url.path == "/users":
        return httpxr.Response(200, json=[{"id": 1, "name": "Alice"}])
    return httpxr.Response(404)


transport = httpxr.MockTransport(mock_handler)

with httpxr.Client(transport=transport) as client:
    r = client.get("https://api.example.com/users")
    assert r.status_code == 200
    assert r.json()[0]["name"] == "Alice"
    print("✓ /users:", r.json())

    r = client.get("https://api.example.com/nope")
    assert r.status_code == 404
    print("✓ /nope: 404")

## OpenAI Chat Streaming (SSE)

Use `httpxr.sse` to stream chat completions from OpenAI or any SSE endpoint:

In [None]:
import json
import httpxr
from httpxr.sse import connect_sse

# Requires a valid OPENAI_API_KEY in the environment
import os

api_key = os.environ.get("OPENAI_API_KEY", "")
if not api_key:
    print("⚠  Set OPENAI_API_KEY to run this cell")
else:
    with httpxr.Client(timeout=60.0) as client:
        with connect_sse(
            client,
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={
                "model": "gpt-4o",
                "stream": True,
                "messages": [{"role": "user", "content": "Hello!"}],
            },
        ) as source:
            for event in source.iter_sse():
                if event.data == "[DONE]":
                    break
                chunk = json.loads(event.data)
                delta = chunk["choices"][0]["delta"]
                print(delta.get("content", ""), end="", flush=True)
        print()

## Download a File

Use `client.download()` for a one-liner file download:

In [None]:
import httpxr

with httpxr.Client() as client:
    resp = client.download(
        "https://httpbin.org/image/png",
        "/tmp/demo.png",
    )
    print(f"✓ {resp.status_code} — saved to /tmp/demo.png")

## Fast JSON Parsing with orjson

Use `response.json_bytes()` to feed raw bytes directly into [orjson](https://github.com/ijl/orjson) — no intermediate UTF-8 decode:

In [None]:
try:
    import orjson
    import httpxr

    with httpxr.Client() as client:
        response = client.get("https://httpbin.org/json")
        data = orjson.loads(response.json_bytes())
        print(data)
except ImportError:
    print("Install orjson: pip install orjson")

## Resilient API Client

Combine `RetryConfig` and `RateLimit` for a production-ready client that handles transient failures and respects API rate limits:

In [None]:
import httpxr

client = httpxr.Client(
    base_url="https://httpbin.org",
    retry=httpxr.RetryConfig(
        max_retries=3,
        backoff_factor=0.5,
        retry_on_status=[429, 500, 502, 503, 504],
        jitter=True,
    ),
    rate_limit=httpxr.RateLimit(
        requests_per_second=10.0,
        burst=20,
    ),
    timeout=30.0,
)

with client:
    for i in range(3):
        r = client.get(f"/get?item={i}")
        r.raise_for_status()
        print(f"item {i}: {r.status_code}")