In [1]:
#|default_exp asink

# The async version

## Setup

In [2]:
#| export
import inspect, typing, mimetypes, base64, json
from collections import abc
try: from IPython import display
except: display=None

from anthropic import AsyncAnthropic, AsyncAnthropicBedrock, AsyncAnthropicVertex
from anthropic.types import Usage, TextBlock, Message, ToolUseBlock
from anthropic.resources import messages

import toolslm
from toolslm.funccall import *

from fastcore.meta import delegates
from fastcore.utils import *
from claudette.core import *
from msglm import mk_msg_anthropic as mk_msg, mk_msgs_anthropic as mk_msgs

In [3]:
#| hide
from nbdev import show_doc

## Async SDK

In [4]:
model = models[1]
cli = AsyncAnthropic()

In [None]:
prompt = "I'm Jeremy"
m = mk_msg(prompt)
r = await cli.messages.create(messages=[m], model=model, max_tokens=100)
r

In [None]:
msgs = mk_msgs([prompt, r, "I forgot my name. Can you remind me please?"]) 
msgs

In [None]:
await cli.messages.create(messages=msgs, model=model, max_tokens=200)

In [8]:
#| exports
class AsyncClient(Client):
    def __init__(self, model, cli=None, log=False, cache=False):
        "Async Anthropic messages client."
        self.model,self.use = model,usage()
        self.text_only = model in text_only_models
        self.log = [] if log else None
        self.c = (cli or AsyncAnthropic(default_headers={'anthropic-beta': 'prompt-caching-2024-07-31'}))
        self.cache = cache

In [9]:
#| exports
@patch
async def _log(self:AsyncClient, final, prefill, msgs, maxtok=None, sp=None, temp=None, stream=None, stop=None, **kwargs):
    "Store the result of the message and accrue total usage."
    self._r(final, prefill)
    if self.log is not None: self.log.append({
        "msgs": msgs, "prefill": prefill,
        "maxtok": maxtok, "sp": sp, "temp": temp, "stream": stream, "stop": stop, **kwargs,
        "result": self.result, "use": self.use, "stop_reason": self.stop_reason, "stop_sequence": self.stop_sequence
    })
    return self.result

In [10]:
c = AsyncClient(model)

In [None]:
c._r(r)
c.use

In [12]:
#| exports
@patch
async def _stream(self:AsyncClient, msgs:list, prefill='', **kwargs):
    "Stream response from Claude."
    async with self.c.messages.stream(model=self.model, messages=mk_msgs(msgs, cache=self.cache, cache_last_ckpt_only=self.cache), **kwargs) as s:
        if prefill: yield prefill
        async for o in s.text_stream: yield o
        await self._log(await s.get_final_message(), prefill, msgs, **kwargs)

In [13]:
#| exports
@patch
def _precall(self:AsyncClient, msgs, prefill, stop, kwargs):
    "Prepare messages for a call to Claude."
    pref = [prefill.strip()] if prefill else []
    if not isinstance(msgs,list): msgs = [msgs]
    if stop is not None:
        if not isinstance(stop, (list)): stop = [stop]
        kwargs["stop_sequences"] = stop
    msgs = mk_msgs(msgs+pref, cache=self.cache, cache_last_ckpt_only=self.cache)
    return msgs

In [14]:
@patch
@delegates(Client.__call__)
async def __call__(self:AsyncClient,
             msgs:list, # List of messages in the dialog
             sp='', # The system prompt
             temp=0, # Temperature
             maxtok=4096, # Maximum tokens
             prefill='', # Optional prefill to pass to Claude as start of its response
             stream:bool=False, # Stream response?
             stop=None, # Stop sequence
             tools:Optional[list]=None, # List of tools to make available to Claude
             tool_choice:Optional[dict]=None, # Optionally force use of some tool
             **kwargs):
    "Make an async call to Claude."
    if tools: kwargs['tools'] = [get_schema(o) for o in listify(tools)]
    if tool_choice: kwargs['tool_choice'] = mk_tool_choice(tool_choice)
    msgs = self._precall(msgs, prefill, stop, kwargs)
    if any(t == 'image' for t in get_types(msgs)): assert not self.text_only, f"Images are not supported by the current model type: {self.model}"
    if stream: return self._stream(msgs, prefill=prefill, max_tokens=maxtok, system=sp, temperature=temp, **kwargs)
    res = await self.c.messages.create(
        model=self.model, messages=msgs, max_tokens=maxtok, system=sp, temperature=temp, **kwargs)
    return await self._log(res, prefill, msgs, maxtok, sp, temp, stream=stream, stop=stop, **kwargs)

In [None]:
c = AsyncClient(model, log=True)
c.use

In [None]:
c.model = models[1]
await c('Hi')

