In [1]:
from toolkit.langchain import models
import add_packages
import os
from pprint import pprint

from my_configs import constants

from toolkit.langchain import (
	tools, document_loaders, text_splitters, text_embedding_models, stores,
	prompts, utils, output_parsers, agents, documents, runnables,
	histories
)

# Quickstart

Build an agent with two tools: one for online searches and one for specific data retrieval from an index.

## Define tools

In [None]:
# Create the tools needed: Tavily for online search and a retriever for local index.
tool_tavily_search = tools.TavilySearchResults()

# Create a retriever over data. 
loader = document_loaders.WebBaseLoader("https://docs.smith.langchain.com/")
document = loader.load()
documents = text_splitters.RecursiveCharacterTextSplitter(
  chunk_size=1000, chunk_overlap=200,
).split_documents(document)

embeddings = text_embedding_models.OpenAIEmbeddings()
vectorstore = stores.chroma.Chroma.from_documents(documents, embeddings)
retriever = vectorstore.as_retriever()
tool_retriever = stores.create_retriever_tool(
  retriever=retriever,
  name="langsmith_search",
  description="Search for information about LangSmith. For any questions about LangSmith, you must use this tool!",
)

# List of tools will use downstream.
my_tools = [
  tool_tavily_search, 
  tool_retriever,
]

## Create agent

In [None]:

# Choose LLM guiding agent.
llm = models.chat_openai

# Choose the prompt to guide the agent.
prompt = prompts.hub.pull("hwchase17/openai-functions-agent")

# Initialize the agent with the LLM, prompt, and tools. 
# The agent takes in input and decides on actions. 
# AgentExecutor execute actions for Agent
agent = agents.create_openai_functions_agent(llm=llm, tools=my_tools, prompt=prompt)
agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True)

## Run agent

In [None]:

# Run agent on stateless queries.
agent_executor.invoke({"input": "hi"})

## Adding in memory

This agent is stateless, does not remember previous interactions. To give it memory, pass in previous chat_history. It needs to be called chat_history because of the prompt used. If a different prompt is used, the variable name could be changed.

Keep track of messages automatically by wrapping in a RunnableWithMessageHistory. 

In [None]:
# Chat history is stored in memory using a global Python dictionary.
store = {}

def get_session_history(
  user_id: str, conversation_id: str
) -> histories.BaseChatMessageHistory:
  """
  Callable references a dict to return an instance of ChatMessageHistory. 
  
  The arguments can be specified by passing a configuration to the 
  RunnableWithMessageHistory at runtime. 
  
  The configuration parameters for tracking message histories can be customized 
  by passing a list of ConfigurableFieldSpec objects to the 
  history_factory_config parameter. 
  
  Two parameters used are user_id and conversation_id.
  """
  if (user_id, conversation_id) not in store:
    store[(user_id, conversation_id)] = histories.ChatMessageHistory()
  return store[(user_id, conversation_id)]

agent_with_memory = runnables.RunnableWithMessageHistory(
  agent_executor,
  get_session_history,
  input_messages_key="input",  # latest input message
  history_messages_key="history",  # key to add historical messages to
  history_factory_config=[
    runnables.ConfigurableFieldSpec(
      id="user_id", annotation=str, name="User ID", default="",
      description="Unique identifier for the user.", is_shared=True,
    ),
    runnables.ConfigurableFieldSpec(
      id="conversation_id", annotation=str, name="Conversation ID", default="", 
      description="Unique identifier for the conversation.", is_shared=True,
    ),
  ]
)

print(agent_with_memory.invoke(
    {"input": "Hi, I'm Bob"},
    config={"configurable": {"user_id": "123", "conversation_id": "1"}}
))

print(agent_with_memory.invoke(
    {"input": "What is my name?"},
    config={"configurable": {"user_id": "123", "conversation_id": "1"}}
))

In [None]:
message_history = histories.ChatMessageHistory()

agent_with_chat_history = runnables.RunnableWithMessageHistory(
    agent_executor,
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    lambda session_id: message_history,
    input_messages_key="input",
    history_messages_key="chat_history",
)

agent_with_chat_history.invoke(
    {"input": "hi! I'm bob"},
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    config={"configurable": {"session_id": "<foo>"}},
)

agent_with_chat_history.invoke(
    {"input": "what's my name?"},
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    config={"configurable": {"session_id": "<foo>"}},
)

# Agent Types


### Tool Calling

In [None]:


llm = models.chat_openai
my_tools = [
	tools.TavilySearchResults(max_results=3)
]

prompt = prompts.create_prompt_tool_calling_agent()

agent = agents.MyStatelessAgent(
	llm=llm, tools=my_tools, prompt=prompt, agent_type='tool_calling',
)

In [None]:
questions = [
	"hi! my name is bob",
	"what's my name? Don't use tools to look this up unless you NEED to",
	"tell me a super long story about an apple",
	"What is LangChain",
	"What I have just ask you?"
]

In [None]:
input_message = questions[1]
result = await agent.stream_agent(input_message)
pprint(result)

In [None]:
input_message = questions[1]
result = agent.invoke_agent(input_message)

### OpenAI tools

OpenAI models detect function calls and provide input for API calls. Model outputs JSON object with function arguments for more reliable and useful function calls.

