# core

> Cache your API calls and make your notebooks fast again.

### Introduction

We often call APIs while prototyping and testing our code. A single API call (e.g. an Anthropic chat completion) can take 100's of ms to run. This can really slow down development especially if our notebook contains many API calls 😞.

`cachy` caches API requests. It does this by saving the result of each API call to a local `cachy.jsonl` file. Before calling an API (e.g. OpenAI) it will check if the request already exists in `cachy.jsonl`. If it does it will return the cached result.

**How does it work?**

Under the hood popular SDK's like OpenAI, Anthropic and LiteLLM use `httpx.Client` and `httpx.AsyncClient`. 

`cachy` patches the `send` method of both clients and injects a simple caching mechanism:

- create a cache key from the request
- if the key exists in `cachy.jsonl` return the cached response
- if not, call the API and save the response to `cachy.jsonl`

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import hashlib,httpx,json
from fastcore.utils import *

`cachy.jsonl` contains one API response per line. 

Each line has the following format `{"key": key, "response": response}` 

- `key`: hash of the API request
- `response`: the API response. 

```json
{
    "key": "afc2be0c", 
    "response": "{\"id\":\"msg_xxx\",\"type\":\"message\",\"role\":\"assistant\",\"model\":\"claude-sonnet-4-20250514\",\"content\":[{\"type\":\"text\",\"text\":\"Coordination.\"}],\"stop_reason\":\"end_turn\",\"stop_sequence\":null,\"usage\":{\"input_tokens\":16,\"cache_creation_input_tokens\":0,\"cache_read_input_tokens\":0,\"cache_creation\":{\"ephemeral_5m_input_tokens\":0,\"ephemeral_1h_input_tokens\":0},\"output_tokens\":6,\"service_tier\":\"standard\"}}"
}
```

### Patching `httpx`

Patching a method is very straightforward. 

In our case we want to patch `httpx._client.Client.send` and `httpx._client.AsyncClient.send`. 

These methods are called when running `httpx.get`, `httpx.post`, etc. 

