In [3]:
from typing import Union, List, Annotated, Sequence, TypedDict

from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import BaseTool, Tool, tool
from langchain.tools.render import format_tool_to_openai_function
from langchain.chains.conversation.memory import ConversationBufferWindowMemory

from langgraph.graph import END, Graph, StateGraph
from langgraph.prebuilt import ToolExecutor, ToolInvocation
from langgraph.graph import END, MessageGraph

from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import ToolMessage, HumanMessage, BaseMessage, FunctionMessage
from langchain_core.tools import tool, BaseTool
from langchain_core.utils.function_calling import convert_to_openai_function, convert_to_openai_tool

from langchain_community.llms import Ollama
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.tools import ShellTool
from langchain_experimental.llms.ollama_functions import OllamaFunctions

import json
import subprocess
import operator
# import streamlit as st

def create_agent(llm, tools, system_message):
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful AI assistant, helping the user accomplish their task."
                " Use the provided tools to progress towards answering the question."
                " You have access to the following tools: {tool_names}.\n{system_message}",
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
    prompt = prompt.partial(system_message=system_message)

    llm
    return prompt | llm

# @tool
def multiply_calculator(
    self, number1: Union[int, float], number2: Union[int, float]
) -> Union[int, float]:
    """
    This is the multiply calculator. It takes two numbers and returns their product.
    """
    return number1 * number2

class Multiply(BaseTool):
    name = "Multiply calculator"
    description = "use this tool when you need to make a multiplication"

    def _run(
        self, number1: Union[int, float], number2: Union[int, float]
    ) -> Union[int, float]:
        return number1 * number2

    def _arun(self, number1: Union[int, float], number2: Union[int, float]):
        raise NotImplementedError("This tool does not support async")

multiply_tools_dict = {
    "name": "multiply_calculator",
    "description": Multiply().description,
    "parameters": {
        "type": "object",
        "properties": {
            "number1": {"type": ["integer", "number"]},
            "number2": {"type": ["integer", "number"]}
        },
        "required": ["number1", "number2"]
    }
}

multiply_function_call = {"name": "multiply_calculator"}

In [5]:
model = OllamaFunctions(model="llama3:8b", format="json")
# model = model.bind_tools(
#     tools=multiply_tools_dict,
#     function_call=multiply_function_call
# )
# One experimental tool
# model = model.bind_tools(
#     tools=[
#         {
#             "name": "multiply_calculator",
#             "description": "use this tool when you need to make a multiplication",
#             "parameters": {
#                 "type": "object",
#                 "properties": {
#                     "number1": {
#                         "type": "int",
#                         "description": "First number",
#                     },
#                     "number2": {
#                         "type": "int",
#                         "description": "Second number",
#                     },
#                 },
#                 "required": ["number1", "number2"],
#             },
#         }
#     ],
#     function_call={"name": "multiply_calculator"},
# )
model.invoke("hi")

In [None]:
# LLM
# model = Ollama(model="llama3:8b")
# Tools
tools = [Multiply()]
# Initialize the tool executor
tool_executor = ToolExecutor(tools=tools)

# Binding functions
# functions = [convert_to_openai_function(t) for t in tools]
# model = model.bind(tools=functions)

# Create agent
llm = create_agent(
    llm=model,
    tools=tools,
    system_message=""
)


In [None]:
inputs = {"messages": [HumanMessage(content="Hi how are you doing?")]}
llm.invoke(inputs)

In [None]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state['messages']
    last_message = messages[-1]
    # If there is no function call, then we finish
    if "function_call" not in last_message.additional_kwargs:
        return "end"
    # Otherwise if there is, we continue
    else:
        return "continue"

# Define the function that calls the model
def call_model(state):
    messages = state['messages']
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}

# Define the function to execute tools
def call_tool(state):
    messages = state['messages']
    # Based on the continue condition
    # we know the last message involves a function call
    last_message = messages[-1]
    # We construct an ToolInvocation from the function_call
    action = ToolInvocation(
        tool=last_message.additional_kwargs["function_call"]["name"],
        tool_input=json.loads(last_message.additional_kwargs["function_call"]["arguments"]),
    )
    # We call the tool_executor and get back a response
    response = tool_executor.invoke(action)
    # We use the response to create a FunctionMessage
    function_message = FunctionMessage(content=str(response), name=action.tool)
    # We return a list, because this will get added to the existing list
    return {"messages": [function_message]}



In [None]:
# initialize conversational memory
# conversational_memory = ConversationBufferWindowMemory(
#     memory_key="chat_history", k=5, return_messages=True
# )

# initialize the graph
graph = StateGraph(AgentState)

graph.add_node("agent", call_model)
graph.add_node("action", call_tool)

graph.add_conditional_edges(
    "agent",
    should_continue,
    {
        "continue": "action",
        "end": END,
    },
)
graph.add_edge("action", "agent")

graph.set_entry_point("agent")

runnable = graph.compile()

In [None]:
runnable.get_graph().print_ascii()

In [None]:
inputs = {"messages": [HumanMessage(content="Hi how are you doing?")]}
runnable.invoke(inputs)

In [None]:

# To get each step
for output in runnable.stream(inputs):
    # stream() yields dictionaries with output keyed by node name
    for key, value in output.items():
        print(f"Output from node '{key}':")
        print("---")
        print(value)
    print("\n---\n")