### Streaming Basico


In [24]:
%pip install langchain langchain_openai load_dotenv langchain_community tavily-python

Collecting tavily-python
  Downloading tavily_python-0.7.12-py3-none-any.whl.metadata (7.5 kB)
Downloading tavily_python-0.7.12-py3-none-any.whl (15 kB)
Installing collected packages: tavily-python
Successfully installed tavily-python-0.7.12
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [28]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
load_dotenv()
import os

Establecemos una cadena de invocación

In [12]:
prompt = ChatPromptTemplate.from_template("Escribe un refran sobre {tema}")
model = ChatOpenAI(model= os.getenv("OPENAI_MODEL")) # instanciamos al OPENAI_MODEL gpt4omini
output_parser = StrOutputParser()
chain = prompt | model | output_parser

En vez de hacer directamente invoke, vamos a ejecutar stream
y con ello vamos a ir recibiendo chunks de la respuesta progresivamente.
Para obtenerlos necesitamos un objeto Iterable, por ello lo ponemos en un for loop
con eso vamos imprimiendo de a pocos cada chunk

In [13]:
for s in chain.stream({"tema": "camaron"}):
    print(s)


"
Al
 camar
ón
 que
 se
 duer
me
,
 se
 lo
 lleva
 la
 corriente
."





### Streaming con RunnableParallel
Con este ejemplo podremos ver como los chunks de las dos peticiones incluso se van mezclando.
Esto es porque estamos haciendo una ejecucion paralela de ambas cadenas de invocación a los LLMs

In [14]:
from langchain_core.runnables import RunnableParallel

prompt = ChatPromptTemplate.from_template("Dime un refran sobre {tema}")
model = ChatOpenAI(model= os.getenv("OPENAI_MODEL")) # instanciamos al OPENAI_MODEL gpt4omini
output_parser = StrOutputParser()
chain1 = prompt | model | output_parser

prompt = ChatPromptTemplate.from_template("Dame un dicho popular sobre {tema}")
model = ChatOpenAI(model= os.getenv("OPENAI_MODEL")) # instanciamos al OPENAI_MODEL gpt4omini
output_parser = StrOutputParser()
chain2 = prompt | model | output_parser

In [15]:
parallel_chain = RunnableParallel({
    "refran": chain1,
    "dicho": chain2
})

for s in parallel_chain.stream({"tema": "viejo"}):
    print(s)

{'dicho': ''}
{'dicho': '"'}
{'dicho': 'El'}
{'dicho': ' que'}
{'dicho': ' mucho'}
{'dicho': ' ha'}
{'dicho': ' vivido'}
{'dicho': ','}
{'dicho': ' poco'}
{'dicho': ' ha'}
{'dicho': ' olvid'}
{'refran': ''}
{'refran': 'Un'}
{'refran': ' refr'}
{'dicho': 'ado'}
{'dicho': '."'}
{'dicho': ' Este'}
{'dicho': ' dicho'}
{'refran': 'án'}
{'refran': ' que'}
{'dicho': ' res'}
{'dicho': 'alta'}
{'refran': ' habla'}
{'refran': ' sobre'}
{'refran': ' la'}
{'refran': ' experiencia'}
{'dicho': ' la'}
{'dicho': ' sab'}
{'dicho': 'idur'}
{'dicho': 'ía'}
{'refran': ' y'}
{'refran': ' la'}
{'refran': ' sab'}
{'dicho': ' y'}
{'dicho': ' la'}
{'refran': 'idur'}
{'refran': 'ía'}
{'refran': ' de'}
{'dicho': ' experiencia'}
{'dicho': ' que'}
{'refran': ' los'}
{'refran': ' mayores'}
{'refran': ' es'}
{'refran': ':'}
{'dicho': ' acumul'}
{'dicho': 'amos'}
{'refran': ' "'}
{'refran': 'El'}
{'dicho': ' con'}
{'dicho': ' los'}
{'dicho': ' años'}
{'refran': ' saber'}
{'dicho': '.'}
{'refran': ' no'}
{'dicho': ''}

Esta es una visualizacion progresiva de como va agregando chunks y va generando un diccionario

In [16]:
result = {}
for s in parallel_chain.stream({"tema": "viejo"}):
    for k,v in s.items():
        if k not in result:
            result[k] = ""
        result[k] += v
    print(result)

