<a href="https://colab.research.google.com/github/githubpradeep/notebooks/blob/main/function_calling_phi_fc.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
# Installs Unsloth, Xformers (Flash Attention) and all other packages!
!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"

# We have to check which Torch version for Xformers (2.3 -> 0.0.27)
from torch import __version__; from packaging.version import Version as V
xformers = "xformers==0.0.27" if V(__version__) < V("2.4.0") else "xformers"
!pip install --no-deps {xformers} trl peft accelerate bitsandbytes triton

In [1]:
!pip install  langchain==0.1.9

Collecting langchain==0.1.9
  Downloading langchain-0.1.9-py3-none-any.whl.metadata (13 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain==0.1.9)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting langchain-community<0.1,>=0.0.21 (from langchain==0.1.9)
  Downloading langchain_community-0.0.38-py3-none-any.whl.metadata (8.7 kB)
Collecting langchain-core<0.2,>=0.1.26 (from langchain==0.1.9)
  Downloading langchain_core-0.1.52-py3-none-any.whl.metadata (5.9 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain==0.1.9)
  Downloading marshmallow-3.22.0-py3-none-any.whl.metadata (7.2 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain==0.1.9)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting packaging<24.0,>=23.2 (from langchain-core<0.2,>=0.1.26->langchain==0.1.9)
  Downloading packaging-23.2-py3-none-any.whl.metadata (3.2 kB)
Collecting mypy-extension

In [1]:
template = '''
Role: |
  You are a function calling AI agent with self-recursion.
  You can call only one function at a time and analyse data you get from function response.
  You are provided with function signatures within <tools></tools> XML tags.
  The current date is: {date}.
Objective: |
  You may use agentic frameworks for reasoning and planning to help with user query.
  Please call a function and wait for function results to be provided to you in the next iteration.
  Don't make assumptions about what values to plug into function arguments.
  Once you have called a function, results will be fed back to you within <tool_response></tool_response> XML tags.
  Don't make assumptions about tool results if <tool_response> XML tags are not present since function hasn't been executed yet.
  Analyze the data once you get the results and call another function.
  At each iteration please continue adding the your analysis to previous summary.
  Your final response should directly answer the user query with an anlysis or summary of the results of function calls.
Tools: |
  Here are the available tools:
  <tools> {tools} </tools>
  If the provided function signatures doesn't have the function you must call, you may write executable python code in markdown syntax and call code_interpreter() function as follows:
  <tool_call>
  {{'arguments': {{'code_markdown': <python-code>, 'name': 'code_interpreter'}}}}
  </tool_call>
  Make sure that the json object above with code markdown block is parseable with json.loads() and the XML block with XML ElementTree.
Examples: |
  Here are some example usage of functions:
  {examples}
Schema: |
  Use the following pydantic model json schema for each tool call you will make:
  {schema}
Instructions: |
  At the very first turn you don't have <tool_results> so you shouldn't not make up the results.
  Please keep a running summary with analysis of previous function results and summaries from previous iterations.
  Do not stop calling functions until the task has been accomplished or you've reached max iteration of 10.
  Calling multiple functions at once can overload the system and increase cost so call one function at a time please.
  If you plan to continue with analysis, always call another function.
  For each function call return a json object with function name and arguments within <tool_call></tool_call> XML tags as follows:
  <tool_call>
  {{'arguments': <args-dict>, 'name': <function-name>}}
  </tool_call>'''

from pydantic import BaseModel
from typing import List, Dict, Literal, Optional
import yaml
import json
import datetime
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool

class FunctionCall(BaseModel):
    arguments: dict
    """
    The arguments to call the function with, as generated by the model in JSON
    format. Note that the model does not always generate valid JSON, and may
    hallucinate parameters not defined by your function schema. Validate the
    arguments in your code before calling your function.
    """

    name: str
    """The name of the function to call."""

class FunctionDefinition(BaseModel):
    name: str
    description: Optional[str] = None
    parameters: Optional[Dict[str, object]] = None

class FunctionSignature(BaseModel):
    function: FunctionDefinition
    type: Literal["function"]

def format_prompt(prompt_schema, variables) -> str:
        formatted_prompt = ""
        for field, value in prompt_schema.items():
            if field == "Examples" and variables.get("examples") is None:
                continue
            formatted_value = value.format(**variables)
            if field == "Instructions":
                formatted_prompt += f"{formatted_value}"
            else:
                formatted_value = formatted_value.replace("\n", " ")
                formatted_prompt += f"{formatted_value}"
        return formatted_prompt

def generate_prompt(user_prompt, tools, num_fewshot=None):

        prompt_schema =  yaml.safe_load(template)


        schema_json = json.loads(FunctionCall.schema_json())
        #schema = schema_json.get("properties", {})

        variables = {
            "date": datetime.date.today(),
            "tools": tools,
            "schema": schema_json
        }
        sys_prompt = format_prompt(prompt_schema, variables)

        prompt = [
                {'content': sys_prompt, 'role': 'system'}
            ]
        prompt.extend(user_prompt)
        return prompt

