<a href="https://colab.research.google.com/github/Theedon/AIAgentDemos/blob/main/langchain_callbacks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%%capture

!pip install  langchain_experimental langchain_openai langchain_google_community
!pip install --upgrade openai
!pip install -qU httpx

## Initialize Agent and Functions

In [None]:
import os
from google.colab import userdata

os.environ["AZURE_OPENAI_ENDPOINT"] = userdata.get('AZURE_OPENAI_ENDPOINT')
os.environ["AZURE_OPENAI_API_KEY"] = userdata.get('AZURE_OPENAI_API_KEY')
os.environ["OPENAI_API_VERSION"] = userdata.get('OPENAI_API_VERSION')
os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')
os.environ["GOOGLE_CSE_ID"] = userdata.get('GOOGLE_CSE_ID')

In [None]:
import os
charts_dir = "charts"
if not os.path.exists(charts_dir):
  os.makedirs(charts_dir)

In [None]:
from langchain.tools import BaseTool
import os
import ast
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from datetime import datetime
import uuid

class CustomPythonReplTool(BaseTool):
    name: str = "custom_python_repl"
    description: str = "A custom REPL tool for executing Python code. Use this to evaluate python scripts and return results."

    def _run(self, code: str) -> str:
        """Execute python code safely and return the output."""
        try:
            tree = ast.parse(code)
            last_node = tree.body[-1] if tree.body else None

            # Capture the last expression's result
            if isinstance(last_node, ast.Expr):
                target = ast.Name(id="_result", ctx=ast.Store())
                assign = ast.Assign(targets=[target], value=last_node.value)
                tree.body[-1] = ast.fix_missing_locations(assign)

            local_namespace = {}
            compiled_code = compile(tree, filename="<ast>", mode="exec")
            exec(compiled_code, {"__builtins__": __builtins__}, local_namespace)

            # Check if the code includes charting
            if plt.get_fignums() and plt.gca().has_data():
                chart_path = os.path.join(os.getcwd(), "charts")
                if not os.path.exists(chart_path):
                    os.makedirs(chart_path)  # Create chart directory if it doesn't exist

                # Generate a unique name for the chart
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                unique_id = uuid.uuid4().hex[:8]
                chart_file = os.path.join(chart_path, f"chart_{timestamp}_{unique_id}.png")

                # Save the chart if any
                plt.savefig(chart_file)
                plt.close()  # Close the plot to clear matplotlib state
                return f"Chart saved at {chart_file}"

            result = local_namespace.get("_result", "no result returned")
            return str(result)

        except Exception as e:
            return f"Error: {str(e)}"

        finally:
            # Ensure the matplotlib state is reset for subsequent calls
            plt.clf()
            plt.close('all')

    async def _arun(self, code: str) -> str:
        raise NotImplementedError("Asynchronous execution is not implemented yet.")


In [None]:
from langchain_core.tools import Tool
from langchain_experimental.utilities import PythonREPL

import dotenv
from langchain_core.tools import StructuredTool
from langchain_google_community import GoogleSearchAPIWrapper

search = GoogleSearchAPIWrapper()

google_search = StructuredTool.from_function(
    name="google_search",
    description="Search Google for recent results.",
    func=search.run,
)

python_repl = PythonREPL()
custom_repl_tool = CustomPythonReplTool()

tools = [
    custom_repl_tool,
    # Tool(
    #     name="python_repl",
    #     func=python_repl.run,
    #     description="A Python shell. Use this to execute python code.",
    # ),
    # google_search,
]


queries = [
    "2 + 2",                             # Basic arithmetic
    "'Hello' + ' ' + 'World'",           # String concatenation
    "[1, 2, 3] + [4, 5, 6]",             # List operations
    "{'name': 'Alice', 'age': 30}['name']",  # Dictionary access
    """
def greet(name):
    return f'Hello, {name}!'
greet('John')
"""  # Function definition and call
]

# Run the tool for each query and print results
for query in queries:
    # tool_result = python_repl.run(query)
    tool_result = python_repl.run(query)
    print(f"Query: {query}\nTool Result: {tool_result}\n")




Query: 2 + 2
Tool Result: 

Query: 'Hello' + ' ' + 'World'
Tool Result: 

Query: [1, 2, 3] + [4, 5, 6]
Tool Result: 

Query: {'name': 'Alice', 'age': 30}['name']
Tool Result: 

