# Foundations of Agentic AI  CA1

## Imports, Environment and API Keys

In [19]:
from pydantic import BaseModel, Field
import os, sqlite3, json, csv
from dotenv import load_dotenv
from openai import OpenAI
from langsmith.wrappers import wrap_openai
from langsmith import traceable
from openai import pydantic_function_tool
load_dotenv()

api_key = os.getenv("GEMINI_API_KEY")
api_base = os.getenv("GEMINI_API_BASE")
model = os.getenv("GEMINI_API_MODEL")

## Client, Album and AlbumList Classes

In [2]:
client = wrap_openai(OpenAI(
 api_key=api_key,
 base_url=api_base,
))

class Album(BaseModel):
    index: int = Field(description="The database index of the album")
    year: int = Field(description="The year the album was released")
    title: str = Field(description="The title of the album")
    artist: str = Field(description="The name of the artist who made the album")
    genre: str = Field(description="The music genre of the album")
    subgenre: str = Field(description="The music subgenre of the album")
    price: float = Field(description="The orice of the album")

class AlbumList(BaseModel):
    albums: list[Album]

## Classes for Pydantic Tools

In [22]:
class getAlbumByTitle(BaseModel):
    """Tool that queries albums by title and returns an Album object containing the database index, Album Year, Album Title, Artist Name, Album Genre, Album Subgenre  and Album Price"""
    albumTitle: str = Field(description="The title of the album.")

    def run(self) -> str:
        try:
            with sqlite3.connect('music.db') as connection:
                cursor = connection.cursor()

                sql_query = "SELECT * FROM music WHERE album = ?"

                cursor.execute(sql_query,[self.albumTitle])
                result = cursor.fetchone()

                if result is None:
                    return "No album found with that title"

                keys = ['index','year','title','artist','genre','subgenre','price']
                vals = list(result)

                result_dict = dict(zip(keys, vals))

                album = Album(**result_dict)

                return album.model_dump_json() # Return as JSON string


        except sqlite3.IntegrityError as e:
            print(f"Error: Integrity constraint violated - {e}")

        except sqlite3.OperationalError as e:
            print(f"Error: Operational issue - {e}")

        except sqlite3.Error as e:
            print(f"Error: Generic Sqlite3 error - {e}")        

        except Exception as e:
            print(f"An unexpected error occurred: {e}")


In [23]:
class getAlbumByArtist(BaseModel):
    """Tool that queries albums by Artist and returns a list of  Album objects containing the database index, Album Year, Album Title, Artist Name, Album Genre, Album Subgenre  and Album Price"""
    albumArtist: str = Field(description="The Artist name.")

    def run(self) -> list[str]:
        try:
            with sqlite3.connect('music.db') as connection:
                cursor = connection.cursor()

                sql_query = "SELECT * FROM music WHERE artist LIKE ?"

                cursor.execute(sql_query,['%' + self.albumArtist + '%'])
                results = cursor.fetchall()

                albums = []

                for result in results:
                    keys = ['index','year','title','artist','genre','subgenre','price']
                    vals = list(result)

                    result_dict = dict(zip(keys, vals))

                    album = Album(**result_dict)
                    albums.append(album.model_dump_json())
                    print(f"Albums returned by Artist: {albums}")

                return albums


        except sqlite3.IntegrityError as e:
            print(f"Error: Integrity constraint violated - {e}")

        except sqlite3.OperationalError as e:
            print(f"Error: Operational issue - {e}")

        except sqlite3.Error as e:
            print(f"Error: Generic Sqlite3 error - {e}")        

        except Exception as e:
            print(f"An unexpected error occurred: {e}")

In [24]:
class getAlbumByYear(BaseModel):
    """Tool that queries albums by Year and returns a list of  Album objects containing the database index, Album Year, Album Title, Artist Name, Album Genre, Album Subgenre  and Album Price"""
    albumYear: str = Field(description="The Year.")

    def run(self) -> list[str]:
        try:
            with sqlite3.connect('music.db') as connection:
                cursor = connection.cursor()

                sql_query = "SELECT * FROM music WHERE year = ?"

                cursor.execute(sql_query, [self.albumYear])
                results = cursor.fetchall()

                albums = []

                for result in results:
                    keys = ['index','year','title','artist','genre','subgenre','price']
                    vals = list(result)

                    result_dict = dict(zip(keys, vals))

                    album = Album(**result_dict)
                    albums.append(album.model_dump_json())

                return albums


        except sqlite3.IntegrityError as e:
            print(f"Error: Integrity constraint violated - {e}")

        except sqlite3.OperationalError as e:
            print(f"Error: Operational issue - {e}")

        except sqlite3.Error as e:
            print(f"Error: Generic Sqlite3 error - {e}")        

        except Exception as e:
            print(f"An unexpected error occurred: {e}")