In [None]:
c.use

In [None]:
q = "Concisely, what is the meaning of life?"
pref = 'According to Douglas Adams,'
await c(q, prefill=pref)

In [None]:
c.use

In [None]:
async for o in await c(q, prefill=pref, stream=True): print(o, end='')

In [None]:
c.use

In [22]:
def sums(
    a:int,  # First thing to sum
    b:int=1 # Second thing to sum
) -> int: # The sum of the inputs
    "Adds a + b."
    print(f"Finding the sum of {a} and {b}")
    return a + b

In [23]:
a,b = 604542,6458932
pr = f"What is {a}+{b}?"
sp = "You are a summing expert."

In [None]:
tools=[sums]
choice = mk_tool_choice('sums')
choice

In [None]:
msgs = mk_msgs(pr)
r = await c(msgs, sp=sp, tools=tools, tool_choice=choice)
r

In [None]:
tr = mk_toolres(r, ns=globals())
tr

In [None]:
msgs += tr
r = contents(await c(msgs, sp=sp, tools=sums))
r

## Structured Output

In [28]:
#|export
@patch
@delegates(Client.structured)
async def structured(self:AsyncClient,
               msgs:list, # List of messages in the dialog
               tools:Optional[list]=None, # List of tools to make available to Claude
               obj:Optional=None, # Class to search for tools  
               ns:Optional[abc.Mapping]=None, # Namespace to search for tools
               **kwargs):
    "Return the value of all tool calls (generally used for structured outputs)"
    tools = listify(tools)
    res = await self(msgs, tools=tools, tool_choice=tools, **kwargs)
    if ns is None: ns=mk_ns(*tools)
    if obj is not None: ns = mk_ns(obj)
    cts = getattr(res, 'content', [])
    tcs = [call_func(o.name, o.input, ns=ns) for o in cts if isinstance(o,ToolUseBlock)]
    return tcs

In [None]:
await c.structured(pr, sums)

In [None]:
c

## AsyncChat

In [31]:
#| exports
class AsyncChat(Chat):
    def __init__(self,
                 model:Optional[str]=None, # Model to use (leave empty if passing `cli`)
                 cli:Optional[Client]=None, # Client to use (leave empty if passing `model`)
                 sp='', # Optional system prompt
                 tools:Optional[list]=None, # List of tools to make available to Claude
                 temp=0, # Temperature
                 cont_pr:Optional[str]=None, # User prompt to continue an assistant response: assistant,[user:"..."],assistant
                 cache: bool = False):
        "Anthropic async chat client."
        assert model or cli
        assert cont_pr != "", "cont_pr may not be an empty string"
        self.c = (cli or AsyncClient(model, cache=cache))
        if tools: tools = [tool(t) for t in tools]
        self.h,self.sp,self.tools,self.cont_pr,self.temp,self.cache = [],sp,tools,cont_pr,temp,cache

    @property
    def use(self): return self.c.use

In [32]:
#| exports
@patch(as_prop=True)
def cost(self: AsyncChat) -> float: return self.c.cost

In [None]:
sp = "Never mention what tools you use."
chat = AsyncChat(model, sp=sp)
chat.c.use, chat.h

In [34]:
#| exports
@patch
async def _stream(self:AsyncChat, res):
    "Handle streaming response from Claude."
    async for o in res: yield o
    self.h += mk_toolres(self.c.result, ns=self.tools, obj=self)

In [35]:
#| exports
@patch
async def _append_pr(self:AsyncChat,
               pr=None,  # Prompt / message
              ):
    "Append prompt to history, handling role alternation."
    prev_role = nested_idx(self.h, -1, 'role') if self.h else 'assistant' # First message should be 'user'
    if pr and prev_role == 'user': await self() # already user request pending
    self._post_pr(pr, prev_role)

In [36]:
#| exports 
@patch
def _post_pr(self:AsyncChat, pr, prev_role):
    "Process prompt after role check."
    if pr is None and prev_role == 'assistant':
        if self.cont_pr is None:
            raise ValueError("Prompt must be given after assistant completion, or use `self.cont_pr`.")
        pr = self.cont_pr # No user prompt, keep the chain
    if pr: self.h.append(mk_msg(pr, cache=self.cache))

In [37]:
#| exports
@patch
async def __call__(self:AsyncChat,
             pr=None,  # Prompt / message
             temp=None, # Temperature
             maxtok=4096, # Maximum tokens
             stream=False, # Stream response?
             prefill='', # Optional prefill to pass to Claude as start of its response
             tool_choice:Optional[dict]=None, # Optionally force use of some tool
             **kw):
    "Make an async call to Claude via the chat interface."
    if temp is None: temp=self.temp
    await self._append_pr(pr)
    res = await self.c(self.h, stream=stream, prefill=prefill, sp=self.sp, temp=temp, maxtok=maxtok,
                 tools=self.tools, tool_choice=tool_choice,**kw)
    if stream: return self._stream(res)
    self.h += mk_toolres(self.c.result, ns=self.tools)
    return res