Query: 
def greet(name):
    return f'Hello, {name}!'
greet('John')

Tool Result: 



In [None]:
from google.colab import userdata
from langchain_openai import ChatOpenAI
from langchain.llms import OpenAI

from langchain_openai import (
    AzureChatOpenAI,
    AzureOpenAIEmbeddings,
)

LLM = AzureChatOpenAI(
    azure_deployment="4o-mini-deploy",
    # cache=redis_cache,
)
# LLM = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# LLM = ChatOpenAI(model="gpt-4o-mini", temperature=0)

In [None]:
from langchain_core.prompts import ChatPromptTemplate

def get_prompt_for_openai_functions_agent():
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant"),
            ("placeholder", "{chat_history}"),
            ("human", "{input}"),
            ("placeholder", "{agent_scratchpad}"),
        ]
    )
    return prompt

In [None]:
from langchain.callbacks.base import BaseCallbackHandler
from uuid import UUID
output_store = []
from typing import Dict, Any, Optional, List, Union
from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult

class DatabaseCallback(BaseCallbackHandler):
    def on_agent_finish(self, output, **kwargs):
        """
        This method is triggered when the agent finishes its execution.

        Args:
            output (dict): The output from the agent's execution.
            kwargs: Additional context data.
        """
        # Save the agent's output to the database
        result = output.return_values.get("output")
        if result:
            print(result)
            self.save_to_database(result)

    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
        print("Callback: on_chain_start triggered")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        print("Callback: on_chain_end triggered")

    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        print(f"Callback: on_chain_error triggered with error: {error}")
        print(f"Callback: on_text triggered with text: {error}")

    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> None:
        print(f"Callback: on_agent_action triggered with action: {action}")

    def save_to_database(self, result):
        # database saving logic here
        output_store.append(result)
        # print(output)

In [None]:
from typing import Dict, Any, List, Union, Optional
from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult

class ExperimentCallbackHandler(BaseCallbackHandler):
    """Custom callback handler with print statements for debugging."""

    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
      print("Callback: on_chain_start triggered")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        print("Callback: on_chain_end triggered")

    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        print(f"Callback: on_chain_error triggered with error: {error}")
    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> None:
        print(f"Callback: on_agent_action triggered with action: {action}")

    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> None:
        print(f"Callback: on_agent_finish triggered with result: {finish}")

                #BELOW DO NOT WORK FOR THE create_tool_calling_agent
    # def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:
    #     print("Callback: on_llm_start triggered")

    # def on_chat_model_start(self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any) -> None:
    #     print("Callback: on_chat_model_start triggered")

    # def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
    #     print(f"Callback: on_llm_new_token triggered with token: {token}")

    # def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
    #     print("Callback: on_llm_end triggered")

    # def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
    #     print(f"Callback: on_llm_error triggered with error: {error}")


    # def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> None:
    #     print(f"Callback: on_tool_start triggered with input: {input_str}")

    # def on_tool_end(self, output: Any, **kwargs: Any) -> None:
    #     print(f"Callback: on_tool_end triggered with output: {output}")

    # def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
    #     print(f"Callback: on_tool_error triggered with error: {error}")

    # def on_text(self, text: str, **kwargs: Any) -> None:
    #     print(f"Callback: on_text triggered with text: {text}")



In [None]:
from langchain.callbacks.base import BaseCallbackHandler
from uuid import UUID, uuid4
output_store = []
from typing import Dict, Any, Optional, List, Union
from langchain.schema import AgentAction, AgentFinish, BaseMessage, LLMResult

class MetadataCallbackHandler(BaseCallbackHandler):
  def __init__(self):
    self.metadata = {}

  def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
    print(inputs.get("metadata", {}))
    self.metadata = inputs.get("metadata", {})

  def on_agent_finish(self, finish: AgentFinish, **kwargs: Any):
        # print("finish", finish)
        # print("kwargs", kwargs)
        # print("metadata", metadata)
        print(f"metadata: {self.metadata}")
        pass

  # def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
  #       print("outputs", outputs)
  #       print("kwargs", kwargs)


In [None]:
from re import VERBOSE
# prompt: write the code to create an agent that uses the pythonREPL tool

from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.llms import OpenAI
from langchain.agents import (
    AgentExecutor,
    # create_openai_functions_agent,
    create_tool_calling_agent,
)
from langchain.callbacks.manager import CallbackManager

