In [None]:
#|default_exp core

# Cosette's source

## Setup

In [None]:
#| export
from fastcore import imghdr
from fastcore.utils import *
from fastcore.meta import delegates

import inspect, typing, mimetypes, base64, json, ast
from collections import abc
from random import choices
from string import ascii_letters,digits

from openai import types
from openai import Completion,OpenAI,NOT_GIVEN
from openai.resources import chat
from openai.resources.chat import Completions
from openai.types.chat.chat_completion import ChatCompletion, ChatCompletionMessage
from openai.types.completion_usage import CompletionUsage

from toolslm.funccall import *

try: from IPython import display
except: display=None

In [None]:
#| hide
from nbdev import show_doc

In [None]:
#| export
empty = inspect.Parameter.empty

In [None]:
#| exports
models = 'gpt-4o', 'gpt-4-turbo', 'gpt-4', 'gpt-4-32k', 'gpt-3.5-turbo', 'gpt-3.5-turbo-instruct', 'o1-preview', 'o1-mini'

In [None]:
model = models[0]

For examples, we'll use GPT-4o.

## OpenAI SDK

In [None]:
cli = OpenAI().chat.completions

In [None]:
m = {'role': 'user', 'content': "I'm Jeremy"}
r = cli.create(messages=[m], model=model, max_tokens=100)
r

### Formatting output

In [None]:
#| exports
def find_block(r:abc.Mapping, # The message to look in
              ):
    "Find the message in `r`."
    m = nested_idx(r, 'choices', 0)
    if not m: return m
    if hasattr(m, 'message'): return m.message
    return m.delta

In [None]:
#| exports
def contents(r):
    "Helper to get the contents from response `r`."
    blk = find_block(r)
    if not blk: return r
    if hasattr(blk, 'content'): return getattr(blk,'content')
    return blk

In [None]:
contents(r)

In [None]:
#| exports
@patch
def _repr_markdown_(self:ChatCompletion):
    det = '\n- '.join(f'{k}: {v}' for k,v in dict(self).items())
    res = contents(self)
    if not res: return f"- {det}"
    return f"""{contents(self)}

<details>

- {det}

</details>"""

In [None]:
r

In [None]:
r.usage

In [None]:
#| exports
def usage(inp=0, # Number of prompt tokens
          out=0, # Number of completion tokens
          reas=0 # Number of reasoning tokens
         ):
    "Slightly more concise version of `CompletionUsage`."
    return CompletionUsage(prompt_tokens=inp, completion_tokens=out, reasoning_tokens=reas, total_tokens=inp+out+reas)

In [None]:
usage(5)

In [None]:
#| exports
@patch
def __repr__(self:CompletionUsage): return f'In: {self.prompt_tokens}; Out: {self.completion_tokens}; Reasoning: {self.completion_tokens_details.reasoning_tokens}; Total: {self.total_tokens}'

In [None]:
r.usage

In [None]:
#| exports
@patch
def __add__(self:CompletionUsage, b):
    "Add together each of `input_tokens` and `output_tokens`"
    return usage(self.prompt_tokens+b.prompt_tokens, self.completion_tokens+b.completion_tokens, self.completion_tokens_details.reasoning_tokens+b.completion_tokens_details.reasoning_tokens)

In [None]:
r.usage+r.usage

In [None]:
#| export
def wrap_latex(text, md=True):
    "Replace OpenAI LaTeX codes with markdown-compatible ones"
    text = re.sub(r"\\\((.*?)\\\)", lambda o: f"${o.group(1)}$", text)
    res = re.sub(r"\\\[(.*?)\\\]", lambda o: f"$${o.group(1)}$$", text, flags=re.DOTALL)
    if md: res = display.Markdown(res)
    return res

### Creating messages

In [None]:
def mk_msg(content, role='user', **kwargs):
    "Helper to create a `dict` appropriate for a message. `kwargs` are added as key/value pairs to the message"
    if hasattr(content, 'content'): content,role = content.content,content.role
    if isinstance(content, ChatCompletion): return find_block(content)
    return dict(role=role, content=content, **kwargs)

In [None]:
prompt = "I'm Jeremy"
m = mk_msg(prompt)
r = cli.create(messages=[m], model=model, max_tokens=100)
r

In [None]:
msgs = [mk_msg(prompt), mk_msg(r), mk_msg('I forgot my name. Can you remind me please?')]
msgs

In [None]:
cli.create(messages=msgs, model=model, max_tokens=200)

## Client

In [None]:
#| exports
class Client:
    def __init__(self, model, cli=None):
        "Basic LLM messages client."
        self.model,self.use = model,usage(0,0)
        self.c = (cli or OpenAI()).chat.completions

In [None]:
c = Client(model)
c.use

