### Interface

- **Simplifying Custom Chain Creation with Runnable Protocol**:
  - LangChain introduces the "Runnable" protocol to facilitate the creation and invocation of custom chains.
  - This protocol is implemented in most components, offering a standard interface for both defining and invoking chains.

- **Standard Interface Methods**:
  1. **Synchronous Methods**:
     - `stream`: Streams back chunks of the response.
     - `invoke`: Calls the chain on an input.
     - `batch`: Processes a list of inputs through the chain.
  2. **Asynchronous Methods**:
     - `astream`: Asynchronously streams back response chunks.
     - `ainvoke`: Asynchronously calls the chain on an input.
     - `abatch`: Asynchronously processes a list of inputs.
     - `astream_log`: Streams back intermediate steps and the final response.

- **Input and Output Types by Component**:
  | Component      | Input Type                                  | Output Type       |
  | -------------- | ------------------------------------------- | ----------------- |
  | Prompt         | Dictionary                                  | PromptValue       |
  | ChatModel      | Single string, list of messages, PromptValue | ChatMessage      |
  | LLM            | Single string, list of messages, PromptValue | String            |
  | OutputParser   | LLM/ChatModel output                        | Varies            |
  | Retriever      | Single string                               | List of Documents |
  | Tool           | String or dictionary                        | Varies            |

- **Input and Output Schemas**:
  - All runnables feature auto-generated Pydantic models for inputs (`input_schema`) and outputs (`output_schema`), based on their structure.

- **Practical Example**:
  - To understand these methods, a basic `PromptTemplate + ChatModel` chain can be created and examined.

- **Runnable Schemas in LangChain**:
  - All Runnable components in LangChain provide schemas for input and output inspection:
    - `input_schema`: An auto-generated Pydantic model reflecting the structure of the Runnable, used for input validation.
    - `output_schema`: A similar auto-generated Pydantic model for the Runnable's output.


In [1]:
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

In [3]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [4]:
model.input_schema.schema()