In [None]:
await chat("I'm Jeremy")
await chat("What's my name?")

In [None]:
q = "Concisely, what is the meaning of life?"
pref = 'According to Douglas Adams,'
await chat(q, prefill=pref)

In [None]:
chat = AsyncChat(model, sp=sp)
async for o in await chat("I'm Jeremy", stream=True): print(o, end='')

In [None]:
pr = f"What is {a}+{b}?"
chat = AsyncChat(model, sp=sp, tools=[sums])
r = await chat(pr)
r

In [None]:
pr += " Say the answer in a sentence."
chat = AsyncChat(model, sp=sp, tools=[sums])
r = await chat(pr)
r

# Async Tool Loop

In [43]:
#| exports
@patch
@delegates(AsyncChat.__call__)
async def toolloop(self:AsyncChat,
             pr, # Prompt to pass to Claude
             max_steps=10, # Maximum number of tool requests to loop through  
             trace_func:Optional[callable]=None, # Function to trace tool use steps (e.g `print`)
             cont_func:Optional[callable]=noop, # Function that stops loop if returns False
             **kwargs):
    "Add prompt `pr` to dialog and get a response from Claude, automatically following up with `tool_use` messages"
    n_msgs = len(self.h)
    r = await self(pr, **kwargs)
    for i in range(max_steps):
        if r.stop_reason!='tool_use': break
        if trace_func: trace_func(self.h[n_msgs:]); n_msgs = len(self.h)
        r = await self(**kwargs)
        if not (cont_func or noop)(self.h[-2]): break
    if trace_func: trace_func(self.h[n_msgs:])
    return r

Let's create some data to test out async tools.

In [49]:
orders = {
    "O1": dict(id="O1", product="Widget A", quantity=2, price=19.99, status="Shipped"),
    "O2": dict(id="O2", product="Gadget B", quantity=1, price=49.99, status="Processing"),
    "O3": dict(id="O3", product="Gadget B", quantity=2, price=49.99, status="Shipped")}

customers = {
    "C1": dict(name="John Doe", email="john@example.com", phone="123-456-7890",
               orders=[orders['O1'], orders['O2']]),
    "C2": dict(name="Jane Smith", email="jane@example.com", phone="987-654-3210",
               orders=[orders['O3']])
}

Next we define the tools.

In [50]:
def get_customer_info(
    customer_id:str # ID of the customer
): # Customer's name, email, phone number, and list of orders
    "Retrieves a customer's information and their orders based on the customer ID"
    print(f'- Retrieving customer {customer_id}')
    return customers.get(customer_id, "Customer not found")

def get_order_details(
    order_id:str # ID of the order
): # Order's ID, product name, quantity, price, and order status
    "Retrieves the details of a specific order based on the order ID"
    print(f'- Retrieving order {order_id}')
    return orders.get(order_id, "Order not found")

def cancel_order(
    order_id:str # ID of the order to cancel
)->bool: # True if the cancellation is successful
    "Cancels an order based on the provided order ID"
    print(f'- Cancelling order {order_id}')
    if order_id not in orders: return False
    orders[order_id]['status'] = 'Cancelled'
    return True

We can see the async tool loop in action.

In [None]:
# Set up the tools and chat
tools = [get_customer_info, get_order_details, cancel_order]
chat = AsyncChat(model, tools=tools)

# Example using async toolloop with a simple query
async def example_email_query():
    r = await chat.toolloop('Can you tell me the email address for customer C1?')
    print(contents(r))

# Example with a more complex multi-step tool process
async def example_cancel_orders():
    chat = AsyncChat(model, tools=tools)
    r = await chat.toolloop('Please cancel all orders for customer C1 for me.', trace_func=print)
    print(contents(r))
    
    # Verify the order was canceled
    r = await chat.toolloop('What is the status of order O2?')
    print(contents(r))

# Execute the examples
await example_email_query()
await example_cancel_orders()

# Image example

In [None]:
fn = Path('samples/puppy.jpg')
img = fn.read_bytes()
display.Image(img)

In [None]:
q = "In brief, what color flowers are in this image?"
msg = mk_msg([img, q])
await c([msg])

In [None]:
chat = AsyncChat(model, sp=sp, cache=True)
await chat("Lorem ipsum dolor sit amet" * 150)
chat.use

In [None]:
await chat("Whoops, sorry about that!")
chat.use

## Export -

In [48]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()