#### Runable 协议

In [8]:
from langchain_ollama import OllamaLLM
# from langchain_openai import ChatOpenAI

model = OllamaLLM(model="deepseek-r1:1.5b", base_url="http://localhost:11434")

chunks = []

for chunk in model.stream("天空是什么颜色"):
    chunks.append(chunk)
    print(chunk, end="|", flush=True)
    # print(chunk.content, end="|", flush=True)
    
print("\n",dir(chunk)) # chunk 是一个字符串类型，并不像OpenAI的ChatOpenAI那样有content属性

<think>|
|嗯|，|用户|问|“|天空|是什么|颜色|”，|我|得|先|想想|这个问题|。|首先|，|天空|本身|是不是|颜色|呢|？|我记得|有时候|天空|看起来|像|淡|蓝色|，|但|其实|它|不是|直接|的颜色|。|可能|是因为|光|的|折射|和|散|射|，|使得|光线|显得|比较|柔和|，|而不是|纯粹|的颜色|。

|接下来|，|颜色|这个词|本身|指的是|物体|对|可见|光|的能量|有|吸收|或|反射|的能力|。|而|天空|是否|真的|“|颜色|”|还要|看|具体情况|。|比如|，|如果|我在|下雨|天|，|雨|点|会|发出|不同|颜色|的|光|，|那|天空|的颜色|可能|与|雨|点|的颜色|有关|，|或者|我|看到|的颜色|更多|是|周围的|空气|里的|光线|。

|还有|，|有时候|天空|看起来|很|蓝|，|这是因为|地球|上的|大气|层|吸收|了|蓝|光|，|让|光线|更|柔和|地|反射|回去|，|形成|一片|蓝| sky|。|这种|现象|叫做|天|蓝色|，|而不是|颜色|本身|。

|另外|，|如果|在|月|夜|的时候|，|天空|可能|看起来|像|黑色|的|，|因为|没有|空气中|气体|的|散|射|和|折射|，|所以|更多的|光线|被|吸收|，|导致|整个|天空|比较|暗|淡|。|但这|只是|我的|个人|感受|，并|不是|科学|上的|结论|。

|我还|得|考虑|一下|，|人类|如何|感知|颜色|。|我们的|视觉|系统|无法|直接|感知|“|颜色|”|本身|，|而是|通过|反射|、|折射|和|色|散|来|呈现|物体|的颜色|。|因此|，“|颜色|”|并不是|一种|独立|的存在|，|而|是一种|通过|光线|反射|的现象|。

|另外|，|天空|的颜色|可能会|随|时间|变化|，|比如|春天|天|更|蓝|，|夏天|天|更|绿|，|秋天|天|更|黄|，|冬天|天|更|白|。|这些|现象|也|与|季节|的变化|有关|，|而不是|“|颜色|”的|本身|。

|还有|，|天空|的颜色|可能|受到|天气|条件|的影响|，|比如|下雨|天|、|阴|天|、|晴|天|等|，|都会|影响|天空|的颜色|看起来|如何|。

|所以|，|回到|用户|的问题|，“|天空|是什么|颜色|”，|我|需要|解释|一下|，|因为|天空|不是一个|直接|的颜色|，|而|是一种

In [9]:
chunks[5],chunks[10],chunks[15]

('问', '”，', '这个问题')

##### 异步调用

In [10]:
from langchain_ollama import OllamaLLM
import asyncio
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts.chat import HumanMessagePromptTemplate

# ChatPromptTemplate.from_messages()期望传入的格式极为严格，
# 每个message必须包含role和content属性，并且content属性必须是一个字符串。
# 如果传入的message不满足这些要求，ChatPromptTemplate.from_messages()会抛出一个ValueError异常。


# 修改这部分，使用HumanMessagePromptTemplate创建符合格式的消息
prompt = ChatPromptTemplate.from_messages([
    HumanMessagePromptTemplate.from_template("给我讲一个关于{topic}的故事")
])

model = OllamaLLM(model="deepseek-r1:1.5b", base_url="http://localhost:11434")
# parser = StrOutputParser()
# chain = prompt | model | parser
chain = prompt | model

async def async_stream():
    async for chunk in chain.stream({"topic":"天空"}):
        print(chunk, end="|", flush=True)

# asyncio.run(async_stream()) # chain.stream({"topic":"天空"}) 返回的是一个普通的生成器对象，
# 而不是一个实现了异步迭代协议（即具有 __aiter__ 和 __anext__ 方法）的异步可迭代对象

# 因此，在调用 asyncio.run() 时，Python 会尝试将生成器对象转换为异步可迭代对象，
# 但由于生成器对象本身并不实现异步迭代协议，因此会引发 TypeError 异常。

In [None]:
import asyncio
from langchain_ollama import OllamaLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts.chat import HumanMessagePromptTemplate


