In [None]:
#|default_exp funccall

# funccall source

In [None]:
#| exports
import inspect
from fastcore.utils import *
from fastcore.docments import docments

In [None]:
#| export
empty = inspect.Parameter.empty

## Function calling

Many LLMs do function calling (aka tool use) by taking advantage of JSON schema.

We'll use [docments](https://fastcore.fast.ai/docments.html) to make getting JSON schema from Python functions as ergonomic as possible. Each parameter (and the return value) should have a type, and a docments comment with the description of what it is. Here's an example:

In [None]:
def silly_sum(
    a:int, # First thing to sum
    b:int=1, # Second thing to sum
    c:list[int]=None, # A pointless argument
) -> int: # The sum of the inputs
    "Adds a + b."
    return a + b

This is what `docments` makes of that:

In [None]:
d = docments(silly_sum, full=True)
d

```json
{ 'a': { 'anno': <class 'int'>,
         'default': <class 'inspect._empty'>,
         'docment': 'First thing to sum'},
  'b': {'anno': <class 'int'>, 'default': 1, 'docment': 'Second thing to sum'},
  'c': {'anno': list[int], 'default': None, 'docment': 'A pointless argument'},
  'return': { 'anno': <class 'int'>,
              'default': <class 'inspect._empty'>,
              'docment': 'The sum of the inputs'}}
```

