In [None]:
%pip install -U langchain langchain-core langchain-openai langchain-community python-dotenv
%pip install -U langchain-classic

In [None]:
import random
from typing import TypedDict, List, Annotated
import operator
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, AnyMessage
from langchain_openai import ChatOpenAI
from IPython.display import Image, display
from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("API_KEY")
base_url = os.getenv("OPENAI_ENDPOINT")
model_name = "gpt-4o-mini"
temp=0.0

llm = ChatOpenAI(
    base_url=base_url,
    api_key=api_key,
    model=model_name,
    temperature=temp
)

## **Data Processing (Sequential)**
- Both node a and b are updating the output in the State
- They are doing that sequentially here, so there's no problem

In [None]:
class State(TypedDict):
    input: int
    output: int
    
def node_a(state: State):
    input_value = state['input']
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE A:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": output}

def node_b(state: State):
    input_value = state['output'] # Coming from output
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE B:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": output}


In [None]:
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
graph.invoke(
    input = {
        "input": 1,
    }, 
)

## **Data Processing (Parallel)**
- We both node a and b tries to update the State output key value at the same time, we get a problem.


In [None]:
class State(TypedDict):
    input: int
    output: int
    
def node_a(state: State):
    input_value = state['input']
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE A:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": output}

def node_b(state: State):
    input_value = state['input'] # Coming from input
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE B:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": output}


In [None]:
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge(START, "node_b")
workflow.add_edge("node_a", END)
workflow.add_edge("node_b", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
graph.invoke(
    input = {
        "input": 1,
    }, 
)

## **Data Processing with a Reducer**
- To solve the problem, we use a reducer: a function that defines how to merge or update state when multiple nodes modify the same state key
- This is done by using Annotation (see below example): output: Annotated[List[int], operator.add]


In [None]:
help(operator.add)
operator.add([1,2,3],[4,5,6])

In [None]:
class State(TypedDict):
    input: int
    output: Annotated[List[int], operator.add]
    
def node_a(state: State):
    input_value = state['input']
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE A:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": [output]} # now it's a List of ints

def node_b(state: State):
    input_value = state['input'] # Coming from input
    offset = random.randint(1,10)
    output =  input_value + offset
    print(
        f"NODE B:\n "
        f"->input:{input_value}\n " 
        f"->offset:{offset}\n "
        f"->output:{output}\n "
    )
    return {"output": [output]} # now it's a List of ints


In [None]:
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge(START, "node_b")
workflow.add_edge("node_a", END)
workflow.add_edge("node_b", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
graph.invoke(
    input = {
        "input": 1,
    }, 
)

## **Call LLMs with Custom State and Langgraph operator**

In [None]:
# adding two lists of messages
operator.add(
    [
        SystemMessage("You're a helpful assistant"),
        HumanMessage("Hi!")
    ],
    [
        AIMessage("Hello! How can I assist you today?")
    ]
)

In [None]:
# add_messages does the same thing
add_messages(
    left = [
        SystemMessage("You're a helpful assistant"),
        HumanMessage("Hi!")
    ],
    right = AIMessage("Hello! How can I assist you today?")
)

In [None]:
class State(TypedDict):
    messages:Annotated[List[AnyMessage], add_messages]
    
def model(state: State):
    messages = state["messages"]
    response = llm.invoke(messages)
    return {"messages": response}

In [None]:
workflow = StateGraph(State)
workflow.add_node("model", model)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
graph = workflow.compile()
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

In [None]:
result = graph.invoke(
    input={
        "messages": [HumanMessage("What's the name of Ash's first pokémon?")]}, 
)

for message in result['messages']:
    message.pretty_print()

### **Call LLMs with Langgraph MessagesState**
- We can use MessagesState which include messages list and reducer function (add_messages) without having to define them explicitly
- we can still add other data fields to the State class

In [None]:
def model(state: MessagesState):
    messages = state["messages"]
    response = llm.invoke(messages)
    return {"messages": response}

workflow = StateGraph(MessagesState)
workflow.add_node("model", model)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
graph = workflow.compile()

In [None]:
result = graph.invoke(
    input={
        "messages": [HumanMessage("What's the name of Ash's first pokémon?")]}, 
)

for message in result['messages']:
    message.pretty_print()

In [None]:
result = graph.invoke(
    input={
        "messages": [HumanMessage("What's the name of Ash's second pokémon?")]}, 
)

for message in result['messages']:
    message.pretty_print()