In [1]:
# !pip install scipy --quiet
# !pip install tenacity --quiet
# !pip install tiktoken --quiet
# !pip install termcolor --quiet
# !pip install openai --quiet

In [1]:
import json
from openai import OpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt
from termcolor import colored  

GPT_MODEL = "gpt-3.5-turbo-0613"
client = OpenAI()

## Defining some utilities functions

In [2]:
@retry(wait=wait_random_exponential(multiplier=1, max=40), stop=stop_after_attempt(3))
def chat_completion_request(messages, tools=None, tool_choice=None, model=GPT_MODEL):
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            tools=tools,
            tool_choice=tool_choice,
        )
        return response
    except Exception as e:
        print("Unable to generate ChatCompletion response")
        print(f"Exception: {e}")
        return e

In [3]:
def pretty_print_conversation(messages):
    role_to_color = {
        "system": "red",
        "user": "green",
        "assistant": "blue",
        "function": "magenta",
    }
    
    for message in messages:
        if message["role"] == "system":
            print(colored(f"system: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "user":
            print(colored(f"user: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "assistant" and message.get("function_call"):
            print(colored(f"assistant: {message['function_call']}\n", role_to_color[message["role"]]))
        elif message["role"] == "assistant" and not message.get("function_call"):
            print(colored(f"assistant: {message['content']}\n", role_to_color[message["role"]]))
        elif message["role"] == "function":
            print(colored(f"function ({message['name']}): {message['content']}\n", role_to_color[message["role"]]))


### This is how to define the functions that will be called by the GPT.

    You need to provide:
        -name
        -description
        -parameters

In [5]:
def get_current_weater(location, format):
    print ("get_current_weather called")
    print(f"Location is: {location}")
    print(f"Format is: {format}")

In [6]:
def get_n_day_weather_forecast(location, format, num_days):
    print ("get_current_weather called")
    

In [7]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_current_weather",
            "description": "Get the current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g. San Francisco, CA",
                    },
                    "format": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "The temperature unit to use. Infer this from the users location.",
                    },
                },
                "required": ["location", "format"],
            },
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_n_day_weather_forecast",
            "description": "Get an N-day weather forecast",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "The city and state, e.g. San Francisco, CA",
                    },
                    "format": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "The temperature unit to use. Infer this from the users location.",
                    },
                    "num_days": {
                        "type": "integer",
                        "description": "The number of days to forecast",
                    }
                },
                "required": ["location", "format", "num_days"]
            },
        }
    },
]

### BOT prompts

In [8]:
messages = []
messages.append({"role": "system", "content": "Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous."})
messages.append({"role": "user", "content": "What's the weather like today"})
chat_response = chat_completion_request(
    messages, tools=tools
)
assistant_message = chat_response.choices[0].message
messages.append(assistant_message)
assistant_message

ChatCompletionMessage(content='Sure, could you please provide me with the location?', role='assistant', function_call=None, tool_calls=None)

In [9]:
messages.append({"role": "user", "content": "I'm in Glasgow, Scotland."})
chat_response = chat_completion_request(
    messages, tools=tools
)

In [10]:
assistant_message = chat_response.choices[0].message
messages.append(assistant_message)
assistant_message

ChatCompletionMessage(content=None, role='assistant', function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_klqxLrMp0hrJ4JaMcDmxFKzQ', function=Function(arguments='{\n  "location": "Glasgow, Scotland",\n  "format": "celsius"\n}', name='get_current_weather'), type='function')])

**From this you can then call a specific function, passing the correct parameters - see example below.**


In [11]:
if assistant_message.tool_calls:
    print("tool_calls was called")
    
    location = json.loads(assistant_message.tool_calls[0].function.arguments)["location"]
    format = json.loads(assistant_message.tool_calls[0].function.arguments)["format"]

    get_current_weater(location, format)

tool_calls was called
get_current_weather called
Location is: Glasgow, Scotland
Format is: celsius


# Database query example

Calling a function from GPT generated inputs

In [5]:
def get_table_names(conn):
    """Return a list of table names."""
    table_names = []
    tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table';")
    for table in tables.fetchall():
        table_names.append(table[0])
    return table_names


def get_column_names(conn, table_name):
    """Return a list of column names."""
    column_names = []
    columns = conn.execute(f"PRAGMA table_info('{table_name}');").fetchall()
    for col in columns:
        column_names.append(col[1])
    return column_names


def get_database_info(conn):
    """Return a list of dicts containing the table name and columns for each table in the database."""
    table_dicts = []
    for table_name in get_table_names(conn):
        columns_names = get_column_names(conn, table_name)
        table_dicts.append({"table_name": table_name, "column_names": columns_names})
    return table_dicts


In [6]:
import sqlite3

conn = sqlite3.connect("./data/Chinook.db")
print("Opened database successfully")

Opened database successfully


In [7]:
database_schema_dict = get_database_info(conn)
database_schema_string = "\n".join(
    [
        f"Table: {table['table_name']}\nColumns: {', '.join(table['column_names'])}"
        for table in database_schema_dict
    ]
)


### Tools 

We'll define a function specification for the function we'd like the API to generate arguments for. Notice that we are inserting the database schema into the function specification. This will be important for the model to know about.

In [8]:
tools = [
    {
        "type": "function",
        "function": {
            "name": "ask_database",
            "description": "Use this function to answer user questions about music. Input should be a fully formed SQL query.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": f"""
                                SQL query extracting info to answer the user's question.
                                SQL should be written using this database schema:
                                {database_schema_string}
                                The query should be returned in plain text, not in JSON.
                                """,
                    }
                },
                "required": ["query"],
            },
        }
    }
]