OpenAI termed capability to invoke single function as functions, capability to invoke one or more functions as tools.

Using tools allows the model to request multiple functions to be called when needed.

In [None]:
# Initialize Tools
tool_tavily_search = tools.TavilySearchResults(max_results=1)
my_tools = [
  tool_tavily_search
]

# Create Agent
prompt = prompts.hub.pull("hwchase17/openai-tools-agent")

# Choose the LLM that will drive the agent
llm = models.chat_openai

# Construct the OpenAI Tools agent
agent = agents.create_openai_tools_agent(llm, my_tools, prompt)

# Run Agent
# Create an agent executor by passing in the agent and tools
agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True)
agent_executor.invoke({"input": "What is LangcChain?"})

### ReAct

Using an agent to implement the [ReAct](https://react-lm.github.io/) logic.

In [None]:
# Initialize tools
tool_tavily_search = tools.TavilySearchResults(max_results=1)
my_tools = [
  tool_tavily_search,
]

# Create Agent
prompt = prompts.hub.pull("hwchase17/react")

# Choose the LLM that will drive the agent
llm = models.chat_openai

# Construct the ReAct agent
agent = agents.create_react_agent(llm=llm, tools=my_tools, prompt=prompt)

# Create an agent executor by passing in the agent and tools
agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True)

# Run Agent

In [None]:
agent_executor.invoke({"input": "What is LangChain?"})

In [None]:
# Using with chat history
prompt = prompts.hub.pull("hwchase17/react-chat")

# Construct the ReAct agent
agent = agents.create_react_agent(llm=llm, tools=my_tools, prompt=prompt)
agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True)

message_history = histories.ChatMessageHistory()

agent_with_chat_history = runnables.RunnableWithMessageHistory(
    agent_executor,
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    lambda session_id: message_history,
    input_messages_key="input",
    history_messages_key="chat_history",
)

questions = [
  "hi! I'm bob",
  "what's my name?",
  "Is there any actor with the same name as me?",
]

In [None]:
agent_with_chat_history.invoke(
    {"input": questions[2]},
    # This is needed because in most real world scenarios, a session id is needed
    # It isn't really used here because we are using a simple in memory ChatMessageHistory
    config={"configurable": {"session_id": "<foo>"}},
)

### Self-ask with search

In [None]:
# Initialize Tools
tool_tavily_answer = tools.TavilyAnswer(max_results=1, name="Intermediate Answer")
my_tools = [
  tool_tavily_answer
]

# Create Agent
prompt = prompts.hub.pull("hwchase17/self-ask-with-search")

# Choose the LLM that will drive the agent
llm = models.chat_openai

# Construct the Self Ask With Search Agent
agent = agents.create_self_ask_with_search_agent(llm, my_tools, prompt)

# Create an agent executor by passing in the agent and tools
agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True,
                                      handle_parsing_errors=True)

# Run Agent

In [None]:
agent_executor.invoke(
    {"input": "What is the hometown of the reigning men's U.S. Open champion?"})

### XML Agent


### JSON Chat Agent


### Structured chat


## [OpenAI assistants](https://python.langchain.com/docs/modules/agents/agent_types/openai_assistants)

The Assistants API enables building AI assistants in applications. Assistants have instructions and use models, my_tools, and knowledge to answer user queries. The API supports Code Interpreter, Retrieval, and Function calling tools.

Interact with OpenAI Assistants using OpenAI tools or custom tools. With OpenAI tools, invoke the assistant directly for final answers. With custom tools, run the assistant and tool execution loop using the built-in AgentExecutor or write your own executor.

Different ways to interact with Assistants.

The OpenAIAssistantRunnable is compatible with the AgentExecutor. Pass it in as an agent directly to the executor. The AgentExecutor calls the invoked tools and uploads the tool outputs back to the Assistants API. Includes built-in LangSmith tracing.

Example: Building a math tutor that can write and run code.

In [None]:
from typing import Sequence, Union
from loguru import logger

my_tools = [
  # {"type": "code_interpreter"},
  tools.DuckDuckGoSearchRun(),
  tools.E2BDataAnalysisTool(),
]


class OpenAIAssistant:
  def __init__(
    self,
    name: str,
    instructions: str,
    tools: list[tools.BaseTool],
    model: str,
    assistant_id: Union[str, None] = None,
  ) -> None:
    self.name = name
    self.instructions = instructions
    self.my_tools = tools
    self.model = model
    self.assistant_id = assistant_id
    self.agent = None
    
    self._create_assistant()
    
    self.agent_executor = agents.AgentExecutor(
      agent=self.agent, tools=self.my_tools,
    )

  def _create_assistant(
    self,
  ):
    if self.assistant_id is None:
      self.agent = agents.OpenAIAssistantRunnable.create_assistant(
        name=self.name,
        instructions=self.instructions,
        tools=self.my_tools,
        model=self.model,
        as_agent=True,
      )
      self.assistant_id = self.agent.assistant_id
      logger.info(f"Created: Assistant ID `{self.assistant_id}`")
      return

    logger.info(f"Found: Assistant ID `{self.assistant_id}`")
    self.agent = agents.OpenAIAssistantRunnable(
      assistant_id=self.assistant_id,
      as_agent=True,
    )
    return
  
  def execute_agent(self, input):
    tool_map = {tool.name: tool for tool in self.my_tools}
    response = self.agent.invoke(input)
    while not isinstance(response, agents.AgentFinish):
      tool_outputs = []
      for action in response:
        tool_output = tool_map[action.tool].invoke(action.tool_input)
        print(action.tool, action.tool_input, tool_output, end="\n\n")
        tool_outputs.append(
          {"output": tool_output, "tool_call_id": action.tool_call_id}
        )
      response = self.agent.invoke(
        {
          "tool_outputs": tool_outputs,
          "run_id": action.run_id,
          "thread_id": action.thread_id,
        }
      )

    return response