In the example below we use `@patch` from [fastcore](https://fastcore.fast.ai/) to print `calling an API` when `httpx._client.Client.send` is run.

```python
@patch
def send(self:httpx._client.Client, r, **kwargs):
    print('calling an API')
    return self._orig_send(r, **kwargs)
```

### Cache Filtering

Now, let's build up our caching logic piece-by-piece.

The first thing we need to do is ensure that our caching logic only runs on specific urls.

For now, let's only cache API calls made to popular LLM providers like OpenAI, Anthropic, Google and DeepSeek. We can make this fully customizable later.

In [None]:
#| exports
doms = ("api.openai.com", "api.anthropic.com", "generativelanguage.googleapis.com", "api.deepseek.com")

In [None]:
#| exports
def _should_cache(url, doms): return any(dom in str(url) for dom in doms)

We could then use `_should_cache` like this.

```python
@patch
def send(self:httpx._client.Client, r, **kwargs):
    if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
    # insert caching logic
    ...
```

### Cache Key

The next thing we need to do is figure out if a response for the request `r` already exists in our cache. 

Recall that each line in `cachy.jsonl` has the following format `{"key": key, "response": response}`.

Our key needs to be unique and deterministic. One way to do this is to concatenate the request URL and content, then generate a hash from the result.

In [None]:
def _key(r): return hashlib.sha256(str(r.url.host).encode() + r.content).hexdigest()[:8]

We use `r.url.host` instead of `r.url` because when LiteLLM calls Gemini it includes the API key in a query param. See [#1](https://github.com/AnswerDotAI/cachy/issues/1).

If we used `r.url` we wouldn't be able to use dummy api keys when running our notebooks in a CI pipeline. 

Let's test this out.

In [None]:
r1 = httpx.Request('POST', 'https://api.openai.com/v1/chat/completions', content=b'some content')
r1

<Request('POST', 'https://api.openai.com/v1/chat/completions')>

In [None]:
_key(r1)

'80acdde4'

If we run it again we should get the same key.

In [None]:
_key(r1)

'80acdde4'

Let's modify the url and confirm we get a different key.

In [None]:
_key(httpx.Request('POST', 'https://api.anthropic.com/v1/messages', content=b'some content'))

'2707fa05'

Great. Let's update our patch.

```python
@patch
def send(self:httpx._client.Client, r, **kwargs):
    if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
    key = _key(r)
    # if cache hit return the response
    # else run the request, write to response the cache and return it
    ...
```

### Cache Reads/Writes

Now let's add some methods that will read from and write to `cachy.jsonl`.

In [None]:
#| exports
def _cache(key, cfp):
    with open(cfp, "r") as f:
        line = first(f, lambda l: json.loads(l)["key"] == key)
        return json.loads(line)["response"] if line else None

In [None]:
#| exports
def _write_cache(key, content, cfp):
    with open(cfp, "a") as f: f.write(json.dumps({"key":key, "response": content})+"\n")

Let's update our `patch`.

In [None]:
@patch
def send(self:httpx._client.Client, r, **kwargs):
    if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
    key = key(r)
    if res := _cache(key,"cachy.jsonl"): return httpx.Response(status_code=200, content=res, request=r)
    res = self._orig_send(r, **kwargs)
    content = res.read().decode()
    _write_cache(key, content, "cachy.jsonl")
    return httpx.Response(status_code=res.status_code, content=content, request=r)

### Streaming

Let's add support for streaming. 

First let's include an `is_stream` bool in our hash so that a non-streamed request will generate a different key to the same request when streamed. 

In [None]:
#| exports
def _key(r, is_stream=False):
    "Create a unique, deterministic id from the request `r`."
    return hashlib.sha256(f"{r.url.host}{is_stream}".encode() + r.content).hexdigest()[:8]

In the `patch` we need to `consume` the entire stream before writing it to the cache.

In [None]:
@patch
def send(self:httpx._client.Client, r, **kwargs):
    is_stream = kwargs.get("stream")
    if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
    key = _key(r, is_stream=False)
    if res := _cache(key,"cachy.jsonl"): return httpx.Response(status_code=200, content=res, request=r)
    res = self._orig_send(r, **kwargs)
    content = res.read().decode() if not is_stream else b''.join(list(res.iter_bytes())).decode()
    _write_cache(key, content, "cachy.jsonl")
    return httpx.Response(status_code=res.status_code, content=content, request=r)

### `enable_cachy` 

To make `cachy` as user friendly as possible let's make it so that we can apply our patch by running a single method at the top of our notebook.

```python
from cachy import enable_cachy

enable_cachy()
```

For this to work we'll need to wrap our patch.

In [None]:
def _apply_patch():    
    @patch    
    def send(self:httpx._client.Client, r, **kwargs):
        is_stream = kwargs.get("stream")
        if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
        key = _key(r, is_stream=False)
        if res := _cache(key,"cachy.jsonl"): return httpx.Response(status_code=200, content=res, request=r)
        res = self._orig_send(r, **kwargs)
        content = res.read().decode() if not is_stream else b''.join(list(res.iter_bytes())).decode()
        _write_cache(key, content, "cachy.jsonl")
        return httpx.Response(status_code=res.status_code, content=content, request=r)

In [None]:
def enable_cachy():  
    _apply_patch()

Great. Now, let's make `cachy` a little more customizable by making it possible to specify:

- the APIs (or domains) to cache
- the location of the cache file.

In [None]:
def enable_cachy(cache_dir=None, doms=doms):
    cfp = Path(cache_dir or getattr(Config.find("settings.ini"), "config_path", ".")) / "cachy.jsonl"
    cfp.touch(exist_ok=True)   
    _apply_patch(cfp, doms)

*Note: If our notebook is running in an nbdev project `Config.find("settings.ini").config_path` automatically finds the base dir.*

In [None]:
def _apply_patch(cfp, doms):    
    @patch
    def send(self:httpx._client.Client, r, **kwargs):
        is_stream = kwargs.get("stream")
        if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
        key = _key(r, is_stream=False)
        if res := _cache(key,cfp): return httpx.Response(status_code=200, content=res, request=r)
        res = self._orig_send(r, **kwargs)
        content = res.read().decode() if not is_stream else b''.join(list(res.iter_bytes())).decode()
        _write_cache(key,content,cfp)
        return httpx.Response(status_code=res.status_code, content=content, request=r)

### Async

Now let's add support for `async` requests.

In [None]:
#| exports
def _apply_async_patch(cfp, doms):    
    @patch
    async def send(self:httpx._client.AsyncClient, r, **kwargs):
        is_stream = kwargs.get("stream")
        if not _should_cache(r.url, doms): return await self._orig_send(r, **kwargs)
        key = _key(r, is_stream=False)
        if res := _cache(key, cfp): return httpx.Response(status_code=200, content=res, request=r)
        res = await self._orig_send(r, **kwargs)
        content = res.read().decode() if not is_stream else b''.join([c async for c in res.aiter_bytes()]).decode()
        _write_cache(key, content, cfp)
        return httpx.Response(status_code=res.status_code, content=content, request=r)

Let's rename our original patch.

In [None]:
#| exports
def _apply_sync_patch(cfp, doms):    
    @patch
    def send(self:httpx._client.Client, r, **kwargs):
        is_stream = kwargs.get("stream")
        if not _should_cache(r.url, doms): return self._orig_send(r, **kwargs)
        key = _key(r, is_stream=False)
        if res := _cache(key,cfp): return httpx.Response(status_code=200, content=res, request=r)
        res = self._orig_send(r, **kwargs)
        content = res.read().decode() if not is_stream else b''.join(list(res.iter_bytes())).decode()
        _write_cache(key,content,cfp)
        return httpx.Response(status_code=res.status_code, content=content, request=r)

Finally, let's update `enable_cachy`.

In [None]:
def enable_cachy(cache_dir=None, doms=doms):
    cfp = Path(cache_dir or getattr(Config.find("settings.ini"), "config_path", ".")) / "cachy.jsonl"
    cfp.touch(exist_ok=True)   
    _apply_sync_patch(cfp, doms)
    _apply_async_patch(cfp, doms)

### Patching `litellm`

A LiteLLM `ModelResponse(Stream)` has a random `id` and a `created` timestamp. These are not provided by the http response but generated client side. As a result we have to find a different way of making these identical on consecutive requests.

For the `ModelResponesStream` we can patch the `__init__` function. However, for the `ModelResponse` this is not enough, because in the `(a)completion` function litellm overwrites `model_response.created` after the object has been initialized. To resolve this we wrap the `(a)completion` and overwrite the response as required.

In [None]:
#| exports
def _apply_litellm_patch():
    try: import litellm
    except ImportError: return  
    
    @patch
    def __init__(self:litellm.ModelResponseStream, *args, **kwargs):
        result = self._orig___init__(*args, **kwargs)
        self.id,self.created  = 'cachy-dummy-id',1
        return result
    
    @patch
    def __init__(self:litellm.ModelResponse, *args, **kwargs):
        result = self._orig___init__(*args, **kwargs)
        self.id,self.created  = 'cachy-dummy-id',1
        return result
    
    original_completion = litellm.completion
    def wrapped_completion(*args, **kwargs):
        res = original_completion(*args, **kwargs)
        if hasattr(res, 'id'): res.id = 'cachy-dummy-id'
        if hasattr(res, 'created'): res.created = 1
        return res
    litellm.completion = wrapped_completion
    
    original_acompletion = litellm.acompletion
    async def wrapped_acompletion(*args, **kwargs):
        res = await original_acompletion(*args, **kwargs)
        if hasattr(res, 'id'): res.id = 'cachy-dummy-id'
        if hasattr(res, 'created'): res.created = 1
        return res
    litellm.acompletion = wrapped_acompletion

And finally we export our `enable_cachy` function.

In [None]:
#| exports
def enable_cachy(cache_dir=None, doms=doms):
    cfp = Path(cache_dir or getattr(Config.find("settings.ini"), "config_path", ".")) / "cachy.jsonl"
    cfp.touch(exist_ok=True)   
    _apply_sync_patch(cfp, doms)
    _apply_async_patch(cfp, doms)
    _apply_litellm_patch()

## Tests

Let's test `enable_cachy` on 3 SDKs (OpenAI, Anthropic, LiteLLM) for the scenarios below:

- sync requests with(out) streaming
- async requests with(out) streaming

Add some helper functions.

In [None]:
class mods: ant="claude-sonnet-4-20250514"; oai="gpt-4o"; gem="gemini/gemini-2.0-flash"

In [None]:
def mk_msgs(m): return [{"role": "user", "content": f"write 1 word about {m}"}]

In [None]:
enable_cachy()

### OpenAI

In [None]:
from openai import OpenAI

In [None]:
cli = OpenAI()

In [None]:
r = cli.responses.create(model=mods.oai, input=mk_msgs("openai sync"))
r

Collaboration

<details>

- id: resp_0adc61ccfb3938da0068c80baeb59c81a29e449ca5f3333fa2
- created_at: 1757940654.0
- error: None
- incomplete_details: None
- instructions: None
- metadata: {}
- model: gpt-4o-2024-08-06
- object: response
- output: [ResponseOutputMessage(id='msg_0adc61ccfb3938da0068c80baf67fc81a2b8fe99f555767ec5', content=[ResponseOutputText(annotations=[], text='Collaboration', type='output_text', logprobs=[])], role='assistant', status='completed', type='message')]
- parallel_tool_calls: True
- temperature: 1.0
- tool_choice: auto
- tools: []
- top_p: 1.0
- background: False
- conversation: None
- max_output_tokens: None
- max_tool_calls: None
- previous_response_id: None
- prompt: None
- prompt_cache_key: None
- reasoning: Reasoning(effort=None, generate_summary=None, summary=None)
- safety_identifier: None
- service_tier: default
- status: completed
- text: ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium')
- top_logprobs: 0
- truncation: disabled
- usage: ResponseUsage(input_tokens=15, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=3, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=18)
- user: None
- store: True

</details>

In [None]:
r = cli.responses.create(model=mods.oai, input=mk_msgs("openai sync"))
r

Collaboration

<details>

- id: resp_0adc61ccfb3938da0068c80baeb59c81a29e449ca5f3333fa2
- created_at: 1757940654.0
- error: None
- incomplete_details: None
- instructions: None
- metadata: {}
- model: gpt-4o-2024-08-06
- object: response
- output: [ResponseOutputMessage(id='msg_0adc61ccfb3938da0068c80baf67fc81a2b8fe99f555767ec5', content=[ResponseOutputText(annotations=[], text='Collaboration', type='output_text', logprobs=[])], role='assistant', status='completed', type='message')]
- parallel_tool_calls: True
- temperature: 1.0
- tool_choice: auto
- tools: []
- top_p: 1.0
- background: False
- conversation: None
- max_output_tokens: None
- max_tool_calls: None
- previous_response_id: None
- prompt: None
- prompt_cache_key: None
- reasoning: Reasoning(effort=None, generate_summary=None, summary=None)
- safety_identifier: None
- service_tier: default
- status: completed
- text: ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium')
- top_logprobs: 0
- truncation: disabled
- usage: ResponseUsage(input_tokens=15, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=3, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=18)
- user: None
- store: True

</details>

Let's test streaming.

In [None]:
r = cli.responses.create(model=mods.oai, input=mk_msgs("openai sync streaming"), stream=True)
for ch in r: print(ch)

ResponseCreatedEvent(response=Response(id='resp_0bedad8c74657bc30068c80bb26cf4819ca6920df9fbd23916', created_at=1757940658.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, background=False, conversation=None, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Reasoning(effort=None, generate_summary=None, summary=None), safety_identifier=None, service_tier='auto', status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium'), top_logprobs=0, truncation='disabled', usage=None, user=None, store=True), sequence_number=0, type='response.created')
ResponseInProgressEvent(response=Response(id='resp_0bedad8c74657bc30068c80bb26cf4819ca6920df9fbd23916', created_at=1757940658.0, error=None, incomplete_details=None, instruction

In [None]:
r = cli.responses.create(model=mods.oai, input=mk_msgs("openai sync streaming"), stream=True)
for ch in r: print(ch)

ResponseCreatedEvent(response=Response(id='resp_0bedad8c74657bc30068c80bb26cf4819ca6920df9fbd23916', created_at=1757940658.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, background=False, conversation=None, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Reasoning(effort=None, generate_summary=None, summary=None), safety_identifier=None, service_tier='auto', status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium'), top_logprobs=0, truncation='disabled', usage=None, user=None, store=True), sequence_number=0, type='response.created')
ResponseInProgressEvent(response=Response(id='resp_0bedad8c74657bc30068c80bb26cf4819ca6920df9fbd23916', created_at=1757940658.0, error=None, incomplete_details=None, instruction

Let's test async.

In [None]:
from openai import AsyncOpenAI

In [None]:
cli = AsyncOpenAI()

In [None]:
r = await cli.responses.create(model=mods.oai, input=mk_msgs("openai async"))
r

Innovative

<details>

- id: resp_07966b24b5f8fcde0068c80bc084cc819785b99bc0eaa06e47
- created_at: 1757940672.0
- error: None
- incomplete_details: None
- instructions: None
- metadata: {}
- model: gpt-4o-2024-08-06
- object: response
- output: [ResponseOutputMessage(id='msg_07966b24b5f8fcde0068c80bc1038481978db7b032ab75dbef', content=[ResponseOutputText(annotations=[], text='Innovative', type='output_text', logprobs=[])], role='assistant', status='completed', type='message')]
- parallel_tool_calls: True
- temperature: 1.0
- tool_choice: auto
- tools: []
- top_p: 1.0
- background: False
- conversation: None
- max_output_tokens: None
- max_tool_calls: None
- previous_response_id: None
- prompt: None
- prompt_cache_key: None
- reasoning: Reasoning(effort=None, generate_summary=None, summary=None)
- safety_identifier: None
- service_tier: default
- status: completed
- text: ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium')
- top_logprobs: 0
- truncation: disabled
- usage: ResponseUsage(input_tokens=15, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=3, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=18)
- user: None
- store: True

</details>

In [None]:
r = await cli.responses.create(model=mods.oai, input=mk_msgs("openai async"))
r

Innovative

<details>

- id: resp_07966b24b5f8fcde0068c80bc084cc819785b99bc0eaa06e47
- created_at: 1757940672.0
- error: None
- incomplete_details: None
- instructions: None
- metadata: {}
- model: gpt-4o-2024-08-06
- object: response
- output: [ResponseOutputMessage(id='msg_07966b24b5f8fcde0068c80bc1038481978db7b032ab75dbef', content=[ResponseOutputText(annotations=[], text='Innovative', type='output_text', logprobs=[])], role='assistant', status='completed', type='message')]
- parallel_tool_calls: True
- temperature: 1.0
- tool_choice: auto
- tools: []
- top_p: 1.0
- background: False
- conversation: None
- max_output_tokens: None
- max_tool_calls: None
- previous_response_id: None
- prompt: None
- prompt_cache_key: None
- reasoning: Reasoning(effort=None, generate_summary=None, summary=None)
- safety_identifier: None
- service_tier: default
- status: completed
- text: ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium')
- top_logprobs: 0
- truncation: disabled
- usage: ResponseUsage(input_tokens=15, input_tokens_details=InputTokensDetails(cached_tokens=0), output_tokens=3, output_tokens_details=OutputTokensDetails(reasoning_tokens=0), total_tokens=18)
- user: None
- store: True

</details>

Let's test async streaming.

In [None]:
r = await cli.responses.create(model=mods.oai, input=mk_msgs("openai async streaming"), stream=True)
async for ch in r: print(ch)

ResponseCreatedEvent(response=Response(id='resp_09667f14142ea18b0068c80bc16c8481a1988be17cf34e1823', created_at=1757940673.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, background=False, conversation=None, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Reasoning(effort=None, generate_summary=None, summary=None), safety_identifier=None, service_tier='auto', status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium'), top_logprobs=0, truncation='disabled', usage=None, user=None, store=True), sequence_number=0, type='response.created')
ResponseInProgressEvent(response=Response(id='resp_09667f14142ea18b0068c80bc16c8481a1988be17cf34e1823', created_at=1757940673.0, error=None, incomplete_details=None, instruction

In [None]:
r = await cli.responses.create(model=mods.oai, input=mk_msgs("openai async streaming"), stream=True)
async for ch in r: print(ch)

ResponseCreatedEvent(response=Response(id='resp_09667f14142ea18b0068c80bc16c8481a1988be17cf34e1823', created_at=1757940673.0, error=None, incomplete_details=None, instructions=None, metadata={}, model='gpt-4o-2024-08-06', object='response', output=[], parallel_tool_calls=True, temperature=1.0, tool_choice='auto', tools=[], top_p=1.0, background=False, conversation=None, max_output_tokens=None, max_tool_calls=None, previous_response_id=None, prompt=None, prompt_cache_key=None, reasoning=Reasoning(effort=None, generate_summary=None, summary=None), safety_identifier=None, service_tier='auto', status='in_progress', text=ResponseTextConfig(format=ResponseFormatText(type='text'), verbosity='medium'), top_logprobs=0, truncation='disabled', usage=None, user=None, store=True), sequence_number=0, type='response.created')
ResponseInProgressEvent(response=Response(id='resp_09667f14142ea18b0068c80bc16c8481a1988be17cf34e1823', created_at=1757940673.0, error=None, incomplete_details=None, instruction

### Anthropic

In [None]:
from anthropic import Anthropic

In [None]:
cli = Anthropic()

In [None]:
r = cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant sync"))
r

Coordination

<details>

- id: `msg_01AgKMiEZKSXzQYnSYWaducJ`
- content: `[{'citations': None, 'text': 'Coordination', 'type': 'text'}]`
- model: `claude-sonnet-4-20250514`
- role: `assistant`
- stop_reason: `end_turn`
- stop_sequence: `None`
- type: `message`
- usage: `{'cache_creation': {'ephemeral_1h_input_tokens': 0, 'ephemeral_5m_input_tokens': 0}, 'cache_creation_input_tokens': 0, 'cache_read_input_tokens': 0, 'input_tokens': 15, 'output_tokens': 5, 'server_tool_use': None, 'service_tier': 'standard'}`

</details>

In [None]:
r = cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant sync"))
r

Coordination

<details>

- id: `msg_01AgKMiEZKSXzQYnSYWaducJ`
- content: `[{'citations': None, 'text': 'Coordination', 'type': 'text'}]`
- model: `claude-sonnet-4-20250514`
- role: `assistant`
- stop_reason: `end_turn`
- stop_sequence: `None`
- type: `message`
- usage: `{'cache_creation': {'ephemeral_1h_input_tokens': 0, 'ephemeral_5m_input_tokens': 0}, 'cache_creation_input_tokens': 0, 'cache_read_input_tokens': 0, 'input_tokens': 15, 'output_tokens': 5, 'server_tool_use': None, 'service_tier': 'standard'}`

</details>

Let's test streaming.

In [None]:
r = cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant sync streaming"), stream=True)
for ch in r: print(ch)

RawMessageStartEvent(message=Message(id='msg_01Jnrv7itVfTgQGFkbGnoi6k', content=[], model='claude-sonnet-4-20250514', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=In: 16; Out: 1; Cache create: 0; Cache read: 0; Total Tokens: 17; Search: 0), type='message_start')
RawContentBlockStartEvent(content_block=TextBlock(citations=None, text='', type='text'), index=0, type='content_block_start')
RawContentBlockDeltaEvent(delta=TextDelta(text='**', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='Buffering**', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockStopEvent(index=0, type='content_block_stop')
RawMessageDeltaEvent(delta=Delta(stop_reason='end_turn', stop_sequence=None), type='message_delta', usage=MessageDeltaUsage(cache_creation_input_tokens=0, cache_read_input_tokens=0, input_tokens=16, output_tokens=8, server_tool_use=None))
RawMessageStopEvent(type='message_stop')


In [None]:
r = cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant sync streaming"), stream=True)
for ch in r: print(ch)

RawMessageStartEvent(message=Message(id='msg_01Jnrv7itVfTgQGFkbGnoi6k', content=[], model='claude-sonnet-4-20250514', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=In: 16; Out: 1; Cache create: 0; Cache read: 0; Total Tokens: 17; Search: 0), type='message_start')
RawContentBlockStartEvent(content_block=TextBlock(citations=None, text='', type='text'), index=0, type='content_block_start')
RawContentBlockDeltaEvent(delta=TextDelta(text='**', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='Buffering**', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockStopEvent(index=0, type='content_block_stop')
RawMessageDeltaEvent(delta=Delta(stop_reason='end_turn', stop_sequence=None), type='message_delta', usage=MessageDeltaUsage(cache_creation_input_tokens=0, cache_read_input_tokens=0, input_tokens=16, output_tokens=8, server_tool_use=None))
RawMessageStopEvent(type='message_stop')


Let's test async.

In [None]:
from anthropic import AsyncAnthropic

In [None]:
cli = AsyncAnthropic()

In [None]:
r = await cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant async"))
r

**concurrent**

<details>

- id: `msg_01R33rKFKM5BqeJprT6A6DVM`
- content: `[{'citations': None, 'text': '**concurrent**', 'type': 'text'}]`
- model: `claude-sonnet-4-20250514`
- role: `assistant`
- stop_reason: `end_turn`
- stop_sequence: `None`
- type: `message`
- usage: `{'cache_creation': {'ephemeral_1h_input_tokens': 0, 'ephemeral_5m_input_tokens': 0}, 'cache_creation_input_tokens': 0, 'cache_read_input_tokens': 0, 'input_tokens': 15, 'output_tokens': 6, 'server_tool_use': None, 'service_tier': 'standard'}`

</details>

In [None]:
r = await cli.messages.create(model=mods.ant, max_tokens=1024, messages=mk_msgs("ant async"))
r

**concurrent**

<details>

- id: `msg_01R33rKFKM5BqeJprT6A6DVM`
- content: `[{'citations': None, 'text': '**concurrent**', 'type': 'text'}]`
- model: `claude-sonnet-4-20250514`
- role: `assistant`
- stop_reason: `end_turn`
- stop_sequence: `None`
- type: `message`
- usage: `{'cache_creation': {'ephemeral_1h_input_tokens': 0, 'ephemeral_5m_input_tokens': 0}, 'cache_creation_input_tokens': 0, 'cache_read_input_tokens': 0, 'input_tokens': 15, 'output_tokens': 6, 'server_tool_use': None, 'service_tier': 'standard'}`

</details>

Let's test async streaming.

In [None]:
r = await cli.messages.create(model=mods.ant,max_tokens=1024,messages=mk_msgs("ant async streaming"), stream=True)
async for ch in r.response.aiter_bytes(): print(ch.decode())

event: message_start
data: {"type":"message_start","message":{"id":"msg_019CQEuc6f7D2ewx7Co1PcTW","type":"message","role":"assistant","model":"claude-sonnet-4-20250514","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":16,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}}       }

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}     }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Async"}             }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Iterable"}}

event: ping
data: {"type": "ping"}

event: content_block_stop
data: {"type":"content_block_stop","index":0     }

event: ping
data: {"type": "ping"}

event: ping
data: {"type": "p

In [None]:
r = await cli.messages.create(model=mods.ant,max_tokens=1024,messages=mk_msgs("ant async streaming"), stream=True)
async for ch in r.response.aiter_bytes(): print(ch.decode())

event: message_start
data: {"type":"message_start","message":{"id":"msg_019CQEuc6f7D2ewx7Co1PcTW","type":"message","role":"assistant","model":"claude-sonnet-4-20250514","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":16,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"cache_creation":{"ephemeral_5m_input_tokens":0,"ephemeral_1h_input_tokens":0},"output_tokens":1,"service_tier":"standard"}}       }

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}     }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Async"}             }

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Iterable"}}

event: ping
data: {"type": "ping"}

event: content_block_stop
data: {"type":"content_block_stop","index":0     }

event: ping
data: {"type": "ping"}

event: ping
data: {"type": "p

### LiteLLM

Let's test the LiteLLM SDK by running sync/async calls with(out) streaming for OpenAI, Anthropic, & Gemini.

We'll also double check tool calls and citations.

#### Sync Tests

In [None]:
from litellm import completion

Let's define a helper method to display a streamed response.

In [None]:
def _stream(r): 
    for ch in r: print(ch.choices[0].delta.content or "")

##### Anthropic

Let's test `claude-sonnet-x`.

In [None]:
r = completion(model=mods.ant, messages=mk_msgs("lite: ant sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='**Streamlined**', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=8, prompt_tokens=18, total_tokens=26, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

In [None]:
r = completion(model=mods.ant, messages=mk_msgs("lite: ant sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='**Streamlined**', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=8, prompt_tokens=18, total_tokens=26, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

Now, with streaming enabled.

First lets verify the `id` and `created` patching is working here as well.

In [None]:
r = completion(model=mods.ant, messages=mk_msgs("lite: ant sync stream..."), stream=True)
os = [o for o in r]
os

[ModelResponseStream(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion.chunk', system_fingerprint=None, choices=[StreamingChoices(finish_reason=None, index=0, delta=Delta(provider_specific_fields=None, content='**', role='assistant', function_call=None, tool_calls=None, audio=None), logprobs=None)], provider_specific_fields=None, citations=None),
 ModelResponseStream(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion.chunk', system_fingerprint=None, choices=[StreamingChoices(finish_reason=None, index=0, delta=Delta(provider_specific_fields=None, content='Lightweight**', role=None, function_call=None, tool_calls=None, audio=None), logprobs=None)], provider_specific_fields=None, citations=None),
 ModelResponseStream(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion.chunk', system_fingerprint=None, choices=[StreamingChoices(finish_reason='stop', index=0, delta=Delta(provi

And let's verify it's compatible with litelllm's stream_chunk_builder:

In [None]:
from litellm import stream_chunk_builder

In [None]:
stream_chunk_builder(os)

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='**Lightweight**', role='assistant', tool_calls=None, function_call=None, provider_specific_fields=None))], usage=Usage(completion_tokens=7, prompt_tokens=19, total_tokens=26, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=None, audio_tokens=None, reasoning_tokens=0, rejected_prediction_tokens=None, text_tokens=None), prompt_tokens_details=None))

Success: the `id` and `created` fields are still patched!

##### OpenAI

Let's test `gpt-4o`.

In [None]:
r = completion(model=mods.oai, messages=mk_msgs("lite: oai sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_f33640a400', choices=[Choices(finish_reason='stop', index=0, message=Message(content='Efficiency', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=1, prompt_tokens=18, total_tokens=19, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None)), service_tier='default')

In [None]:
r = completion(model=mods.oai, messages=mk_msgs("lite: oai sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_f33640a400', choices=[Choices(finish_reason='stop', index=0, message=Message(content='Efficiency', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=1, prompt_tokens=18, total_tokens=19, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None)), service_tier='default')

Now, with streaming enabled.

In [None]:
r = completion(model=mods.oai, messages=mk_msgs("lite: oai sync stream..."), stream=True)
_stream(r)

Synchronization




In [None]:
r = completion(model=mods.oai, messages=mk_msgs("lite: oai sync stream..."), stream=True)
_stream(r)

Synchronization




##### Gemini

Let's test `2.0-flash`.

In [None]:
r = completion(model=mods.gem, messages=mk_msgs("lite: gem sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gemini-2.0-flash', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='Synchronization.\n', role='assistant', tool_calls=None, function_call=None, images=[], thinking_blocks=[], provider_specific_fields=None))], usage=Usage(completion_tokens=3, prompt_tokens=10, total_tokens=13, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=None, text_tokens=10, image_tokens=None)), vertex_ai_grounding_metadata=[], vertex_ai_url_context_metadata=[], vertex_ai_safety_results=[], vertex_ai_citation_metadata=[])

In [None]:
r = completion(model=mods.gem, messages=mk_msgs("lite: gem sync..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gemini-2.0-flash', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='Synchronization.\n', role='assistant', tool_calls=None, function_call=None, images=[], thinking_blocks=[], provider_specific_fields=None))], usage=Usage(completion_tokens=3, prompt_tokens=10, total_tokens=13, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=None, text_tokens=10, image_tokens=None)), vertex_ai_grounding_metadata=[], vertex_ai_url_context_metadata=[], vertex_ai_safety_results=[], vertex_ai_citation_metadata=[])

Now, with streaming enabled.

In [None]:
r = completion(model=mods.gem, messages=mk_msgs("lite: gem sync stream..."), stream=True)
_stream(r)

Eff
ortless




In [None]:
r = completion(model=mods.gem, messages=mk_msgs("lite: gem sync stream..."), stream=True)
_stream(r)

Eff
ortless




#### Async Tests

In [None]:
from litellm import acompletion

In [None]:
async def _astream(r):
    async for chunk in r: print(chunk.choices[0].delta.content or "")

##### Anthropic

Let's test `claude-sonnet-x`.

In [None]:
r = await acompletion(model=mods.ant, messages=mk_msgs("lite: ant async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='**coroutine**', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=8, prompt_tokens=18, total_tokens=26, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

In [None]:
r = await acompletion(model=mods.ant, messages=mk_msgs("lite: ant async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='**coroutine**', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=8, prompt_tokens=18, total_tokens=26, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

Now, with streaming enabled.

In [None]:
r = await acompletion(model=mods.ant, messages=mk_msgs("lite: ant async stream..."), stream=True)
await(_astream(r))

**
reactive**



In [None]:
r = await acompletion(model=mods.ant, messages=mk_msgs("lite: ant async stream..."), stream=True)
await(_astream(r))

**
reactive**



##### OpenAI

Let's test `gpt-4o`.

In [None]:
r = await acompletion(model=mods.oai, messages=mk_msgs("lite: oai async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_f33640a400', choices=[Choices(finish_reason='stop', index=0, message=Message(content='Fast.', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=2, prompt_tokens=18, total_tokens=20, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None)), service_tier='default')

In [None]:
r = await acompletion(model=mods.oai, messages=mk_msgs("lite: oai async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gpt-4o-2024-08-06', object='chat.completion', system_fingerprint='fp_f33640a400', choices=[Choices(finish_reason='stop', index=0, message=Message(content='Fast.', role='assistant', tool_calls=None, function_call=None, provider_specific_fields={'refusal': None}, annotations=[]), provider_specific_fields={})], usage=Usage(completion_tokens=2, prompt_tokens=18, total_tokens=20, completion_tokens_details=CompletionTokensDetailsWrapper(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0, text_tokens=None), prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=0, cached_tokens=0, text_tokens=None, image_tokens=None)), service_tier='default')

Now, with streaming enabled.

In [None]:
r = await acompletion(model=mods.oai, messages=mk_msgs("lite: oai async stream..."), stream=True)
await(_astream(r))

Eff
icient
.




In [None]:
r = await acompletion(model=mods.oai, messages=mk_msgs("lite: oai async stream..."), stream=True)
await(_astream(r))

Eff
icient
.




##### Gemini

Let's test `2.0-flash`.

In [None]:
r = await acompletion(model=mods.gem, messages=mk_msgs("lite: gem async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gemini-2.0-flash', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='Concurrency\n', role='assistant', tool_calls=None, function_call=None, images=[], thinking_blocks=[], provider_specific_fields=None))], usage=Usage(completion_tokens=2, prompt_tokens=10, total_tokens=12, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=None, text_tokens=10, image_tokens=None)), vertex_ai_grounding_metadata=[], vertex_ai_url_context_metadata=[], vertex_ai_safety_results=[], vertex_ai_citation_metadata=[])

In [None]:
r = await acompletion(model=mods.gem, messages=mk_msgs("lite: gem async..."))
r

ModelResponse(id='cachy-dummy-id', created=1, model='gemini-2.0-flash', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='stop', index=0, message=Message(content='Concurrency\n', role='assistant', tool_calls=None, function_call=None, images=[], thinking_blocks=[], provider_specific_fields=None))], usage=Usage(completion_tokens=2, prompt_tokens=10, total_tokens=12, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=None, text_tokens=10, image_tokens=None)), vertex_ai_grounding_metadata=[], vertex_ai_url_context_metadata=[], vertex_ai_safety_results=[], vertex_ai_citation_metadata=[])

Now, with streaming enabled.

In [None]:
r = await acompletion(model=mods.gem, messages=mk_msgs("lite: gem async stream..."), stream=True)
await(_astream(r))

Concurrency



In [None]:
r = await acompletion(model=mods.gem, messages=mk_msgs("lite: gem async stream..."), stream=True)
await(_astream(r))

Concurrency



#### Tool Calls

As a sanity check let's confirm that tool calls work.

In [None]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Get the current weather in a given location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type":"string", "description":"The city e.g. Reims"},
                    "unit": {"type":"string", "enum":["celsius", "fahrenheit"]},
                },
                "required": ["location"],
            }
        }
    }
]

In [None]:
r = completion(model=mods.ant, messages=mk_msgs("Is it raining in Reims?"), tools=tools)
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='tool_calls', index=0, message=Message(content=None, role='assistant', tool_calls=[ChatCompletionMessageToolCall(index=0, function=Function(arguments='{"location": "Reims"}', name='get_current_weather'), id='toolu_0182nVBg1pTYTadKxS5qgCt4', type='function')], function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=57, prompt_tokens=427, total_tokens=484, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

In [None]:
r = completion(model=mods.ant, messages=mk_msgs("Is it raining in Reims?"), tools=tools)
r

ModelResponse(id='cachy-dummy-id', created=1, model='claude-sonnet-4-20250514', object='chat.completion', system_fingerprint=None, choices=[Choices(finish_reason='tool_calls', index=0, message=Message(content=None, role='assistant', tool_calls=[ChatCompletionMessageToolCall(index=0, function=Function(arguments='{"location": "Reims"}', name='get_current_weather'), id='toolu_0182nVBg1pTYTadKxS5qgCt4', type='function')], function_call=None, provider_specific_fields={'citations': None, 'thinking_blocks': None}))], usage=Usage(completion_tokens=57, prompt_tokens=427, total_tokens=484, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetailsWrapper(audio_tokens=None, cached_tokens=0, text_tokens=None, image_tokens=None), cache_creation_input_tokens=0, cache_read_input_tokens=0))

## Export -

In [None]:
#|hide
#|eval: false
import nbdev; nbdev.nbdev_export()