# 包装生成器为异步可迭代对象的类
class AsyncGeneratorWrapper:
    def __init__(self, gen):
        # 初始化时传入一个生成器对象
        self.gen = gen

    def __aiter__(self):
        # 使该类的实例成为异步可迭代对象
        return self

    async def __anext__(self):
        try:
            # 使用 asyncio.to_thread 避免阻塞事件循环，将同步的 next 调用放到单独线程执行
            return await asyncio.to_thread(next, self.gen)
        except StopIteration:
            # 当生成器耗尽时，抛出异步迭代结束的异常
            raise StopAsyncIteration
        except Exception as e:
            # 捕获并打印其他可能出现的异常
            print(f"An error occurred: {e}")
            # 出现异常时也结束异步迭代
            raise StopAsyncIteration


# 创建一个聊天提示模板，接收一个 topic 参数，用于生成用户的提问消息
prompt = ChatPromptTemplate.from_messages([
    HumanMessagePromptTemplate.from_template("给我讲一个关于{topic}的故事")
])

# 初始化 Ollama 语言模型，指定模型名称和服务地址
model = OllamaLLM(model="deepseek-r1:1.5b", base_url="http://localhost:11434")
# 初始化输出解析器，将模型的输出解析为字符串
parser = StrOutputParser()
# 构建一个链式处理流程，先使用提示模板处理输入，再经过模型生成输出，最后用解析器解析输出
chain = prompt | model | parser


async def async_stream():
    # 调用链式处理流程的 stream 方法，传入 topic 参数，得到一个生成器
    gen = chain.stream({"topic": "天空"})
    # 将生成器包装成异步可迭代对象
    async_gen = AsyncGeneratorWrapper(gen)
    # 异步迭代生成器，逐块输出内容
    async for chunk in async_gen:
        # 打印每一块内容，以 | 作为分隔符，并立即刷新输出缓冲区
        print(chunk, end="|", flush=True)


# 根据环境决定是否使用 asyncio.run
try:
    # 获取当前正在运行的事件循环
    loop = asyncio.get_running_loop()
    if loop.is_running():
        # 如果事件循环正在运行，创建一个任务来运行异步函数
        loop.create_task(async_stream())
    else:
        # 如果事件循环未运行，使用 asyncio.run 来运行异步函数
        asyncio.run(async_stream())
except RuntimeError:
    # 如果获取运行的事件循环时出现错误，使用 asyncio.run 来运行异步函数
    asyncio.run(async_stream())

<think>|
|嗯|，|用户|让我|给|一个|关于|天空|的故事|。|这|可能|是一个|挺|有趣的|请求|，|因为|天空|通常|是我们|比较|熟悉|的概念|，|但|有时候|我们可以|忽略|它的|伟大|和|独特|之处|。

|首先|，|我|需要|确定|故事|的主题|是什么|。|可能是|从|自然|元素|开始|，|比如|星星|、|阳光|或者|云|朵|，|这样|更容易|引起|读者|的兴趣|。|另外|，|主角|的|设定|也很|重要|，|比如|他|来自|哪里|，|遇到|什么|特别|的事情|或者|事件|，|这样|故事|会|更|生动|有趣|。

|考虑到|用户|可能|希望|故事|有|深度|，|我可以|加入|一些|象征|意义|或|情感|元素|。|例如|，|天空|中的|某个|地方|可能|代表|一个|特定|的人|、|文化|或|历史|背景|，|赋予|了|它|独特的| meaning|。

|接下来|，|我会|构思|一个|简单|但|富有|画面|感|的故事|。|比如|，|主角|来自|一个小| town|，|那里|有一|片|荒|地|，|后来|变成了|一个|重要的|自然|保护区|。|这样|，|故事|不仅|讲述了|天空|的主题|，|还|融入|了|个人|的经历|和|情感|，|让|读者|更容易|产生|共鸣|。

|在|语言|风格|上|，|要|保持|口语|化|，|避免|过于|正式|或|学术|化的|表达|，|让|故事|更|易于|理解和|接受|。|同时|，|通过|细腻|的|描写|，|如|天气|的变化|、|动物|的行为|，|来|生动|描绘|天空|的存在|和|影响|。

|最后|，|确保|故事|有一个|引|人|入|胜|的|开头|，|点|明|主题|，并|逐渐|展开|情节|，|引导|读者|跟随|主角|的情感|和|经历|。|这样|，|一个|关于|天空|的故事|就能|既|符合|用户的|请求|，|又能|引发|读者|的兴趣|。
|</think>|

|好的|！|让我们|讲|一个|关于|天空|的故事|——|关于|一个|特别|的地方|和|它的|伟大|之处|。

|---