my_openai_assistant = OpenAIAssistant(
  name="langchain assistant",
  instructions=("You are a personal math tutor. Write and run code to answer "
                "math questions. You can also search the internet."),
  tools=my_tools,
  model=constants.MODELS["OPENAI"]["GPT-3.5-TURBO-0125"],
  assistant_id="asst_SYdC8LwpTHu0fedq102coNyo",  # None, asst_SYdC8LwpTHu0fedq102coNyo
)

In [None]:
# query = {"content": "What is typical color of dogs?"}
query = {"content": "husky's color"}
result = my_openai_assistant.agent_executor.invoke(query)

In [None]:
result

# How-to


### Custom agent


In [None]:
# Create agent using OpenAI Tool Calling for reliability.
# Create it without memory, then add memory for conversation.

#* Load LLM
# Load the language model used to control the agent.
llm = models.chat_openai

#* Define Tools
# Function docstring is important.
# Python function to calculate word length.
@tools.tool
def get_word_length(word: str) -> int:
  """Returns the length of a word."""
  return len(word)

my_tools = [
  get_word_length,
]

#* Create Prompt for OpenAI Function Calling.
# Input variables: 
# - input: user objective, string
# - agent_scratchpad: sequence of previous agent tool invocations and outputs (messages)
prompt = prompts.ChatPromptTemplate.from_messages([
  (
    "system",
    "You are very powerful assistant, but don't know current events."
  ),
  (
    "user",
    "{input}"
  ),
  prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])

#* Bind tools to LLM
# How agent knows tools can be used by relying on OpenAI tool calling LLMs. 
# Tools are passed in OpenAI tool format to the model by binding functions to 
# ensure they are passed each time the model is invoked.
llm_with_tools = llm.bind_tools(my_tools)

# * Create Agent & Adding memory
# Utility functions: 
# - Component for formatting intermediate steps (agent action, tool output
# pairs) to input messages sent to model
# - Component for converting output message into agent action/agent finish.
agent = (
  {
    "input": lambda x: x["input"],
    "agent_scratchpad": lambda x: agents.format_to_openai_tool_messages(
      x["intermediate_steps"]
    ),
  }
  | prompt 
  | llm_with_tools
  | agents.OpenAIToolsAgentOutputParser()
)

agent_executor = agents.AgentExecutor(agent=agent, tools=my_tools, verbose=True)

message_history = histories.ChatMessageHistory()
agent_with_chat_history = runnables.RunnableWithMessageHistory(
  agent_executor,
  lambda session_id: message_history,
  input_messages_key="input",
  history_messages_key="chat_history",
)

questions = [
  "Hello",
  "My name is Bob",
  "What is my name?",
  "What is the length of word bob?"
]

In [None]:

agent_with_chat_history.invoke(
    {"input": questions[3]},
    config={"configurable": {"session_id": "<foo>"}},
)

### Returning Structured Output

Agent return a structured output instead of a single string.

Example, agent doing question-answering over sources. Output should include answer and list of sources used.

In [None]:
from typing import List
from langchain_core.pydantic_v1 import BaseModel, Field
import json

#* Retriever

# Create a retriever over mock data.
loader = document_loaders.TextLoader("../data/state_of_the_union.txt")
document = loader.load()

text_splitter = text_splitters.RecursiveCharacterTextSplitter(
  chunk_size=1000, chunk_overlap=0,
)
docs = text_splitter.split_documents(document)

# add in the fake source information
# Add a “page_chunk” tag to the metadata of each document.
for i, doc in enumerate(docs):
  doc.metadata["page_chunk"] = i
  
vectorstore = stores.chroma.Chroma.from_documents(
  docs, text_embedding_models.OpenAIEmbeddings(), collection_name="state-of-union"
)
retriever = vectorstore.as_retriever()

#* Tools
# Create tools for the agent, specifically one tool to wrap the retriever.
retriever_tool = stores.create_retriever_tool(
  retriever=retriever,
  name="state-of-union-retriever",
  description="Create tools for the agent, specifically one tool to wrap the retriever.",
)

_.

Create custom parsing logic by passing the Response schema to the LLM via functions parameter, similar to passing tools for the agent to use.

When Response function called by LLM, use as signal to return to user. 
When any other function called by LLM, treat as tool invocation.

Parsing logic:
- If no function is called, assume response to user is AgentFinish
- If Response function is called, respond to user with inputs (structured output) 
and return AgentFinish
- If any other function is called, treat as tool invocation and return AgentActionMessageLog

Using AgentActionMessageLog allows to attach a log of messages for future use in passing back to the agent prompt.


In [None]:

#* Response schema
# Two fields: answer and list of sources.
class Response(BaseModel):
  """Final response to the question being asked"""
  
  answer: str = Field(
    description="The final response to the user"
  )
  sources: List[int] = Field(
      description=("List of page chunks that contain answer to the question. "
                   "Only include a page chunk if it contains relevant information")
  )
  
#* Custom parsing logic
def parse(output):
  # If no function was invoked, return to user
  if "function_call" not in output.additional_kwargs:
    return agents.AgentFinish(
      return_values={"output": output.content}, log=output.content
    )
  
  # Parse out the function call
  function_call = output.additional_kwargs["function_call"]
  name = function_call["name"]
  inputs = json.loads((function_call["arguments"]))
  
  # If the Response function was invoked, return to the user with the function inputs
  if name == "Response":
    return agents.AgentFinish(return_values=inputs, log=str(function_call)) 
  # Return an agent action
  else:
    return agents.AgentActionMessageLog(
      tool=name, tool_input=inputs, log="", message_log=[output]
    )

#* Create Agent
# prompt: placeholders for user's question and agent_scratchpad (intermediate steps)
prompt = prompts.ChatPromptTemplate.from_messages([
  ("system", "You are a helpful assistant"),
  ("user", "{input}"),
  prompts.MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# tools: attach tools and Response format to LLM as functions
llm = models.chat_openai
llm_with_tools = llm.bind_functions([retriever_tool, Response])

agent = (
  {
    "input": lambda x: x["input"],
    # format agent_scratchpad from intermediate steps (AIMessages, FunctionMessages)
    "agent_scratchpad": lambda x: agents.format_to_openai_function_messages(
      x["intermediate_steps"]
    )
  }
  | prompt
  | llm_with_tools
  # custom output parser: parse LLM response
  | parse
)

# AgentExecutor: run agent-tool loop
agent_executor = agents.AgentExecutor(
  tools=[retriever_tool], agent=agent, verbose=True,
)

In [None]:
#* Run agent
# It responds with a dictionary answer and sources keys
agent_executor.invoke(
  {"input": "what did the president say about ketanji brown jackson"},
  return_only_outputs=True,
)

### Handle parsing errors, Access intermediate steps

Occasionally LLM cannot determine step to take because outputs are not correctly formatted for output parser. Default agent errors easily control functionality with handle_parsing_errors.

Include intermediary steps as a list of (action, observation) pairs in the return value to enhance agent insight


In [None]:
wikipedia_tool = tools.wikipedia
my_tools = [
  wikipedia_tool,
]

prompt = prompts.hub.pull("hwchase17/react")
llm = models.chat_openai
agent = agents.create_react_agent(llm, my_tools, prompt)
agent_executor = agents.AgentExecutor(
  agent=agent, tools=my_tools, verbose=True, handle_parsing_errors=True,
  return_intermediate_steps=True,
)

In [None]:

#* Error
# The agent will error due to failing to output an Action string caused by a
# malicious input.
agent_executor.invoke(
  {"input": "What is Leo DiCaprio's middle name?"}
)

### Cap the max number of iterations


### Timeouts for agents

### Streaming


In [None]:
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import tool
from langchain_core.callbacks import Callbacks
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
import random
import pprint


In [None]:
model = ChatOpenAI(temperature=0, streaming=True)

In [None]:
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, TypeVar, Union
from uuid import UUID

from langchain_core.callbacks.base import AsyncCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult

# Get the prompt to use - you can modify this!
prompt = hub.pull("hwchase17/openai-tools-agent")
# print(prompt.messages) -- to see the prompt
my_tools = [
	tools.TavilySearchResults(max_results=3)
]
agent = create_openai_tools_agent(
    model.with_config({"tags": ["agent_llm"]}), my_tools, prompt
)
agent_executor = AgentExecutor(agent=agent, tools=my_tools).with_config(
    {"run_name": "Agent"}
)

# Here is a custom handler that will print the tokens to stdout.
# Instead of printing to stdout you can send the data elsewhere; e.g., to a streaming API response
class TokenByTokenHandler(AsyncCallbackHandler):
	def __init__(self, tags_of_interest: List[str]) -> None:
		"""A custom call back handler.

		Args:
				tags_of_interest: Only LLM tokens from models with these tags will be
													printed.
		"""
		self.tags_of_interest = tags_of_interest

	async def on_chain_start(
		self,
		serialized: Dict[str, Any],
		inputs: Dict[str, Any],
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		metadata: Optional[Dict[str, Any]] = None,
		**kwargs: Any,
	) -> None:
		"""Run when chain starts running."""
		# print("on chain start: ")
		# print(inputs)

	async def on_chain_end(
		self,
		outputs: Dict[str, Any],
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		**kwargs: Any,
	) -> None:
		"""Run when chain ends running."""
		# print("On chain end")
		# print(outputs)

	async def on_chat_model_start(
		self,
		serialized: Dict[str, Any],
		messages: List[List[BaseMessage]],
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		metadata: Optional[Dict[str, Any]] = None,
		**kwargs: Any,
	) -> Any:
		"""Run when a chat model starts running."""
		overlap_tags = self.get_overlap_tags(tags)

		# if overlap_tags:
		# 	print(",".join(overlap_tags), end=": ", flush=True)

	def on_tool_start(
		self,
		serialized: Dict[str, Any],
		input_str: str,
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		metadata: Optional[Dict[str, Any]] = None,
		inputs: Optional[Dict[str, Any]] = None,
		**kwargs: Any,
	) -> Any:
		"""Run when tool starts running."""
		print(f"Tool: {serialized}")

	def on_tool_end(
		self,
		output: Any,
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		**kwargs: Any,
	) -> Any:
		"""Run when tool ends running."""
		print(f"Result: {str(output)}")

	async def on_llm_end(
		self,
		response: LLMResult,
		*,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		**kwargs: Any,
	) -> None:
		"""Run when LLM ends running."""
		overlap_tags = self.get_overlap_tags(tags)

		if overlap_tags:
			# Who can argue with beauty?
			print()
			# print()

	def get_overlap_tags(self, tags: Optional[List[str]]) -> List[str]:
		"""Check for overlap with filtered tags."""
		if not tags:
			return []
		return sorted(set(tags or []) & set(self.tags_of_interest or []))

	async def on_llm_new_token(
		self,
		token: str,
		*,
		chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None,
		run_id: UUID,
		parent_run_id: Optional[UUID] = None,
		tags: Optional[List[str]] = None,
		**kwargs: Any,
) -> None:
		"""Run on new LLM token. Only available when streaming is enabled."""
		overlap_tags = self.get_overlap_tags(tags)

		if token and overlap_tags:
			print(token, end="", flush=True)


handler = TokenByTokenHandler(tags_of_interest=[
  "tool_llm", "agent_llm"
])

result = await agent_executor.ainvoke(
	{"input": "tell me a super long story about a dog"},
	{"callbacks": [handler]},
)

In [None]:
result

### Structured Tools


### Running Agent as an Iterator


# Tools

### Toolkits


### Defining Custom Tools

Construct agent with list of Tools:

- Tool components:
    - name (str): required, must be unique within set of tools
    - description (str): optional but recommended for tool use determination
    - args_schema (Pydantic BaseModel): optional but recommended for more information or validation.

There are multiple ways to define a tool.

Many agents only work with functions that require single inputs


In [None]:
"""
Two functions: 
- A made up search function that always returns the string “LangChain” , 
requires one input
- A multiplier function that will multiply two numbers by eachother, requires 
multiple inputs
"""

#### @tool decorator



Defines a custom tool. 

The function name is the tool name, can be overridden by passing a string as the first argument. Pass tool name and JSON args into the tool decorator for customization.

The function’s docstring is used as the tool’s description - a docstring MUST be provided.

In [None]:
class SearchInput(tools.BaseModel):
  query: str = tools.Field(description="should be a search query")

@tools.tool("search_tool", args_schema=SearchInput, return_direct=True)
def search(query: str) -> str:
  """Look up things online."""
  return "LangChain"

@tools.tool
def multiply(a: int, b: int) -> int:
  """Multiply two numbers."""
  return a * b

#### Subclass BaseTool

Define a custom tool by subclassing the BaseTool class for maximal control over the tool definition.


In [None]:
class SearchInput(tools.BaseModel):
  query: str = tools.Field(description="should be a search query")
  
class CustomSearchTool(tools.BaseTool):
  name = "custom_search"
  description = "useful for when you need to answer questions about current events"
  args_schema: tools.typing.Type[tools.BaseModel] = SearchInput
  
  def _run(
    self, query: str, 
    run_manager: tools.typing.Optional[tools.CallbackManagerForToolRun] = None,
  ) -> str:
    """Use the tool."""
    return "LangChain"
  
  async def _arun(
    self, query: str,
    run_manager: tools.typing.Optional[tools.AsyncCallbackManagerForToolRun] = None,
  ) -> str:
    """Use the tool asynchronously."""
    raise NotImplementedError("custom_search does not support async")
  

tool_search = CustomSearchTool()

print(tool_search.name)
print(tool_search.description)
print(tool_search.args)

In [None]:
class CalculatorInput(tools.BaseModel):
  a: int = tools.Field(description="first number")
  b: int = tools.Field(description="second number")
  
class CustomCalculatorTool(tools.BaseTool):
  name = "Calculator"
  description = "useful for when you need to answer questions about math"
  args_schema: tools.typing.Type[tools.BaseModel] = CalculatorInput
  return_direct: bool = True
  
  def _run(
    self, a: int, b: int,
    run_manager: tools.typing.Optional[tools.CallbackManagerForToolRun] = None
  ) -> str:
    """Use the tool."""
    return a * b

  async def _arun(
    self, a: int, b: int,
    run_manager: tools.typing.Optional[tools.AsyncCallbackManagerForToolRun] = None
  ) -> str:
    """Use the tool asynchronously."""
    return NotImplementedError("Calculator does not support async.")


#### StructuredTool dataclass

Use a StructuredTool dataclass for a mix of convenience and functionality.

Can Define a custom args_schema to provide more information about inputs.

In [None]:
def search_function(query: str):
  return "LangChain"

tool_search = tools.StructuredTool.from_function(
  func=search_function,
  name="Search",
  description="useful for when you need to answer questions about current events",
  # coroutine=
)

In [None]:
class SchemaCalculator(tools.BaseModel):
  a: int = tools.Field(description="first number")
  b: int = tools.Field(description="second number")
  
def fn_calculator(a: int, b: int) -> int:
  """Multiply two numbers."""
  return a * b

tool_calculator = tools.StructuredTool.from_function(
  func=fn_calculator,
  name="Calculator",
  description="multiply numbers",
  args_schema=SchemaCalculator,
  return_direct=True,
  
  # coroutine=
)

print(tool_calculator.name)
print(tool_calculator.description)
print(tool_calculator.args)

#### Handling Tool Errors

When tool encounters error and exception is not caught, agent will stop executing. To continue execution, raise ToolException and set handle_tool_error accordingly.

When ToolException is thrown, the agent will handle the exception based on the handle_tool_error variable of the tool, and the processing result will be returned to the agent as observation, and printed in red.

Set handle_tool_error to True, a unified string value, or a function. If set as a function, the function should take a ToolException as a parameter and return a str value.

Only raising a ToolException won’t be effective. Set the handle_tool_error of the tool because its default value is False.



In [None]:
class SchemaCalculator(tools.BaseModel):
  a: int = tools.Field(description="first number")
  b: int = tools.Field(description="second number")
  
def fn_calculator(a: int, b: int) -> int:
  """Multiply two numbers."""
  return a * b

tool_calculator = tools.StructuredTool.from_function(
  func=fn_calculator,
  name="Calculator",
  description="multiply numbers",
  args_schema=SchemaCalculator,
  return_direct=True,
  handle_tool_error=tools._handle_error
  # coroutine=
)

print(tool_calculator.name)
print(tool_calculator.description)
print(tool_calculator.args)

#### My way

In [None]:
class SchemaCalculator(tools.BaseModel):
  a: int = tools.Field(description="first number")
  b: int = tools.Field(description="second number")
  
def fn_calculator(a: int, b: int) -> int:
  """Multiply two numbers."""
  return a * b

tool_calculator = tools.StructuredTool.from_function(
  func=fn_calculator,
  name="Calculator",
  description="multiply numbers",
  args_schema=SchemaCalculator,
  return_direct=True,
  handle_tool_error=tools._handle_error
  # coroutine=
)

print(tool_calculator.name)
print(tool_calculator.description)
print(tool_calculator.args)

### Tools as OpenAI Functions

LangChain tools are used as OpenAI functions


In [None]:
model = models.chat_openai

tool_move_file = tools.MoveFileTool()
my_tools = [
  tool_move_file,
]

fns = [tools.convert_to_openai_function(t) for t in my_tools]

model_with_fns = model.bind_functions(fns)
model_with_tools = model.bind_tools(my_tools)

In [None]:
model_with_tools.invoke([
  prompts.HumanMessage(content="move file foo to bar"),
])

# Test

In [None]:

# Initialize tools
tool_tavily_search = tools.TavilySearchResults(max_results=1)
my_tools = [
  tool_tavily_search,
]

# Create Agent
prompt = prompts.hub.pull("hwchase17/react-chat")
# prompt = prompts.hub.pull("hwchase17/react")

# Choose the LLM that will drive the agent
llm = models.chat_openai

my_agent = agents.MyAgent(prompt=prompt, tools=my_tools, agent_type="react", llm=llm)

questions = [
	"Hello",
	"My name is Bob",
	"what's my name?",
	"Is there any actor with the same name as me?",
]

In [None]:
response = my_agent.invoke_agent(questions[2])

# Agent Class

In [None]:
questions = [
	"Hello",
	"My name is Bob",
	"What is my name?"
]

In [40]:
import add_packages
import boto3
from loguru import logger
from typing import Union, Optional, List, Literal, AsyncGenerator, TypeAlias
from pydantic import BaseModel

from toolkit import utils

from langchain.agents import (
	create_openai_tools_agent, create_openai_functions_agent, 
	create_react_agent, create_self_ask_with_search_agent,
	create_xml_agent, create_tool_calling_agent,
	AgentExecutor
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.prompts.chat import BaseChatPromptTemplate
from langchain_core.runnables import Runnable
from langchain_core.tools import BaseTool
from langchain_core.agents import (
	AgentActionMessageLog, AgentFinish, AgentAction
)
from langchain_core.messages import AIMessage, HumanMessage, ChatMessage

from langchain.agents.openai_assistant import OpenAIAssistantRunnable
from langchain.agents.format_scratchpad.openai_tools import (
	format_to_openai_tool_messages, 
)
from langchain.agents.format_scratchpad import (
	format_to_openai_function_messages,
)
from langchain.agents.output_parsers.openai_tools import (
	OpenAIToolsAgentOutputParser,
)

from langchain_community.chat_message_histories.dynamodb import DynamoDBChatMessageHistory

#*==============================================================================

dynamodb = boto3.resource("dynamodb")

#*==============================================================================

TypeHistoryType: TypeAlias = Literal["in_memory", "dynamodb"]
TypeUserId: TypeAlias = str
TypeSessionId: TypeAlias = Union[str, None]

def create_markdown_textbox(input: str):
	result = f"""\
<div style="border: 1px solid red; padding: 10px; margin: 10px;">
{input}
</div>
"""
	return result

class SchemaChatHistory(BaseModel):
	history_type: TypeHistoryType = "in_memory"
	user_id: TypeUserId = "admin"
	session_id: TypeSessionId = None
	history_size: Union[int, None] = 10

class ChatHistory:
	def __init__(self, schema: SchemaChatHistory):
		self.history_type = schema.history_type

		self.user_id = schema.user_id
		self.is_new_session = not bool(schema.session_id)
		self.session_id = schema.session_id if schema.session_id else utils.generate_unique_id("uuid_name")

		self.history_size = schema.history_size
	
		if self.history_type == "in_memory":
			self.chat_history = []
		elif self.history_type == "dynamodb":
			self.chat_history = DynamoDBChatMessageHistory(
				table_name="LangChainSessionTable", 
				session_id=self.session_id,
				key={
					"SessionId": self.session_id,
					"UserId": self.user_id,
				},
				history_size=self.history_size,
			)

		if self.is_new_session:
			welcome_msg = "Hello! How can I help you today?"
	
			if self.history_type == "in_memory":
				self.chat_history.append(AIMessage(welcome_msg))
			elif self.history_type == "dynamodb":
				self.chat_history.add_ai_message(welcome_msg)

		logger.info(f"User Id: {self.user_id}")
		logger.info(f"Session Id: {self.session_id}")
		logger.info(f"History Type: {self.history_type}")
	
	async def _add_messages_to_history(
		self,
		msg_user: str,
		msg_ai: str,
	):
		if self.history_type == "in_memory":
			self.chat_history.append(HumanMessage(msg_user))
			self.chat_history.append(AIMessage(msg_ai))
		elif self.history_type == "dynamodb":
			await self.chat_history.aadd_messages(messages=[HumanMessage(msg_user)])
			await self.chat_history.aadd_messages(messages=[AIMessage(msg_ai)])

	async def _get_chat_history(self):
		if self.history_type == "in_memory":
			return self.chat_history
		elif self.history_type == "dynamodb":
			return self.chat_history.messages

	async def clear_chat_history(self):
		if self.history_type == "in_memory":
			self.chat_history = []
		elif self.history_type == "dynamodb":
			await self.chat_history.aclear()

	async def _truncate_chat_history(
		self,
	):
		if self.history_type == "in_memory":
			self.chat_history = self.chat_history[-self.history_size:]
		elif self.history_type == "dynamodb":
			...

class MyStatelessAgent:
	def __init__(
		self,
		llm: Union[BaseChatModel, None],
		tools: list[BaseTool],
		prompt: Union[BaseChatPromptTemplate, None],
	
		agent_type: Literal[
			"tool_calling", "openai_tools", "react", "anthropic"
		] = "tool_calling",
		agent_verbose: bool = False,
	):
		self.llm = llm
		self.my_tools = tools
		self.prompt = prompt

		self.agent_type = agent_type
		self.agent_verbose = agent_verbose
	
		self.agent = self._create_agent()
		self.agent_executor = AgentExecutor(
			agent=self.agent, tools=self.my_tools, verbose=self.agent_verbose,
			handle_parsing_errors=True,
			return_intermediate_steps=False,
		)

	def _create_agent(self) -> Runnable:
		logger.info(f"Agent type: {self.agent_type}")
	
		if self.agent_type == "tool_calling":
			return create_tool_calling_agent(self.llm, self.my_tools, self.prompt)
		elif self.agent_type == "openai_tools":
			return create_openai_tools_agent(self.llm, self.my_tools, self.prompt)
		elif self.agent_type == "react":
			return create_react_agent(llm=self.llm, tools=self.my_tools, prompt=self.prompt)
		elif self.agent_type == "anthropic": # todo
			return create_xml_agent(llm=self.llm, tools=self.my_tools, prompt=self.prompt)
		else:
			raise ValueError(
					"Invalid agent type. Supported types are 'openai_tools' and 'react'.")

	def _create_chat_history(
		self,
		history_type: TypeHistoryType = "dynamodb",
		user_id: TypeUserId = "admin",
		session_id: TypeSessionId = None,
		history_size: Union[int, None] = 10,
	) -> ChatHistory:
	
		return ChatHistory(schema=SchemaChatHistory(
			history_type=history_type, user_id=user_id, session_id=session_id,
			history_size=history_size,
		)) 
	
	async def invoke_agent(
		self,
		input_message: str,
		callbacks: Optional[List] = None,
		mode: Literal["sync", "async"] = "async",
	
		history_type: TypeHistoryType = "dynamodb",
		user_id: TypeUserId = "admin",
		session_id: TypeSessionId = None,
	
		history_size: Union[int, None] = 10,
	):
		result = None

		history = self._create_chat_history(
			history_type, user_id, session_id, history_size,
		)

		input_data = {
			"input": input_message, "chat_history": await history._get_chat_history()
		}

		configs = {}
		configs["callbacks"] = callbacks if callbacks else []

		if mode == "sync":
			result = self.agent_executor.invoke(input_data, configs)
		elif mode == "async":
			result = await self.agent_executor.ainvoke(input_data, configs)

		result = result["output"]

		await history._add_messages_to_history(input_message, result)
		if history_type == "in_memory":
			await history._truncate_chat_history()
	
		return result

	async def astream_events_basic(
		self,
		input_message: str,

		history_type: TypeHistoryType = "dynamodb",
		user_id: TypeUserId = "admin",
		session_id: TypeSessionId = None,
	
		show_tool_call: bool = True,
		history_size: Union[int, None] = 10,
	) -> AsyncGenerator[str, None]:
		"""
		async for chunk in agent.astream_events_basic("Hello"):
			print(chunk, end="", flush=True)
		"""

		history = self._create_chat_history(
			history_type, user_id, session_id, history_size,
		)

		result = ""
		async for event in self.agent_executor.astream_events(
			input={"input": input_message, "chat_history": await history._get_chat_history()},
			version="v1",
		):
			event_event = event["event"]
			event_name = event["name"]
   
			try:
				event_data_chunk = event["data"]["chunk"]
			except:
				pass
  
			if event_event == "on_chat_model_stream":
				chunk = dict(event_data_chunk)["content"]
				result += chunk
				yield chunk
		
			# print(event)
	
			if show_tool_call and event_event == "on_chain_stream":
				if event_name == "RunnableSequence":
					try:
						chunk: str = dict(event_data_chunk[0])["log"]
						chunk = chunk.replace("\n", "") + "\n"
      
						###
						# chunk = create_markdown_textbox(chunk)
						###
						result += chunk
						yield chunk
					except:
						pass
			
				elif event_name == "RunnableLambda":
					try:
						chunk = dict(event_data_chunk[1])["content"]
						result += chunk
						yield chunk
					except:
						pass

		await history._add_messages_to_history(input_message, result)
		if history_type == "in_memory":
			await history._truncate_chat_history()

	async def astream_events_basic_wrapper(
		self,
		input_message: str,
	):
		result = ""
		async for chunk in self.astream_events_basic(input_message):
			result += chunk
			print(chunk, end="", flush=True)
		return result

	def hello():
		...

In [41]:
llm = models.chat_openai
my_tools = [
	tools.TavilySearchResults(max_results=3)
]
prompt = prompts.create_prompt_tool_calling_agent()

agent = MyStatelessAgent(
	llm=llm,
	tools=my_tools,
	prompt=prompt,
	agent_type='tool_calling',
	agent_verbose=False,
)

[32m2024-05-21 08:27:46.707[0m | [1mINFO    [0m | [36m__main__[0m:[36m_create_agent[0m:[36m155[0m - [1mAgent type: tool_calling[0m


In [42]:
async for chunk in agent.astream_events_basic(
	input_message="Tell me a about apple inc",
	show_tool_call=True,
	history_type="dynamodb",
	user_id="user-ip",
	session_id="default",
):
	print(chunk, end="", flush=True)

[32m2024-05-21 08:27:46.877[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m92[0m - [1mUser Id: user-ip[0m
[32m2024-05-21 08:27:46.878[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m93[0m - [1mSession Id: default[0m
[32m2024-05-21 08:27:46.878[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m94[0m - [1mHistory Type: dynamodb[0m


Invoking: `tavily_search_results_json` with `{'query': 'Apple Inc'}`
[{"url": "https://en.wikipedia.org/wiki/Apple_Inc.", "content": "In total, Apple occupies almost 40% of the available office space in the city.[278]\nApple's headquarters for Europe, the Middle East and Africa (EMEA) are located in Cork in the south of Ireland, called the Hollyhill campus.[279] The facility, which opened in 1980, houses 5,500 people and was Apple's first location outside of the United States.[280] Apple's international sales and distribution arms operate out of the campus in Cork.[281]\nApple has two campuses near Austin, Texas: a 216,000-square-foot (20,100 m2) campus opened in 2014 houses 500 engineers who work on Apple silicon[282] and a 1.1-million-square-foot (100,000 m2) campus opened in 2021 where 6,000 people work in technical support, supply chain management, online store curation, and Apple Maps data management.\n As of the end of 2021[update], this broad line of products comprises about 11%

In [15]:
await agent.invoke_agent(
	input_message="Tell me about apple inc",
	history_type="dynamodb",
	user_id="user",
	session_id="default",
)

[32m2024-05-21 08:10:37.913[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m84[0m - [1mUser Id: user[0m
[32m2024-05-21 08:10:37.914[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m85[0m - [1mSession Id: default[0m
[32m2024-05-21 08:10:37.918[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m86[0m - [1mHistory Type: dynamodb[0m


"Apple Inc. is a renowned technology company known for its innovative products and services. Here are some key details about Apple Inc.:\n\n1. **Headquarters and Locations**: Apple's headquarters are situated in Cupertino, California. They have offices and campuses worldwide, including in Europe, the Middle East, Africa, and Texas.\n\n2. **Products and Services**: Apple offers a diverse range of products such as the iPhone, MacBook, Apple Card, and Apple TV+. Additionally, they provide services like AppleCare+, iCloud+, Apple Card credit card, Apple Pay, and digital content services like Apple Music and Apple TV+.\n\n3. **Recent Developments**: Apple unveiled its new VR headset, Vision Pro, and visionOS at WWDC 2023. They also launched 'Apple Pay Later,' a buy now, pay later service for Apple Wallet users in March 2023.\n\n4. **Corporate History**: Apple has a rich history, including the introduction of the Apple II and the popularization of VisiCalc, a spreadsheet program. The company

In [35]:
create_markdown_textbox("hello")

'<div style="border: 1px solid red; padding: 10px; margin: 10px;">\nhello\n</div>\n'