import ast
import os
import re
import json
import datetime
import xml.etree.ElementTree as ET
import re
def get_assistant_message(completion, eos_token):
    """define and match pattern to find the assistant message"""
    completion = completion.strip()
    assistant_pattern = re.compile(r'<\|assistant\|>((?:(?!<\|assistant\|>).)*)$', re.DOTALL)

    assistant_match = assistant_pattern.search(completion)
    if assistant_match:
        assistant_content = assistant_match.group(1).strip()
        return assistant_content.replace(eos_token, "")
    else:
        assistant_content = None
        return assistant_content

def validate_and_extract_tool_calls(assistant_content):
    validation_result = False
    tool_calls = []
    error_message = None
    try:
        # wrap content in root element
        xml_root_element = f"<root>{assistant_content}</root>"
        root = ET.fromstring(xml_root_element)
        # extract JSON data
        for element in root.findall(".//tool_call"):
            json_text = element.text.strip()
            json_data = None
            try:
                # Prioritize json.loads for better error handling
                json_data = json.loads(json_text)
            except json.JSONDecodeError as json_err:
                try:
                    # Fallback to ast.literal_eval if json.loads fails
                    json_data = ast.literal_eval(json_text)
                except (SyntaxError, ValueError) as eval_err:
                    continue
            if json_data is not None:
                tool_calls.append(json_data)
    except:
        pass


    # Return default values if no valid data is extracted
    return tool_calls

In [2]:
from unsloth import FastLanguageModel
import torch
max_seq_length = 2048 # Choose any! We auto support RoPE Scaling internally!
dtype = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.



from unsloth import FastLanguageModel
model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "Neuranest/Phi-3.5-mini-instruct-hfc", # YOUR MODEL YOU USED FOR TRAINING
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
)
FastLanguageModel.for_inference(model) # Enable native 2x faster inference


from unsloth.chat_templates import get_chat_template
tokenizer = get_chat_template(
            tokenizer,
            chat_template = "phi-3", # Supports zephyr, chatml, mistral, llama, alpaca, vicuna, vicuna_old, unsloth
            mapping = {"role" : "role", "content" : "content", "user" : "human", "assistant" : "gpt"}, # ShareGPT style
            #mapping = {"role" : "from", "content" : "value", "user" : "human", "assistant" : "gpt"}, # ShareGPT style
        )

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.
==((====))==  Unsloth 2024.8: Fast Llama patching. Transformers = 4.44.2.
   \\   /|    GPU: Tesla T4. Max memory: 14.748 GB. Platform = Linux.
O^O/ \_/ \    Pytorch: 2.4.0+cu121. CUDA = 7.5. CUDA Toolkit = 12.1.
\        /    Bfloat16 = FALSE. FA [Xformers = 0.0.27.post2. FA2 = False]
 "-____-"     Free Apache license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


Unsloth 2024.8 patched 32 layers with 32 QKV layers, 32 O layers and 32 MLP layers.


In [3]:
def generate(prompt):
    inputs = tokenizer.apply_chat_template(
        prompt,#dataset[5]['conversations'][:2],
        tokenize = True,
        add_generation_prompt = True, # Must add for generation
        return_tensors = "pt",
    ).to("cuda")

    from transformers import TextStreamer
    text_streamer = TextStreamer(tokenizer, skip_prompt = True)
    outputs = model.generate(input_ids = inputs, #streamer = text_streamer,
            use_cache = True,
            max_new_tokens=1500,

    eos_token_id=32007
    )

#    completion = tokenizer.decode(outputs[0][inputs.shape[-1]:], skip_special_tokens=False, clean_up_tokenization_space=True)
    completion = tokenizer.decode(outputs[0], skip_special_tokens=False, clean_up_tokenization_space=True)

    return completion

In [4]:
def execute_function_call(tool_call):
    function_name = tool_call.get("name")
    function_to_call = globals()[function_name]

    function_args = tool_call.get("arguments", {})

    print(f"Invoking function call {function_name} ...{function_args.values()}")
    function_response = function_to_call(*function_args.values())
    results_dict = f'{{"name": "{function_name}", "content": {function_response}}}'
    return results_dict

