# Runnable Interface

https://python.langchain.com/docs/expression_language/interface/

Runnable是一个标准的接口定义. 实现了以下方法的就是一个Runnable:
- stream
- invoke
- batch

对应的异步方法包括:
- astream
- ainvoke
- abatch
- astream_log
- astream_events

接口未定义输入类型和输出类型，不同组件有不同的输入输出类型。


In [1]:
import os

# 初始化
API_KEY = os.getenv("UNION_API_KEY")
BASE_URL = os.getenv("UNION_BASE_URL")

## 一个简单的 PromptTemplate + ChatModel 链


In [3]:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model

### Input Schema: Runnable的输入的结构

In [4]:
# The input schema of the chain is the input schema of its first part, the prompt.
chain.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [5]:
prompt.input_schema.schema()

{'title': 'PromptInput',
 'type': 'object',
 'properties': {'topic': {'title': 'Topic', 'type': 'string'}}}

In [10]:
model.input_schema.schema()

{'title': 'ChatOpenAIInput',
 'anyOf': [{'type': 'string'},
  {'$ref': '#/definitions/StringPromptValue'},
  {'$ref': '#/definitions/ChatPromptValueConcrete'},
  {'type': 'array',
   'items': {'anyOf': [{'$ref': '#/definitions/AIMessage'},
     {'$ref': '#/definitions/HumanMessage'},
     {'$ref': '#/definitions/ChatMessage'},
     {'$ref': '#/definitions/SystemMessage'},
     {'$ref': '#/definitions/FunctionMessage'},
     {'$ref': '#/definitions/ToolMessage'}]}}],
 'definitions': {'StringPromptValue': {'title': 'StringPromptValue',
   'description': 'String prompt value.',
   'type': 'object',
   'properties': {'text': {'title': 'Text', 'type': 'string'},
    'type': {'title': 'Type',
     'default': 'StringPromptValue',
     'enum': ['StringPromptValue'],
     'type': 'string'}},
   'required': ['text']},
  'ToolCall': {'title': 'ToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'object'},
    'id': {

### Output Schema: 输出的结构

In [11]:
# The output schema of the chain is the output schema of its last part, in this case a ChatModel, which outputs a ChatMessage
chain.output_schema.schema()

{'title': 'ChatOpenAIOutput',
 'anyOf': [{'$ref': '#/definitions/AIMessage'},
  {'$ref': '#/definitions/HumanMessage'},
  {'$ref': '#/definitions/ChatMessage'},
  {'$ref': '#/definitions/SystemMessage'},
  {'$ref': '#/definitions/FunctionMessage'},
  {'$ref': '#/definitions/ToolMessage'}],
 'definitions': {'ToolCall': {'title': 'ToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'object'},
    'id': {'title': 'Id', 'type': 'string'}},
   'required': ['name', 'args', 'id']},
  'InvalidToolCall': {'title': 'InvalidToolCall',
   'type': 'object',
   'properties': {'name': {'title': 'Name', 'type': 'string'},
    'args': {'title': 'Args', 'type': 'string'},
    'id': {'title': 'Id', 'type': 'string'},
    'error': {'title': 'Error', 'type': 'string'}},
   'required': ['name', 'args', 'id', 'error']},
  'AIMessage': {'title': 'AIMessage',
   'description': 'Message from an AI.',
   'type': 'object',
   'proper

### Stream

In [12]:
for s in chain.stream({"topic": "bears"}):
    print(s.content, end="", flush=True)

Why did the bear bring a ladder to the picnic?

Because it wanted to reach the "bear" essentials on the top shelf!

### Invoke

In [13]:
chain.invoke({"topic": "bears"})

AIMessage(content="Why don't bears like fast food?\n\nBecause they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-da19421b-3e69-46fc-ae5c-608feb8f992d-0')

### Batch

In [14]:
chain.batch([{"topic": "bears"}, {"topic": "cats"}])

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they already have bear feet!", response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 13, 'total_tokens': 27}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-1a47bc14-930c-470c-a670-23b774eb0f23-0'),
 AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!", response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 13, 'total_tokens': 30}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-9cc37846-936e-443d-b602-2c03b5853c0a-0')]

In [15]:
# 设置最大并发数
chain.batch([{"topic": "bears"}, {"topic": "cats"}], config={"max_concurrency": 5})

[AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 13, 'prompt_tokens': 13, 'total_tokens': 26}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-d285000c-a1ff-4d5c-9246-8446161190a5-0'),
 AIMessage(content="Sure, here's a cat joke for you:\n\nWhy don't cats play poker in the wild?\n\nToo many cheetahs!", response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 13, 'total_tokens': 40}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-286593bb-e1c1-4796-b6db-554c838f2b8c-0')]

### Async Stream

In [16]:
async for s in chain.astream({"topic": "bears"}):
    print(s.content, end="", flush=True)

Sure, here's a bear-related joke for you:

Why don't bears like fast food?

Because they can't catch it!

### Async Invoke

In [17]:
await chain.ainvoke({"topic": "bears"})

AIMessage(content="Why don't bears wear shoes?\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 13, 'prompt_tokens': 13, 'total_tokens': 26}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-895dcae5-abd3-493e-a15b-89bfb9b803a2-0')

### Async Batch

In [18]:
await chain.abatch([{"topic": "bears"}])

[AIMessage(content="Why don't bears wear shoes? \n\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 14, 'prompt_tokens': 13, 'total_tokens': 27}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-f3fd2f70-ab99-4243-9b9a-9ebb231fee26-0')]

### Async Stream Event

In [21]:
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | model.with_config(run_name="my_llm")
    | StrOutputParser()
)

async for event in retrieval_chain.astream_events(
    "where did harrison work?", version="v1", include_names=["Docs", "my_llm"]
):
    kind = event["event"]
    if kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="|")
    elif kind in {"on_chat_model_start"}:
        print()
        print("Streaming LLM:")
    elif kind in {"on_chat_model_end"}:
        print()
        print("Done streaming LLM.")
    elif kind == "on_retriever_end":
        print("--")
        print("Retrieved the following documents:")
        print(event["data"]["output"]["documents"])
    elif kind == "on_tool_end":
        print(f"Ended tool: {event['name']}")
    else:
        pass

  warn_beta(


--
Retrieved the following documents:
[Document(page_content='harrison worked at kensho')]

Streaming LLM:
|H|arrison| worked| at| Kens|ho|.||
Done streaming LLM.


### Async Stream Intermediate Steps

In [23]:
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"]
):
    print("-" * 40)
    print(chunk)

----------------------------------------
RunLogPatch({'op': 'replace',
  'path': '',
  'value': {'final_output': None,
            'id': 'ea903ee7-5101-498e-b9e0-f05e89db5fdb',
            'logs': {},
            'name': 'RunnableSequence',
            'streamed_output': [],
            'type': 'chain'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs',
  'value': {'end_time': None,
            'final_output': None,
            'id': 'eaf4636f-20be-4b63-af84-b51a477d16e4',
            'metadata': {},
            'name': 'Docs',
            'start_time': '2024-04-16T03:48:52.996+00:00',
            'streamed_output': [],
            'streamed_output_str': [],
            'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
            'type': 'retriever'}})
----------------------------------------
RunLogPatch({'op': 'add',
  'path': '/logs/Docs/final_output',
  'value': {'documents': [Document(page_content='harrison worked at kensho')]}},
 

In [24]:
# 设置 diff=False，使用流式传输增量运行状态
async for chunk in retrieval_chain.astream_log(
    "where did harrison work?", include_names=["Docs"], diff=False
):
    print("-" * 70)
    print(chunk)

----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': 'b65b7748-12be-4319-8eda-8552fb2f2da7',
 'logs': {},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
----------------------------------------------------------------------
RunLog({'final_output': None,
 'id': 'b65b7748-12be-4319-8eda-8552fb2f2da7',
 'logs': {'Docs': {'end_time': None,
                   'final_output': None,
                   'id': '692f422b-f16f-45f2-ab77-4e3126a4838a',
                   'metadata': {},
                   'name': 'Docs',
                   'start_time': '2024-04-16T03:50:31.817+00:00',
                   'streamed_output': [],
                   'streamed_output_str': [],
                   'tags': ['map:key:context', 'FAISS', 'OpenAIEmbeddings'],
                   'type': 'retriever'}},
 'name': 'RunnableSequence',
 'streamed_output': [],
 'type': 'chain'})
-------------------------------------------------------------

### Parallelism - 并行

In [30]:
from langchain_core.runnables import RunnableParallel

chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
    ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
    | model
)
combined = RunnableParallel(joke=chain1, poem=chain2)


In [31]:
%%time
chain1.invoke({"topic": "python"})

CPU times: total: 0 ns
Wall time: 1.19 s


AIMessage(content="Why did the python programmer always carry a notebook?\n\nBecause they couldn't remember anything without a list!", response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 13, 'total_tokens': 33}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-9e04677f-215e-4dcf-8256-409292096566-0')

In [32]:
%%time
chain2.invoke({"topic": "python"})

CPU times: total: 0 ns
Wall time: 844 ms


AIMessage(content='Python slithers, agile and sly,\nIn code it dances, creating its own sky.', response_metadata={'token_usage': {'completion_tokens': 20, 'prompt_tokens': 17, 'total_tokens': 37}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-d33fb56a-a276-43ac-895a-b7bae9c87c15-0')

In [33]:
%%time
combined.invoke({"topic": "bears"})

CPU times: total: 0 ns
Wall time: 1.88 s


{'joke': AIMessage(content="Sure, here's a bear-related joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 13, 'total_tokens': 37}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-454af089-ef36-495f-8505-52e9f918f745-0'),
 'poem': AIMessage(content="Majestic bears roam,\nNature's strength, beauty, and home.", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 17, 'total_tokens': 32}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-ef200da3-ceb6-43a1-8259-a6230a216da0-0')}

### Parallelism on batches - 批处理并行

In [34]:
%%time
chain1.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: total: 0 ns
Wall time: 931 ms


[AIMessage(content="Sure, here's a bear joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 13, 'total_tokens': 36}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-10523a5d-9f53-4c58-b426-b930e040f150-0'),
 AIMessage(content="Sure, here's a cat joke for you:\n\nWhy don't cats play poker in the wild?\n\nToo many cheetahs!", response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 13, 'total_tokens': 40}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-4a694810-5d2b-4d13-9856-0c4bc3d23698-0')]

In [35]:
%%time
chain2.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: total: 0 ns
Wall time: 1.07 s


[AIMessage(content='Gentle giants roam,\nWild grace in fur and bone.', response_metadata={'token_usage': {'completion_tokens': 13, 'prompt_tokens': 17, 'total_tokens': 30}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-63c4fd89-3b39-4522-9666-5ac6867acf61-0'),
 AIMessage(content='Whiskers dance, eyes gleam,\nSilent grace, feline dream.', response_metadata={'token_usage': {'completion_tokens': 17, 'prompt_tokens': 17, 'total_tokens': 34}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-4df8fc4c-9fd0-4450-a0ee-cb3b82d7fc23-0')]

In [36]:
%%time
combined.batch([{"topic": "bears"}, {"topic": "cats"}])

CPU times: total: 0 ns
Wall time: 1.25 s


[{'joke': AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!", response_metadata={'token_usage': {'completion_tokens': 13, 'prompt_tokens': 13, 'total_tokens': 26}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-ddf615a3-9afb-412d-92f0-329d77bc998d-0'),
  'poem': AIMessage(content='In forests deep, bears wander free,\nSymbols of strength, wild majesty.', response_metadata={'token_usage': {'completion_tokens': 16, 'prompt_tokens': 17, 'total_tokens': 33}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-0a5b9c63-841f-4b4f-a96b-a873f065507f-0')},
 {'joke': AIMessage(content="Sure, here's a cat joke for you:\n\nWhy was the cat sitting on the computer?\n\nBecause it wanted to keep an eye on the mouse!", response_metadata={'token_usage': {'completion_tokens': 30, 'prompt_tokens': 13, 'total_tokens': 43}, 'model_name': 'gpt-3.5-turbo', 'sys