## Executing SQL queries
Now let's implement the function that will actually excute queries against the database.

In [9]:
def ask_database(conn, query):
    """Function to query SQLite database with a provided SQL query."""
    try:
        results = str(conn.execute(query).fetchall())
    except Exception as e:
        results = f"query failed with error: {e}"
    return results

def execute_function_call(message):
    if message.tool_calls[0].function.name == "ask_database":
        query = json.loads(message.tool_calls[0].function.arguments)["query"]
        results = ask_database(conn, query)
    else:
        results = f"Error: function {message.tool_calls[0].function.name} does not exist"
    return results

## BOT responses

In [10]:
messages = []
messages.append({"role": "system", "content": "Answer user questions by generating SQL queries against the Chinook Music Database."})
messages.append({"role": "user", "content": "Hi, who are the top 5 artists by number of tracks?"})
chat_response = chat_completion_request(messages, tools)

In [11]:
messages

[{'role': 'system',
  'content': 'Answer user questions by generating SQL queries against the Chinook Music Database.'},
 {'role': 'user',
  'content': 'Hi, who are the top 5 artists by number of tracks?'}]

In [13]:
chat_response.status

AttributeError: 'ChatCompletion' object has no attribute 'status'

In [12]:
chat_response

ChatCompletion(id='chatcmpl-9GSEF9HkdQbTgt9jDKE91hWG3HCUV', choices=[Choice(finish_reason='tool_calls', index=0, logprobs=None, message=ChatCompletionMessage(content=None, role='assistant', function_call=None, tool_calls=[ChatCompletionMessageToolCall(id='call_V2DZaAbmZsCkMAz1ADE71W9l', function=Function(arguments='{\n  "query": "SELECT artists.Name, COUNT(tracks.TrackId) AS TrackCount FROM artists INNER JOIN albums ON artists.ArtistId = albums.ArtistId INNER JOIN tracks ON albums.AlbumId = tracks.AlbumId GROUP BY artists.Name ORDER BY TrackCount DESC LIMIT 5"\n}', name='ask_database'), type='function')]))], created=1713708787, model='gpt-3.5-turbo-0613', object='chat.completion', system_fingerprint=None, usage=CompletionUsage(completion_tokens=67, prompt_tokens=407, total_tokens=474))

In [17]:
assistant_message = chat_response.choices[0].message
assistant_message.content = str(assistant_message.tool_calls[0].function)
messages.append({"role": assistant_message.role, "content": assistant_message.content})

In [19]:
if assistant_message.tool_calls:
    results = execute_function_call(assistant_message)
    messages.append({"role": "function", "tool_call_id": assistant_message.tool_calls[0].id, "name": assistant_message.tool_calls[0].function.name, "content": results})
pretty_print_conversation(messages)

[31msystem: Answer user questions by generating SQL queries against the Chinook Music Database.
[0m
[32muser: Hi, who are the top 5 artists by number of tracks?
[0m
[34massistant: Function(arguments='{\n  "query": "SELECT artists.Name, COUNT(tracks.TrackId) AS num_tracks FROM artists JOIN albums ON artists.ArtistId = albums.ArtistId JOIN tracks ON albums.AlbumId = tracks.AlbumId GROUP BY artists.Name ORDER BY num_tracks DESC LIMIT 5;"\n}', name='ask_database')
[0m
[35mfunction (ask_database): [('Iron Maiden', 213), ('U2', 135), ('Led Zeppelin', 114), ('Metallica', 112), ('Lost', 92)]
[0m


In [20]:
messages.append({"role": "user", "content": "What is the name of the album with the most tracks?"})
chat_response = chat_completion_request(messages, tools)
assistant_message = chat_response.choices[0].message
assistant_message.content = str(assistant_message.tool_calls[0].function)
messages.append({"role": assistant_message.role, "content": assistant_message.content})

In [23]:
if assistant_message.tool_calls:
    results = execute_function_call(assistant_message)
    messages.append({"role": "function", "tool_call_id": assistant_message.tool_calls[0].id, "name": assistant_message.tool_calls[0].function.name, "content": results})
pretty_print_conversation(messages)

[31msystem: Answer user questions by generating SQL queries against the Chinook Music Database.
[0m
[32muser: Hi, who are the top 5 artists by number of tracks?
[0m
[34massistant: Function(arguments='{\n  "query": "SELECT artists.Name, COUNT(tracks.TrackId) AS num_tracks FROM artists JOIN albums ON artists.ArtistId = albums.ArtistId JOIN tracks ON albums.AlbumId = tracks.AlbumId GROUP BY artists.Name ORDER BY num_tracks DESC LIMIT 5;"\n}', name='ask_database')
[0m
[35mfunction (ask_database): [('Iron Maiden', 213), ('U2', 135), ('Led Zeppelin', 114), ('Metallica', 112), ('Lost', 92)]
[0m
[32muser: What is the name of the album with the most tracks?
[0m
[34massistant: Function(arguments='{\n  "query": "SELECT albums.Title, COUNT(tracks.TrackId) AS num_tracks FROM albums JOIN tracks ON albums.AlbumId = tracks.AlbumId GROUP BY albums.Title ORDER BY num_tracks DESC LIMIT 1;"\n}', name='ask_database')
[0m
[35mfunction (ask_database): [('Greatest Hits', 57)]
[0m
