# Langchain Streaming

深入研究 Langchain 流式输出 [Streaming With LangChain](https://python.langchain.com/docs/expression_language/streaming)

## Setting up local llm on Ollama

In [1]:
%pip install langchain

Note: you may need to restart the kernel to use updated packages.


In [2]:
# 在 Jupyter Notebook 中运行 Python上下文异步代码，需要安装nest_asyncio 包，该包提供对在嵌套事件循环中运行异步代码的支持。
import nest_asyncio

nest_asyncio.apply()

In [3]:
from langchain_community.llms import Ollama
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# LLM model
# See: https://python.langchain.com/docs/integrations/llms/ollama
llm = Ollama(model="mistral:latest")

# prompt与输出
template = """Answer the Question: {question}"""
prompt = ChatPromptTemplate.from_template(template)
output_parser = StrOutputParser()

# 处理流水线
chain = prompt | llm | output_parser

In [4]:
chain.invoke({"question": "介绍一下北京"})

" I'd be happy to introduce you to Beijing, the magnificent capital city of China. Located in the northern part of the country, Beijing is a bustling metropolis with over 21 million residents. It is renowned for its rich history, vibrant culture, and significant role as a political, educational, and cultural center of China.\n\nBeijing is home to many of China's most famous landmarks, including the Forbidden City, which was once the imperial palace and now serves as a UNESCO World Heritage Site; the Temple of Heaven, an ancient religious site and architectural masterpiece; and the Great Wall of China, one of the Seven Wonders of the Medieval World.\n\nThe city is also famous for its delicious cuisine, such as Peking Roast Duck and Jiaozi (dumplings). Beijing's streets come alive with an intoxicating mix of old and new, traditional and modern. Modern high-rises stand proudly alongside ancient temples and hutongs (traditional alleyways).\n\nBeijing is a global city and is home to many in

## 流式输出的 2 个方法

1. astream(): 默认的流式输出方法
2. async astream_events and async astream_log：它们提供了一种从链中传输中间步骤和最终输出的方法。

### 1. astream 方法

需要搭配 async 和 for 循环使用，流式输出结果

In [5]:
chunks = []
async for chunk in chain.astream({"question": "hello. tell me something about yourself"}):
    chunks.append(chunk)
    print(chunk, end="|", flush=True)

 I|'|m| an| artificial| intelligence| designed| to| assist| with| information| and| answer| questions| to| the| best| of| my| ability|.| I| don|'|t| have| a| physical| self| or| personal| experiences|,| so| I| can|'|t| tell| you| about| myself| in| that| way|.| However|,| I| can| provide| you| with| accurate| information| on| a| wide| range| of| topics|,| solve| complex| problems|,| and| even| generate| creative| content|.| Let| me| know| how| I| can| help| you| today|!||

查看不同的 chunk 的结果

In [6]:
chunks[0]

' I'

In [7]:
str = ""
for i in range(20):
    str += chunks[i]
str

" I'm an artificial intelligence designed to assist with information and answer questions to the best of my ability"

#### JSON 数据的流式输出

对于 Function Calling 的场景，需要 llm 返回 json 结构的数据。以下是对 json 输出数据做流式输出的范例：

In [8]:
from langchain_core.output_parsers import JsonOutputParser

chain = prompt | llm | JsonOutputParser()
async for text in chain.astream({"question": 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}):
    print(text, flush=True)

{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'Fr'}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 6}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 670}]}
{'countries': [{'name': 'France', 'population': 6702}]}
{'countries': [{'name': 'France', 'population': 67022}]}
{'countries': [{'name': 'France', 'population': 670220}]}
{'countries': [{'name': 'France', 'population': 6702200}]}
{'countries': [{'name': 'France', 'population': 67022000}]}
{'countries': [{'name': 'France', 'population': 67022000}, {}]}
{'countries': [{'name': 'France', 'population': 67022000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67022000}, {'name': 'Sp'}]}
{'countries': [{'name': 'France', 'population': 67022000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67022000}, {'name': 'Spain', 'population': 4}]}
{'countries': [{'nam

我们继续模拟 Function Calling 的场景，假设程序返回了一个JSON数据，我们需要设计函数提取出其中的关键信息：

In [9]:
# 非流式输出的处理函数
# 直接输出结果
def _extract_country_names(inputs):
    """A function that does not operates on input streams and breaks streaming."""
    if not isinstance(inputs, dict):
        return ""
    
    if "countries" not in inputs:
        return ""
    
    countries = inputs["countries"]

    if not isinstance(countries, list):
        return ""
    
    country_names = [
        country.get("name") for country in countries if isinstance(country, dict)
    ]
    return country_names

chain = prompt | llm | JsonOutputParser() | _extract_country_names

async for text in chain.astream({"question": 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}):
    print(text, end="|", flush=True)

['France', 'Spain', 'Japan']|

In [10]:
# 流式输出函数，使用yield处理
async def _extract_country_names_streaming(input_stream):
    """A function that operates on input streams."""
    country_names_so_far = set()

    async for input in input_stream:
        if not isinstance(input, dict):
            continue

        if "countries" not in input:
            continue

        countries = input["countries"]

        if not isinstance(countries, list):
            continue

        for country in countries:
            name = country.get("name")
            if not name:
                continue
            if name not in country_names_so_far:
                yield name
                country_names_so_far.add(name)

chain =  llm | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream('output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'):
    print(text, end="|", flush=True)

Fr|France|Sp|Spain|J|Japan|

#### Non-streaming components

在不支持 stream 输出的组件中（比如Retrievers），使用 stream 会直接输出最后的结果

In [13]:
%pip install faiss-cpu

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
311.51s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


Collecting faiss-cpu
  Obtaining dependency information for faiss-cpu from https://files.pythonhosted.org/packages/21/d1/f44f02d87d238cd51174f19c55e8a0a3cd1a6207e4eb526c847939986556/faiss_cpu-1.7.4-cp310-cp310-macosx_11_0_arm64.whl.metadata
  Downloading faiss_cpu-1.7.4-cp310-cp310-macosx_11_0_arm64.whl.metadata (1.3 kB)
Downloading faiss_cpu-1.7.4-cp310-cp310-macosx_11_0_arm64.whl (2.7 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m:01[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.7.4
Note: you may need to restart the kernel to use updated packages.


In [11]:
from langchain.embeddings import HuggingFaceEmbeddings

# Embedding Model
modelPath = "BAAI/bge-base-en-v1.5"

# Create a dictionary with model configuration options, specifying to use the CPU for computations
# model_kwargs = {'device':'cpu'}
model_kwargs = {'device':'mps'} # mps是苹果专用的

# Create a dictionary with encoding options, specifically setting 'normalize_embeddings' to False
encode_kwargs = {'normalize_embeddings': True}

# Initialize an instance of HuggingFaceEmbeddings with the specified parameters
embeddings = HuggingFaceEmbeddings(
    model_name=modelPath,     # Provide the pre-trained model's path
    model_kwargs=model_kwargs, # Pass the model configuration options
    encode_kwargs=encode_kwargs # Pass the encoding options
)

Downloading model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/711k [00:00<?, ?B/s]

Downloading tokenizer_config.json:   0%|          | 0.00/366 [00:00<?, ?B/s]

Downloading vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

In [14]:
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
    ["harrison worked at kensho", "harrison likes spicy food"],
    embedding=embeddings,
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks

[[Document(page_content='harrison worked at kensho'),
  Document(page_content='harrison likes spicy food')]]

#### 流水线中使用 “不支持异步组件” 仍可以实现异步输出

In [15]:
from langchain_core.runnables import RunnablePassthrough

retrieval_chain = (
    {
        "context": retriever.with_config(run_name="Docs"),
        "question": RunnablePassthrough(),
    }
    | prompt
    | llm
    | StrOutputParser()
)

In [17]:
for chunk in retrieval_chain.stream(
    "Where did harrison work?" "Write 3 made up sentences about this place."
):
    print(chunk, end="|", flush=True)

 Based| on| the| given| context|,| Harrison| worked| at| K|ens|ho|.| Here| are| three| made|-|up| sentences| about| K|ens|ho|:|
|
|1|.| K|ens|ho| is| a| pione|ering| tech| company| known| for| its| innovative| solutions| in| the| financial| sector|,| where| Harrison| contributed| significantly| as| an| este|emed| team| member|.|
|2|.| The| office| environment| at| K|ens|ho| is| vibr|ant| and| dynamic|,| filled| with| intellectual| discussions| over| cups| of| coffee|,| fuel|ing| creativity| and| collaboration| among| employees| like| Harrison|.|
|3|.| Despite| being| a| cutting|-|edge| tech| company|,| K|ens|ho| maint|ains| a| unique| culture| that| values| cam|ar|ader|ie| and| fun| –| one| day|,| Harrison| even| brought| in| his| favorite| sp|icy| cu|isine| to| share| with| his| team|.||