<a href="https://colab.research.google.com/github/anthonymalumbe/transferlearning.github.io/blob/main/final_gemma_transformer_with_tool_calling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Install dependencies
Run the cell below to install all the required dependencies.

In [None]:
!pip install -q -U torch transformers
!pip install -q -U tensorflow
!pip install -q -U keras keras-nlp
!pip install -q -U googlemaps
!pip install -q -U huggingface_hub
!pip install -q -U kagglehub
!pip install -q -U bitsandbytes
!pip install -q -U accelerate
!pip install -q -U requests
!pip install -q -U pandas

## Setup

### Select the Colab runtime
To complete this tutorial, you'll need to have a Colab runtime with sufficient resources to run the Gemma model. In this case, you can use a GPU:

1. In the upper-right of the Colab window, select **▾ (Additional connection options)**.
2. Select **Change runtime type**.
3. Under **Hardware accelerator**, select **A100 GPU**.

### Gemma setup

To complete this tutorial, you'll first need to complete the setup instructions at [Gemma setup](https://ai.google.dev/gemma/docs/setup). The Gemma setup instructions show you how to do the following:

* Get access to Gemma on kaggle.com.
* Select a Colab runtime with sufficient resources to run
  the Gemma 2B model.
* Generate and configure a Kaggle username and an API key as Colab secrets.

After you've completed the Gemma setup, move on to the next section, where you'll set environment variables for your Colab environment.


In [None]:
import os
import pathlib
from google.colab import userdata
from google.oauth2 import service_account

# Note: `userdata.get` is a Colab API. If you're not using Colab, set the env manually or use a different method to retrieve these credentials.
os.environ["KAGGLE_USERNAME"] = userdata.get("KAGGLE_USERNAME")
os.environ["KAGGLE_KEY"] = userdata.get("KAGGLE_KEY")

# Save the service account key to a file.
pathlib.Path('service_account_key.json').write_text(userdata.get('SERVICE_ACCOUNT_KEY'))
credentials = service_account.Credentials.from_service_account_file('service_account_key.json')

# Retrieve the Google Maps API key.
MAPS_API_KEY = userdata.get("GET_MAP_API")

# This line sets the memory fraction for XLA (Accelerated Linear Algebra), which can be useful for optimising TensorFlow performance. Adjust the value as needed.
os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "1.0"

### Gemma

**About Gemma**

Gemma is a family of lightweight, state-of-the-art open models from Google, built from the same research and technology used to create the Gemini models. They are text-to-text, decoder-only large language models, available in English, with open weights, pre-trained variants, and instruction-tuned variants. Gemma models are well-suited for a variety of text generation tasks, including question answering, summarization, and reasoning. Their relatively small size makes it possible to deploy them in environments with limited resources such as a laptop, desktop or your own cloud infrastructure, democratizing access to state of the art AI models and helping foster innovation for everyone.

**Prompt formatting**

Instruction-tuned (IT) models are trained with a specific formatter that annotates all instruction tuning examples with extra information, both at training and inference time. The formatter has two purposes:

* Indicating roles in a conversation, such as the system, user, or assistant roles.
* Delineating turns in a conversation, especially in a multi-turn conversation.

Below is the control tokens used by Gemma and their use cases. Note that the control tokens are reserved in and specific to our tokenizer.

* Token to indicate a user turn: `user`
* Token to indicate a model turn: `model`
* Token to indicate the beginning of dialogue turn: `<start_of_turn>`
* Token to indicate the end of dialogue turn: `<end_of_turn>`

Here's the [official documentation](https://ai.google.dev/gemma/docs/formatting) regarding prompting instruction-tuned models.

# Function Calling Using Gemma Open Source Models
### **How function calling works**

### Process Flow


FunctionCallingProcessFlow.svg

#####**Prepare your functions for Function Calling**
You should write the functions (tools) calls used by the Gemma model in Python code and make sure to add Python docstrings:

In order for this to work correctly, you should write your functions in the format above, so that they can be parsed correctly as tools. Specifically, you should follow these rules:

*   The function should have a descriptive name
*   Every argument must have a type hint
*   The function must have a docstring in the standard Google style (in other words, an initial function description
followed by an Args: block that describes the arguments, unless the function does not have any arguments.
*   Do not include types in the Args: block. In other words, write a: The first number to multiply, not a (int): The first number to multiply. Type hints should go in the function header instead.
*   The function can have a return type and a Returns: block in the docstring. However, these are optional because most tool-use models ignore them.



#####**Passing tool results to the model**
The sample code below shows the level of detail need to list the available tools for your model, but what happens if it wants to actually use one? If that happens, you should:

*   Parse the model's output to get the tool name(s) and arguments.
*   Add the model's tool call(s) to the conversation.
*   Call the corresponding function(s) with those arguments.
*   Add the result(s) to the conversation

Let's walk through a tool use example, step by step. For this example, we will use an *gemma-2-27b-it* model.

First, let's load our model and tokenizer:

In [None]:
import os

def create_model_dir(base_dir=".", model_dir="models"):
    """
    Dynamically creates a directory for storing models if it doesn't already exist.

    Args:
        base_dir (str): The base directory where the model directory will be created.
                        Defaults to the current directory.
        model_dir (str): The name of the model directory to be created.
                         Defaults to "models".

    Returns:
        str: The absolute path to the model directory.

    Example:
        >>> create_model_dir("/content", "my_model_dir")
        '/content/my_model_dir'
    """

    # Construct the full path to the model directory
    model_dir_path = os.path.join(base_dir, model_dir)

    # Check if the directory exists, and create it if it doesn't
    if not os.path.exists(model_dir_path):
        os.makedirs(model_dir_path)
        print(f"Directory '{model_dir_path}' created successfully.")
    else:
        print(f"Directory '{model_dir_path}' already exists.")

    # Return the absolute path
    return os.path.abspath(model_dir_path)

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Define the path to the directory where the model will be stored
PATH_TO_MODEL_DIR = create_model_dir("/content", "my_model_dir")

# Load the pre-trained language model
model = AutoModelForCausalLM.from_pretrained(
    "DiTy/gemma-2-27b-it-function-calling-GGUF",  # The specific model ID from Hugging Face Model Hub
    device_map="auto",  # Automatically distribute the model across available devices (GPUs)
    torch_dtype=torch.bfloat16,  # Use bfloat16 precision for more efficient memory usage
                                  # (fall back to float16 or float32 if bfloat16 is not supported)
    cache_dir=PATH_TO_MODEL_DIR,  # Store the downloaded model in the specified directory to avoid re-downloading
)

# Load the corresponding tokenizer
tokenizer = AutoTokenizer.from_pretrained(
    "DiTy/gemma-2-27b-it-function-calling-GGUF",  # Same model ID as the model
    cache_dir=PATH_TO_MODEL_DIR,  # Store the downloaded tokenizer in the specified directory
)

Directory '/content/my_model_dir' created successfully.


config.json:   0%|          | 0.00/927 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/42.8k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/12 [00:00<?, ?it/s]

model-00001-of-00012.safetensors:   0%|          | 0.00/4.74G [00:00<?, ?B/s]

model-00002-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00003-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00004-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00005-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00006-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00007-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00008-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00009-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00010-of-00012.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00011-of-00012.safetensors:   0%|          | 0.00/4.87G [00:00<?, ?B/s]

model-00012-of-00012.safetensors:   0%|          | 0.00/680M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/12 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/168 [00:00<?, ?B/s]



tokenizer_config.json:   0%|          | 0.00/50.3k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

In [None]:
# Define set of tools that can be used by the LLM
import requests
import googlemaps
import json


def multiply(first_int: int, second_int: int) -> int:
    """
    Multiply two integers together.

    Args:
        first_int: The first integer to be multiplied.
        second_int: The second integer to be multiplied.

    Returns:
        int: The product of the two integers.

    Example:
        >>> multiply(2, 3)
        6

    (operators: multiplied, *, times, product, etc.)
    """
    result = first_int * second_int
    return json.dumps(result)

def add(first_int: int, second_int: int) -> int:
    """
    Add two integers together.

    Args:
        first_int: The first integer to be added.
        second_int: The second integer to be added.

    Returns:
        int: The sum of the two integers.

    Example:
        >>> add(2, 3)
        5

    (operators: plus, added, +, addition, sum, summation)
    """
    result = first_int + second_int
    return json.dumps(result)


def exponentiate(base: int, exponent: int) -> int:
    """
    Raises `base` to the power of `exponent`.

    Args:
        base: The base integer.
        exponent: The exponent to raise the base by.

    Returns:
        int: The result of base raised to the power of exponent.

    Example:
        >>> exponentiate(2, 3)
        8

    (operators: power to, **, exp)
    """
    result = base ** exponent
    return json.dumps(result)


def get_exchange_rate(amount: float = None, date: str = "latest", from_currency: str = None, to_currency: str = None) -> str:
    """
    Fetches the exchange rate between two currencies on a given date and optionally calculates the equivalent amount.

    Args:
        amount: The amount to convert. If not provided, returns the exchange rate.
        date: The date of the exchange rate (default is "latest").
        from_currency: The currency code to convert from (e.g., "USD").
        to_currency: The currency code to convert to (e.g., "EUR").

    Returns:
        str: JSON string containing either the converted amount or the exchange rate.

    Example:
        >>> get_exchange_rate(100, "latest", "USD", "EUR")
        '{"amount": 85.5}'  # For example, if the conversion rate is 0.855

    (operators: exchange rate, currency conversion, convert)
    """
    print("(tool called: get_exchange_rate)")

    params = {
        "from": from_currency,
        "to": to_currency,
        "date": date,
    }
    url = f"https://api.frankfurter.app/{params['date']}"

    try:
        # API call to get the exchange rate
        api_response = requests.get(url, params=params)
        api_response.raise_for_status()  # Raise an exception for bad status codes
        data = api_response.json()  # Parse the JSON response

        rate = data["rates"][to_currency]  # Extract the exchange rate for the target currency

        if amount:
            result = {"amount": amount * rate}  # Calculate and store the converted amount
        else:
            result = {"rate": rate}  # Store the exchange rate

        return json.dumps(result)  # Return the result as a JSON string
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        return json.dumps({"error": str(e)}) # Return the error as a JSON string



def search_places(search: str) -> str:
    """
    Searches for places based on a query string and returns relevant details.This function uses the Google Maps API to search for places (e.g., restaurants, cafes, parks) based on the provided search query. It returns a list of places, each with information such as name, address, location, opening hours, rating, and types.

    Args:
        search: The query string to search for places (e.g., "where is the nearest M&S in  Croydon").

    Returns:
        A JSON-formatted string containing a list of places with details like name, address, location (latitude and longitude), whether it's open, rating, place ID, and types.

    Example:
        >>> search_places("Can you tell me where is nearest coffee shop to Central Park")
        [
            {
                'name': 'Starbucks',
                'address': '123 Central Park W, New York, NY 10023',
                'location': {'lat': 40.785091, 'lng': -73.968285},
                'open_now': True,
                'rating': 4.3,
                'place_id': 'abc123',
                'types': ['cafe', 'restaurant']
            },
        ]
    """
    print("(tool called: search_places)")

    gmaps = googlemaps.Client(key=MAPS_API_KEY)
    response = gmaps.places(search)

    # Initialize an empty list to store the search results
    results = []

    # Loop through the results in the response and extract relevant information
    for place in response['results']:
        results.append({
            'name': place['name'],
            'address': place['formatted_address'],
            'location': place['geometry']['location'],
            'open_now': place['opening_hours']['open_now'] if 'opening_hours' in place else None,
            'rating': place.get('rating', None),
            'place_id': place['place_id'],
            'types': place['types'],
        })

    return json.dumps(results)

tools = [add, exponentiate, multiply, get_exchange_rate, search_places]

In [None]:
function_map = {
    "add":add,
    "exponentiate":exponentiate,
    "multiply": multiply,
    "get_exchange_rate" : get_exchange_rate,
    "search_places":search_places,
}

In [None]:
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

print(f"Device: {DEVICE}")
print(f"CUDA Version: {torch.version.cuda}")
print(f"Pytorch {torch.__version__}")

Device: cuda
CUDA Version: 12.1
Pytorch 2.5.0+cu121


In [None]:
# query_text = "where is the nearest Tesco in avenue park road, Se25 5dr"

To get the result of generation, use apply_chat_template.

In order to take into account our written **functions (tools)**, we need to **pass them as a list through the tools attribute** and also use **add_prompt_generation=True.**

Here are different example prompts to test different function calls

**Calls the Googlemaps API**

get_messages = [
    {"role": "system", "content": "You are a helpful assistant with access to the following functions. Use them if required - "},  # The system message sets the persona of the assistant
    {"role": "user", "content": **"Hi, can you tell me where is the nearest Tesco in avenue park road, Se25 5dr?"**},
]

**Calls the frankfurter.app API**

get_messages = [
    {"role": "system", "content": "You are a helpful assistant with access to the following functions. Use them if required - "},  # The system message sets the persona of the assistant
    {"role": "user", "content": **"How much is 500AUD worth in HKD on the 1st of July 2024?"**},
]

**Calls local function**

get_messages = [
    {"role": "system", "content": "You are a helpful assistant with access to the following functions. Use them if required - "},  # The system message sets the persona of the assistant
    {"role": "user", "content": **"what's 4 to the power or 3?"**},
]

In [None]:
# Define the conversation history as a list of messages
get_messages = [
    {"role": "system", "content": "You are a helpful assistant with access to the following functions. Use them if required - "},  # The system message sets the persona of the assistant
    {"role": "user", "content": "Hi, can you tell me where is the nearest Tesco in avenue park road, Se25 5dr"},
]
# Prepare the inputs for the model using the tokenizer
inputs = tokenizer.apply_chat_template(
    get_messages,  # The conversation history
    tokenize=False,  # Don't tokenize the input yet (this will be done later)
    add_generation_prompt=True,  # Add a generation prompt to guide the model to generate a response
    tools=[add, exponentiate, multiply, get_exchange_rate, search_places]  # Provide the available tools (functions) to the model
)
print(inputs)

<bos><start_of_turn>user
You are a helpful assistant with access to the following functions. Use them if required - {
    "name": "add",
    "description": "Add two integers together.",
    "parameters": {
        "type": "object",
        "properties": {
            "first_int": {
                "type": "integer",
                "description": "The first integer to be added."
            },
            "second_int": {
                "type": "integer",
                "description": "The second integer to be added."
            }
        },
        "required": [
            "first_int",
            "second_int"
        ]
    },
    "return": {
        "type": "integer",
        "description": "int: The sum of the two integers.

Example:
    >>> add(2, 3)
    5

(operators: plus, added, +, addition, sum, summation)"
    }
},
{
    "name": "exponentiate",
    "description": "Raises `base` to the power of `exponent`.",
    "parameters": {
        "type": "object",
        "properties":

Now we can generate a model's response. Be careful because, after apply_chat_template, there is no need to add special tokens during tokenization. So, use add_special_tokens=False:

In [None]:
import gc
torch.cuda.empty_cache()
gc.collect()

139

In [None]:
# Define the IDs of tokens that indicate the end of the assistant's turn
terminator_ids = [
    tokenizer.eos_token_id,  # The end-of-sequence token ID
    tokenizer.convert_tokens_to_ids("<end_of_turn>"),  # A custom end-of-turn token ID
]

# Encode the input using the tokenizer
prompt_ids = tokenizer.encode(
    inputs,  # The input text
    add_special_tokens=False,  # Be careful because, after apply_chat_template,
                               # there is no need to add special tokens during
                               # tokenization. So, use add_special_tokens=False
    return_tensors='pt'  # Return PyTorch tensors
).to(model.device)  # Move the tensor to the same device as the model

# Generate a response using the model
generated_ids = model.generate(
    prompt_ids,  # The encoded input IDs
    max_new_tokens=512,  # The maximum number of tokens to generate
    eos_token_id=terminator_ids,  # The IDs of tokens that indicate the end of generation
    bos_token_id=tokenizer.bos_token_id,  # The beginning-of-sequence token ID
)

# Decode the generated IDs to obtain the text response
generated_response = tokenizer.decode(
    generated_ids[0][prompt_ids.shape[-1]:],  # Extract the generated part of the output
    skip_special_tokens=False  # Keep the special tokens in the decoded output for debugging
)



The 'max_batch_size' argument of HybridCache is deprecated and will be removed in v4.46. Use the more precisely named 'batch_size' argument instead.
Starting from v4.46, the `logits` model output will have the same type as the model (except at train time, where it will always be FP32)


Function call: {"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}<end_of_turn>


In [None]:
generated_response

'Function call: {"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}<end_of_turn>'

In [None]:
# Split and select the last item in the list
gemma_response = generated_response.split('<start_of_turn>model')[-1]
# Remove leading and trailing spaces
gemma_response = gemma_response.strip()
# Remove the '<end_of_turn> token
gemma_response= gemma_response.replace('<end_of_turn>', "")

In [None]:
gemma_response

'Function call: {"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}'

In [None]:
import re
# Remove "Function call:" using regular expression
cleaned_text = re.sub(r"^\s*Function call:\s*", "", gemma_response)

In [None]:
cleaned_text

'{"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}'

In [None]:
# Extract the schema from the string
schema = json.loads(cleaned_text)
# Call the function using the schema's arguments
result = search_places(
    schema["arguments"]["search"],
)

(tool called: search_places)


In [None]:
messages = get_messages
messages

[{'role': 'system',
  'content': 'You are a helpful assistant with access to the following functions. Use them if required - '},
 {'role': 'user',
  'content': 'Hi, can you tell me where is the nearest Tesco in avenue park road, Se25 5dr'}]

Great, now we can pick up and process the results with our called function, and then provide the model with the function's response:

In [None]:
import json
tool_call = {"name": schema["name"],"arguments": schema["arguments"]}
messages.append({"role": "model", "content":json.dumps(tool_call)})

In [None]:
messages

[{'role': 'system',
  'content': 'You are a helpful assistant with access to the following functions. Use them if required - '},
 {'role': 'user',
  'content': 'Hi, can you tell me where is the nearest Tesco in avenue park road, Se25 5dr'},
 {'role': 'model',
  'content': '{"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}'}]

In [None]:
response = result
# messages.append({"role": "function-response", "content": response})
locations_data = json.loads(response)
messages.append({"role": "function-response", "content": json.dumps(locations_data[0])})

In [None]:
messages

[{'role': 'system',
  'content': 'You are a helpful assistant with access to the following functions. Use them if required - '},
 {'role': 'user',
  'content': 'Hi, can you tell me where is the nearest Tesco in avenue park road, Se25 5dr'},
 {'role': 'model',
  'content': '{"name": "search_places", "arguments": {"search": "where is the nearest Tesco in avenue park road, Se25 5dr"}}'},
 {'role': 'function-response',
  'content': '{"name": "Tesco Express", "address": "1-9 S Norwood Hill, London SE25 6AA, United Kingdom", "location": {"lat": 51.3997177, "lng": -0.0753607}, "open_now": false, "rating": 3, "place_id": "ChIJt1VSpjwBdkgRQMITXZWkHG0", "types": ["supermarket", "convenience_store", "grocery_or_supermarket", "point_of_interest", "store", "food", "establishment"]}'}]

In [None]:
inputs = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True,  # adding prompt for generation
    tools = [add, exponentiate, multiply, get_exchange_rate, search_places]
)
print(inputs)

<bos><start_of_turn>user
You are a helpful assistant with access to the following functions. Use them if required - {
    "name": "add",
    "description": "Add two integers together.",
    "parameters": {
        "type": "object",
        "properties": {
            "first_int": {
                "type": "integer",
                "description": "The first integer to be added."
            },
            "second_int": {
                "type": "integer",
                "description": "The second integer to be added."
            }
        },
        "required": [
            "first_int",
            "second_int"
        ]
    },
    "return": {
        "type": "integer",
        "description": "int: The sum of the two integers.

Example:
    >>> add(2, 3)
    5

(operators: plus, added, +, addition, sum, summation)"
    }
},
{
    "name": "exponentiate",
    "description": "Raises `base` to the power of `exponent`.",
    "parameters": {
        "type": "object",
        "properties":

In [None]:
torch.cuda.empty_cache()

In [None]:
prompt_ids =  tokenizer.encode(inputs, add_special_tokens=False, return_tensors='pt').to(model.device)
generated_ids = model.generate(
    prompt_ids,
    max_new_tokens=512,
    eos_token_id=terminator_ids,
    bos_token_id=tokenizer.bos_token_id,
)
generated_response = tokenizer.decode(generated_ids[0][prompt_ids.shape[-1]:], skip_special_tokens=False)  # `skip_special_tokens=False` for debug

print(generated_response)

The nearest Tesco to you is Tesco Express, located at 1-9 S Norwood Hill, London SE25 6AA, United Kingdom.<end_of_turn>


In [None]:
inputs = tokenizer.apply_chat_template(messages, tools=tools, add_generation_prompt=True, return_dict=True, return_tensors="pt")
inputs = {k: v.to(model.device) for k, v in inputs.items()}
out = model.generate(**inputs, max_new_tokens=128)
print(tokenizer.decode(out[0][len(inputs["input_ids"][0]):]))

The nearest Tesco to you is Tesco Express, located at 1-9 S Norwood Hill, London SE25 6AA, United Kingdom.<end_of_turn>
<eos>