|在一个|小| towns|，|有一|片|荒|凉|的|的土地|。|那里|有一|块|巨大的|岩石|，在|阳光|下|闪闪|发|亮|。|这块|岩石|就是|我们|今天|要|讲|的那个|地方|。

|那|年的|夏天|，|天空|格外|蓝|。|它|像|是一位|神秘|的

In [None]:
import asyncio
# 从 langchain_ollama 库导入 OllamaLLM 类，用于调用 Ollama 模型
from langchain_ollama import OllamaLLM
# 从 langchain_core.output_parsers 库导入 JsonOutputParser 类，用于将模型输出解析为 JSON 格式
from langchain_core.output_parsers import JsonOutputParser

# 初始化 Ollama 语言模型，指定模型名称为 deepseek-r1:1.5b，模型服务的基础 URL 为 http://localhost:11434
model = OllamaLLM(model="deepseek-r1:1.5b", base_url="http://localhost:11434")
# 创建一个链式调用，先使用模型进行推理，然后使用 JsonOutputParser 将输出解析为 JSON 格式
chain = (model | JsonOutputParser())

# 定义一个类，用于将同步生成器包装成异步生成器
class AsyncGeneratorWrapper:
    def __init__(self, gen):
        # 初始化时接收一个同步生成器对象
        self.gen = gen

    def __aiter__(self):
        # 使该类的实例成为异步可迭代对象
        return self

    async def __anext__(self):
        try:
            # 使用 asyncio.to_thread 将同步的 next 调用放到单独的线程中执行，避免阻塞事件循环
            return await asyncio.to_thread(next, self.gen)
        except StopIteration:
            # 当同步生成器耗尽时，抛出异步迭代结束的异常
            raise StopAsyncIteration

# 定义一个异步函数，用于流式处理模型的输出
async def async_stream():
    # 调用 chain 的 stream 方法，传入请求内容，得到一个同步生成器，该请求要求以特定 JSON 格式输出国家及人口列表
    gen = chain.stream(
        "以JSON 格式输出法国、西班牙和日本的国家及人口列表。"
        '使用一个带有“countries”外部键的字典，其中包含国家列表。'
        '每个国家都应该有键“name”和“population”。'
    )
    # 将同步生成器包装成异步生成器
    async_gen = AsyncGeneratorWrapper(gen)
    # 异步迭代异步生成器，逐块获取模型输出
    async for text in async_gen:
        # 打印每一块输出，并立即刷新输出缓冲区
        print(text, flush=True)

try:
    # 获取当前正在运行的事件循环
    loop = asyncio.get_running_loop()
    if loop.is_running():
        # 如果事件循环正在运行，创建一个任务来运行异步函数
        loop.create_task(async_stream())
    else:
        # 如果事件循环未运行，使用 asyncio.run 来运行异步函数
        asyncio.run(async_stream())
except RuntimeError:
    # 如果获取运行的事件循环时出现错误，使用 asyncio.run 来运行异步函数
    asyncio.run(async_stream())

Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<async_stream() done, defined at C:\Users\33249\AppData\Local\Temp\ipykernel_11836\4150857872.py:31> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at c:\Users\33249\anaconda3\envs\xhd\Lib\asyncio\futures.py:387, Task.task_wakeup()]>>


