**Interface**

To make it as easy as possible to create custom chains, we’ve implemented a “Runnable” protocol. The Runnable protocol is implemented for most components. This is a standard interface, which makes it easy to define custom chains as well as invoke them in a standard way. The standard interface includes:

1. **stream**: stream back chunks of the response
2. **invoke**: call the chain on an input
3. **batch**: call the chain on a list of inputs

These also have corresponding async methods:

1. **astream**: stream back chunks of the response async
2. **ainvoke**: call the chain on an input async
3. **abatch**: call the chain on a list of inputs async
4. **astream_log**: stream back intermediate steps as they happen, in addition to the final response
5. **astream_events**: beta stream events as they happen in the chain (introduced in langchain-core 0.1.14)

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

**Input Schema**

A description of the inputs accepted by a Runnable. This is a Pydantic model dynamically generated from the structure of any Runnable. You can call .schema() on it to obtain a JSONSchema representation.

In [None]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

In [None]:
prompt.input_schema.schema()

In [None]:
model.input_schema.schema()

**Output Schema**

A description of the outputs produced by a Runnable. This is a Pydantic model dynamically generated from the structure of any Runnable. You can call .schema() on it to obtain a JSONSchema representation.

In [None]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

**Stream**

In [None]:
for s in chain.stream({"topic": "bears"}):
    print(s.content, end="", flush=True)

**Invoke**

In [None]:
chain.invoke({"topic": "bears"})

**Batch**

In [None]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])

**Async Stream**

In [None]:
async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)

**Async Invoke**

In [None]:
await chain.ainvoke({"topic": "bears"})

**Async Batch**

In [None]:
await chain.abatch([{"topic": "bears"}])

**Parallelism**

Let’s take a look at how LangChain Expression Language supports parallel requests. For example, when using a RunnableParallel (often written as a dictionary) it executes each element in parallel.

In [None]:
from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
    ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)

In [None]:
%%time
chain1.invoke({"topic": "bears"})

In [None]:
%%time
chain2.invoke({"topic": "bears"})

In [None]:
%%time
combined.invoke({"topic": "bears"})

**Parallelism on batches**
Parallelism can be combined with other runnables. Let’s try to use parallelism with batches.

In [None]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

In [None]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

In [None]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

# **Streaming With LangChain**

Streaming is critical in making applications based on LLMs feel responsive to end-users.

Important LangChain primitives like LLMs, parsers, prompts, retrievers, and agents implement the LangChain Runnable Interface.

This interface provides two general approaches to stream content:

1. sync stream and async astream: a default implementation of streaming that streams the final output from the chain.
2. async astream_events and async astream_log: these provide a way to stream both intermediate steps and final output from the chain.

Let’s take a look at both approaches, and try to understand a how to use them. 🥷

**Using Stream**

All Runnable objects implement a sync method called stream and an async variant called astream.

These methods are designed to stream the final output in chunks, yielding each chunk as soon as it is available.

Streaming is only possible if all steps in the program know how to process an input stream; i.e., process an input chunk one at a time, and yield a corresponding output chunk.

The complexity of this processing can vary, from straightforward tasks like emitting tokens produced by an LLM, to more challenging ones like streaming parts of JSON results before the entire JSON is complete.

The best place to start exploring streaming is with the single most important components in LLMs apps– the LLMs themselves!

**LLMs and Chat Models**

Large language models and their chat variants are the primary bottleneck in LLM based apps. 🙊

Large language models can take several seconds to generate a complete response to a query. This is far slower than the ~200-300 ms threshold at which an application feels responsive to an end user.

The key strategy to make the application feel more responsive is to show intermediate progress; e.g., to stream the output from the model token by token.

In [None]:
# Showing the example using anthropic, but you can use
# your favorite chat model!
from langchain_community.chat_models import ChatAnthropic

model = ChatAnthropic()

chunks = []
async for chunk in model.astream("hello. tell me something about yourself"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

In [None]:
chunks[0]

We got back something called an AIMessageChunk. This chunk represents a part of an AIMessage.

Message chunks are additive by design – one can simply add them up to get the state of the response so far!

In [None]:
chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]

**Chains**

Virtually all LLM applications involve more steps than just a call to a language model.

Let’s build a simple chain using LangChain Expression Language (LCEL) that combines a prompt, model and a parser and verify that streaming works.

We will use StrOutputParser to parse the output from the model. This is a simple parser that extracts the content field from an AIMessageChunk, giving us the token returned by the model.

In [None]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
    print(chunk, end="|", flush=True)

**Working with Input Streams**

What if you wanted to stream JSON from the output as it was being generated?

If you were to rely on json.loads to parse the partial json, the parsing would fail as the partial json wouldn’t be valid json.

You’d likely be at a complete loss of what to do and claim that it wasn’t possible to stream JSON.

Well, turns out there is a way to do it – the parser needs to operate on the input stream, and attempt to “auto-complete” the partial json into a valid state.

Let’s see such a parser in action to understand what this means.

In [None]:
from langchain_core.output_parsers import JsonOutputParser

chain = (
    model | JsonOutputParser()
)  # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
    'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
    print(text, flush=True)