https://langchain-ai.github.io/langgraph/how-tos/streaming/#stream-custom-data

In [18]:
from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

joke_model = init_chat_model(model="openai:gpt-4o-mini", tags=["joke"]) 
poem_model = init_chat_model(model="openai:gpt-4o-mini", tags=["poem"]) 


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # Note: Passing the config through explicitly is required for python < 3.11
      # Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config, 
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config, 
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

async for msg, metadata in graph.astream(
      {"topic": "cats"},
      stream_mode="messages", 
):
    if metadata["tags"] == ["joke"]: 
        print(msg.content, end="|", flush=True)

Writing joke...
|Why| was| the| cat| sitting| on| the| computer|?

|Because| it| wanted| to| keep| an| eye| on| the| mouse|!||

Writing poem...


In [19]:
from typing import TypedDict
from langgraph.graph import START, StateGraph 
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # write both the joke and the poem concurrently
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

for msg, metadata in graph.stream( 
    {"topic": "cats"},
    stream_mode="messages",
):
    if msg.content and metadata["langgraph_node"] == "write_poem": 
        print(msg.content, end="|", flush=True)

In| shadows| soft|,| they| weave| and| play|,|  
|With| whispers| light|,| they| greet| the| day|.|  
|A| flick| of| tail|,| a| gentle| p|ounce|,|  
|In| every| corner|,| grace| amounts|.|  

|With| emerald| eyes| like| twilight| dreams|,|  
|They| chase| the| sun| through| golden| beams|.|  
|In| quiet| moments|,| curled| and| warm|,|  
|They| bring| a| hush|,| a| soothing| charm|.|  

|M|yster|ious| hearts| wrapped| in| fur|,|  
|In| every| p|urr|,| the| world|‚Äôs| a| blur|.|  
|Oh|,| feline| friends|,| with| hearts| so| free|,|  
|You|‚Äôve| found| a| place| inside| of| me|.|  |

In [20]:
import asyncio
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

# Inicializamos el modelo con soporte para streaming
model = ChatOpenAI(model="gpt-4o")

# Definimos el estado del grafo
class State(TypedDict):
    topic: str
    joke: str
    poem: str

# Nodo para generar una broma
def write_joke(state: State):
    topic = state["topic"]
    joke_response = model.invoke(
        [{"role": "user", "content": f"Escribe una broma sobre {topic}"}]
    )
    return {"joke": joke_response.content}

# Nodo para generar un poema
def write_poem(state: State):
    topic = state["topic"]
    poem_response = model.invoke(
        [{"role": "user", "content": f"Escribe un poema corto sobre {topic}"}]
    )
    return {"poem": poem_response.content}

# Construimos el grafo
graph = (
    StateGraph(State)
    .add_node(write_joke)
    .add_node(write_poem)
    .add_edge(START, "write_joke")
    .add_edge(START, "write_poem")
    .compile()
)

# Funci√≥n principal para ejecutar el grafo con astream()
async def run_async_stream():
    input_state = {"topic": "cats"}
    
    async for result in graph.astream(
        input_state,
        stream_mode="messages"
    ):
        if isinstance(result, tuple):
            message_chunk, metadata = result
            if metadata["langgraph_node"] == "write_poem":
                print(message_chunk.content, end="|", flush=True)

# Ejecutar la funci√≥n en un entorno notebook
await run_async_stream()


|En| la| sombra| silenc|iosa| del| hogar|,|  
|un| gato| se| ac|urr|uca|,| empieza| a| so|√±ar|.|  
|Con| pasos| suaves| y| ojos| de| luna|,|  
|rec|orre| la| noche|,| su| fiel| fortuna|.|  

|Su| ron|rone|o| es| un| dulce| cantar|,|  
|un| arr|ullo| que| calma|,| te| invita| a| descansar|.|  
|Ma|estro| del| sig|ilo|,| del| salto| preciso|,|  
|en| su| mundo| fel|ino| no| hay| contr|atiem|pos| ni| h|ilos|.|  

|Con| un| salto| elegante| y| mirada| ser|ena|,|  
|expl|ora| su| reino| sin| pr|isa| ni| pena|.|  
|M|ister|ioso| guard|i|√°n| de| secretos| antiguos|,|  
|prote|ge| los| sue√±os|,| amigo| de| siglos|.|  

|Oh|,| gato| en|igm|√°tico| de| andar| tan| ligero|,|  
|en| tu| presencia| encontramos| un| mundo| sinc|ero|.|  
|En|igma| y| tern|ura| en| un| solo| ser|,|  
|en| cada| pel|aje|,| una| historia| por| ver|.|  ||

# Ejemplo m√°s complejo

In [2]:
from typing import TypedDict
from langgraph.graph import START, END, StateGraph
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o", streaming=True)

class State(TypedDict):
    question: str
    formal_question: str
    translated_question: str
    elaboration: str
    summary: str

def generar_pregunta(state: State) -> State:
    return {
        "question": "oye bro, c√≥mo s√© si los congresistas est√°n yendo a chambear?",
        "formal_question": "",
        "translated_question": "",
        "elaboration": "",
        "summary": ""
    }

