# Async & Streaming

Vediamo ora un'argomento neccesario se vogliamo creare dei grafi pronti per la produzione.

## Async

Fin'ora per eseguire un grafo, abbiamo solamente usato .invoke(), che è un metodo sincrono, questo può causare ritardi significativi quando gestiamo molteplici richieste dato che Python esegue in un singolo THREAD.

Vediamo meglio cosa vuol dire:

Con synchronous code, quando un User A fa una richiesta, il sistema invia una richiesta al modello e attende per la risposta. Durante questo tempo di attesa, il THREAD è bloccato e non può gestire un'altra richiesta.

Ad esempio, se avessimo avuto 5 richieste di cui ciascuna prende 2 secondi per processare il sistema le processa una dopo l'altra, risultando in totale di 10 secondi.

![alt text](sync.png)

Con un codice asincrono, un User A fa una richiesta, ma il THREAD non si blocca nell'attesa che la richiesta dell'user A venga risposta.

Ad esempio, quando l'user A invia una richiesta al modello, il sistema può immediatamente gestire altre richieste invece di sedersi in attesa che il modello risponda alla prima richiesta. Invece, il processo cicla lungo le richieste, facendo un avanzamento su tutte esse.

In questo modo, tutte le 5 richieste possono essere processate in concorrenza, completandole in circa 2 secondi.

![alt text](async.png)


Quando rilasciamo in produzione, un agente che gestisce molte richieste utilizzando codice asincrono è assulutamente essenziale. Is MANDATORY for real world applications!

Questo assicura che l'applicazione rimanga responsive ed efficiente anche sotto carico.

## Streaming

Un'altra tecnica spesso usata in applicazioni condivise è lo streaming.

Senza lo streaming, l'utente deve attendere che l'agente abbia generato l'intera risposta prima di vedere qualsiasi cosa nell'UI.

Questo può portare a ritardi, specialmente con output lunghi, e l'utente potrebbe chiudere l'applicazione.

Con lo streaming abilitato, la risposta è inviata all'utente in modo incrementale, token by token mentre viene generata.

Questo consente all'utente di vedere l'output venendo creato in real-time, risultando più fluido e veloce.

Tuttavia, streaming diventa più complicato in workflows che coinvolgono molteplici agenti.

In questi casi, non vogliamo streammare immediatamente i risultati, come dati parzialmente processati o decisioni fatte dall'agente.

Questo significa che l'user deve ancora aspettare che l'intero workflow sia completato prima di ricevere la risposta.

Quindi, mentre lo streaming è incredibilmente utile per agneti conversazionali semplici, può essere non pratico per workflow avanzati dove il risultato finale è l'unico output rilevante.

Per prima cosa creiamo un grafo semplice che non fa uso di streaming nè di operazioni ascinrone.

In modo da vedere cosa dobbiamo cambiare per farle.

In [12]:
from dotenv import load_dotenv
load_dotenv()

True

In [13]:
from typing import TypedDict, Literal
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage

from langgraph.graph import END, START, StateGraph, MessagesState
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode

In [14]:
@tool
def get_weather(location: str):
    """Call to get the current weather."""
    if location.lower() in ["munich"]:
        return "It's 15 degrees Celsius and cloudy."
    else:
        return "It's 32 degrees Celsius and sunny."


In [15]:
tools = [get_weather]
model = ChatOpenAI(model="gpt-4o").bind_tools(tools)

In [20]:
def call_model(state: MessagesState):
    messages = state['messages']
    response = model.invoke(messages)
    return {"messages": [response]}

def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state["messages"]

    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END



In [21]:
workflow = StateGraph(MessagesState)

tool_node = ToolNode(tools)

In [22]:
workflow = StateGraph(MessagesState)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")

workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")

graph = workflow.compile(checkpointer=MemorySaver())

In [23]:
graph.invoke(
    {"messages": [HumanMessage(content="How is the weather in munich?")]},
    config = {"configurable": {"thread_id": 1}}
)