Note that this is an [AttrDict](https://fastcore.fast.ai/basics.html#attrdict) so we can treat it like an object, *or* a dict:

In [None]:
d.a.docment, d['a']['anno']

('First thing to sum', int)

In [None]:
#| exports
def _types(t:type)->tuple[str,Optional[str]]:
    "Tuple of json schema type name and (if appropriate) array item name."
    if t is empty: raise TypeError('Missing type')
    tmap = {int:"integer", float:"number", str:"string", bool:"boolean", list:"array", dict:"object"}
    tmap.update({k.__name__: v for k, v in tmap.items()})
    if getattr(t, '__origin__', None) in (list,tuple): return "array", tmap.get(t.__args__[0].__name__, "object")
    elif isinstance(t, str): return tmap.get(t, "object"), None
    else: return tmap.get(t.__name__, "object"), None

This internal function is needed to convert Python types into JSON schema types.

In [None]:
_types(list[int]), _types(int), _types('int')

(('array', 'integer'), ('integer', None), ('integer', None))

Will also convert custom types to the `object` type.

In [None]:
class Custom: a: int
_types(list[Custom]), _types(Custom)

(('array', 'object'), ('object', None))

In [None]:
#| exports
def _param(name, info):
    "json schema parameter given `name` and `info` from docments full dict."
    paramt,itemt = _types(info.anno)
    pschema = dict(type=paramt, description=info.docment or "")
    if itemt: pschema["items"] = {"type": itemt}
    if info.default is not empty: pschema["default"] = info.default
    return pschema

This private function converts a key/value pair from the `docments` structure into the `dict` that will be needed for the schema.

In [None]:
n,o = first(d.items())
print(n,'//', o)
_param(n, o)

a // {'docment': 'First thing to sum', 'anno': <class 'int'>, 'default': <class 'inspect._empty'>}


{'type': 'integer', 'description': 'First thing to sum'}

In [None]:
#| export
def _get_nested_schema(obj):
    "Generate nested JSON schema for a class or function"
    d = docments(obj, full=True)
    props,req,defs = {},{},{}
    for n,o in d.items():
        if n == 'return': continue
        props[n] = p = _param(n,o)
        if o.default is empty: req[n] = True
        t = o.anno.__args__[0] if hasattr(o.anno, '__args__') else o.anno
        if isinstance(t, type) and not issubclass(t, (int, float, str, bool)):
            defs[t.__name__] = _get_nested_schema(t)
            p['items' if p['type']=='array' else '$ref'] = f'#/$defs/{t.__name__}'
    res = dict(type='object', properties=props)
    if isinstance(obj, type): res['title'] = obj.__name__
    if req: res['required'] = list(req)
    if defs: res['$defs'] = defs
    return res

In [None]:
#| exports
def get_schema(f:callable, pname='input_schema')->dict:
    "Generate JSON schema for a class, function, or method"
    schema = _get_nested_schema(f)
    desc = f.__doc__
    assert desc, "Docstring missing!"
    d = docments(f, full=True)
    ret = d.pop('return')
    if ret.anno is not empty: desc += f'\n\nReturns:\n- type: {_types(ret.anno)[0]}'
    return {"name": f.__name__, "description": desc, pname: schema}

Putting this all together, we can now test getting a schema from `silly_sum`. The tool use spec doesn't support return annotations directly, so we put that in the description instead.

In [None]:
s = get_schema(silly_sum)
desc = s.pop('description')
print(desc)
s

Adds a + b.

Returns:
- type: integer


{'name': 'silly_sum',
 'input_schema': {'type': 'object',
  'properties': {'a': {'type': 'integer', 'description': 'First thing to sum'},
   'b': {'type': 'integer',
    'description': 'Second thing to sum',
    'default': 1},
   'c': {'type': 'array',
    'description': 'A pointless argument',
    'items': {'type': 'integer'},
    'default': None}},
  'required': ['a']}}

This also works with string annotations, e.g:

In [None]:
def silly_test(
    a: 'int',  # quoted type hint
):
    "Mandatory docstring"
    return a

get_schema(silly_test)

{'name': 'silly_test',
 'description': 'Mandatory docstring',
 'input_schema': {'type': 'object',
  'properties': {'a': {'type': 'integer', 'description': 'quoted type hint'}},
  'required': ['a']}}

`get_schema` also handles more complicated structures such as nested classes. This is useful for things like structured outputs.

In [None]:
class Turn:
    "Turn between two speakers"
    def __init__(
        self,
        speaker_a:int, # First speaker to speak
        speaker_b:int  # Second speaker to speak
    ): store_attr()
    
    __repr__ = basic_repr(['speaker_a', 'speaker_b'])

class Conversation:
    "A conversation between two speakers"
    def __init__(
        self,
        turn:list[Turn], # Turns of the conversation
    ): store_attr()
    
    __repr__ = basic_repr('turn')

s = get_schema(Conversation)
desc = s.pop('description')
print(desc)
s

A conversation between two speakers


{'name': 'Conversation',
 'input_schema': {'type': 'object',
  'properties': {'turn': {'type': 'array',
    'description': 'Turns of the conversation',
    'items': '#/$defs/Turn'}},
  'title': 'Conversation',
  'required': ['turn'],
  '$defs': {'Turn': {'type': 'object',
    'properties': {'speaker_a': {'type': 'integer',
      'description': 'First speaker to speak'},
     'speaker_b': {'type': 'integer',
      'description': 'Second speaker to speak'}},
    'title': 'Turn',
    'required': ['speaker_a', 'speaker_b']}}}}

::: {.callout-note}
Nested structures inside dictionaries such as `dict[str, Turn]` are currently not supported.
:::

### Python tool

In language model clients it's often useful to have a 'code interpreter' -- this is something that runs code, and generally outputs the result of the last expression (i.e like IPython or Jupyter). 

In this section we'll create the `python` function, which executes a string as Python code, with an optional timeout. If the last line is an expression, we'll return that -- just like in IPython or Jupyter, but without needing them installed.

In [None]:
#| exports
import ast, time, signal, traceback
from fastcore.utils import *

In [None]:
#| exports
def _copy_loc(new, orig):
    "Copy location information from original node to new node and all children."
    new = ast.copy_location(new, orig)
    for field, o in ast.iter_fields(new):
        if isinstance(o, ast.AST): setattr(new, field, _copy_loc(o, orig))
        elif isinstance(o, list): setattr(new, field, [_copy_loc(value, orig) for value in o])
    return new

This is an internal function that's needed for `_run` to ensure that location information is available in the abstract syntax tree (AST), since otherwise python complains.

In [None]:
#| exports
def _run(code:str ):
    "Run `code`, returning final expression (similar to IPython)"
    tree = ast.parse(code)
    last_node = tree.body[-1] if tree.body else None
    
    # If the last node is an expression, modify the AST to capture the result
    if isinstance(last_node, ast.Expr):
        tgt = [ast.Name(id='_result', ctx=ast.Store())]
        assign_node = ast.Assign(targets=tgt, value=last_node.value)
        tree.body[-1] = _copy_loc(assign_node, last_node)

    compiled_code = compile(tree, filename='<ast>', mode='exec')
    namespace = {}
    stdout_buffer = io.StringIO()
    saved_stdout = sys.stdout
    sys.stdout = stdout_buffer
    try: exec(compiled_code, namespace)
    finally: sys.stdout = saved_stdout
    _result = namespace.get('_result', None)
    if _result is not None: return _result
    return stdout_buffer.getvalue().strip()

This is the internal function used to actually run the code -- we pull off the last AST to see if it's an expression (i.e something that returns a value), and if so, we store it to a special `_result` variable so we can return it.

In [None]:
_run('import math;math.factorial(12)')

479001600

In [None]:
_run('print(1+1)')

'2'

We now have the machinery needed to create our `python` function.

In [None]:
#| exports
def python(code, # Code to execute
           timeout=5 # Maximum run time in seconds before a `TimeoutError` is raised
          ): # Result of last node, if it's an expression, or `None` otherwise
    """Executes python `code` with `timeout` and returning final expression (similar to IPython).
    Raised exceptions are returned as a string, with a stack trace."""
    def handler(*args): raise TimeoutError()
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout)
    try: return _run(code)
    except Exception as e: return traceback.format_exc()
    finally: signal.alarm(0)

There's no builtin security here -- you should generally use this in a sandbox, or alternatively prompt before running code. It can handle multiline function definitions, and pretty much any other normal Python syntax.

In [None]:
python("""def factorial(n):
    if n == 0 or n == 1: return 1
    else: return n * factorial(n-1)
factorial(5)""")

120

If the code takes longer than `timeout` then it raises a `TimeoutError`.

In [None]:
try: python('import time; time.sleep(10)', timeout=1)
except TimeoutError: print('Timed out')

## Export -

In [None]:
#|hide
#|eval: false
from nbdev.doclinks import nbdev_export
nbdev_export()