def generate_function_call(query, tools,  max_depth=5):
        try:
            depth = 0
            user_message = f"{query}\nThis is the first turn and you don't have <tool_results> to analyze yet"
            chat = [{"role": "user", "content": user_message}]
            prompt = generate_prompt(chat, tools)
            completion = generate(prompt)


            def recursive_loop(prompt, completion, depth):
                nonlocal max_depth
                assistant_message = get_assistant_message(completion, '<|end|>')
                tool_calls = validate_and_extract_tool_calls(assistant_message)
                prompt.append({"role": "assistant", "content": assistant_message})
                print(f"Assistant Message:\n{assistant_message}")
                tool_message = f"Agent iteration {depth} to assist with user query: {query}\n"
                if tool_calls:


                    for tool_call in tool_calls:

                        try:
                            function_response = execute_function_call(tool_call)
                            tool_message += f"<tool_response>\n{function_response}\n</tool_response>\n"
                            print(f"Here's the response from the function call: {tool_call.get('name')}\n{function_response}")
                        except Exception as e:
                            print(f"Could not execute function: {e}")
                            tool_message += f"<tool_response>\nThere was an error when executing the function: {tool_call.get('name')}\nHere's the error traceback: {e}\nPlease call this function again with correct arguments within XML tags <tool_call></tool_call>\n</tool_response>\n"
                    prompt.append({"role": "tool", "content": tool_message})

                    depth += 1
                    if depth >= max_depth:
                        print(f"Maximum recursion depth reached ({max_depth}). Stopping recursion.")
                        return

                    completion =  generate(prompt)
                    recursive_loop(prompt, completion, depth)
                else:
                    completion =  generate(prompt)

            recursive_loop(prompt, completion, depth)

        except Exception as e:

            raise e

In [5]:
# Example dummy function hard coded to return the same weather
# In production, this could be your backend API or an external API
def get_current_weather(location, unit="fahrenheit"):
    """Get the current weather in a given location"""
    if "tokyo" in location.lower():
        return json.dumps(
            {"location": location, "temperature": "10", "unit": "celsius"}
        )
    elif "san francisco" in location.lower():
        return json.dumps(
            {"location": location, "temperature": "72", "unit": "fahrenheit"}
        )
    else:
        return json.dumps(
            {"location": location, "temperature": "22", "unit": "celsius"}
        )
tools = json.dumps([convert_to_openai_tool(get_current_weather)])



generate_function_call("What's the weather like in San Francisco, Tokyo, Paris? use chain of thought", tools)

Assistant Message:
<tool_call>
{'arguments': {'location': 'San Francisco'}, 'name': 'get_current_weather'}
</tool_call>
<tool_call>
{'arguments': {'location': 'Tokyo'}, 'name': 'get_current_weather'}
</tool_call>
<tool_call>
{'arguments': {'location': 'Paris'}, 'name': 'get_current_weather'}
</tool_call>
Invoking function call get_current_weather ...dict_values(['San Francisco'])
Here's the response from the function call: get_current_weather
{"name": "get_current_weather", "content": {"location": "San Francisco", "temperature": "72", "unit": "fahrenheit"}}
Invoking function call get_current_weather ...dict_values(['Tokyo'])
Here's the response from the function call: get_current_weather
{"name": "get_current_weather", "content": {"location": "Tokyo", "temperature": "10", "unit": "celsius"}}
Invoking function call get_current_weather ...dict_values(['Paris'])
Here's the response from the function call: get_current_weather
{"name": "get_current_weather", "content": {"location": "Paris",

In [6]:

import yfinance as yf

from langchain_core.utils.function_calling import convert_to_openai_tool


def get_current_stock_price(symbol: str) -> float:
  """
  Get the current stock price for a given symbol.

  Args:
    symbol (str): The stock symbol.

  Returns:
    float: The current stock price, or None if an error occurs.
  """
  try:
    print(symbol)
    stock = yf.Ticker(symbol)
    # Use "regularMarketPrice" for regular market hours, or "currentPrice" for pre/post market
    current_price = stock.info.get("regularMarketPrice", stock.info.get("currentPrice"))
    return current_price if current_price else None
  except Exception as e:
    print(f"Error fetching current price for {symbol}: {e}")
    return None

In [7]:
tools = json.dumps([convert_to_openai_tool(get_current_stock_price)])

generate_function_call("I need the current stock price of Tesla (TSLA) and Google (GOOG)", tools)

Assistant Message:
<tool_call>
{'arguments': {'symbol': 'TSLA'}, 'name': 'get_current_stock_price'}
</tool_call>
<tool_call>
{'arguments': {'symbol': 'GOOG'}, 'name': 'get_current_stock_price'}
</tool_call>
Invoking function call get_current_stock_price ...dict_values(['TSLA'])
TSLA
Here's the response from the function call: get_current_stock_price
{"name": "get_current_stock_price", "content": 214.11}
Invoking function call get_current_stock_price ...dict_values(['GOOG'])
GOOG
Here's the response from the function call: get_current_stock_price
{"name": "get_current_stock_price", "content": 165.11}
Assistant Message:
The current stock price for Tesla (TSLA) is $214.11 and for Google (GOOG) it is $165.11.
