# How to stream runnables

* The LangChain Runnable Interface supports both synchronous and asynchronous streaming methods (stream and astream) to make applications feel more responsive by outputting intermediate results as they’re generated

# Key Components for Streaming in LangChain
* LLMs and Chat Models: Language models are usually the slowest part of LLM applications, so streaming their output token-by-token can improve responsiveness. LangChain's streaming API lets us process each token immediately rather than waiting for the entire output.

* Streaming Methods:

   a. sync stream: Streams the final output in chunks.
 
   b.  async astream: Asynchronous version, for environments where async processing is possible.

In [6]:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o-mini")

chunks = []
for chunk in model.stream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)


|The| color| of| the| sky| can| vary| depending| on| several| factors|,| including| time| of| day|,| weather| conditions|,| and| atmospheric| composition|.| Generally|,| during| a| clear| day|,| the| sky| appears| blue| due| to| Ray|leigh| scattering|,| where| shorter| blue| wavelengths| of| sunlight| are| scattered| in| all| directions| by| the| gases| and| particles| in| the| atmosphere|.| At| sunrise| and| sunset|,| the| sky| can| display| a| range| of| colors|,| including| red|,| orange|,| and| pink|,| due| to| the| angle| of| the| sun| and| the| increased| distance| the| light| travels| through| the| atmosphere|.| On| cloudy| or| storm|y| days|,| the| sky| can| appear| gray| or| over|cast|.||

In [7]:
## Asynchronous streaming

chunks = []
async for chunk in model.astream("what color is the sky?"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

|The| color| of| the| sky| typically| appears| blue| during| the| day| due| to| the| scattering| of| sunlight| by| the| Earth's| atmosphere|.| This| phenomenon|,| known| as| Ray|leigh| scattering|,| causes| shorter| wavelengths| of| light| (|blue| and| violet|)| to| scatter| more| than| longer| wavelengths| (|red|,| orange|,| yellow|).| However|,| the| sky| can| also| appear| in| various| shades|,| such| as| gray| during| over|cast| weather|,| orange| or| pink| during| sunrise| and| sunset|,| and| even| black| at| night| when| the| sun| is| below| the| horizon|.||

In [8]:
chunks[0]

AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-db79cf26-58b0-48b2-93e5-fcd34e8697e7')

# Chains with LangChain Expression Language (LCEL)
Using LangChain’s Expression Language, you can construct chains of components (e.g., prompt templates, models, parsers) that can stream the output seamlessly.

In [9]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)


|Why| did| the| par|rot| wear| a| rain|coat|?

|Because| it| wanted| to| be| a| poly|uns|aturated|!||

# Handling JSON Streaming
To stream JSON output while generating it incrementally:

Use JsonOutputParser to operate on partial JSON.

Use generator functions (yield) to handle partial data and operate on streams incrementally.

* This import brings in JsonOutputParser, a component used to parse the output of language models into JSON. This is especially useful for structured tasks, ensuring the model's response conforms to the JSON format.

* model: This represents a language model, like OpenAI GPT or Hugging Face models, which generates the text output.

* JsonOutputParser(): Acts as a parser in the chain, ensuring that the raw text generated by the model is parsed into valid JSON.

* | (Pipe Operator): Connects components in the LangChain framework, indicating that the output of the model is fed into the JsonOutputParser.

In [16]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    "output a list of the countries france, spain and japan and their populations in JSON format. "
    'Use a dict with an outer key of "countries" which contains a list of countries. '
    "Each country should have the key `name` and `population`"
):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 652}]}
{'countries': [{'name': 'France', 'population': 652735}]}
{'countries': [{'name': 'France', 'population': 65273511}]}
{'countries': [{'name': 'France', 'population': 65273511}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467547}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46754778}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 46754778}, {}]}
{'countries': [{'name': 'France', 'population': 65273511}, {'name': 'Spain', 'population': 467

* chain.astream: This is the asynchronous method to start the chain, generating and processing the model's output.
* Prompt: The input prompt is a clear instruction for the model to generate a specific JSON structure.
* async for: Used for consuming asynchronous iterable results. The chain streams the output incrementally, processing it in chunks rather than waiting for the entire response to complete.
* print(text, flush=True): Prints each chunk of streamed output, ensuring it's displayed immediately.

# Handling Non-streaming Components
Some components, like Retrievers, don’t natively support streaming. For these cases:

Place the non-streaming components early in the chain.

Streaming starts after the last non-streaming step, maintaining partial streaming from there.

LangChain’s streaming approach, along with LCEL chains, allows for flexibility in handling complex LLM applications that feel responsive to users by delivering intermediate outputs rapidly.

In [10]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(metadata={}, page_content='harrison worked at kensho'),
  Document(metadata={}, page_content='harrison likes spicy food')]]

In [12]:
pip install langchain

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [14]:
retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model
    | StrOutputParser()
)

In [15]:
for chunk in retrieval_chain.stream(
    "Where did harrison work? " "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

|H|arrison| worked| at| Kens|ho|.| Kens|ho| is| known| for| its| innovative| approach| to| data| analysis|,| leveraging| advanced| algorithms| to| provide| insights| in| real| time|.| The| atmosphere| in| the| office| is| vibrant| and| collaborative|,| with| team| members| often| brainstorming| ideas| over| lunch|.| Additionally|,| Kens|ho| hosts| regular| workshops| to| keep| employees| updated| on| the| latest| trends| in| technology| and| data| science|.||