{'messages': [HumanMessage(content='How is the weather in munich?', additional_kwargs={}, response_metadata={}, id='e78880be-96eb-4719-b5ec-d56b61368ac9'),
  AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ldiWQ6RF5Tv7O04bzA9tQcMt', 'function': {'arguments': '{"location":"munich"}', 'name': 'get_weather'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 52, 'total_tokens': 68, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_eb9dce56a8', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-e6a1e731-787e-48fc-b6d0-b4cb83fa174a-0', tool_calls=[{'name': 'get_weather', 'args': {'location': 'munich'}, 'id': 'call_ldiWQ6RF5Tv7O04bzA9tQcMt', 'type': 'tool_call'}], usage_metadata={'inpu

In [24]:
# follow-up
graph.invoke(
    {"messages": [HumanMessage(content="What would you recommend to do in that city then?")]},
    config = {"configurable": {"thread_id": 1}}
)

{'messages': [HumanMessage(content='How is the weather in munich?', additional_kwargs={}, response_metadata={}, id='e78880be-96eb-4719-b5ec-d56b61368ac9'),
  AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_ldiWQ6RF5Tv7O04bzA9tQcMt', 'function': {'arguments': '{"location":"munich"}', 'name': 'get_weather'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 52, 'total_tokens': 68, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_eb9dce56a8', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-e6a1e731-787e-48fc-b6d0-b4cb83fa174a-0', tool_calls=[{'name': 'get_weather', 'args': {'location': 'munich'}, 'id': 'call_ldiWQ6RF5Tv7O04bzA9tQcMt', 'type': 'tool_call'}], usage_metadata={'inpu

### Getting production ready - async and streaming 

Per prima cosa, vogliamo usare lo streaming, ma non tutti i modelli lo supportano.

OpenAI si, basta passare il parametro setraming=True.

Per rendere il nmostro agente asincrono, dobbiamo cambiare la funzione di definizione e come lo invochiamo il modello.

Dunque se vogliamo usare la invocazione asicrona del modello, allora dobbiamo usare il metodo **ainvoke()**, che permette l'utilizzo del codice async.

Inoltre, per rendere il codeice asincrono, dobbiamo definre una funzione async, quindi dobbiamo usare **async def** al posto di solo def per definire la nostra funzione.

In [25]:
model = ChatOpenAI(model="gpt-4o", streaming=True).bind_tools(tools)

In [27]:
async def call_model(state: MessagesState):
    messages = state['messages']
    response = await model.ainvoke(messages)
    return {"messages": [response]}

def should_continue(state: MessagesState) -> Literal["tools", END]:
    messages = state["messages"]

    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END

In [28]:
workflow = StateGraph(MessagesState)

tool_node = ToolNode(tools)

workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")

workflow.add_conditional_edges(
    "agent",
    should_continue,
    path_map={
        "tools": "tools", 
        END: END
    }
)

workflow.add_edge("tools", "agent")

graph = workflow.compile(checkpointer=MemorySaver())

In [29]:
inputs = {"messages": [HumanMessage(content="How is the weather in Munich?")]}

config = {"configurable": {"thread_id": 2}}

In [None]:
# ora invochiamo il nostro grafo asicronomamente
# nel caso fosse pubblico può gestire le invoccazioni del nostro grafo contemporeamente
# invece di far attandere una richiesta di invocazione del grafo finche 
# non risponde a quella precedente
await graph.ainvoke(
    input=inputs,
    config=config
)

{'messages': [HumanMessage(content='How is the weather in Munich?', additional_kwargs={}, response_metadata={}, id='683b6d24-f44a-4c87-97a4-9c2be647557a'),
  AIMessage(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_WXelB9dLnaZ5E0AdnU1pSc0Q', 'function': {'arguments': '{"location":"Munich"}', 'name': 'get_weather'}, 'type': 'function'}]}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_eb9dce56a8'}, id='run-4e47c833-4d65-430b-b99e-3d9f79598b25-0', tool_calls=[{'name': 'get_weather', 'args': {'location': 'Munich'}, 'id': 'call_WXelB9dLnaZ5E0AdnU1pSc0Q', 'type': 'tool_call'}]),
  ToolMessage(content="It's 15 degrees Celsius and cloudy.", name='get_weather', id='4df61a80-5195-46b8-b218-7e0205971974', tool_call_id='call_WXelB9dLnaZ5E0AdnU1pSc0Q'),
  AIMessage(content='The weather in Munich is currently 15 degrees Celsius and cloudy.', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'mode

Ora performiamo lo streaming effettivamente.

In sostanza abbiamo due modi per streammare gli output dal grafo:
1. stream_mode = "update" che streamma specifici stream changes fatto da ciascun nodo.

2. stream_mode = "messages" che streamma token by token outputs dalle invocazioni del chat model.

La funzione graph.astream() crea un interable e quindi possiamo ciclare quello che crea.

Con una applicazione che fa uso di una UI, normalmente si usa "messages" come stream_mode.

In [34]:
inputs = {"messages": [HumanMessage(content="How is the weather in Munich?")]}

async for output in graph.astream(inputs, stream_mode="updates", config=config):
    # stream_mode="updates" yields dictionaries with output keyed by node name
    for key, value in output.items():
        print(f"\nOutput from node '{key}':")
        print("---")
        print(value['messages'][-1].pretty_print())


Output from node 'agent':
---
Tool Calls:
  get_weather (call_cICAgzohnTyeXhVgpY0K6UQ7)
 Call ID: call_cICAgzohnTyeXhVgpY0K6UQ7
  Args:
    location: Munich
None

Output from node 'tools':
---
Name: get_weather

It's 15 degrees Celsius and cloudy.
None

Output from node 'agent':
---

The weather in Munich is currently 15 degrees Celsius and cloudy.
None


In [47]:
from langchain_core.messages import AIMessageChunk, HumanMessage


inputs = [HumanMessage(content="How is the weather in Munich?")]
gathered = None

async for msg, metadata in graph.astream({"messages": inputs}, stream_mode="messages", config=config):
    if msg.content and not isinstance(msg, HumanMessage):
        # I messaggi che crea graph.astream(stream_mode="messages")
        # sono AIMessageChunk, quindi prendiamo token by token
        # Print each token as it streams in 
        # il parametro flush è True altrimenti non
        # potremo vedere lo streaming mechanism live
        print(msg.content, end="|", flush=True)

    # Memorizziamo anche i final results di tale messaggio prodotto dall'llm
    # nella variabile gathered
    # Handle the AI message chunks for proper assembly 
    if isinstance(msg, AIMessageChunk):
        if gathered is None:
            gathered = msg.content
        else:
            gathered = gathered + msg.content 


It's 15 degrees Celsius and cloudy.|The weather in Munich is currently 15 degrees Celsius and cloudy.|

In [48]:
print(gathered)

None