{'dicho': ''}
{'dicho': 'Un'}
{'dicho': 'Un dicho'}
{'dicho': 'Un dicho', 'refran': ''}
{'dicho': 'Un dicho popular', 'refran': ''}
{'dicho': 'Un dicho popular sobre', 'refran': ''}
{'dicho': 'Un dicho popular sobre', 'refran': '"'}
{'dicho': 'Un dicho popular sobre', 'refran': '"El'}
{'dicho': 'Un dicho popular sobre los', 'refran': '"El'}
{'dicho': 'Un dicho popular sobre los vie', 'refran': '"El'}
{'dicho': 'Un dicho popular sobre los vie', 'refran': '"El que'}
{'dicho': 'Un dicho popular sobre los vie', 'refran': '"El que mucho'}
{'dicho': 'Un dicho popular sobre los viejos', 'refran': '"El que mucho'}
{'dicho': 'Un dicho popular sobre los viejos es', 'refran': '"El que mucho'}
{'dicho': 'Un dicho popular sobre los viejos es:', 'refran': '"El que mucho'}
{'dicho': 'Un dicho popular sobre los viejos es: "', 'refran': '"El que mucho'}
{'dicho': 'Un dicho popular sobre los viejos es: "', 'refran': '"El que mucho habla'}
{'dicho': 'Un dicho popular sobre los viejos es: "', 'refran': '"

### Stream Log
Ahora revisaremos como pasar cierto contexto al recibir un stream de datos.
Es muy util cuando deseamos retornar pasos intermedios. Es decir cuando tenemos pasos intermedios como en RAG.
Cuando por ejemplo podemos mostrar al usuario el trabajo que vamos haciendo al momento de recuperar documentos.

In [35]:
from langchain_community.retrievers.tavily_search_api import TavilySearchAPIRetriever
from langchain_core.runnables import RunnablePassthrough

retriever= TavilySearchAPIRetriever()

prompt = ChatPromptTemplate.from_template("""Responde la pregunta basado solamente en el contexto provisto:
                                          Contexto: {contexto}
                                          Pregunta: {pregunta}""")

chain = prompt | model | output_parser #Esta cadena requiere contexto del problema para ser ejecutada.

# Y con esta otra cadena vamos a obtener dicho contexto por medio de la ejecucion del retriever.
# Es una cadena que envuelve a la otra para entregarlo como contexto.
retrieval_chain = RunnablePassthrough.assign( 
    contexto=(lambda x: x["pregunta"]) | retriever.with_config(run_name="Docs")
) | chain

In [36]:
for s in retrieval_chain.stream({"pregunta": "que es langsmith?"}):
    print(s, end="")

LangSmith es una plataforma unificada diseñada para facilitar la depuración, prueba, evaluación y monitoreo de aplicaciones impulsadas por modelos de lenguaje (LLMs). Ofrece herramientas poderosas que permiten a los desarrolladores gestionar y optimizar flujos de trabajo en aplicaciones de IA, asegurando un funcionamiento eficaz en entornos empresariales. Con una interfaz intuitiva y herramientas de visualización, LangSmith ayuda a los usuarios a entender y depurar complicados sistemas de IA, permitiendo que se enfoquen en mejorar la experiencia del usuario durante el desarrollo.

Con Stream log podemos incluso obtener informacion relevante del documento que viene referenciando y tambien podemos conocer su paso a paso para componer la respuesta final

In [37]:
async for s in retrieval_chain.astream_log({"pregunta": "que es langsmith?"}):
    print(s, end="")

RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '826ec263-ed09-432a-9f75-56760df9c857',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})RunLogPatch({'op': 'add',
  'path': '/logs/RunnableAssign<contexto>',
  'value': {'end_time': None,
            'final_output': None,
            'id': '26958355-5fa3-4bf2-b999-5cb92b29093b',
            'metadata': {},
            'name': 'RunnableAssign<contexto>',
            'start_time': '2025-11-04T07:02:03.417+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['seq:step:1'],
            'type': 'chain'}})RunLogPatch({'op': 'add',
  'path': '/logs/RunnableAssign<contexto>/streamed_output/-',
  'value': {'pregunta': 'que es langsmith?'}})RunLogPatch({'op': 'add',
  'path': '/logs/RunnableParallel<contexto>',
  'value': {'end_time': None,
            'final_output': None

Ahora agregamos las citaciones obtenidas dentro del mismo stream log

In [38]:
async for s in retrieval_chain.astream_log({"pregunta": "que es langsmith?"}, include_names=["Docs"]):
    print(s, end="")

RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': '8a395d94-a023-42ef-9cf9-600570df9255',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': '61f5774c-2322-4deb-a532-e91f406bd763',
            'metadata': {'ls_retriever_name': 'tavilysearchapi'},
            'name': 'Docs',
            'start_time': '2025-11-04T07:03:18.161+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['seq:step:2'],
            'type': 'retriever'}})RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(metadata={'title': 'Explora LangSmith: Tu Puente a Aplicaciones con LLMs Listas para ...', 'source': 'https://medium.com/@augustoromero/explora-langsmith-tu-puente-a-ap

### AGENTES
Ahora veremos como suele manejarse dentro de stream de resultados de herramientas.

LangChain nos permite tener feedback en tiempo real de la ejecucion de nuestro agente.
Aqui lo que es posible con el streaming de Langchain:

- 👉 Stream agent progress — obtiene la actualización de estado despues de cada paso del agente.
- 👉 Stream LLM tokens — recibir el stream de los tokens generados por el llm.
- 👉 Stream custom updates — emitir señales personalizadas por usuario (e.g., "Fetched 10/100 records")
- 👉 Stream multiple modes — escoger de entre "updates"(progreso del agente), messages (LLM tokens + metadata), o custom (data de usuario personalizada).

Ref> https://docs.langchain.com/oss/python/langchain/streaming


In [70]:
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from datetime import datetime

@tool
def get_systemtime():
    """
    Returns today's system date
    """
    return datetime.today()

search = TavilySearchResults()
tools = [search, get_systemtime]

prompt = f"""Eres un asistente servicial que resuelves consultas usando tus herramientas"""

llm = ChatOpenAI(model=os.getenv("OPENAI_MODEL"), temperature=0)

agent = create_agent(
    model= os.getenv("OPENAI_MODEL"),
    tools= tools,
    system_prompt=prompt)


In [71]:
for stream_mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "cual es el clima hoy en Seattle Washington?"}]},
    stream_mode=["updates", "custom", "final_output"] # Stream de varios tipos de eventos
):
    print(f"Stream Mode: {stream_mode}")
    for step, data in chunk.items():
        print(f"step: {step}")
        print(f"Content: {chunk}")
    print("-" * 200)