llm = LLM


# We can have as many callback managers running simultaneously
callback_manager = CallbackManager([
                            # ExperimentCallbackHandler(),
                            # DatabaseCallback(),
                            MetadataCallbackHandler()
                             ])

prompt = get_prompt_for_openai_functions_agent()
agent = create_tool_calling_agent(
        llm=llm,
        tools=tools,
        prompt=prompt,
    )


# Create a CallbackManager and add the callback

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    callback_manager = callback_manager,
    verbose=True,
    handle_parsing_errors=True,
    return_intermediate_steps=True,  # noqa: F821
)

In [None]:
from langchain.schema.runnable import RunnableConfig


def run_demo(agent_executor: AgentExecutor, query: list | str):
    query_list = [query] if isinstance(query, str) else query
    metadata = {"user_id": "12345", "task_id": "abc123"}
    config = RunnableConfig(metadata = metadata)
    for i in range(len(query_list)):
        res = agent_executor.invoke(input =  {"input": query_list[i], "metadata": metadata}, config=config)
        # res = agent_executor.invoke({"input": query_list[i]}

        # Print the result
        print(f"Result for query {i+1}: {res}")

        # Demarcation line after each result
        print("\n" + "="*50 + f" End of Query {i+1} " + "="*50 + "\n")
        # return res


In [None]:
# Example interaction with the agent
query_list = [
              "Convert 'hello world' to uppercase.",
              "Generate the first 10 prime numbers.",
              "What is the square root of 420?",
              "What is today's date (in dd/mm/yyyy) and the day of the week?",
              "I woke up around 8 o clock this morning, I spent the next quarter of the day washing dishes and the next half of the week working, if today is the 9th of december, when do i finish working",
              "Fetch the title of the website 'https://medium.com/bitgrit-data-science-publication/openai-code-interpreter-ecda4ff5839c'",
              "Write a Python snippet to create a file named 'test.txt' and write 'Hello, STELLAR' in it.",
             ]

# res = agent_executor.invoke({"input": query_list[2]})
# print(res)

run_demo(agent_executor, query_list)

{'user_id': '12345', 'task_id': 'abc123'}


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `custom_python_repl` with `{'code': "'hello world'.upper()"}`


[0m[36;1m[1;3mHELLO WORLD[0mmetadata: {'user_id': '12345', 'task_id': 'abc123'}
[32;1m[1;3mThe uppercase version of 'hello world' is **HELLO WORLD**.[0m

[1m> Finished chain.[0m
Result for query 1: {'input': "Convert 'hello world' to uppercase.", 'metadata': {'user_id': '12345', 'task_id': 'abc123'}, 'output': "The uppercase version of 'hello world' is **HELLO WORLD**.", 'intermediate_steps': [(ToolAgentAction(tool='custom_python_repl', tool_input={'code': "'hello world'.upper()"}, log='\nInvoking: `custom_python_repl` with `{\'code\': "\'hello world\'.upper()"}`\n\n\n', message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_3lcaUFwjjlZMn8yFNIpY4gAw', 'function': {'arguments': '{"code":"\'hello world\'.upper()"}', 'name': 'custom_python_repl'}, 'type': 'funct

In [None]:
res1

{'input': "Convert 'hello world' to uppercase.",
 'metadata': {'user_id': '12345', 'task_id': 'abc123'},
 'output': "The phrase 'hello world' in uppercase is: **HELLO WORLD**.",
 'intermediate_steps': [(ToolAgentAction(tool='custom_python_repl', tool_input={'code': "'hello world'.upper()"}, log='\nInvoking: `custom_python_repl` with `{\'code\': "\'hello world\'.upper()"}`\n\n\n', message_log=[AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_Xx3DtbOxO3wrSSSOLSSVhxRG', 'function': {'arguments': '{"code":"\'hello world\'.upper()"}', 'name': 'custom_python_repl'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_5154047bf2'}, id='run-4154d2c5-c3f4-4a51-ad8e-7f6a9705959a', tool_calls=[{'name': 'custom_python_repl', 'args': {'code': "'hello world'.upper()"}, 'id': 'call_Xx3DtbOxO3wrSSSOLSSVhxRG', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'custom_python_repl',

In [None]:
for i in output_store:
  print(f"DATA TYPE ==> {type(i)}\nDATA CONTENT ==> {i} \n-------------------------------------------\n")