# Function Calling

In [100]:
import os
import re
import sys
import json
import operator
from pprint import pprint
from datetime import datetime
from dotenv import load_dotenv
from typing import  Optional, Union
from langchain_community.llms.sambanova import SambaStudio, Sambaverse
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate
from langchain_core.tools import StructuredTool
from langchain_core.tools import ToolException
from langchain_core.tools import tool
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import Tool
from langchain_experimental.utilities import PythonREPL
from langchain_core.runnables import Runnable, RunnableLambda, RunnablePassthrough

current_dir = os.getcwd()
kit_dir = os.path.abspath(os.path.join(current_dir, "..")) 
repo_dir = os.path.abspath(os.path.join(kit_dir, ".."))

sys.path.append(kit_dir)
sys.path.append(repo_dir)

load_dotenv(os.path.join(repo_dir, ".env")) 



## Tools Definitions

### Basic tools

#### Get time

In [2]:
# tool schema
class GetTimeSchema(BaseModel):
    """Returns current date, current time or both."""
    kind: Optional[str] = Field(description='kind of information to retrieve "date", "time" or "both"')

In [3]:
# definition using @tool decorator
@tool(args_schema=GetTimeSchema)
def get_time(kind: str = "both") -> str:
    """Returns current date, current time or both.

    Args:
        kind: date, time or both
    """
    if kind == "date":
        date = datetime.now().strftime("%d/%m/%Y")
        return f"Current date: {date}"
    elif kind == "time":
        time = datetime.now().strftime("%H:%M:%S")
        return f"Current time: {time}"
    else:
        date = datetime.now().strftime("%d/%m/%Y")
        time = datetime.now().strftime("%H:%M:%S")
        return f"Current date: {date}, Current time: {time}"


In [4]:
get_time.invoke({"kind":"time"})

'Current time: 13:18:43'

In [5]:
get_time.get_input_schema().schema()

{'title': 'GetTimeSchema',
 'description': 'Returns current date, current time or both.',
 'type': 'object',
 'properties': {'kind': {'title': 'Kind',
   'description': 'kind of information to retrieve "date", "time" or "both"',
   'type': 'string'}}}

### Customized error handling tools

#### Calculator

In [6]:
# tool schema
class CalculatorSchema(BaseModel):
    """allow calculation of only basic operations: + - * and /
    with a string input expression """
    expression: str = Field(..., description="expression to calculate, example '12 * 3'")

In [36]:
#function to use in the tool
def calculator(expression: str ) -> Union[str, int, float]:
    """
    allow calculation of basic operations
    with a string input expression
    Args:
        expression: expression to calculate 
    """
    ops = {
        '+': operator.add,
        '-': operator.sub,
        '*': operator.mul,
        'x': operator.mul,
        'X': operator.mul,
        '÷': operator.truediv,
        '/': operator.truediv
    }
    tokens = re.findall(r'\d+\.?\d*|\+|\-|\*|\/|÷|x|X', expression)
    
    if not tokens:
        raise ToolException(f"Invalid expression '{expression}', should only contain one of the following operators + - * and /")
    
    current_value = float(tokens.pop(0))
    
    while tokens:
        # The next token should be an operator
        op = tokens.pop(0)
        
        # The next token should be a number
        if not tokens:
            raise ToolException(f"Incomplete expression '{expression}'")
        try:
            next_value = float(tokens.pop(0))
            
        except ValueError:
            raise ToolException("Invalid number format")
        
        except:
            raise ToolException("Invalid operation")
        
        # check division by 0
        if op in ['/','÷'] and next_value == 0:
            raise ToolException("cannot divide by 0")
        
        current_value = ops[op](current_value, next_value)
    
    return current_value


# tool error handler
def _handle_error(error: ToolException) -> str:
    return f"The following errors occurred during Calculator tool execution: `{error.args}`"


# tool definition
calculator = StructuredTool.from_function(
    func=calculator,
    args_schema=CalculatorSchema,
    handle_tool_error=_handle_error,#True,
)

In [37]:
calculator.invoke("7 / 0")

"The following errors occurred during Calculator tool execution: `('cannot divide by 0',)`"

In [38]:
calculator.get_input_schema().schema()

{'title': 'CalculatorSchema',
 'description': 'allow calculation of only basic operations: + - * and /\nwith a string input expression ',
 'type': 'object',
 'properties': {'expression': {'title': 'Expression',
   'description': "expression to calculate, example '12 * 3'",
   'type': 'string'}},
 'required': ['expression']}

### Langchain Tools

#### Python repl

In [39]:
# tool schema
class ReplSchema(BaseModel):
    "A Python shell. Use this to execute python commands. Input should be a valid python commands and expressions. If you want to see the output of a value, you should print it out with `print(...), if you need a specific module you should import it`."
    command: str = Field(..., description="python code to execute to evaluate")

In [40]:
# tool definition
python_repl = PythonREPL()
python_repl = Tool(
    name="python_repl",
    description="A Python shell. Use this to execute python commands. Input should be a valid python command. If you want to see the output of a value, you should print it out with `print(...)`.",
    func=python_repl.run,
    args_schema=ReplSchema
)