In [None]:
#| exports
@patch
def _r(self:Client, r:ChatCompletion):
    "Store the result of the message and accrue total usage."
    self.result = r
    if getattr(r,'usage',None): self.use += r.usage
    return r

In [None]:
c._r(r)
c.use

In [None]:
#| export
def get_stream(r):
    for o in r:
        o = contents(o)
        if o and isinstance(o, str): yield(o)

In [None]:
#| exports
@patch
@delegates(Completions.create)
def __call__(self:Client,
             msgs:list, # List of messages in the dialog
             sp:str='', # System prompt
             maxtok=4096, # Maximum tokens
             stream:bool=False, # Stream response?
             **kwargs):
    "Make a call to LLM."
    if stream: kwargs['stream_options'] = {"include_usage": True}
    if sp: msgs = [mk_msg(sp, 'system')] + list(msgs)
    r = self.c.create(
        model=self.model, messages=msgs, max_tokens=maxtok, stream=stream, **kwargs)
    if not stream: return self._r(r)
    else: return get_stream(map(self._r, r))

In [None]:
msgs = [mk_msg('Hi')]

In [None]:
c(msgs)

In [None]:
c.use

In [None]:
for o in c(msgs, stream=True): print(o, end='')

In [None]:
c.use

## Tool use

In [None]:
def sums(
    a:int,  # First thing to sum
    b:int # Second thing to sum
) -> int: # The sum of the inputs
    "Adds a + b."
    print(f"Finding the sum of {a} and {b}")
    return a + b

In [None]:
#| export
def mk_openai_func(f): return dict(type='function', function=get_schema(f, 'parameters'))

In [None]:
#| export
def mk_tool_choice(f): return dict(type='function', function={'name':f})

In [None]:
sysp = "You are a helpful assistant. When using tools, be sure to pass all required parameters, at minimum."

In [None]:
a,b = 604542,6458932
pr = f"What is {a}+{b}?"
tools=[mk_openai_func(sums)]
tool_choice=mk_tool_choice("sums")

In [None]:
msgs = [mk_msg(pr)]
r = c(msgs, sp=sysp, tools=tools)
r

In [None]:
m = find_block(r)
m

In [None]:
tc = m.tool_calls
tc

In [None]:
func = tc[0].function
func

In [None]:
#| exports
def _mk_ns(*funcs:list[callable]) -> dict[str,callable]:
    "Create a `dict` of name to function in `funcs`, to use as a namespace"
    return {f.__name__:f for f in funcs}

In [None]:
#| exports
def call_func(fc:types.chat.chat_completion_message_tool_call.Function, # Function block from message
              ns:Optional[abc.Mapping]=None, # Namespace to search for tools, defaults to `globals()`
              obj:Optional=None # Object to search for tools
             ):
    "Call the function in the tool response `tr`, using namespace `ns`."
    if ns is None: ns=globals()
    if not isinstance(ns, abc.Mapping): ns = _mk_ns(*ns)
    func = getattr(obj, fc.name, None)
    if not func: func = ns[fc.name]
    return func(**ast.literal_eval(fc.arguments))

In [None]:
ns = _mk_ns(sums)
res = call_func(func, ns=ns)
res

In [None]:
#| exports
def mk_toolres(
    r:abc.Mapping, # Tool use request response
    ns:Optional[abc.Mapping]=None, # Namespace to search for tools
    obj:Optional=None # Class to search for tools
    ):
    "Create a `tool_result` message from response `r`."
    r = mk_msg(r)
    tcs = getattr(r, 'tool_calls', [])
    res = [r]
    for tc in (tcs or []):
        func = tc.function
        cts = str(call_func(func, ns=ns, obj=obj))
        res.append(mk_msg(str(cts), 'tool', tool_call_id=tc.id, name=func.name))
    return res

In [None]:
tr = mk_toolres(r, ns=ns)
tr

In [None]:
msgs += tr

In [None]:
res = c(msgs, sp=sysp, tools=tools)
res

In [None]:
class Dummy:
    def sums(
        self,
        a:int,  # First thing to sum
        b:int=1 # Second thing to sum
    ) -> int: # The sum of the inputs
        "Adds a + b."
        print(f"Finding the sum of {a} and {b}")
        return a + b

In [None]:
tools = [mk_openai_func(Dummy.sums)]

o = Dummy()
msgs = mk_toolres("I'm Jeremy")
r = c(msgs, sp=sysp, tools=tools)
msgs += mk_toolres(r, obj=o)
res = c(msgs, sp=sysp, tools=tools)
res

In [None]:
msgs

In [None]:
tools = [mk_openai_func(Dummy.sums)]

o = Dummy()
msgs = mk_toolres(pr)
r = c(msgs, sp=sysp, tools=tools)
msgs += mk_toolres(r, obj=o)
res = c(msgs, sp=sysp, tools=tools)
res

In [None]:
#| exports