Stream Mode: updates
step: model
Content: {'model': {'messages': [AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 119, 'total_tokens': 142, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_provider': 'openai', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_560af6e559', 'id': 'chatcmpl-CY6EK7VjVF8FqaQduqhz89fWifdO5', 'service_tier': 'default', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--c038ab29-bf23-4ee6-aa69-99366194cccb-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'clima hoy en Seattle Washington'}, 'id': 'call_4KEXJqGt7gzwTQXXRUe41Df5', 'type': 'tool_call'}], usage_metadata={'input_tokens': 119, 'output_tokens': 23, 'total_tokens': 142, 'input_token_details': {'audio': 0,

## Stream tokens

In [72]:
for token, metadata in agent.stream(  
    {"messages": [{"role": "user", "content": "Cual es el clima en Cusco, Peru?"}]},
    stream_mode="messages",
):
    print(f"node: {metadata['langgraph_node']}")
    print(f"content: {token.content_blocks}")
    print("\n")

node: model
content: [{'type': 'tool_call_chunk', 'id': 'call_sTssKUqXSQaub40p4dBSrVBS', 'name': 'tavily_search_results_json', 'args': '', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '{"', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'query', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '":"', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'cl', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'ima', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': ' en', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': ' Cus', 'index': 0}]


node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'co', 'index': 0}]


In [75]:
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()

client = OpenAI()
audio_file = open("./audio/audio_en.mp3", "rb")

stream = client.audio.transcriptions.create(
  model="gpt-4o-mini-transcribe", 
  file=audio_file, 
  response_format="text",
  stream=True
)

for event in stream:
    try:
        print(event.delta, end="")
    except:
        print("\n====================================")



O AGI, the artificial mind, the ghost in the machine, the future of mankind. From lines of text to a conscious flow, where the human story's gonna go. Is it a servant or a brand new sovereign? A tool we use or a friend we greet? The questions echo in the halls of progress as the old world and new world meet. We hold our breath for the future shaping, with wonder and a touch of fear. A new chapter starts, no escaping, the future we built is finally here.