In [41]:
python_repl.invoke({"command" : "for i in range(0,5):\n\tprint(i)"})

'0\n1\n2\n3\n4\n'

In [42]:
python_repl.get_input_schema().schema()

{'title': 'ReplSchema',
 'description': 'A Python shell. Use this to execute python commands. Input should be a valid python commands and expressions. If you want to see the output of a value, you should print it out with `print(...), if you need a specific module you should import it`.',
 'type': 'object',
 'properties': {'command': {'title': 'Command',
   'description': 'python code to execute to evaluate',
   'type': 'string'}},
 'required': ['command']}

#### SQL calling

### Default response tool

In [139]:
# tool schema
class ConversationalResponse(BaseModel):
    "Respond conversationally only if no other tools should be called for a given query, or if you have a final answer."
    response: str = Field(..., description="Conversational response to the user.")
    
ConversationalResponse.schema()

{'title': 'ConversationalResponse',
 'description': 'Respond conversationally only if no other tools should be called for a given query, or if you have a final answer.',
 'type': 'object',
 'properties': {'response': {'title': 'Response',
   'description': 'Conversational response to the user.',
   'type': 'string'}},
 'required': ['response']}

In [140]:
def get_tools_schemas(tools: Union[Tool, list] = None, default: Union[Tool, BaseModel]= None):
    
    if tools is None:
        pass
    elif isinstance(tools, Tool):
        tools = [tools]
    tools_schemas = []
    
    for tool in tools:
        tool_schema = tool.get_input_schema().schema()
        schema = {"name": tool.name, "description": tool_schema["description"], "properties": tool_schema["properties"]}
        if "required" in schema:
            schema["required"] = tool_schema["required"]
        tools_schemas.append(schema)  
        
    if default is not None:
        if isinstance(default, Tool):
            tool_schema=default.get_input_schema().schema()
        else:
            tool_schema = default.schema()
        schema = {"name": tool_schema["title"], "description": tool_schema["description"], "properties": tool_schema["properties"]}
        if "required" in schema:
            schema["required"] = tool_schema["required"]
        tools_schemas.append(schema)
              
    return tools_schemas

In [141]:
tools = [get_time, calculator, python_repl]

In [142]:
tools_schemas = get_tools_schemas(tools, default = ConversationalResponse)
tools_schemas = "\n".join([json.dumps(tool, indent=2) for tool  in tools_schemas])
pprint(tools_schemas)