def _mock_id(): return 'call_' + ''.join(choices(ascii_letters+digits, k=24))

def mock_tooluse(name:str, # The name of the called function
                 res,  # The result of calling the function
                 **kwargs): # The arguments to the function
    ""
    id = _mock_id()
    func = dict(arguments=json.dumps(kwargs), name=name)
    tc = dict(id=id, function=func, type='function')
    req = dict(content=None, role='assistant', tool_calls=[tc])
    resp = mk_msg('' if res is None else str(res), 'tool', tool_call_id=id, name=name)
    return [req,resp]

This function mocks the messages needed to implement tool use, for situations where you want to insert tool use messages into a dialog without actually calling into the model.

In [None]:
tu = mk_tooluse(name='sums', res=7063474, a=604542, b=6458932)
r = c([mk_msg(pr)]+tu, tools=tools)
r

## Chat

In [None]:
#| exports
class Chat:
    def __init__(self,
                 model:Optional[str]=None, # Model to use (leave empty if passing `cli`)
                 cli:Optional[Client]=None, # Client to use (leave empty if passing `model`)
                 sp='', # Optional system prompt
                 tools:Optional[list]=None,  # List of tools to make available
                 tool_choice:Optional[str]=None): # Forced tool choice
        "OpenAI chat client."
        assert model or cli
        self.c = (cli or Client(model))
        self.h,self.sp,self.tools,self.tool_choice = [],sp,tools,tool_choice
    
    @property
    def use(self): return self.c.use

In [None]:
sp = "Never mention what tools you use."
chat = Chat(model, sp=sp)
chat.c.use, chat.h

In [None]:
#| exports
@patch
@delegates(Completions.create)
def __call__(self:Chat,
             pr=None,  # Prompt / message
             stream:bool=False, # Stream response?
             **kwargs):
    "Add prompt `pr` to dialog and get a response"
    if isinstance(pr,str): pr = pr.strip()
    if pr: self.h.append(mk_msg(pr))
    if self.tools: kwargs['tools'] = [mk_openai_func(o) for o in self.tools]
    if self.tool_choice: kwargs['tool_choice'] = mk_tool_choice(tool_choice)
    res = self.c(self.h, sp=self.sp, stream=stream, **kwargs)
    self.h += mk_toolres(res, ns=self.tools, obj=self)
    return res

In [None]:
chat("I'm Jeremy")
chat("What's my name?")

In [None]:
chat = Chat(model, sp=sp)
for o in chat("I'm Jeremy", stream=True):
    o = contents(o)
    if o and isinstance(o, str): print(o, end='')

### Chat tool use

In [None]:
pr = f"What is {a}+{b}?"
pr

In [None]:
chat = Chat(model, sp=sp, tools=[sums])
r = chat(pr)
r

In [None]:
chat()

## Images

As everyone knows, when testing image APIs you have to use a cute puppy.

In [None]:
# Image is Cute_dog.jpg from Wikimedia
fn = Path('samples/puppy.jpg')
display.Image(filename=fn, width=200)

In [None]:
img = fn.read_bytes()

```js
{
  "type": "image_url",
  "image_url": {
    "url": f"data:image/jpeg;base64,{base64_image}"
  }
}
```

In [None]:
#| exports
def img_msg(data:bytes)->dict:
    "Convert image `data` into an encoded `dict`"
    img = base64.b64encode(data).decode("utf-8")
    mtype = mimetypes.types_map['.'+imghdr.what(None, h=data)]
    r = {'url': f"data:{mtype};base64,{img}"}
    return {'type': "image_url", "image_url": r}

In [None]:
#| exports
def text_msg(s:str)->dict:
    "Convert `s` to a text message"
    return {"type": "text", "text": s}

In [None]:
q = "In brief, what color flowers are in this image?"
msg = mk_msg([img_msg(img), text_msg(q)])

In [None]:
c([msg])

In [None]:
#| exports
def _mk_content(src):
    "Create appropriate content data structure based on type of content"
    if isinstance(src,str): return text_msg(src)
    if isinstance(src,bytes): return img_msg(src)
    return src

There's not need to manually choose the type of message, since we figure that out from the data of the source data.

In [None]:
_mk_content('Hi')

In [None]:
#| export
def mk_msg(content, # A string, list, or dict containing the contents of the message
           role='user', # Must be 'user' or 'assistant'
           **kwargs):
    "Helper to create a `dict` appropriate for a message. `kwargs` are added as key/value pairs to the message"
    if hasattr(content, 'content'): content,role = content.content,content.role
    if isinstance(content, ChatCompletion): return find_block(content)
    if content is not None and not isinstance(content, list): content=[content]
    content = [_mk_content(o) for o in content] if content else ''
    return dict(role=role, content=content, **kwargs)

In [None]:
c([mk_msg([img, q])])

## Export -

In [None]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()