{'title': 'ChatOpenAIInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'A Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'str

In [5]:
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

### Output schema

- **Runnable Output Descriptions in LangChain**:
  - The output produced by a Runnable is defined through a Pydantic model.
  - This model is dynamically generated based on the structure of the Runnable.
  - To access a JSONSchema representation of this output, one can use the `.schema()` method on the Pydantic model.

In [6]:
chain.output_schema.schema()

{'title': 'ChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/AIMessage'},
  {'$ref': '#/definitions/HumanMessage'},
  {'$ref': '#/definitions/ChatMessage'},
  {'$ref': '#/definitions/SystemMessage'},
  {'$ref': '#/definitions/FunctionMessage'},
  {'$ref': '#/definitions/ToolMessage'}],
 'definitions': {'AIMessage': {'title': 'AIMessage',
   'description': 'A Message from an AI.',
   'type': 'object',
   'properties': {'content': {'title': 'Content',
     'anyOf': [{'type': 'string'},
      {'type': 'array',
       'items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}]},
    'additional_kwargs': {'title': 'Additional Kwargs', 'type': 'object'},
    'type': {'title': 'Type',
     'default': 'ai',
     'enum': ['ai'],
     'type': 'string'},
    'example': {'title': 'Example', 'default': False, 'type': 'boolean'}},
   'required': ['content']},
  'HumanMessage': {'title': 'HumanMessage',
   'description': 'A Message from a human.',
   'type': 'object',
   'properties': {'conten

### Stream

In [7]:
for s in chain.stream({"topic": "bears"}):
  print(s.content, end="", flush=True)


Why don't bears wear shoes?

Because they already have bear feet!

### Invoke

In [8]:
chain.invoke(
  {
    "topic": "bears"
  }
)

AIMessage(content="Why don't bears wear shoes?\n\nBecause they already have bear feet!")

### Batch

In [10]:
chain.batch(
  [
    {
      "topic": "bears"
    },
    {
      "topic": "cats"
    }
  ]
)

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they already have bear feet!"),
 AIMessage(content="Sure, here's a joke about cats:\n\nWhy don't cats play poker in the wild?\n\nToo many cheetahs!")]

###

In [14]:
# You can also structure the chain like following
# Then, no need to provide dictionaries
from langchain_core.runnables import RunnablePassthrough
chain_v2 = { "topic": RunnablePassthrough()} | prompt | model

chain_v2.batch(
  [
    "bears",
    "cats",
    "birds",
    "snakes",
    "monkeys"
  ]
)

[AIMessage(content="Why don't bears ever wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Sure, here's a cat-themed joke for you:\n\nWhy don't cats play poker in the wild?\n\nBecause they're afraid of cheetahs!"),
 AIMessage(content="Sure, here's a bird joke for you:\n\nWhy don't seagulls fly over the bay?\n\nBecause then they would be bagels!"),
 AIMessage(content="Why don't snakes ever take time off work?\n\nBecause they don't like to hiss-tory!"),
 AIMessage(content="Why don't monkeys play cards in the wild?\n\nBecause there are too many cheetahs!")]

### Async stream

In [18]:
async for s in chain.astream({"topic": "bears"}):
  print(s.content, end="", flush=True)

Why did the bear bring a ladder to the party?

Because he wanted to "raise" the roof!

### Async invoke

In [19]:
await chain.ainvoke(
  {
    "topic": "bears"
  }
)

AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!")

### Async Batch

In [20]:
await chain_v2.abatch(
  [
    "bellman",
    "ice cream",
    "cars",
    "girls",
    "computers",
    "earth"
  ]
)

[AIMessage(content='Why did the bellman take up singing?\n\nBecause he wanted to be a bell-canto!'),
 AIMessage(content='Why did the ice cream go to therapy?\n\nBecause it had too many meltdowns!'),
 AIMessage(content='Why did the car go to therapy?\nBecause it had too many breakdowns!'),
 AIMessage(content='Why did the girl bring a ladder to the bar?\n\nBecause she heard the drinks were on the house!'),
 AIMessage(content='Why did the computer go to art school?\n\nBecause it had a lot of hard drive!'),
 AIMessage(content="Why did the Earth break up with the Moon? \n\nBecause it couldn't resist its gravitational pull!")]

### Async Stream Intermediate Steps

- **Intermediate Step Streaming in Runnables**:
  - LangChain Runnables feature the `.astream_log()` method, allowing for streaming of intermediate steps in real-time.
  - This functionality is beneficial for several purposes:
    - Showing progress to the user.
    - Utilizing intermediate results.
    - Debugging the chain.
  - Users have the flexibility to stream all steps by default or to selectively include/exclude steps based on name, tags, or metadata.

- **Streaming Output Format**:
  - The `.astream_log()` method produces JSONPatch operations.
  - These operations, when applied in sequence, construct the RunState, offering a comprehensive view of the chain's progress and intermediate states.


In [21]:
from typing import TypedDict, List, Optional, Any, Dict

class LogEntry(TypedDict):

    id: str
    """ID of the sub-run."""
    name: str
    """Name of the object being run."""
    type: str
    """Type of the object being run, eg. prompt, chain, llm, etc."""
    tags: List[str]
    """List of tags for the run."""
    metadata: Dict[str, Any]
    """Key-value pairs of metadata for the run."""
    start_time: str
    """ISO-8601 timestamp of when the run started."""

    streamed_output_str: List[str]
    """List of LLM tokens streamed by this run, if applicable."""
    final_output: Optional[Any]
    """Final output of this run.
    Only available after the run has finished successfully."""
    end_time: Optional[str]
    """ISO-8601 timestamp of when the run ended.
    Only available after the run has finished."""


class RunState(TypedDict):
    id: str
    """ID of the run."""
    streamed_output: List[Any]
    """List of output chunks streamed by Runnable.stream()"""
    final_output: Optional[Any]
    """Final output of the run, usually the result of aggregating (`+`) streamed_output.
    Only available after the run has finished successfully."""

    logs: Dict[str, LogEntry]
    """Map of run names to sub-runs. If filters were supplied, this list will
    contain only the runs that matched the filters."""

### Streaming JSONPatch chunks

- **Streaming JSONPatch in HTTP Servers**:
  - The capability to stream JSONPatch operations from Runnables is particularly useful in web applications.
  - This streaming can be employed in HTTP servers, allowing the client-side to receive and apply these operations.

- **Client-Side Application**:
  - By applying the streamed JSONPatch operations on the client side, it's possible to rebuild and replicate the run state as it evolves.

- **LangServe Integration**:
  - LangChain's LangServe offers specialized tools to simplify the creation of a webserver from any Runnable, enhancing the ease of integrating this streaming capability into web applications.

In [25]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

In [26]:
template = """
Answer the question based only on the following context: {context}.

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)
vectorstore = FAISS.from_texts(["harrison worked at kensho"], embedding=OpenAIEmbeddings())
retreiver = vectorstore.as_retriever()

retrieval_chain = (
  {
    "context": retreiver.with_config(run_name="Docs"),
    "question": RunnablePassthrough()
  }
  | prompt
  | model
  | StrOutputParser()
)

In [27]:
async for chunk in retrieval_chain.astream_log("where did harrison work?", include_names=["Docs"]):
  print("-" * 40)
  print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'c7f8162f-832d-4009-aa08-f725b01939c7',
            'logs': {},
            'streamed_output': []}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': '294ac588-c890-4c05-8a9a-00593212ac97',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2023-12-17T12:33:50.727',
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 {'op': 'add',
  'path': '/logs/Docs/end_time',
  'value': '2023-12-17T12:33:51.034'})
------------------------

### Streamijng the incremental RunState

- **Streaming Incremental RunState**:
  - To obtain incremental values of RunState in a more verbose and detailed format, you can set `diff=False` in the streaming process.
  - This approach results in an output that includes more repetitive parts, providing a comprehensive view of each stage in the RunState.

In [28]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"], diff=False
):
    print("-" * 70)
    print(chunk)

----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '4be41e15-e758-457e-88c3-55f93ece8fbf',
 'logs': {},
 'streamed_output': []})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '4be41e15-e758-457e-88c3-55f93ece8fbf',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': '8c8a5ba7-80d2-4443-ad59-0017386fa3d2',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2023-12-17T12:37:11.723',
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
                   'type': 'retriever'}},
 'streamed_output': []})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': '4be41e15-e758-457e-88c3-55f93ece8fbf',
 'logs': {'Docs': {'end_time': '2023-12-17T12:37:12.007

### Parrarellism

- **Parallelism in LangChain Expression Language**:
  - LangChain Expression Language (LCEL) facilitates parallel processing of requests.
  - Using `RunnableParallel`, often represented as a dictionary, allows for simultaneous execution of each element in the dictionary.
  - This feature is particularly useful for tasks that can benefit from concurrent operations, enhancing efficiency and speed.

In [29]:
from langchain_core.runnables import RunnableParallel

In [30]:
chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
    ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)

In [32]:
%%time
chain1.invoke(
  {
    "topic": "bears"
  }
)

CPU times: user 6.62 ms, sys: 2.17 ms, total: 8.79 ms
Wall time: 655 ms


AIMessage(content="Sure, here's a bear joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they already have bear feet!")

In [33]:
%%time
chain2.invoke(
  {
    "topic": "bears"
  }
)

CPU times: user 7.06 ms, sys: 1.67 ms, total: 8.73 ms
Wall time: 485 ms


AIMessage(content="In forest's embrace,\nMajestic bears leave their trace.")

In [34]:
%%time
combined.invoke({"topic": "bears"})

CPU times: user 30.4 ms, sys: 4.3 ms, total: 34.7 ms
Wall time: 516 ms


{'joke': AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 'poem': AIMessage(content="In forest's embrace, \nMajestic bears leave their trace.")}

### Parallelism on batches

In [35]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 28.7 ms, sys: 3.62 ms, total: 32.3 ms
Wall time: 645 ms


[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
 AIMessage(content="Sure, here's a cat-related joke for you:\n\nWhy don't cats play poker in the wild?\n\nBecause there are too many cheetahs!")]

In [36]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 23.8 ms, sys: 3.72 ms, total: 27.5 ms
Wall time: 560 ms


[AIMessage(content="In deep woods they roam,\nMajestic bears, nature's crown."),
 AIMessage(content='Whiskers dance on moonlit trails,\nSoft purrs weave tales of feline tales.')]

In [37]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: user 54.5 ms, sys: 7.69 ms, total: 62.2 ms
Wall time: 507 ms


[{'joke': AIMessage(content='Why don\'t bears use cell phones? \n\nBecause they already have "paws"ome reception in the woods!'),
  'poem': AIMessage(content='In the wild they roam,\nMajestic bears find their home.')},
 {'joke': AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!"),
  'poem': AIMessage(content='Whiskers soft, eyes so bright,\nCats bring joy, day and night.')}]