{}
{'country': {}}
{'country': {'France': {}}}
{'country': {'France': {'name': ''}}}
{'country': {'France': {'name': 'France'}}}
{'country': {'France': {'name': 'France', 'population': 6}}}
{'country': {'France': {'name': 'France', 'population': 67}}}
{'country': {'France': {'name': 'France', 'population': 670}}}
{'country': {'France': {'name': 'France', 'population': 6700}}}
{'country': {'France': {'name': 'France', 'population': 67000}}}
{'country': {'France': {'name': 'France', 'population': 670000}}}
{'country': {'France': {'name': 'France', 'population': 6700000}}}
{'country': {'France': {'name': 'France', 'population': 67000000}}}
{'country': {'France': {'name': 'France', 'population': 67000000}, 'Spain': {}}}
{'country': {'France': {'name': 'France', 'population': 67000000}, 'Spain': {'name': ''}}}
{'country': {'France': {'name': 'France', 'population': 67000000}, 'Spain': {'name': 'Spain'}}}
{'country': {'France': {'name': 'France', 'population': 67000000}, 'Spain': {'name': 'S

多线程调用

In [None]:

async def async_stream1():
    # 调用链式处理流程的 stream 方法，传入 topic 参数，得到一个生成器
    gen = chain.stream({"topic": "天空"})
    # 将生成器包装成异步可迭代对象
    async_gen = AsyncGeneratorWrapper(gen)
    # 异步迭代生成器，逐块输出内容
    async for chunk in async_gen:
        # 打印每一块内容，以 | 作为分隔符，并立即刷新输出缓冲区
        print(chunk, end="|", flush=True)
        
async def async_stream2():
    gen = chain.stream({"topic": "大海"})
    async_gen = AsyncGeneratorWrapper(gen)
    async for chunk in async_gen:
        print(chunk, end="|", flush=True)
        
async def main():
    # 同步调用
    # await async_stream1()
    # await async_stream2()
    
    # 异步调用（两个协程，它们内部的操作都是
    # 基于异步 I/O 等异步操作来实现的，所以它们代表的是两个协程，而不是线程。）
    await asyncio.gather(async_stream1(), async_stream2())
    
try:
    loop = asyncio.get_running_loop()
    if loop.is_running():
        loop.create_task(async_stream1())
        loop.create_task(async_stream2())
    else:
        asyncio.run(main())
except RuntimeError:
    asyncio.run(main())

#### 事件流

In [5]:
# 从 langchain_ollama 库导入 OllamaLLM 类，用于与 Ollama 模型进行交互
from langchain_ollama import OllamaLLM
# 从 langchain_core.output_parsers 库导入 JsonOutputParser 类，用于将模型输出解析为 JSON 格式
from langchain_core.output_parsers import JsonOutputParser
# 导入 asyncio 库，用于实现异步编程
import asyncio

# 初始化 Ollama 语言模型，指定使用的模型名称为 deepseek - r1:1.5b
# 并设置模型服务的基础 URL 为本地的 http://localhost:11434
model = OllamaLLM(model="deepseek-r1:1.5b", base_url="http://localhost:11434")
# 创建一个链式调用，先使用模型进行推理，然后使用 JsonOutputParser 将输出解析为 JSON 格式
chain = (model | JsonOutputParser())

# 定义一个类，用于将异步生成器包装成另一个异步可迭代对象
# 虽然代码里目前未使用该类，但保留以作后续扩展用途
class AsyncGeneratorWrapper:
    def __init__(self, gen):
        # 初始化时接收一个异步生成器对象
        self.gen = gen

    def __aiter__(self):
        # 使该类的实例成为异步可迭代对象
        return self

    async def __anext__(self):
        # 尝试获取异步生成器的下一个值
        return await self.gen.__anext__()

# 定义一个异步函数，用于流式处理模型的事件
async def async_stream():
    # 初始化一个空列表，用于存储模型返回的事件
    events = []
    # 异步迭代模型的事件流，该事件流是通过向模型输入 "hello" 并指定版本 "v2" 得到的
    async for event in model.astream_events(input="hello", version="v2"):
        # 将每个事件添加到 events 列表中
        events.append(event)
    # 打印存储所有事件的列表
    print(events)

try:
    # 获取当前正在运行的事件循环
    loop = asyncio.get_running_loop()
    if loop.is_running():
        # 如果事件循环正在运行，创建一个任务来运行异步函数
        loop.create_task(async_stream())
    else:
        # 如果事件循环未运行，使用 asyncio.run 来运行异步函数
        asyncio.run(async_stream())
except RuntimeError:
    # 如果获取运行的事件循环时出现错误，使用 asyncio.run 来运行异步函数
    asyncio.run(async_stream())

[{'event': 'on_llm_start', 'data': {'input': 'hello'}, 'name': 'OllamaLLM', 'tags': [], 'run_id': 'c32e4a2e-ecb3-4f2c-a8bc-8ff8e2fc121b', 'metadata': {'ls_provider': 'ollama', 'ls_model_type': 'llm', 'ls_model_name': 'deepseek-r1:1.5b'}, 'parent_ids': []}, {'event': 'on_llm_stream', 'run_id': 'c32e4a2e-ecb3-4f2c-a8bc-8ff8e2fc121b', 'name': 'OllamaLLM', 'tags': [], 'metadata': {'ls_provider': 'ollama', 'ls_model_type': 'llm', 'ls_model_name': 'deepseek-r1:1.5b'}, 'data': {'chunk': '<think>'}, 'parent_ids': []}, {'event': 'on_llm_stream', 'run_id': 'c32e4a2e-ecb3-4f2c-a8bc-8ff8e2fc121b', 'name': 'OllamaLLM', 'tags': [], 'metadata': {'ls_provider': 'ollama', 'ls_model_type': 'llm', 'ls_model_name': 'deepseek-r1:1.5b'}, 'data': {'chunk': '\n\n'}, 'parent_ids': []}, {'event': 'on_llm_stream', 'run_id': 'c32e4a2e-ecb3-4f2c-a8bc-8ff8e2fc121b', 'name': 'OllamaLLM', 'tags': [], 'metadata': {'ls_provider': 'ollama', 'ls_model_type': 'llm', 'ls_model_name': 'deepseek-r1:1.5b'}, 'data': {'chunk': 