In [25]:
class getAlbumByGenre(BaseModel):
    """Tool that queries albums by Genre and returns a list of  Album objects containing the database index, Album Year, Album Title, Artist Name, Album Genre, Album Subgenre  and Album Price"""
    albumGenre: str = Field(description="The Album Genre name.")

    def run(self) -> list[str]:
        try:
            with sqlite3.connect('music.db') as connection:
                cursor = connection.cursor()

                sql_query = "SELECT * FROM music WHERE genre LIKE ? OR subgenre LIKE ?"

                cursor.execute(sql_query, ['%'+self.albumGenre+'%', '%'+self.albumGenre+'%'])
                results = cursor.fetchall()
                print(results)

                albums = []

                for result in results:
                    keys = ['index','year','title','artist','genre','subgenre','price']
                    vals = list(result)

                    result_dict = dict(zip(keys, vals))

                    album = Album(**result_dict)
                    albums.append(album.model_dump_json())

                return albums


        except sqlite3.IntegrityError as e:
            print(f"Error: Integrity constraint violated - {e}")

        except sqlite3.OperationalError as e:
            print(f"Error: Operational issue - {e}")

        except sqlite3.Error as e:
            print(f"Error: Generic Sqlite3 error - {e}")        

        except Exception as e:
            print(f"An unexpected error occurred: {e}")

In [26]:
class getAlbumByGenreAndYear(BaseModel):
    """Tool that queries albums by Genre and Year and returns a list of  Album objects containing the database index, Album Year, Album Title, Artist Name, Album Genre, Album Subgenre  and Album Price"""
    albumGenre: str = Field(description="The Album Genre name.")
    albumYear: str = Field(description="The Year.")

    def run(self) -> list[str]:
        try:
            with sqlite3.connect('music.db') as connection:
                cursor = connection.cursor()

                sql_query = "SELECT * FROM music WHERE genre = ? and year=?"

                cursor.execute(sql_query, [self.albumGenre, self.albumYear])
                results = cursor.fetchall()
                print(results)

                albums = []

                for result in results:
                    keys = ['index','year','title','artist','genre','subgenre','price']
                    vals = list(result)

                    result_dict = dict(zip(keys, vals))

                    album = Album(**result_dict)
                    albums.append(album.model_dump_json())

                return albums


        except sqlite3.IntegrityError as e:
            print(f"Error: Integrity constraint violated - {e}")

        except sqlite3.OperationalError as e:
            print(f"Error: Operational issue - {e}")

        except sqlite3.Error as e:
            print(f"Error: Generic Sqlite3 error - {e}")        

        except Exception as e:
            print(f"An unexpected error occurred: {e}")

In [27]:
class writeToCsv(BaseModel):
    """Tool that writes to a csv file"""
    customerQuery: str = Field(description="The customer query.")
    queryAnswer: str = Field(description="The answer to the customer query.")

    def run(self) -> str:
        try:
            with open('email_output.csv','a', encoding='UTF8') as f:
                writer = csv.writer(f)
                data = [self.customerQuery, self.queryAnswer, 'yes']
                writer.writerow(data)
            return self.queryAnswer
   

        except Exception as e:
            print(f"An unexpected error occurred: {e}")

## Function to execute tools

In [None]:
# write function to execute tool. Take function name and parameters and return result of function
@traceable(name="AD Tool Call")
def execute_function(tool_call, tool_lookup) -> tuple[str, str]:
    function_name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)
    tool = tool_lookup[function_name](**args)
    return function_name, tool.run()


## Albums Workflow