('{\n'
 '  "name": "get_time",\n'
 '  "description": "Returns current date, current time or both.",\n'
 '  "properties": {\n'
 '    "kind": {\n'
 '      "title": "Kind",\n'
 '      "description": "kind of information to retrieve \\"date\\", \\"time\\" '
 'or \\"both\\"",\n'
 '      "type": "string"\n'
 '    }\n'
 '  }\n'
 '}\n'
 '{\n'
 '  "name": "calculator",\n'
 '  "description": "allow calculation of only basic operations: + - * and '
 '/\\nwith a string input expression ",\n'
 '  "properties": {\n'
 '    "expression": {\n'
 '      "title": "Expression",\n'
 '      "description": "expression to calculate, example \'12 * 3\'",\n'
 '      "type": "string"\n'
 '    }\n'
 '  }\n'
 '}\n'
 '{\n'
 '  "name": "python_repl",\n'
 '  "description": "A Python shell. Use this to execute python commands. Input '
 'should be a valid python commands and expressions. If you want to see the '
 'output of a value, you should print it out with `print(...), if you need a '
 'specific module you should i

## Function Calling 

### LLM definition

In [349]:
llm = Sambaverse(
    sambaverse_model_name="Meta/Meta-Llama-3-70B-Instruct",
    model_kwargs={       
            "max_tokens_to_generate": 2048,
            "select_expert": "Meta-Llama-3-70B-Instruct",
            "process_prompt": True,
            "temperature": 0.01
        }
)

# llm= SambaStudio(
#     model_kwargs={       
#             "max_tokens_to_generate": 2048,
#             "select_expert": "Meta-Llama-3-8B-Instruct",
#             "process_prompt": False
#         }
# )


### Chain

In [351]:
function_calling_prompt =  """<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an helpful assistant and you have access to the following tools:

{tools}

You must always select one or more of the above tools and answer with only a list of JSON objects matching the following schema:

```json
[{{
  "tool": <name of the selected tool>,
  "tool_input": <parameters for the selected tool, matching the tool's JSON schema>
}}]
```

Think step by step
Do not call a tool if the input depends on another tool output you dont have yet
Do not try to answer until you get tools output, if you dont have an answer yet you can continue calling tools

<|eot_id|><|start_header_id|>user<|end_header_id|>
User: {usr_msg} 
<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Assistant:""" 
#You must wait to have a tools output before generating a final response, or calling other tools
function_calling_prompt_template = PromptTemplate.from_template(function_calling_prompt)

In [352]:
def jsonFinder(input_string):
    json_pattern = re.compile(r'(\{.*\}|\[.*\])', re.DOTALL)
    # Find the first JSON structure in the string
    json_match = json_pattern.search(input_string)
    if json_match:
        json_str = json_match.group(1)
        try: 
            json.loads(json_str)
        except:
            json_str=json_str.replace("'", '"')
    else:
        json_str = None #TODO convert to a conversational tool
    return(json_str)

In [353]:
parser = RunnableLambda(jsonFinder) | JsonOutputParser()

In [354]:
prompt_template = {"tools" : lambda x:tools_schemas, "usr_msg" : RunnablePassthrough()} | function_calling_prompt_template

In [355]:
default_fc_chain = prompt_template | llm | parser

### Tool execution

In [356]:
tools_map = {
    "get_time": get_time, 
    "calculator": calculator, 
    "python_repl": python_repl,
}

def execute(tools):
    tool_msg = "Tool '{name}'response: {response}"
    tools_msgs = []
    if len(tools)==1 and tools[0]["tool"].lower() == "conversationalresponse":
        final_answer = True
        return final_answer, tools[0]["tool_input"]["response"]
    for tool in tools:
        final_answer = False
        if tool["tool"].lower()!="conversationalresponse":
            response = tools_map[tool["tool"].lower()](tool["tool_input"])
            tools_msgs.append(tool_msg.format(name=tool["tool"], response=str(response)))
    return final_answer, tools_msgs

In [357]:
query = "hi"
response_tools = default_fc_chain.invoke(query)
print(response_tools)
final_answer, response_tools = execute(response_tools)
print(response_tools)

[{'tool': 'ConversationalResponse', 'tool_input': {'response': 'Hi! How can I assist you today?'}}]
Hi! How can I assist you today?


In [358]:
query = "it is time to go to sleep?"
response_tools = default_fc_chain.invoke(query)
pprint(response_tools)
final_answer, response_tools = execute(response_tools)
print(response_tools)

[{'tool': 'get_time', 'tool_input': {'kind': 'time'}}]
["Tool 'get_time'response: Current time: 16:35:45"]


In [359]:
query = "whats is 347 min in hours and minutes?"
response_tools = default_fc_chain.invoke(query)
print(response_tools)
final_answer, response_tools = execute(response_tools)
print(response_tools)

[{'tool': 'calculator', 'tool_input': {'expression': '347 / 60'}}]
["Tool 'calculator'response: 5.783333333333333"]


In [366]:
query =  "is this word is a palindrome? 'saippuakivikauppias'"
response_tools = default_fc_chain.invoke(query)
pprint(response_tools)
final_answer, response_tools = execute(response_tools)
print(response_tools)

[{'tool': 'python_repl',
  'tool_input': {'command': "print('saippuakivikauppias' "
                            "=='saippuakivikauppias'[::-1])"}}]
["Tool 'python_repl'response: True\n"]


In [367]:
query = "sort this list of elements alphabetically ['screwdriver', 'pliers', 'hammer']"
response_tools = default_fc_chain.invoke(query)
pprint(response_tools)
final_answer, response_tools = execute(response_tools)
print(response_tools)

[{'tool': 'python_repl',
  'tool_input': {'command': "print(sorted(['screwdriver', 'pliers', "
                            "'hammer']))"}}]
["Tool 'python_repl'response: ['hammer', 'pliers', 'screwdriver']\n"]


## Pipeline function calling generation

In [370]:
query =   "it is time to go to sleep, how many hours last to 10pm?"
response_tools = default_fc_chain.invoke(query)
final_answer, tools_msgs = execute(response_tools)
print(response_tools)
print(tools_msgs)

[{'tool': 'get_time', 'tool_input': {'kind': 'time'}}]
["Tool 'get_time'response: Current time: 16:38:31"]


In [375]:
response_prompt =  """|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an helpful assistant and you have access to the following tools:

{tools}

You must always select one or more of the above tools and answer with only a list of JSON objects matching the following schema:

```json
[{{
  "tool": <name of the selected tool>,
  "tool_input": <parameters for the selected tool, matching the tool's JSON schema>
}}]
```

Think step by step
Do not call a tool if the input depends on another tool output you dont have yet
Do not try to answer until you get tools output, if you dont have an answer yet you can continue calling tools

<|eot_id|><|start_header_id|>user<|end_header_id|>
User: {usr_msg} 
<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Assistant: {response_tools}

<|eot_id|><|start_header_id|>tools<|end_header_id|>

{tools_msgs}

<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Assistant: 
"""
response_prompt_template = PromptTemplate.from_template(response_prompt)

new_prompt_template = {"tools" : lambda x:tools_schemas, "usr_msg" : lambda x:query, "response_tools": lambda x:response_tools, "tools_msgs":RunnablePassthrough()} | response_prompt_template
new_chain =  new_prompt_template | llm | parser



In [376]:
print(str(new_chain.invoke(tools_msgs)))

OutputParserException: Invalid json output: [{"tool": "calculator", "tool_input": {"expression": "22 -"+ "16"}}]

In [None]:
json_correction_prompt = """|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a yaml format corrector tool<|eot_id|><|start_header_id|>user<|end_header_id|>
fix the following yaml file: {yaml} 
<|eot_id|><|start_header_id|>assistant<|end_header_id|>
fixed yaml:
"""