def reescribir_pregunta(state: State) -> State:
    prompt = [{"role": "user", "content": f"Reformula esta pregunta en lenguaje formal: '{state['question']}'"}]
    response = llm.invoke(prompt)
    return {"formal_question": response.content}

def traducir_pregunta(state: State) -> State:
    prompt = [{"role": "user", "content": f"Traduce esta pregunta al ingl√©s: '{state['formal_question']}'"}]
    response = llm.invoke(prompt)
    return {"translated_question": response.content}

def elaborar_explicacion(state: State) -> State:
    prompt = [{"role": "user", "content": f"Explica en detalle esta pregunta: '{state['translated_question']}'"}]
    response = llm.invoke(prompt)
    return {"elaboration": response.content}

def resumir_intencion(state: State) -> State:
    prompt = [{"role": "user", "content": f"Resume en una l√≠nea la intenci√≥n original de esta pregunta: '{state['question']}'"}]
    response = llm.invoke(prompt)
    return {"summary": response.content}

graph = (
    StateGraph(State)
    .add_node("generar_pregunta", generar_pregunta)
    .add_node("reescribir_pregunta", reescribir_pregunta)
    .add_node("traducir_pregunta", traducir_pregunta)
    .add_node("elaborar_explicacion", elaborar_explicacion)
    .add_node("resumir_intencion", resumir_intencion)
    .add_edge(START, "generar_pregunta")
    .add_edge("generar_pregunta", "reescribir_pregunta")
    .add_edge("reescribir_pregunta", "traducir_pregunta")
    .add_edge("traducir_pregunta", "elaborar_explicacion")
    .add_edge("elaborar_explicacion", "resumir_intencion")
    .add_edge("resumir_intencion", END)
    .compile()
)



In [7]:
import asyncio

async def stream_graph():
    state = {}
    resultado = {
        "reescribir_pregunta": "",
        "traducir_pregunta": "",
        "elaborar_explicacion": "",
        "resumir_intencion": ""
    }

    print("üü¢ Streaming del grafo LangGraph con 5 nodos...\n")

    async for result in graph.astream(state, stream_mode="messages"):
        if isinstance(result, tuple):
            message_chunk, metadata = result
            nodo = metadata.get("langgraph_node")
            if nodo in resultado:
                token = message_chunk.content
                resultado[nodo] += token
                print(f"[{nodo}]: {token}", end="", flush=True)

    print("\n\n‚úÖ Finalizado")
    print("\nüîç Resultados por nodo:")
    for nodo, texto in resultado.items():
        print(f"\nüìå {nodo.upper()}:\n{texto.strip()}")

# Ejecutar en notebook
await stream_graph()

üü¢ Streaming del grafo LangGraph con 5 nodos...

[reescribir_pregunta]: [reescribir_pregunta]: ¬ø[reescribir_pregunta]: Pod[reescribir_pregunta]: r[reescribir_pregunta]: √≠as[reescribir_pregunta]:  indic[reescribir_pregunta]: arme[reescribir_pregunta]:  c√≥mo[reescribir_pregunta]:  puedo[reescribir_pregunta]:  verificar[reescribir_pregunta]:  la[reescribir_pregunta]:  asistencia[reescribir_pregunta]:  de[reescribir_pregunta]:  los[reescribir_pregunta]:  congres[reescribir_pregunta]: istas[reescribir_pregunta]:  a[reescribir_pregunta]:  sus[reescribir_pregunta]:  labores[reescribir_pregunta]: ?[reescribir_pregunta]: [traducir_pregunta]: [traducir_pregunta]: Could[traducir_pregunta]:  you[traducir_pregunta]:  tell[traducir_pregunta]:  me[traducir_pregunta]:  how[traducir_pregunta]:  I[traducir_pregunta]:  can[traducir_pregunta]:  verify[traducir_pregunta]:  the[traducir_pregunta]:  attendance[traducir_pregunta]:  of[traducir_pregunta]:  congress[traducir_pregunta]:  members[traducir_pr

In [9]:
import asyncio

async def stream_ultimo_nodo():
    state = {}
    full_message = ""

    print("üü¢ Streaming solo del nodo final: resumir_intencion...\n")

    async for result in graph.astream(state, stream_mode="messages"):
        if isinstance(result, tuple):
            message_chunk, metadata = result
            nodo = metadata.get("langgraph_node")
            if nodo == "resumir_intencion":
                token = message_chunk.content
                full_message += token
                print(token, end="", flush=True)

    print("\n\n‚úÖ Finalizado")
    print(f"\nüìå Resultado completo del nodo [resumir_intencion]:\n{full_message.strip()}")

# Ejecutar en notebook
await stream_ultimo_nodo()


üü¢ Streaming solo del nodo final: resumir_intencion...

La pregunta busca saber c√≥mo comprobar si los congresistas est√°n cumpliendo con sus funciones laborales.

‚úÖ Finalizado

üìå Resultado completo del nodo [resumir_intencion]:
La pregunta busca saber c√≥mo comprobar si los congresistas est√°n cumpliendo con sus funciones laborales.