In [30]:
@traceable(name="Albums Workflow")
def albumWorkflow(customerQuery: str) -> str:

    tools = [getAlbumByTitle, getAlbumByArtist, getAlbumByGenre, getAlbumByYear, getAlbumByGenreAndYear, writeToCsv]
    tool_lookup = {tool.__name__: tool for tool in tools}
    messages = [
        {"role": "system", "content": "You are a useful music shop assistant that answers customer queries providing information on albums and writes the answer to a csv file"},
        {"role": "user", "content": customerQuery}

    ]

    response = client.chat.completions.create (
    model=model,
    messages=messages,
    temperature=0,
    tools=[pydantic_function_tool(tool) for tool in tools]
    )


    if not response.choices[0].message.tool_calls:
        return response
    
    tool_calls = response.choices[0].message.tool_calls


    # Add the assistant's message with tool calls
    messages.append({
        "role": "assistant",
        "content": response.choices[0].message.content,
        "tool_calls": [
            {
                "id": tc.id,
                "type": "function",
                "function": {
                    "name": tc.function.name,
                    "arguments": tc.function.arguments
                }
            }
            for tc in tool_calls
        ]
    })

    # Execute tools
    for tool_call in tool_calls:
        _, tool_result = execute_function(tool_call, tool_lookup)
        
        # Add tool response to messages
        messages.append({
            "role": "tool",
            "tool_call_id": tool_call.id,
            "content": str(tool_result)  
        })



    query_response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0
    )
    
    messages.append({
        "role": "assistant",
        "content": query_response.choices[0].message.content
    })

    messages.append(
        {
            "role" : "user",
            "content" : "write the customer query response to a csv file"
        }
    )



    
    write_response = client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0,
        tools=[pydantic_function_tool(tool) for tool in tools]
    )

    if not write_response.choices[0].message.tool_calls:
        # No tool calls needed, return direct response
        return write_response

    tool_calls = write_response.choices[0].message.tool_calls
    #print(f"Tools to run: {tool_calls}")

    # Add the assistant's message with tool calls
    messages.append({
        "role": "assistant",
        "content": write_response.choices[0].message.content,
        "tool_calls": [
            {
                "id": tc.id,
                "type": "function",
                "function": {
                    "name": tc.function.name,
                    "arguments": tc.function.arguments
                }
            }
            for tc in tool_calls
        ]
    })

    # Execute tools
    for tool_call in tool_calls:
        _, tool_result = execute_function(tool_call, tool_lookup)
        
        # Add tool response to messages
        messages.append({
            "role": "tool",
            "tool_call_id": tool_call.id,
            "content": str(tool_result)  
        })

    return query_response.choices[0].message.content

## Run the workflow for all queries

In [31]:
# initialise output csv
header = ['email', 'response', 'run']
with open('email_output.csv','w', encoding='UTF8') as f:
    writer = csv.writer(f)
    writer.writerow(header)
# Read CSV file and print each line
with open('emails.csv', 'r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader)
    
    for line in csv_reader: # Call the workflow for each line in the CSV
       res = albumWorkflow(line[0])

Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}']
Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":108,"year":1971,"title":"Hunky Dory","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}']
Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":108,"year":1971,"title":"Hunky Dory","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":251,"year":1977,"title":"Low","artist":"David Bowie","genre":"Electronic, Rock","subgenre":"Art 

## Albums ReAct Loop

In [33]:
@traceable(name="Albums react_loop")
def react_loop(messages, client, tools) -> str:
        tool_lookup = {tool.__name__: tool for tool in tools}
        tool_schemas = [pydantic_function_tool(tool) for tool in tools]
        while True:
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=0,
                tools=tool_schemas
            )
            # get the tool calls from the response
            tools_to_run = response.choices[0].message.tool_calls
            if not tools_to_run:
                break

            messages.append({
                    "role": "assistant",
                    "content": response.choices[0].message.content,
                    "tool_calls": [
                        {
                            "id": tool_call.id,
                            "type": "function",
                            "function": {
                                "name": tool_call.function.name,
                                "arguments": tool_call.function.arguments
                            }
                        } for tool_call in tools_to_run
                    ]
            })

            # execute the tool calls
            for tool_call in tools_to_run:
                function_name, tool_response = execute_function(tool_call, tool_lookup)
                #print(f"executed {function_name}, \n tool response : \n {tool_response} ")

                # Add tool response to messages
                messages.append({
                    "role": "tool",
                    "tool_call_id": tool_call.id,
                    "content": str(tool_response)  
                })



        return response.choices[0].message.content

## Run the ReAct Loop for all queries

In [34]:
tools = [getAlbumByTitle, getAlbumByArtist, getAlbumByGenre, getAlbumByYear, getAlbumByGenreAndYear, writeToCsv]
messages = [
    {"role": "system", "content": "You are a useful music shop assistant that answers customer queries providing information on albums and writes the answer to a csv file"},
    {"role": "user", "content": "do you have any bowie or led zep?" }

]
# initialise output csv
header = ['email', 'response', 'run']
with open('email_output.csv','w', encoding='UTF8') as f:
    writer = csv.writer(f)
    writer.writerow(header)

with open('emails.csv', 'r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader)
    
    for line in csv_reader: # Call the workflow for each line in the CSV
        messages = [
            {"role": "system", "content": "You are a useful music shop assistant that answers customer queries providing information on albums and writes the answer to a csv file"},
            {"role": "user", "content": f"{line[0]}" }
        ]
        response = react_loop(messages,client, tools)
        print(response)

Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}']
Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":108,"year":1971,"title":"Hunky Dory","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}']
Albums returned by Artist: ['{"index":35,"year":1972,"title":"The Rise and Fall of Ziggy Stardust and the Spiders From Mars","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":108,"year":1971,"title":"Hunky Dory","artist":"David Bowie","genre":"Rock","subgenre":"Classic Rock, Glam","price":0.0}', '{"index":251,"year":1977,"title":"Low","artist":"David Bowie","genre":"Electronic, Rock","subgenre":"Art 