# prompt cache 提示缓存与streaming流式输出

在大型语言模型(LLM)的推理过程中，生成第一个token是一个计算密集型任务，这个阶段被称为prefill phase或context phase。这个阶段的计算量较大，也和输入给模型的Prompt的长度呈线性关系。

生成首个token的时间直接影响用户感知的响应速度，因此被称为"首token时延"（Time To First Token, TTFT）。对于服务应用来说，TTFT是衡量用户体验的关键指标，理想情况下应控制在1秒左右。

TTFT受多个因素影响，包括：
1. 模型参数规模
2. 输入Prompt的长度
3. 批处理大小（Batch Size）
4. 可用的GPU资源

为了优化TTFT并提升用户体验，目前大模型厂商开放了两种主要策略（Prompt Cache（提示缓存）是Claude目前独有的）：

1. Prompt Cache（提示缓存）：
   通过缓存之前处理过的Prompt结果，我们可以显著减少生成首个token所需的计算时间。这对于处理相似或重复的查询特别有效。

2. Streaming流式输出：
   一旦首个token生成，我们可以立即开始向用户传送结果，而不是等待整个响应完成。这种方法可以大大提升用户感知的响应速度。

这两种技术的结合不仅可以减少用户等待的时间，还能提供更流畅的交互体验。在接下来的课程中，我们将深入探讨这些策略的实现和优化方法，以及通过实践看看它们如何在实际应用中提升LLM服务的性能和用户满意度。

例如streaming的流程：

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240904150008.png)

## Claude 8月新功能prompt cache 提示缓存降低首token时延与成本

Prompt Caching是8月15日最新推出的API使用的优化，可以预想之后的各大模型公司也将会相继推出这个功能，包括5月份Gemini也提过API缓存功能正在测试研发中。
它在 API 中通过`cache_control`使用。如果找到缓存版本，将直接使用它来大幅**减少处理时间响应时间**和**成本**；这种机制可以减少长提示的延迟高达85%，并**降低成本高达90%**，对于需要频繁与AI系统交互的应用来说，这意味着巨大的经济效益。想象一下，如果一家公司每天需要处理数百万次AI查询，这种成本节约将会产生显著的影响。

缓存的内容不是永久保存的，缓存的生命周期只开放出了“ephemeral”（短暂的）一个值，为5分钟。每次当缓存的数据被再次使用时，其生命周期会被刷新，即重新计算这5分钟的持续时间。

定价策略
- **写入成本**：将数据新写入缓存时的成本相对较高，比基本的数据处理费用高出25%。
- **读取成本**：一旦数据被写入缓存，之后使用这部分缓存的成本会大大降低，仅为基本处理费用的10%。

目前，这个8月15日推出的提示缓存功能处于公开测试阶段，现可支持Claude 3.5 Sonnet和Claude 3 Haiku模型，对Claude 3 Opus模型的支持也即将推出。

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240821104258.png)

它真的是才研究出来的吗？ 不是，system prompt可能就是它的雏形。

之前的system prompt，就可以让AI系统记住了常用的背景信息，不用每次都重新学习，这样它就能更快地开始回答用户的具体问题：


想象我们在一家餐厅工作，负责准备食材和做菜。

1. System Prompt就像是餐厅的基本准备工作：
   - 比如切好常用的蔬菜，准备好调料，把厨房工具摆放到位。

2. 用户的prompt就像是顾客点的菜：
   - 每个顾客可能点不同的菜，但是基本的准备工作是一样的。

3. 传统方法：
   - 每次有顾客来，你都要重新做一遍基本准备工作，然后再开始做菜。
   - 这样很浪费时间，特别是当餐厅很忙的时候。

4. 新方法 System Prompt：
   - 你在开始营业前就把基本准备工作做好，并且保持这个状态。
   - 当顾客来点菜时，你直接从这个准备好的状态开始做菜。
   - 这样每次做菜都会快很多，因为你不用重复基本准备工作。

5. 技术角度解释：
   - System Prompt的处理结果（Key和Value值）被保存在GPU的内存中。
   - 对于新的请求，直接使用保存的结果，而不是重新计算。
   - 这大大减少了每次请求的初始处理时间。

6. 好处：
   - 就像餐厅能更快地上菜一样，AI系统能更快地开始生成回应。
   - 特别是对第一个回应（首个token），速度提升会很明显。


Prompt Cache提供了更灵活、更个性化的缓存策略。它不仅可以缓存固定的系统指令，还能动态地管理和利用整个对话历史，从而在多轮对话中实现更高效的性能和更自然的交互。

底层实现：
1. System Prompt缓存：
   - 只缓存固定的系统提示部分。
   - 每次请求都使用相同的缓存内容。

2. Prompt Cache：
   - 可以缓存整个输入提示的不同部分。
   - 使用模板来管理不同部分的缓存。
   - 对于多轮对话，可以缓存历史对话内容。
   - 可以动态更新缓存，添加新的对话内容。

故事化比喻：
想象一个图书管理员（AI模型）在图书馆工作。

1. System Prompt缓存就像是：
   - 图书管理员每天早上记住图书馆的基本规则和布局。
   - 这些信息整天不变，帮助快速回答基本问题。

2. Prompt Cache（特别是Session Prompt Cache）就像是：
   - 图书管理员不仅记住基本规则，还为每位读者保留一个笔记本。
   - 笔记本分为几个部分：图书馆规则、读者喜好、今天的对话记录等。
   - 当读者问新问题时，管理员快速查阅笔记本相关部分，无需重新思考已讨论过的内容。
   - 随着对话进行，管理员在笔记本中添加新信息，但只在必要时更新。

区别：
1. 灵活性：
   - System Prompt缓存像是固定的图书馆规则手册。
   - Prompt Cache像是可定制的、动态更新的笔记本。

2. 范围：
   - System Prompt缓存仅覆盖通用指令。
   - Prompt Cache可以包括整个对话历史和特定用户信息。

3. 更新频率：
   - System Prompt缓存通常保持不变。
   - Prompt Cache可以在对话过程中不断更新。

4. 资源利用：
   - Prompt Cache虽然可能占用更多内存，但通过减少重复计算，可能overall更节省资源。

5. 个性化：
   - System Prompt缓存对所有用户都一样。
   - Prompt Cache可以为每个用户或每次对话session定制。

并且，Prompt Cache可以节省成本，Prompt Cache也并不完全是后端的工程化，而是内嵌在大模型的推理过程中的。

In [1]:
%pip install anthropic bs4

Collecting bs4
  Downloading bs4-0.0.2-py2.py3-none-any.whl.metadata (411 bytes)
Downloading bs4-0.0.2-py2.py3-none-any.whl (1.2 kB)
Installing collected packages: bs4
Successfully installed bs4-0.0.2

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
import anthropic
import time
import requests
from bs4 import BeautifulSoup

In [27]:
client = anthropic.Anthropic()
MODEL_NAME = "claude-3-5-sonnet-20240620"
# MODEL_NAME = "claude-3-haiku-20240307"

In [26]:
def fetch_article_content(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # 移除脚本和样式元素
    for script in soup(["script", "style"]):
        script.decompose()
    
    # 获取文本
    text = soup.get_text()
    
    # 将文本分割成行，并移除每行开头和结尾的空格
    lines = (line.strip() for line in text.splitlines())
    # 将多个标题分割成单独的行
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # 删除空行
    text = '\n'.join(chunk for chunk in chunks if chunk)
    
    return text

# 获取文章内容
book_url = "https://www.gutenberg.org/cache/epub/7337/pg7337.txt"
book_content = fetch_article_content(book_url)

print(f"从书中获取了 {len(book_content)} 个字符。")
print("前500个字符：")
print(book_content[:500])

从书中获取了 26132 个字符。
前500个字符：
The Project Gutenberg eBook of 道德經
This ebook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this ebook or online
at www.gutenberg.org. If you are not located in the United States,
you will have to check the laws of the country where you are located
before using this eBook.
Title: 道德經
Author: Lao


第 1 部分：非缓存 API 调用

In [30]:
def make_non_cached_api_call():
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "<书籍>" + book_content + "</书籍>",
                },
                {
                    "type": "text",
                    "text": "这本书的标题是什么？只输出标题。"
                }
            ]
        }
    ]

    start_time = time.time()
    response = client.messages.create(
        model=MODEL_NAME,
        max_tokens=300,
        messages=messages,

    )
    end_time = time.time()

    return response, end_time - start_time


non_cached_response, non_cached_time = make_non_cached_api_call()

print(f"非缓存 API 调用时间: {non_cached_time:.2f} 秒")
print(f"非缓存 API 调用输入 tokens: {non_cached_response.usage.input_tokens}")
print(f"非缓存 API 调用输出 tokens: {non_cached_response.usage.output_tokens}")

print("\n总结 (非缓存):",non_cached_response)



非缓存 API 调用时间: 2.45 秒
非缓存 API 调用输入 tokens: 13424
非缓存 API 调用输出 tokens: 7

总结 (非缓存): Message(id='msg_01WvFNGUW9GNhDpwJxqBZkex', content=[TextBlock(text='道德經', type='text')], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason='end_turn', stop_sequence=None, type='message', usage=Usage(input_tokens=13424, output_tokens=7))


第 2 部分：缓存的 API 调用

In [31]:
def make_cached_api_call():
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "<书籍>" + book_content + "</书籍>",
                    "cache_control": {"type": "ephemeral"}
                },
                {
                    "type": "text",
                    "text": "这本书的标题是什么？只输出标题。"
                }
            ]
        }
    ]

    start_time = time.time()
    response = client.messages.create(
        model=MODEL_NAME,
        max_tokens=300,
        messages=messages,
        extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}
    )
    end_time = time.time()

    return response, end_time - start_time

cached_response, cached_time = make_cached_api_call()

print(f"缓存 API 调用时间: {cached_time:.2f} 秒")
print(f"缓存 API 调用输入 tokens: {cached_response.usage.input_tokens}")
print(f"缓存 API 调用输出 tokens: {cached_response.usage.output_tokens}")

print(cached_response)


缓存 API 调用时间: 3.07 秒
缓存 API 调用输入 tokens: 20
缓存 API 调用输出 tokens: 7

总结 (缓存):
Message(id='msg_013jDVJTEspsvRYfsz4sPR8M', content=[TextBlock(text='道德經', type='text')], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason='end_turn', stop_sequence=None, type='message', usage=Usage(input_tokens=20, output_tokens=7, cache_creation_input_tokens=13404, cache_read_input_tokens=0))


多轮对话中使用提示缓存

让多轮对话中的system prompt和最后两个用户输入被标记为短暂缓存(ephemeral),而其他历史消息保持不变从而降低成本与响应时间。

In [32]:
class ConversationHistory:
    def __init__(self):
        # 初始化一个空列表来存储对话轮次
        self.turns = []

    def add_turn_assistant(self, content):
        # 将助手的对话轮次添加到对话历史中
        self.turns.append({
            "role": "assistant",
            "content": [
                {
                    "type": "text",
                    "text": content
                }
            ]
        })

    def add_turn_user(self, content):
        # 将用户的对话轮次添加到对话历史中
        self.turns.append({
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": content
                }
            ]
        })

    def get_turns(self):
        # 逆序遍历再逆回来是为了方便获取最近的两个用户轮次。通过逆序遍历，可以很容易地找到最后两个用户的发言。如果正序遍历，就需要遍历整个列表才能确定最后两个用户轮次。
        result = []
        user_turns_processed = 0
        # 逆序遍历对话轮次
        for turn in reversed(self.turns):
            if turn["role"] == "user" and user_turns_processed < 2:
                # 添加最后两个用户轮次，添加缓存控制标记
                result.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": turn["content"][0]["text"],
                            "cache_control": {"type": "ephemeral"}
                        }
                    ]
                })
                user_turns_processed += 1
            else:
                # 其他轮次按原样添加
                result.append(turn)
        # 返回原顺序的轮次
        return list(reversed(result))



假设我们有以下对话历史:

```python
turns = [
    {"role": "user", "content": [{"text": "你好"}]},
    {"role": "assistant", "content": [{"text": "你好!有什么我可以帮助你的吗?"}]},
    {"role": "user", "content": [{"text": "今天天气怎么样?"}]},
    {"role": "assistant", "content": [{"text": "对不起,我是一个AI助手,没有实时天气信息。你可以查看天气预报app或网站获取准确信息。"}]},
    {"role": "user", "content": [{"text": "谢谢,那你能告诉我北京的著名景点吗?"}]}
]
```

get_turns是如何处理这个对话历史的呢？

1. 函数首先会逆序遍历这个列表。

2. 它会找到最后两个用户的输入,也就是:
   - "谢谢,那你能告诉我北京的著名景点吗?"
   - "今天天气怎么样?"

3. 这两个用户输入会被添加到结果中,并标记为短暂缓存:

```python
result = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "谢谢,那你能告诉我北京的著名景点吗?",
                "cache_control": {"type": "ephemeral"}
            }
        ]
    },
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "今天天气怎么样?",
                "cache_control": {"type": "ephemeral"}
            }
        ]
    }
]
```

4. 然后,其他所有轮次(包括AI的回复和更早的用户输入)会被原样添加到结果中。

5. 最后,整个结果会被再次逆序,恢复到原来的顺序。

最终的结果看起来是这样的:

```python
final_result = [
    {"role": "user", "content": [{"text": "你好"}]},
    {"role": "assistant", "content": [{"text": "你好!有什么我可以帮助你的吗?"}]},
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "今天天气怎么样?",
                "cache_control": {"type": "ephemeral"}
            }
        ]
    },
    {"role": "assistant", "content": [{"text": "对不起,我是一个AI助手,没有实时天气信息。你可以查看天气预报app或网站获取准确信息。"}]},
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "谢谢,那你能告诉我北京的著名景点吗?",
                "cache_control": {"type": "ephemeral"}
            }
        ]
    }
]
```

注意,只有最后两个用户输入被标记为短暂缓存(ephemeral),而其他所有内容保持不变。

In [33]:
# 初始化对话历史
conversation_history = ConversationHistory()

# 系统消息包含书籍内容
system_message = f"<file_contents> {book_content} </file_contents>"

# 预定义问题用于我们的模拟
questions = [
    "这部书的标题是什么？",
    "老子是谁？",
    "有什么重要的观点并解释？",
    "这部书主要几个主题是什么？"
]

def simulate_conversation():
    for i, question in enumerate(questions, 1):
        print(f"\n轮次 {i}:")
        print(f"用户: {question}")
        
        # 将用户输入添加到对话历史中
        conversation_history.add_turn_user(question)

        # 记录性能测量的开始时间
        start_time = time.time()

        # 向助手发起 API 调用
        response = client.messages.create(
            model=MODEL_NAME,
            extra_headers={
              "anthropic-beta": "prompt-caching-2024-07-31"
            },
            max_tokens=300,
            system=[
                {"type": "text", "text": system_message, "cache_control": {"type": "ephemeral"}},
            ],
            messages=conversation_history.get_turns(),
        )

        
        end_time = time.time()

        # 提取助手的回复
        assistant_reply = response.content[0].text
        conversation_history.add_turn_assistant(assistant_reply)
        print(f"助手: {assistant_reply}")
        
        
        # 打印 token 使用信息
        input_tokens = response.usage.input_tokens
        output_tokens = response.usage.output_tokens
        input_tokens_cache_read = getattr(response.usage, 'cache_read_input_tokens', '---')
        input_tokens_cache_create = getattr(response.usage, 'cache_creation_input_tokens', '---')
        print(f"用户输入 tokens: {input_tokens}")
        print(f"输出 tokens: {output_tokens}")
        print(f"输入 tokens (缓存读取): {input_tokens_cache_read}")
        print(f"输入 tokens (缓存写入): {input_tokens_cache_create}")

        # 计算并打印经过的时间
        elapsed_time = end_time - start_time

        # 计算输入提示的缓存百分比
        total_input_tokens = input_tokens + (int(input_tokens_cache_read) if input_tokens_cache_read != '---' else 0)
        percentage_cached = (int(input_tokens_cache_read) / total_input_tokens * 100 if input_tokens_cache_read != '---' and total_input_tokens > 0 else 0)

        print(f"输入提示的 {percentage_cached:.1f}% 被缓存 ({total_input_tokens} tokens)")
        print(f"花费时间: {elapsed_time:.2f} 秒")

        # 将助手的回复添加到对话历史中
       

1. total_input_tokens（总输入tokens）:
   
   含义：这是处理请求时使用的总token数。它包括两部分：
   - 新处理的tokens (input_tokens)
   - 从缓存中读取的tokens (input_tokens_cache_read，如果有的话)

2. percentage_cached（缓存百分比）:
   计算方法：(int(input_tokens_cache_read) / total_input_tokens * 100 if input_tokens_cache_read != '---' and total_input_tokens > 0 else 0)
   
   含义：这表示在总输入tokens中，有多少百分比是从缓存中读取的。

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240904150255.png)

In [34]:
simulate_conversation()


轮次 1:
用户: 这部书的标题是什么？
助手: 这部书的标题是《道德經》。
用户输入 tokens: 4
输出 tokens: 17
输入 tokens (缓存读取): 0
输入 tokens (缓存写入): 13416
输入提示的 0.0% 被缓存 (4 tokens)
花费时间: 3.21 秒

轮次 2:
用户: 老子是谁？
助手: 老子是中国古代著名的哲学家、思想家,被认为是道家学派的创始人。关于老子的具体生平,历史上存在很多说法和争议,但通常认为他生活在春秋时期(公元前6世纪左右)。

以下是关于老子的一些普遍看法:

1. 老子的真名可能是李耳,老子是他的尊称,意为"老先生"。

2. 他被认为是《道德经》(也称《老子》)的作者,这本书是道家思想的重要经典。

3. 传说老子曾在周朝担任守藏室之职(管理皇家图书馆)。

4. 有说法称孔子曾向老子请教礼仪,这表明老子在当时已经是一位受人尊敬的智者。

5. 据说晚年的老子离开了自己工作的地方,骑着青牛西行。在关令尹喜的请求下,他留下了
用户输入 tokens: 4
输出 tokens: 300
输入 tokens (缓存读取): 13416
输入 tokens (缓存写入): 27
输入提示的 100.0% 被缓存 (13420 tokens)
花费时间: 5.77 秒

轮次 3:
用户: 有什么重要的观点并解释？
助手: 《道德经》中包含了许多重要的哲学观点，以下是一些核心思想及其解释：

1. 道：
"道"是老子哲学的核心概念，指宇宙的本源和运行规律。道是无形的，却是万物的根源。

2. 无为：
强调顺应自然，不过度干预。这不是完全不作为，而是遵循事物的自然发展规律。

3. 柔弱胜刚强：
认为柔软和谦卑反而更有力量，过于强硬和固执会导致失败。

4. 返璞归真：
主张回归简单纯朴的生活方式，摒弃人为的复杂。

5. 辩证思维：
如"祸福相依"，强调事物的对立统一性。

6. 清静无为的统治：
主张统治者应该减少干预，让百姓自然安定。

7. 知足常乐：
提倡满足于现状，
用户输入 tokens: 4
输出 tokens: 300
输入 tokens (缓存读取): 13443
输入 tokens (缓存写入): 319
输入提示的 100.0% 被缓存 (13447 tokens)
花费时间:

可以用提示缓存优化的场景：

1. 对话型助手:

通过缓存长指令或上传的文档,可以降低成本并加快长对话的响应速度。
实际应用:客服聊天机器人可以预先加载产品手册、常见问题等信息,从而更快速准确地回答客户问题。

2. 编程助手:

通过在提示中保留代码库的摘要版本,改善代码自动补全和问答功能。
实际应用:IDE插件可以分析整个项目代码,为开发者提供更智能的代码建议和上下文相关的帮助。

3. 大文档处理:

在提示中包含完整的长篇材料(包括图像),而不增加响应延迟。
实际应用:法律文件审核系统可以一次性处理整份合同,快速找出关键条款或潜在风险。

4. 详细指令集:

通过共享大量指令、流程和示例来微调AI的响应。
实际应用:在教育平台上,可以为AI助教提供大量优秀作文范例,使其能更准确地评分和提供反馈。

5. 主动搜索和工具使用:

提高需要多轮工具调用和迭代修改场景的性能。
实际应用:智能家居系统可以根据用户习惯、天气预报等信息,主动调整家电设置,无需频繁请求服务器。

6. 与长篇内容对话:

将整个文档嵌入提示中,让用户能与任何知识库进行问答交互。
实际应用:企业可以将所有内部文档、培训材料输入AI系统,员工可以随时查询,获得精准答案。

## Streaming流式响应提升用户体验

对于 LLMs，流式处理已成为一项越来越受欢迎的功能。Streaming是在开始生成token时快速返回，而不是等待创建完整响应后再返回所有的内容，这样也可以模拟真人对话的节奏感，让用户感觉不那么“机器人”。就像你体验过的 GPT、Claude.ai、GLM等 AI 能力一样，它们在用户提问后，都采用了打字机形式的回复方式，让用户感受到数据在不断流入屏幕，从而提高了聊天的流畅性，**并极大的加快了首次回应给用户的响应的时间**，即Time to First Token (TTFT)。

1. Streaming流式响应的作用:
   - 逐个令牌(token)流式传输响应
   - 用户可以实时看到响应的生成过程
   

2. streaming流式响应的优点:
   - 避免长时间等待完整响应
   - 提高效率和交互性
   - 使对话感觉更自然,更像人类思考过程


服务端来发送streaming：


需要实时数据的场景,传统的HTTP请求-响应模型存在一些局限性——客户端主动发请求，服务端被动地返回响应的单向通信模式。这种模型在客户端需要实时获取结果的场景下是不合适的，因为这意味着客户端需要不断地轮询，所以最好的做法是服务端生成结果之后，主动推送给客户端。

可能有小伙伴知道WebSocket，WebSocket是一种双向通信协议,允许服务器和客户端之间建立持久连接,双方都可以主动发送数据，常用于实时聊天应用、在线游戏中。但选用它有些太重了，因为这意味着要从 HTTP 切换到 WebSocket 协议，以及它是双向通信的，而我们只需要接收就可以了。

有没有办法，我们仍然使用 HTTP 协议，同时还能让服务端主动推送数据呢？

铛铛铛铛～  SSE 技术来了，它的英文全称是 Server-Sent Events（服务端推送事件）。通过 SSE 可以让服务端即时推送数据到客户端，而不需要客户端轮询服务端以获取更新。

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240904183611.png)

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240904153837.png)



Anthropic API 的流式响应也采用 Server-Sent Events (SSE) 格式，这种基于 HTTP 的单向数据流协议。 服务器会不断地发送事件数据，每个事件包含一个类型和一些数据。 例如：

```
event: message_start
data: {"message": {"id": "...", "role": "assistant", "content": []}}

event: content_block_start
data: {"content_block": {"index": 0, "type": "text"}}

event: content_block_delta
data: {"delta": {"type": "text_delta", "text": "Hello,"}}

event: content_block_delta
data: {"delta": {"type": "text_delta", "text": " world!"}}

.......
```

- 流式传输就像用水管接水一样，数据不是一次性全部传输完成的，而是像水流一样持续传输。这样做的优势在于：

实时性强： 可以更快地获取到第一批数据，不用等到所有数据都传输完毕。

节省内存： 不需要将所有数据都存储在内存中，可以边接收边处理。

处理大数据： 对于非常大的数据，流式传输可以避免内存溢出。

这个时候需要大量的解析和状态管理，不过不用担心，我们导入的“Anthropic”里的`MessageStream` 类等会解析 SSE 数据流，并将其转换为 Python 对象，例如 `MessageStartEvent`、`ContentBlockStartEvent`、`ContentBlockDeltaEvent` 等。 这些对象包含了事件的类型和相关数据，方便用户处理。




既然Claude公司已经在sdk里面帮助我们处理包装过服务器显示的原始数据转换为用户友好的python消息对象了，来方便我们处理流式响应。

对于我们来说，想要使用streaming的功能，我们一个是要开启它，一个是要解析包装过的流式响应结果，Python对象就可以了。

一共有create(stream=true),stream()两种方式，我们先看第一种

In [40]:
from dotenv import load_dotenv
from anthropic import Anthropic
load_dotenv()

#自动寻找 "ANTHROPIC_API_KEY" 的环境变量
client = Anthropic()

而要从 API 获取流式响应，只需将 stream=True 传递给 client.messages.create 即可。

观察在没有开启之前，响应是等LLM生成结束后一起返回给我们的：

In [41]:
response = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "为什么空调能吹出冷热两种不同的风？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=300,
)
print("开始有响应了!")
print("========================")
print(response.content[0].text)




开始有响应了!
空调之所以能吹出冷热两种不同的风,主要是因为它的制冷和制热原理不同。

1. 制冷原理:

- 空调利用制冷剂在蒸发和凝结过程中吸收和释放热量的原理来实现制冷。
- 制冷剂在蒸发时吸收室内的热量,使室内温度下降;在凝结时释放热量到室外,使室外温度升高。
- 这就使得空调能吹出冷风。

2. 制热原理:

- 空调制热时利用制冷系统反向运行,使制冷剂在凝结过程中释放热量,从而使室内温度升高。
- 制冷剂在凝结时释放热量到室内,使室内温度升高。
- 这就使得空调能吹出热风。

3. 切换原理:

- 空调通过切换阀来切换制冷和制热模式,从而使出风口


In [45]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "有透明的金属吗？",
        }
    ],
    model="claude-3-5-sonnet-20240620",
    max_tokens=600,
    stream=True,
)
for event in stream:
    print(event)

RawMessageStartEvent(message=Message(id='msg_013U5kduJ8yCrUiDuBkc1Str', content=[], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=Usage(input_tokens=17, output_tokens=1)), type='message_start')
RawContentBlockStartEvent(content_block=TextBlock(text='', type='text'), index=0, type='content_block_start')
RawContentBlockDeltaEvent(delta=TextDelta(text='严', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='格来说,目', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='前还没有完', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='全透明的金', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='属。不', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='过,有一', type='text_del

In [43]:
for event in stream:
    print(event)

RawMessageStartEvent(message=Message(id='msg_01FfdRSTmFZYvgRdaAqcncar', content=[], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=Usage(input_tokens=17, output_tokens=1)), type='message_start')
RawContentBlockStartEvent(content_block=TextBlock(text='', type='text'), index=0, type='content_block_start')
RawContentBlockDeltaEvent(delta=TextDelta(text='虽然严', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='格意', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='义上没', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='有完全透明', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='的', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='纯金属,但', type='text_delta')

如果我们想要访问有关令牌使用情况的信息，则需要查看两个位置：

RawMessageStartEvent 包含我们的 input（prompt） token 使用信息

RawMessageDeltaEvent 包含有关total output token 的信息

- 获取例子

In [46]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "根据热力学第二定律，世界将越来越混乱。那为什么会产生能体现秩序的细胞、生物和人类？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=1000,
    stream=True,
)
for event in stream:
    if event.type == "message_start":
        input_tokens = event.message.usage.input_tokens
        print("消息开始事件", flush=True)  
        print(f"已使用的输入令牌数: {input_tokens}", flush=True)  
        print("========================")  # 分隔线
    elif event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")  # 实时打印接收到的文本内容，不换行，下面会讲
    elif event.type == "message_delta":
        output_tokens = event.usage.output_tokens
        print("\n========================", flush=True)  # 分隔线
        print(f"已使用的输出令牌数: {output_tokens}", flush=True)  

消息开始事件
已使用的输入令牌数: 52
这个问题非常有趣。热力学第二定律确实说明了整个宇宙将趋于无序状态,也就是所谓的"熵增"。但是在地球上,我们看到生命在不断创造出一种局部的有序状态,这似乎和热力学定律相矛盾。

其实这并不矛盾,关键在于生命利用外界的自由能(比如太阳能)来维持和增强自身的有序状态。生物系统通过新陈代谢、光合作用等过程,从环境中摄取能量和物质,并用这些建立和维持其内部的有序结构。

虽然整个宇宙的熵会不断增加,但生命系统能够通过消耗外界能量来局部减熵,从而达到更高的有序程度。这就是生命得以产生和发展的关键所在。

人类社会也是如此,通过科技进步、知识积累等方式,持续创造出更高水平的有序状态。虽然这也会增加全球熵,但它依然是一个局部的秩序增加过程。

所以生命和人类社会的有序状态并不违背热力学第二定律,而是依托于它合理利用能量而实现的。这就是生命得以克服无序趋势而进化发展的奥秘所在。
已使用的输出令牌数: 412


我们关心的所有LLM生成内容都来自 RawContentBlockStartEvent，每个事件都包含一个设置为“content_block_delta”的类型。要实际获取内容本身，我们希望访问 delta 中的 text 属性。让我们尝试专门打印出生成的文本：

In [47]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "有透明的金属吗？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=400,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text)

是
的,有
一
些
透明的金
属材
料。
以
下是几
种常见的
透明金属
:

1
. 金
属玻
璃(
Metal
lic Glass):
这是
一种非
晶态
的
金属合
金,具有
金
属的性
质但
又
具
有玻
璃的透
明性。它
们
通常由多
种金属元
素组成,
如
钯
、铂、
铜
等。

2
. 金属
薄膜:
一
些
金属在
制
成
极
薄的
膜层
时
,
会变
得部
分透明。
例
如金
、
银、铝
等金
属在
制
成几
纳米厚
的
薄膜
时,就
会
呈现出
一
定
程
度的透明
性
。

3.
 
掺杂金
属
氧化物:
通
过在
金属
氧化物中
掺杂其
他元
素,可以
制
造出一
些透明的
金属氧
化物材
料。例如
掺杂
铟
的
氧化
铟(
I
TO)就
是一种常
见的透明
导
电材
料。

4
. 金属
纳米
颗粒:
一
些金属在
制
成纳米
尺度的
颗粒时
,也
会表
现出一定
程度的透
明性。这
是
由于纳
米尺度
下的量
子效应造
成的。


这
些透明金
属材料在
电
子显
示、
太
阳能电
池、建
筑玻
璃等领
域都有广
泛应用
。它
们结
合
了金属的
导
电性和透
明性,在
许
多领
域都有重
要的作


在 Python 中，`print()` 函数的默认行为是输出内容写入缓冲区，然后再从缓冲区中输出到终端。并在行末自动添加一个换行符（`'\n'`）。

所以为了在输出逐渐到达时保持实时更新，并且将输出显示在同一行，以便更清晰地查看完整的响应，在处理实时流数据时常特定设置 `flush=True` 和 `end=""`。


1. **`end=""`**:
   - 这个参数定义了 `print()` 语句输出的结尾字符。默认情况下是换行符 `'\n'`，但当你设置 `end=""` 时，输出内容将不会自动换行，而是会在同一行继续输出。
   - **应用场景**：在处理流数据时，可能需要将所有数据连续打印在同一行，而不是逐个事件打印时都换行。

2. **`flush=True`**:
   - `print()` 函数将输出内容写入缓冲区，然后再从缓冲区中输出到终端。`flush` 参数控制是否立即将缓冲区中的内容刷新到终端。
   - `flush=False`（默认）：`print()` 函数会根据系统的设置决定何时刷新缓冲区，有时输出可能会被延迟。
   - **`flush=True`**：强制 `print()` 立即将缓冲区的内容输出到终端。这对于希望实时看到输出的情况非常有用，尤其是在流式数据处理中，用户通常希望尽快看到新的数据。


In [49]:
stream = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "有透明的金属吗？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=400,
    temperature=0,
    stream=True,
)
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")

是的,有一些透明的金属材料。以下是几种常见的透明金属:

1. 金属玻璃(Metallic Glass):这是一种非晶态的金属合金,具有金属的性质但又具有玻璃的透明性。它们通常由多种金属元素组成,如钯、铂、铜等。

2. 金属薄膜:一些金属在制成极薄的膜层时,会变得部分透明。例如金、银、铝等金属在制成几纳米厚的薄膜时,就会呈现出一定程度的透明性。

3. 掺杂金属氧化物:通过在金属氧化物中掺杂其他元素,可以制造出一些透明的金属氧化物材料。例如掺杂铟的氧化铟(ITO)就是一种常见的透明导电材料。

4. 金属纳米颗粒:一些金属在制成纳米尺度的颗粒时,也会表现出一定程度的透明性。这是由于纳米尺度下的量子效应造成的。

这些透明金属材料在电子显示、太阳能电池、建筑玻璃等领域都有广泛应用。它们结合了金属的导电性和透明性,在许多领域都有重要的作

流式处理现在已经为我们所用了！并大大地提升了用户体验，但并不会神奇地提高 model 生成响应所需的总时间。在页面上展示首token的速度是快了多，但从请求开始到收到最终生成的token之间仍然需要相同的时间。

1. 回顾Time to First Token (TTFT) 定义与重要性：
   - TTFT是从发送请求到接收到AI模型生成的第一个内容单元所需的时间。
   - 它是衡量AI响应速度和用户体验的关键指标，对于提供快速、流畅的交互至关重要。

2. 流式传输对TTFT的改善：
   - 流式传输允许内容一旦生成就立即发送，而不是等待整个响应完成。
   - 这显著减少了用户的等待时间，提升了交互体验，特别适用于聊天机器人、长文本生成和实时翻译等场景。

我们用代码来看看时间方面的结果：

非流式处理方法开始。我们将要求模型生成一段很长的文本，但在 500 个标记处将其截断：

In [50]:
import time
def measure_non_streaming_ttft():
    start_time = time.time()

    response = client.messages.create(
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "写一篇长篇文章，解释唐朝的历史",
            }
        ],
        temperature=0,
        model="claude-3-haiku-20240307",
    )

    response_time = time.time() - start_time

    print(response.content[0].text)
    print(f"接收第一个token的时间: {response_time:.3f}秒")
    print(f"接收完整响应的时间: {response_time:.3f}秒")
    print(f"生成的总token数: {response.usage.output_tokens}")
    
    


In [52]:
measure_non_streaming_ttft()

好的,我将尽力为您撰写一篇详细的唐朝历史文章。以下是初稿:

唐朝历史概述

唐朝是中国历史上最辉煌灿烂的时期之一,其政治、经济、文化成就在世界范围内享有盛誉。唐朝始于公元618年,终于907年,历时近三百年,共有21位皇帝先后统治。

唐朝的兴起与李渊及其子李世民的功绩密不可分。李渊原为北周的高级将领,在隋朝末年趁乱起兵,最终在公元618年建立了唐朝,改国号为"大唐"。李世民即位后,通过一系列政治改革和军事行动,巩固了唐朝的统治地位,使其成为当时世界上最强大的帝国之一。

唐朝的鼎盛时期主要集中在贞观之治(626-649年)和开元盛世(713-756年)。这两个时期,唐朝的政治、经济、文化都达到了空前的繁荣。唐朝的版图一度遍及中国大部分地区,并且与周边国家保持密切的政治、经济、文化交流。唐朝的首都长安(今西安)更是成为当时世界上最繁华的城市之一。

唐朝的政治制度以"三省六部"为核心,实行中央集权的君主专制制度。同时,唐朝还建立了科举制度,为选拔人才提供了制度保障。在经济方面,唐朝实行"均田制"和"
接收第一个token的时间: 4.693秒
接收完整响应的时间: 4.693秒
生成的总token数: 500


现在让我们使用流式处理方法尝试相同的操作：

In [53]:
import time
def measure_streaming_ttft():
    start_time = time.time()

    stream = client.messages.create(
        max_tokens=500,
        messages=[
            {
                "role": "user",
                "content": "写一篇长篇文章，解释唐朝的历史",
            }
        ],
        temperature=0,
        model="claude-3-haiku-20240307",
        stream=True
    )
    have_received_first_token = False
    for event in stream:
        if event.type == "content_block_delta":
            if not have_received_first_token:
                ttft = time.time() - start_time
                have_received_first_token = True
            print(event.delta.text, flush=True, end="")
        elif event.type == "message_delta":
            output_tokens = event.usage.output_tokens
            total_time = time.time() - start_time

    print(f"\n接收第一个token的时间: {ttft:.3f}秒", flush=True)
    print(f"接收完整响应的时间: {total_time:.3f}秒", flush=True)
    print(f"生成的总token数: {output_tokens}", flush=True)


In [55]:
measure_streaming_ttft()

好的,我将尽力为您撰写一篇详细的唐朝历史文章。以下是初稿:

唐朝历史概述

唐朝是中国历史上最辉煌灿烂的时期之一,其政治、经济、文化成就在世界范围内享有盛誉。唐朝始于公元618年,终于公元907年,历时近三百年,共有21位皇帝先后统治。

唐朝的兴起始于李渊,他于公元618年推翻隋朝,建立了唐朝。唐太宗李世民是唐朝最伟大的皇帝,他不仅巩固了政权,还大幅扩张了领土,使唐朝成为当时世界上最强大的帝国之一。

唐朝的鼎盛时期主要集中在武则天和玄宗时期。武则天是唐朝历史上唯一的女性皇帝,她在位期间推行了一系列改革措施,极大地提高了中央集权的力度。玄宗时期则被称为"盛唐"时期,这一时期唐朝的政治、经济、文化都达到了顶峰。

唐朝的衰落始于安史之乱,这场持续数年的内战严重削弱了唐朝的实力。此后,唐朝陷入了长期的内乱和分裂,最终在公元907年被后梁所灭。

唐朝在政治、经济、文化等方面都取得了巨大成就。在政治方面,唐朝建立了高度集中的中央集权制度,并创立了科举制度
接收第一个token的时间: 0.975秒
接收完整响应的时间: 4.489秒
生成的总token数: 500


优化：使`with` 语句**资源管理**的方式

`with` 语句是 Python 中一种强大的语法结构，用于简化资源管理，确保资源在使用完毕后能够被正确释放。它主要用于处理文件、网络连接、数据库连接等需要手动关闭的资源，避免资源泄漏和程序错误。

**`with` 语句的基本语法：**

```python
with expression as target:
    # 代码块
```

**工作原理：**

1. **进入 `with` 语句块：**
   - `expression` 会被执行，其返回值（必须是一个实现了上下文管理器协议的对象，即对象实现 __enter__() 和 __exit__() 方法，用于管理资源的获取和释放。with 语句会自动调用这两个方法，确保资源在使用完毕后能够被正确清理。）会被赋值给 `target` 变量。
   - 上下文管理器协议要求对象实现 `__enter__()` 和 `__exit__()` 方法。
   - `__enter__()` 方法会被自动调用，用于准备资源，例如打开文件、建立网络连接等。
2. **执行代码块：**
   - 在 `with` 语句块中，可以使用 `target` 变量来访问和操作资源。
3. **退出 `with` 语句块：**
   - 无论代码块中是否发生异常，`__exit__()` 方法都会被自动调用，用于清理资源，例如关闭文件、断开连接等。

**示例：**

**1. 文件操作：**

```python
with open('myfile.txt', 'r') as f:
    contents = f.read()
    # 处理文件内容
# 文件会自动关闭，无需手动调用 f.close()
```

**2. 数据库连接：**

```python
with connect('mydb.sqlite') as conn:
    cursor = conn.cursor()
    # 执行数据库操作
# 数据库连接会自动关闭
```

`with` 语句确保无论代码块中发生什么（包括异常），`stream` 对象都会被正确关闭。这意味着与 API 的连接会被及时释放，避免资源泄漏。
它更安全可靠，能够保证资源的正确释放，避免潜在的资源泄漏问题。尤其是 **处理大型流式响应，程序可能出现异常，编写健壮的代码** 场景下，使用 `with` 语句更加重要，实际开发中应始终优先选择使用 `with` 语句来管理流式响应对象，养成良好的编程习惯。 



In [56]:
with client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "有透明的金属吗？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=400,
    temperature=0,
    stream=True,
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            print(event.delta.text, flush=True, end="")

是的,有一些透明的金属材料。以下是几种常见的透明金属:

1. 金属玻璃(Metallic Glass):这是一种非晶态的金属合金,具有金属的性质但又具有玻璃的透明性。它们通常由多种金属元素组成,如钯、铂、铜等。

2. 金属薄膜:一些金属在制成极薄的膜层时,会变得部分透明。例如金、银、铝等金属在制成几纳米厚的薄膜时,就会呈现出一定程度的透明性。

3. 掺杂金属氧化物:通过在金属氧化物中掺杂其他元素,可以制造出一些透明的金属氧化物材料。例如掺杂铟的氧化铟(ITO)就是一种常见的透明导电材料。

4. 金属纳米颗粒:一些金属在制成纳米尺度的颗粒时,也会表现出一定程度的透明性。这是由于纳米尺度下的量子效应造成的。

这些透明金属材料在电子显示、太阳能电池、建筑玻璃等领域都有广泛应用。它们结合了金属的导电性和透明性,在许多领域都有重要的作

如果不用with则要手动管理

In [52]:
stream_raw = client.messages.create(
    messages=[
        {
            "role": "user",
            "content": "有透明的金属吗？",
        }
    ],
    model="claude-3-haiku-20240307",
    max_tokens=400,
    temperature=0,
    stream=True,
)
# 手动调用 __enter__ 方法获取流
stream = stream_raw.__enter__()
for event in stream:
    if event.type == "content_block_delta":
        print(event.delta.text, flush=True, end="")
# 手动调用 __exit__ 方法关闭连接
stream_raw.__exit__(None, None, None)
    
print("Exiting...")

是的,有一些透明的金属材料。以下是几种常见的透明金属:

1. 金属玻璃(Metallic Glass):这是一种非晶态的金属合金,具有金属的性质但又具有玻璃的透明性。它们通常由多种金属元素组成,如钯、铂、铜等。

2. 金属薄膜:一些金属在制成极薄的膜层时,会变得部分透明。例如金、银、铝等金属在制成几纳米厚的薄膜时,就

APIStatusError: {'type': 'error', 'error': {'details': None, 'type': 'overloaded_error', 'message': 'Overloaded'}}

做一个流式多轮对话机器人

In [57]:
from anthropic import Anthropic

# Initialize the Anthropic client
# client = Anthropic()

# ANSI color codes
BLUE = "\033[94m"
GREEN = "\033[92m"
RESET = "\033[0m"

def chat_with_claude():
    print("欢迎来到 Claude Chatbot!")
    print("使用 'quit' 来退出对话.")
    
    conversation = []
    
    while True:
        user_input = input(f"You: ")
        
        if user_input.lower() == 'quit':
            print("再见!")
            break
        
        conversation.append({"role": "user", "content": user_input})
        print(f"{BLUE}You: {RESET}", end="", flush=True)
        print(f"{GREEN}Claude: {RESET}", end="", flush=True)
        
        assistant_response = ""
        with client.messages.create(
            model="claude-3-5-sonnet-20240620",
            max_tokens=1000,
            messages=conversation,
            stream=True
        )as stream:
        
            
            for chunk in stream:
                if chunk.type == "content_block_delta":
                    content = chunk.delta.text
                    print(f"{GREEN}{content}{RESET}", end="", flush=True)
                    assistant_response += content
        
        print()  # New line after the complete response
        
        conversation.append({"role": "assistant", "content": assistant_response})

if __name__ == "__main__":
    chat_with_claude()

欢迎来到 Claude Chatbot!
使用 'quit' 来退出对话.
[94mYou: [0m[92mClaude: [0m[92mHello[0m[92m! How can[0m[92m I assist you today?[0m[92m Feel[0m[92m free to ask me any[0m[92m questions or let[0m[92m me know if[0m[92m you[0m[92m need help with anything[0m[92m.[0m
[94mYou: [0m[92mClaude: [0m[92m浪花[0m[92m呈现[0m[92m白色的[0m[92m原因主要有[0m[92m以下几点：[0m[92m

1. 气[0m[92m泡反[0m[92m射：[0m[92m
当海浪[0m[92m拍打时[0m[92m，[0m[92m会[0m[92m产生大量细[0m[92m小的[0m[92m气泡。这[0m[92m些气泡能[0m[92m反[0m[92m射各[0m[92m种波[0m[92m长的光，[0m[92m综[0m[92m合起来呈[0m[92m现白色。[0m[92m

2. 光[0m[92m散[0m[92m射：
水[0m[92m中的微[0m[92m小颗粒[0m[92m（[0m[92m如[0m[92m盐分、[0m[92m沙粒等）[0m[92m会[0m[92m散[0m[92m射[0m[92m阳光。当光[0m[92m线穿过含[0m[92m有大[0m[92m量气[0m[92m泡和[0m[92m微[0m[92m粒的[0m[92m浪花时，会[0m[92m发[0m[92m生全[0m[92m方[0m[92m位散[0m[92m射，[0m[92m呈现出[0m[92m白[0m[92m色。

3.[0m[92m [0m[92m泡沫结[0m[92m构：
浪[0m[92m花中[0m[92m的[0m[92m泡沫由无[0m[92m数微[0m[92m小水[0m[92m

Claude API中流式响应有两种不同方法。

1. 刚才学的`create(stream=True)`方法：

这是在创建消息时启用流式响应的方法。当您使用API发送请求时，可以通过设置`"stream": true`参数来启用流式响应。

例如，使用curl发送请求时可以这样设置，当然，python的使用方法就像上面学的那样：

```bash
curl https://api.anthropic.com/v1/messages \
     --header "anthropic-version: 2023-06-01" \
     --header "content-type: application/json" \
     --header "x-api-key: $ANTHROPIC_API_KEY" \
     --data \
'{
 "model": "claude-3-5-sonnet-20240620",
 "messages": [{"role": "user", "content": "Hello"}],
 "max_tokens": 256,
 "stream": true
}'
```


2. `stream()`方法：

这是Claude的Python和TypeScript SDK提供的专门用于流式响应的方法。它提供了一种更直接的方式来处理流式响应。

例如，在Python中使用`stream()`方法：

```python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
    model="claude-3-5-sonnet-20240620",
) as stream:
  for text in stream.text_stream:
      print(text, end="", flush=True)
```


在TypeScript中使用`stream()`方法：

```typescript
import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

await client.messages.stream({
    messages: [{role: 'user', content: "Hello"}],
    model: 'claude-3-5-sonnet-20240620',
    max_tokens: 1024,
}).on('text', (text) => {
    console.log(text);
});
```


为什么相同的效果但是有两个函数：

1. `create(stream=True)`是在API级别启用流式响应的通用方法，适用于所有支持流式响应的API调用。

2. `stream()`是SDK提供的专门方法，为开发者提供了更便捷的流式处理接口，可能包含一些特定于流式处理的功能。


- **使用的差异:** `create()` 函数主要用于创建消息，流式返回只是其可选功能之一；而 `stream()` 就没有 `stream` 参数，因为它总是以流式方式返回结果，也再包装了一下。

- **内部的差异:** `create()` 函数在 `stream=True` 时返回原始的流式事件数据`Stream[RawMessageStreamEvent]` 对象，可以迭代获取原始的流式事件数据。，需要用户自行解析和处理；而 `stream()` 函数返回 `MessageStreamManager` 对象，可以更方便地使用 `with` 语句管理流式连接，并通过 `MessageStream` 对象获取用户友好的事件和消息对象。

- 但效果都没差，都是以流式方式返回结果，不过使用stream会有更完善的包装和拓展性，如果是直接使用API，`create(stream=True)`是您的选择。使既然我们用了Python或TypeScript 的SDK，`stream()`方法就提供了更方便的接口。



In [58]:
from anthropic import Anthropic

# client = Anthropic()

def streaming_with_helpers():
    with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        # 进入 with 语句块后，API 请求才会真正发出
        for text in stream.text_stream:
            print(text, end="", flush=True)

streaming_with_helpers()

以下是一些有趣的动物趣事:

1. 海豚睡觉时只有半个大脑在休息,另一半保持清醒以监视周围环境和控制呼吸。

2. 蜜蜂在跳舞时会通过摇摆身体和振动翅膀来告诉同伴食物的方向和距离。

3. 章鱼有三颗心脏,其中两颗专门为鳃供血,另一颗为身体其他部位供血。

4. 袋鼠不能向后跳,只能向前或向上跳跃。

5. 企鹅会向伴侣求婚,它们会赠送鹅卵石作为礼物。

6. 蚂蚁能够举起相当于自身体重50倍的重物。

7. 长颈鹿的舌头长达50厘米,可以用来清洁自己的耳朵。

8. 河马的汗液呈粉红色,有助于保护皮肤免受阳光伤害。

9. 树懒动作缓慢到可以长苔藓,它们身上经常长满绿色植物。

10. 北极熊的皮肤其实是黑色的,只是它们的毛发是中空且透明的,反射阳光看起来是白色的。

这些动物的特性和行为都十分有趣,展示了大自然的神奇和多样性。

stream()还有什么辅助方法呢？

之前的 `client.messages.create(stream=True)` 需要手动检查每个事件的类型 (`event.type`)，才能获取文本内容 (`event.delta.text`)。这稍微有些繁琐。


新方法 `client.messages.stream()` 返回一个 `MessageStreamManager` 上下文管理器，它简化了 Anthropic Python SDK 中消息流式处理的代码，使开发者更容易获取和处理流式传输的文本内容和最终的完整消息。它提供了一些便捷的辅助方法，例如：

* `stream.text_stream`：这是一个迭代器，它可以直接 yield 流式传输的文本内容，无需手动检查事件类型。
* `stream.get_final_message()`：它返回最终累积的完整消息。

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240910143044.png)



In [60]:
from anthropic import Anthropic

# client = Anthropic()

def streaming_with_helpers():
    with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        # 进入 with 语句块后，API 请求才会真正发出
        for text in stream.text_stream:
            print(text, end="", flush=True)

    final_message = stream.get_final_message()
    print("\n\nSTREAM函数返回对象的得到最终消息的内置方法get_final_message()： ")
    print(final_message)
    
    final_text = stream.get_final_text()
    print("\n\nSTREAM函数返回对象的得到最终文本的内置方法get_final_text()： ")
    print(final_text)

streaming_with_helpers()

好的,我来给您分

KeyboardInterrupt: 

- 加餐内容：偏工程化streaming的底层逻辑及后端的数据结构，如果只是想要使用streaming不看这部分也不影响后续的学习使用。

我们讲了stream的使用和浅层的概念，但是它的本质是什么呢，为什么它能一直输出，面试的时候问Stream 类返回的对象有什么特殊之处，和普通的数据结构有什么不一样？

流式返回对象的是迭代器(Iterator)，迭代器适用于处理大量数据、需要节省内存或需要动态生成元素的情况.

我们拿熟悉的列表来对比一下

**列表 (List)**

* **数据结构:** 列表通常使用数组或动态数组来存储元素。这意味着所有元素都存储在连续的内存块中。
* **内存占用:** 列表需要预先分配足够的内存来存储所有元素，即使有些元素还没有被使用。
* **访问方式:** 列表支持随机访问，这意味着你可以通过索引快速访问任何元素。
* **创建方式:** 列表在创建时就需要确定所有元素。

**迭代器 (Iterator)**

* **数据结构:** 迭代器不直接存储元素，而是提供了一种访问元素的方式。它可以基于各种数据结构实现，例如列表、元组、字典、文件等。
* **内存占用:** 迭代器通常只占用少量内存，因为它不需要一次性存储所有元素。它按需生成或访问元素。
* **访问方式:** 迭代器只能顺序访问元素，你不能直接跳到某个特定位置。
* **创建方式:** 一种特殊的迭代器（叫生成器）可以动态生成元素，例如通过循环或读取文件。

- 可以通过 iter() 函数将可迭代对象（例如列表、元组、字典等）转换为迭代器。


In [61]:
my_list = [1, 2, 3]
my_iterator = iter(my_list)

print(next(my_iterator))  # 输出 1
print(next(my_iterator))  # 输出 2


1
2


In [63]:
print(next(my_iterator))

StopIteration: 

可以使用 for...in... 循环遍历一个迭代器时，Python 解释器会首先调用迭代器的 __iter__ 方法获取生成器对象本身，然后反复调用生成器的 __next__ 方法获取下一个元素，直到 StopIteration 异常被引发，表示所有元素都读取了，结束。看着很想普通的数据结构，但里面别有洞天。

In [64]:
my_list = [1, 2, 3]
my_iterator = iter(my_list)
for i in my_iterator:
    print(i)

1
2
3


迭代器的特性

**1. 惰性计算:** 

迭代器采用惰性计算的方式，只有在需要的时候才会计算下一个元素。而列表等普通对象会一次性将所有元素都存储在内存中。

例如，Stream 类在初始化时并不会立即读取所有数据，而是在每次调用 `__next__` 方法时才会读取一部分数据。

**2. 节省内存:**

由于惰性计算的特性，迭代器通常比列表等数据结构占用更少的内存，因为它不需要一次性存储所有元素。

**3. 单次遍历:**

迭代器只能遍历一次。 一旦迭代器遍历完所有元素，就不能再重新遍历了。 

**4. 可迭代对象:**

任何实现了 `__iter__` 方法的对象都是可迭代对象。 

**5. 广泛应用:**

迭代器广泛应用于 Python 的各种数据结构和算法中，例如 `for` 循环、列表推导式、生成器等。



想象你有一卷卫生纸，你想知道它有多少格。

列表: 就像你把整卷卫生纸都展开，然后一格一格地数。 你需要很大的空间来展开卫生纸，而且数完之后卫生纸就乱七八糟了。

迭代器: 就像你每次从卫生纸卷上撕下一格，数完之后扔掉。 你不需要很大的空间，而且卫生纸卷会越来越小，直到用完为止。

在 Python 中，迭代器是一种可以逐个返回其元素的对象。一个对象如果实现了 `__iter__` 和 `__next__` 这两个特殊方法，就被认为是一个迭代器。想象一下，你有一盒积木，你想把积木一个一个拿出来玩

__iter__ 方法: 就像整理好积木盒，确保积木按顺序摆放好，并准备好一个指示器，指向第一块积木。

__next__ 方法: 就像根据指示器，拿出当前指向的积木，并将指示器移动到下一块积木。

在 Stream 类的源码中，我们可以看到它实现了这两个方法，所以它是一个迭代器：

```python
class Stream(...):
    ...
    def __iter__(self) -> Iterator[_T]:
        for item in self._iterator:
            yield item

    def __next__(self) -> _T:
        return self._iterator.__next__()
    ...
```



In [65]:
with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        # 进入 with 语句块后，API 请求才会真正发出
        for text in stream.text_stream:
            print(text, end="", flush=True)

以下是一些有趣的动物趣事:

1. 章鱼有三颗心脏。两颗心脏专门用来给鳃供血,另一颗则负责给全身其他部位供血。

2. 蜂鸟是唯一能够倒着飞行的鸟类。它们灵活的翅膀使它们能够在空中悬停和向各个方向飞行。

3. 长颈鹿的舌头长达45-50厘米,可以用来清洁自己的耳朵。

4. 企鹅会向心爱的对象赠送小石子作为求爱礼物。

5. 树懒动作缓慢,一周只排便一次,而且每次都要花费大约一周的时间才能完成整个排便过程。

6. 大象是唯一无法跳跃的哺乳动物。它们的腿骨结构使它们无法同时将四肢离地。

7. 海豚睡觉时只有半个大脑在休息,另一半保持清醒以控制呼吸和警惕捕食者。

8. 蚂蚁能够举起相当于自身体重50倍的重物,相当于一个成年人能够举起一辆小汽车。

9. 北极熊的皮毛其实是透明的,只是因为反射光线才显得是白色。

10. 蜜蜂在飞行时会跳舞来告诉同伴食物的位置和方向。

这些动物的特性和行为都非常有趣,展示了大自然的奇妙和多样性。

In [68]:
for event in stream:
    print(event)

In [69]:
class MyIterator:
    def __init__(self, data):
        self.data = data
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if self.index < len(self.data):
            result = self.data[self.index]
            self.index += 1
            return result
        else:
            raise StopIteration

# 创建迭代器对象
my_iter = MyIterator([1, 2, 3, 4, 5])


# 惰性计算演示：
print("开始迭代...")
for i in range(3):  
    item = next(my_iter) # 第3个元素在执行了前两次next才加载和计算元素，而没有一次性将所有元素加载到内存
    print(f"获取到元素：{item}") 
print("迭代暂停但并没有真正暂停，它的内部状态（例如 self.index 自我索引）已经更新到下一个元素的位置，for 循环控制了迭代的次数结束后保留了迭代器的状态，")

# 继续迭代：
print("继续迭代...")
for item in my_iter: # 从上次暂停的位置继续迭代
    print(f"获取到元素：{item}")

开始迭代...
获取到元素：1
获取到元素：2
获取到元素：3
迭代暂停但并没有真正暂停，它的内部状态（例如 self.index 自我索引）已经更新到下一个元素的位置，for 循环控制了迭代的次数结束后保留了迭代器的状态，
继续迭代...
获取到元素：4
获取到元素：5


- 加餐内容：偏工程化streaming的底层逻辑及后端的数据结构，如果只是想要使用streaming不看这部分也不影响后续的学习使用。

**Stream 类返回的对象是迭代器，这个说法是对的。但是还有吗？更准确地说，Stream 类返回的对象是一个生成器对象，它是一种特殊的迭代器，可以按需生成元素，因为有暂停和恢复执行的机制来接收外部输入，这也是将其作为一个协程来达到异步的必须条件。 面试加分就来了**

**两者区别：**

* **生成器可以使用 `yield` 关键字主动暂停执行，并将控制权交还给调用者。** 即可以可以在执行过程中暂停，并在之后的时间点恢复执行，并可以接收外部传入的值（大模型的流式输出）。
又因为它会暂停执行，等待接收新的数据块，然后处理数据块并返回结果。 这样可以避免阻塞主线程，提高程序的响应速度。实现异步处理流式响应。

* **普通迭代器没有主动暂停的能力，它只是被动地根据 `__next__` 方法的调用返回下一个元素。** `for` 循环或其他迭代工具可以控制迭代的次数和进度，但迭代器本身不会暂停。



可以看到普通的迭代器是积木盒里的积木是预先准备好的，我们这里的stream积木是一个一个现来的，不是事先就存在好的。它更是迭代器中的特殊的迭代器——生成器。是的相当于子类，拥有迭代器迭代的特性又有其他的功能。

通过**`yield` 关键字：**识别不是普通的迭代器而是生成器：


__iter__: 返回生成器对象本身。就像你准备了一个空的积木盒，并拿一支笔指着盒子的入口，告诉自己 "我已经准备好开始往盒子里放积木并玩了"。

__next__ : 返回生成器的下一个元素。如果没有更多元素，则引发 StopIteration 异常。就像你每次根据规则（生成器函数中的代码）制作一块新的积木，并把它放到盒子里（yield 语句），然后把笔指向下一块要制作的积木的位置。

生成器可以作为协程使用，实现异步编程。 在 Stream 类中，使用生成器作为协程可以实现异步处理流式响应，提高程序的效率和响应速度。

```python
class Stream(...):
    ...
    def __iter__(self) -> Iterator[_T]:
        for item in self._iterator:
            yield item

    def __next__(self) -> _T:
        return self._iterator.__next__()
    ...
```


In [73]:
stream =  client.messages.create(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
        stream=True
    ) 
        # 进入 with 语句块后，API 请求才会真正发出
for event in stream:
    print(event)

RawMessageStartEvent(message=Message(id='msg_01UhGkCZSHZxbQPyYZrxPDuY', content=[], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason=None, stop_sequence=None, type='message', usage=Usage(input_tokens=18, output_tokens=1)), type='message_start')
RawContentBlockStartEvent(content_block=TextBlock(text='', type='text'), index=0, type='content_block_start')
RawContentBlockDeltaEvent(delta=TextDelta(text='好的,我', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='来分', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='享几个有', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='趣的动物小', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='知', type='text_delta'), index=0, type='content_block_delta')
RawContentBlockDeltaEvent(delta=TextDelta(text='识:', type='text_delta'),

## 异步调用

之前学的所有都是在同步语境下的，即程序是顺序发生。在我们自己这样简单实用API中同步就足够了，因为很少发生阻塞的情况。

我们问问题也是一个一个问的，但如果我问两个问题，它也是回答了第一个再回答第二个的。

那如果我想他们都一起发送message的请求，一起处理，谁先回答完谁先返还给我呢？

包括我们之前成功的做出来了多轮对话机器人，能调用函数工具的对话机器人，还有上面的流式对话机器人

在实际的聊天机器人中，我们学会使用了流式输出，加快了用户看到首token的时间，已经离工程化、实际落地近了一些。

但如果是多个用户同时开启了我们的聊天机器人呢，相当于同时请求了message的API，我们还要把用户A的问题回答完了再开始回答用户B的吗

那用户z要多可怜呀（要等到前面23个的message都响应回答完毕才处理它的）

- sleep() 来模拟每个步骤所需的时间。在实际应用中，这可能是等待数据库查询、文件 I/O 或网络请求等。


In [4]:
import time

def answer_question(delay, name):
    print(f"开始回答用户{name}的问题")
    time.sleep(delay)  # 使用同步 sleep
    print(f"用户 {name}的问题得到了回答，回答用时 {delay} 秒")

def main():
    questions = [
        (5, "A"),
        (1, "B"),
        (3, "C"),
        (2, "D"),
        (4, "E"),
        (1, "F"),
        (6, "G"),
        (2, "H"),
        (5, "I"),
        (3, "J"),
        (4, "K"),
        (1, "L"),
        (7, "M"),
        (2, "N"),
        (5, "O"),
        (3, "P")
    ]

    start_time = time.time()

    for delay, name in questions:
        answer_question(delay, name)

    end_time = time.time()
    total_time = end_time - start_time
    print(f"\n所有问题回答完毕，总共用时 {total_time:.2f} 秒")

# 运行主函数
main()

开始回答用户A的问题
用户 A的问题得到了回答，回答用时 5 秒
开始回答用户B的问题
用户 B的问题得到了回答，回答用时 1 秒
开始回答用户C的问题
用户 C的问题得到了回答，回答用时 3 秒
开始回答用户D的问题
用户 D的问题得到了回答，回答用时 2 秒
开始回答用户E的问题
用户 E的问题得到了回答，回答用时 4 秒
开始回答用户F的问题
用户 F的问题得到了回答，回答用时 1 秒
开始回答用户G的问题
用户 G的问题得到了回答，回答用时 6 秒
开始回答用户H的问题
用户 H的问题得到了回答，回答用时 2 秒
开始回答用户I的问题


KeyboardInterrupt: 

怎么解决它呢？

我们先来简单介绍一下多进程，多线程，异步（协程），他们都可以解决这个问题

多进程:

    独立的内存空间
    适合CPU密集型任务
    不受GIL限制,可实现真正的并行


多线程:

    共享内存空间
    适合I/O密集型任务
    受全局解释器锁(GIL)限制



- 全局解释器锁(GIL)是Python解释器中的一个机制，它确保同一时刻只有一个线程可以执行Python字节码。

并发:在同一时间段内管理多个任务的执行，这些任务可能交替进行，但没有真正同时执行。

时间 -->

线程1: ----====----====----====----

线程2: --====----====----====------

并行:每个任务在不同的处理器核心上独立执行，同一时刻真正同时执行多个任务，需要多核处理器支持。

时间 -->

进程1: =================>

进程2: =================>

多线程:

    +---------------------------+
    |          进程             |
    |  +--------+  +--------+   |
    |  | 线程1  |  | 线程2  |   |
    |  +--------+  +--------+   |
    |  同一处理器核心，共享内存空间   |
    +---------------------------+

多进程:

    +------------+  +------------+
    |   进程1    |  |   进程2    |
    |            |  |            |
    | 独立内存   |  | 独立内存   |
    +------------+  +------------+

异步：
    
    +--------------------------------------------------+
    |                      进程                        |
    |  +--------------------------------------------+  |
    |  |                   线程                     |  |
    |  |  +----------------------------------------+|  |
    |  |  |              事件循环                  ||  |
    |  |  |  +------------+  +------------+        ||  |
    |  |  |  |   协程1    |  |   协程2    |        ||  |
    |  |  |  +------------+  +------------+        ||  |
    |  |  |  +------------+                        ||  |
    |  |  |  |   协程3    |                        ||  |
    |  |  |  +------------+                        ||  |
    |  |  +----------------------------------------+|  |
    |  |                     GIL                    |  |
    |  +--------------------------------------------+  |
    |                       |                          |
    |                   处理器核心                     |
    +--------------------------------------------------+

- CPU密集型任务：GIL严重限制了多线程的性能
- I/O密集型任务：由于GIL在I/O等待时释放，影响相对较小

所以I/O操作用有GIL锁限制的多线程及协成更划算

性能比较：
- CPU密集型任务：  多进程 > 多线程 ≈ 异步
- I/O密集型任务：  异步 ≥ 多线程 ≈ 多进程

我们的大模型回答及流式是I/O操作，是I/O密集型任务

结合使用异步与多线程两种方法是最佳选择。例如，在一个web应用中，您可能使用异步来处理HTTP请求，而使用多线程来处理后台任务。

线程是更外面的东西，和大模型API调用这种处理HTTP请求的是涉及到异步，所以我们要学异步的API调用

于是我们经常在实际项目里面会看到这样的代码：

```python
async with client.messages.stream(...) as stream:
    async for text in stream.text_stream:
        print(text, end="", flush=True)

final_message = await stream.get_final_message()
...
```

这里面async，await就是**异步**的标识。不用怕，这节课我们会熟悉它。


- **事件循环（Event Loop）**：管理和调度异步任务的核心机制，负责处理任务的执行、挂起和恢复。
- **async/await **：async def 定义异步函数，await 暂停函数执行，等待协程完成，例如，`await asyncio.sleep(delay)`：在此处，函数暂停执行，等待指定的 `delay` 秒。这种暂停是**非阻塞的**，因为它允许事件循环在等待期间处理其他任务。（我们的操作）
- **回调（Callback）：在异步操作完成时，被调用的函数或方法**，比如煎牛排的时候去拌沙拉了，不需要不断检查牛排是否完成，可以专注于其他任务。一旦牛排完成，铃声自动提醒，避免遗漏。之前是自己手写的，但现在async/await就实现了。

In [5]:
import asyncio

async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed after {delay} seconds")

async def main():
    tasks = [
        task("A", 3),
        task("B", 1),
        task("C", 2),
    ]
    await asyncio.gather(*tasks)

# asyncio.run(main())
await main()

Task A started
Task B started
Task C started
Task B completed after 1 seconds
Task C completed after 2 seconds
Task A completed after 3 seconds


事件循环的工作过程

1. **启动事件循环**：启动事件循环，并执行 `main` 协程。

2. **创建并调度任务**：`main` 函数创建三个任务（A、B、C），并通过 `asyncio.gather` 同时调度它们。

3. **执行任务A**：
   - 打印 "Task A started"
   - 遇到 `await asyncio.sleep(3)`，挂起自身，等待3秒。

4. **执行任务B**：
   - 打印 "Task B started"
   - 遇到 `await asyncio.sleep(1)`，挂起自身，等待1秒。

5. **执行任务C**：
   - 打印 "Task C started"
   - 遇到 `await asyncio.sleep(2)`，挂起自身，等待2秒。

6. **等待与恢复**：
   - **1秒后**：任务B的 `sleep` 结束，事件循环恢复任务B，打印 "Task B completed after 1 seconds"。
   - **2秒后**：任务C的 `sleep` 结束，事件循环恢复任务C，打印 "Task C completed after 2 seconds"。
   - **3秒后**：任务A的 `sleep` 结束，事件循环恢复任务A，打印 "Task A completed after 3 seconds"。

7. **结束事件循环**：所有任务完成，事件循环结束。

In [7]:
import asyncio
import time

async def answer_question(delay, name):
    print(f"开始回答用户{name}的问题")
    await asyncio.sleep(delay)  # 使用异步 sleep
    print(f"用户 {name}的问题得到了回答，回答用时 {delay} 秒")

async def main():
    tasks = [
        answer_question(5, "A"),
        answer_question(1, "B"),
        answer_question(3, "C"),
        answer_question(2, "D"),
        answer_question(4, "E"),
        answer_question(1, "F"),
        answer_question(6, "G"),
        answer_question(2, "H"),
        answer_question(5, "I"),
        answer_question(3, "J"),
        answer_question(4, "K"),
        answer_question(1, "L"),
        answer_question(7, "M"),
        answer_question(2, "N"),
        answer_question(5, "O"),
        answer_question(3, "P")
    ]
    await asyncio.gather(*tasks)

await main()

开始回答用户A的问题
开始回答用户B的问题
开始回答用户C的问题
开始回答用户D的问题
开始回答用户E的问题
开始回答用户F的问题
开始回答用户G的问题
开始回答用户H的问题
开始回答用户I的问题
开始回答用户J的问题
开始回答用户K的问题
开始回答用户L的问题
开始回答用户M的问题
开始回答用户N的问题
开始回答用户O的问题
开始回答用户P的问题
用户 B的问题得到了回答，回答用时 1 秒
用户 F的问题得到了回答，回答用时 1 秒
用户 L的问题得到了回答，回答用时 1 秒
用户 N的问题得到了回答，回答用时 2 秒
用户 D的问题得到了回答，回答用时 2 秒
用户 H的问题得到了回答，回答用时 2 秒
用户 C的问题得到了回答，回答用时 3 秒
用户 J的问题得到了回答，回答用时 3 秒
用户 P的问题得到了回答，回答用时 3 秒
用户 K的问题得到了回答，回答用时 4 秒
用户 E的问题得到了回答，回答用时 4 秒
用户 O的问题得到了回答，回答用时 5 秒
用户 A的问题得到了回答，回答用时 5 秒
用户 I的问题得到了回答，回答用时 5 秒
用户 G的问题得到了回答，回答用时 6 秒
用户 M的问题得到了回答，回答用时 7 秒


1. **创建任务**：`main` 函数创建了A-P个 `answer_question` 协程对象，并将它们传递给 `asyncio.gather`。

2. **调度任务**：事件循环开始调度这些协程。每个协程开始执行，打印 "开始回答用户X的问题"，然后遇到 `await asyncio.sleep(delay)`。

3. **挂起协程**：当协程执行到 `await asyncio.sleep(delay)` 时，它会挂起自己，告诉事件循环在指定的时间后恢复执行。这时，事件循环会继续调度其他任务，而不会阻塞等待。

4. **恢复执行**：一旦指定的 `delay` 时间过去，事件循环会将该协程重新调度执行，打印 "用户 X 的问题得到了回答，回答用时 Y 秒"。

非阻塞：主程序不会被长时间运行的操作阻塞。

高效：可以同时处理多个事件。这就是异步的特点啦！

同步处理时程序会按顺序逐个接收和处理流式数据中的事件。在处理完一块代码即一个事件之前，程序会阻塞，无法进行其他操作。
异步处理使用 `async` 和 `await` 关键字以异步的方式接收和处理流式数据。当程序等待数据时，它可以切换到其他任务，不会阻塞主线程。

| 特性    | 同步处理         | 异步处理        |
| ----- | ------------ | ----------- |
| 处理方式  | 阻塞式          | 非阻塞式        |
| 实时性   | 较低           | 较高          |
| 代码复杂度 | 简单           | 复杂          |
| 适用场景  | 数据量小，实时性要求不高 | 数据量大，需要保持响应 |


由此，异步可以处理很多种问题，比如：

场景：你正在开发一个旅游预订平台。

具体业务：用户搜索旅行套餐时，系统需要查询多个外部API：
- 航空公司API查询航班
- 酒店API查询房间可用性
- 租车公司API查询车辆
- 旅游景点API查询门票


场景：你在开发一个大模型的聊天应用或者RAG，Agent应用等。

- 发送和接收消息
    - 当用户发送消息时，客户端立即将消息显示在聊天界面上，同时异步发送到服务器。
    - 服务器接收到消息后，异步处理（如存储到数据库、进行内容过滤等），然后异步推送给其他在线用户。 
- 同步聊天历史
    - 当用户打开应用时，立即显示最近的消息，同时在后台异步加载更多历史消息。
    - 使用分页或虚拟滚动技术，异步加载更早的消息


在Claude的API调用中怎么能从同步消息转换为异步消息呢？首先，我们源头的小助手就要变，而不是像stream在发送消息的用不同的参数或函数就可以改变的了
原来的小助手是这样创建的


In [None]:
from anthropic import Anthropic

client = Anthropic()

用异步要导入这样的小助手

In [None]:
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

而之后的函数使用都是一样的，create(),stream()的使用方式，参数也都是一样的，这是因为在包里面都已经封装好了，比如：

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240912151228.png)

![](https://typora-photo1220.oss-cn-beijing.aliyuncs.com/DataAnalysis/LingYi/20240912151232.png)

** 异步编程的核心理念**

 **传统厨房的工作方式（同步编程）**

在传统的厨房中，厨师小明一次只能专注于一项任务。例如：

1. **做汤**（需要10分钟）
2. **煎牛排**（需要5分钟）
3. **准备沙拉**（需要3分钟）

**同步工作流程：**

**总时间：** 10 + 5 + 3 = **18分钟**

**问题：** 在等待过程中，小明无法同时处理其他任务，导致效率低下，尤其在高峰期时，顾客需要等待更长时间。

**引入异步编程的工作方式**

为了提高效率，小明决定采用**异步编程**的理念，允许他在等待某个任务完成时，继续处理其他任务。

**异步工作流程：**

1. **开始做汤**：记录需要10分钟，不阻塞继续做其他菜。
2. **开始煎牛排**：记录需要5分钟。
3. **开始准备沙拉**：记录需要3分钟。
4. **等待**：在不同任务的等待时间内，合理安排任务的完成顺序。

**总时间：** 10分钟（做汤）与其他任务重叠，整体准备时间显著减少。

 **异步函数与`async`/`await`**

**定义：**

- **异步函数（Async Function）**：使用 `async def` 定义的函数，能够在执行过程中挂起自身，等待某些操作完成。
- **`await` 关键字**：用于暂停当前异步函数的执行，等待某个异步操作完成，然后恢复执行。

**类比：**

- **异步函数**：像小明准备某道菜的具体步骤，例如“煮汤”。
- **`await`**：像设置一个计时器，在等待汤煮好的同时，小明可以开始准备其他菜。

为了保持非阻塞性，让事件循环能够在等待某个操作完成时，继续处理其他任务。

如果内部操作是同步的（如使用 time.sleep()），会阻塞整个事件循环，导致其他任务无法执行。
使用异步操作（如 await asyncio.sleep()），允许事件循环在等待期间调度和执行其他任务，保持高效的并发能力。

In [8]:
async def make_soup():
    print("开始煮汤")
    await asyncio.sleep(10)  # 模拟煮汤需要的时间
    print("汤煮好了")


async def cook_steak():
    print("开始煎牛排")
    await asyncio.sleep(5)  # 模拟煎牛排需要的时间
    print("牛排煎好了")


async def make_salad():
    print("开始准备沙拉")
    await asyncio.sleep(3)  # 模拟准备沙拉需要的时间
    print("沙拉准备好了")


async def prepare_full_meal():
    soup_task = asyncio.create_task(make_soup())
    steak_task = asyncio.create_task(cook_steak())
    salad_task = asyncio.create_task(make_salad())

    await asyncio.gather(soup_task, steak_task, salad_task) #并发运行多个协程,gather() 会等待所有传入的协程完成。
    print("所有菜品都准备好了！")

await prepare_full_meal()

开始煮汤
开始煎牛排
开始准备沙拉
沙拉准备好了
牛排煎好了
汤煮好了
所有菜品都准备好了！


**事件循环的详细工作机制** 

**任务的创建与调度**

当小明决定同时准备多道菜时，他会创建多个任务（协程），例如：

- `make_soup()`
- `cook_steak()`
- `make_salad()`

这些任务被传递给事件循环，事件循环负责调度它们的执行。

**挂起与恢复**

在异步函数中，遇到 `await` 时，当前任务会被挂起，控制权返回给事件循环，事件循环可以调度其他任务继续执行。

**具体流程：**

1. **开始任务**：事件循环启动 `make_soup()`、`cook_steak()`、`make_salad()`。
2. **遇到 `await`**：
   - `make_soup()` 开始煮汤，`await asyncio.sleep(10)` 暂停自己，告诉事件循环等待10分钟后恢复。
   - 同时，事件循环调度 `cook_steak()` 和 `make_salad()`。
3. **继续执行其他任务**：
   - `cook_steak()` 开始煎牛排，`await asyncio.sleep(5)` 暂停自己，告诉事件循环等待5分钟后恢复。
   - `make_salad()` 开始准备沙拉，`await asyncio.sleep(3)` 暂停自己，告诉事件循环等待3分钟后恢复。
4. **任务完成**：
   - 3分钟后，`make_salad()` 恢复执行，完成沙拉准备。
   - 5分钟后，`cook_steak()` 恢复执行，完成煎牛排。
   - 10分钟后，`make_soup()` 恢复执行，完成煮汤。
5. **所有任务完成**：事件循环确认所有任务都已完成，输出“所有菜品都准备好了！”

**非阻塞性**

由于使用了 `await`，每个任务在等待期间不会阻塞其他任务的执行。这种非阻塞性是异步编程的核心优势，能够显著提升程序的并发能力和响应性。

 **事件循环的调度策略**

事件循环采用**协作式调度**，即任务在适当的时机（如遇到 `await`）主动让出控制权。这与抢占式调度（操作系统中常见的多线程调度）不同，避免了线程之间的抢占冲突，提高了调度效率。

In [9]:
import asyncio
import time

async def answer_question(delay, name):
    print(f"开始回答用户{name}的问题")
    time.sleep(delay)  # 使用异步 sleep
    print(f"用户 {name}的问题得到了回答，回答用时 {delay} 秒")

async def main():
    tasks = [
        answer_question(5, "A"),
        answer_question(1, "B"),
        answer_question(2, "C"),
        answer_question(2, "D"),
        answer_question(4, "E"),
        answer_question(1, "F"),
        answer_question(6, "G"),

    ]
    await asyncio.gather(*tasks)

await main()

开始回答用户A的问题
用户 A的问题得到了回答，回答用时 5 秒
开始回答用户B的问题
用户 B的问题得到了回答，回答用时 1 秒
开始回答用户C的问题
用户 C的问题得到了回答，回答用时 2 秒
开始回答用户D的问题
用户 D的问题得到了回答，回答用时 2 秒
开始回答用户E的问题
用户 E的问题得到了回答，回答用时 4 秒
开始回答用户F的问题
用户 F的问题得到了回答，回答用时 1 秒
开始回答用户G的问题
用户 G的问题得到了回答，回答用时 6 秒


CancelledError: 

In [10]:
import asyncio
import time

async def answer_question(delay, name):
    print(f"开始回答用户{name}的问题")
    await asyncio.sleep(delay)  # 使用异步 sleep
    print(f"用户 {name}的问题得到了回答，回答用时 {delay} 秒")

async def main():
    tasks = [
        answer_question(5, "A"),
        answer_question(1, "B"),
        answer_question(2, "C"),
        answer_question(2, "D"),
        answer_question(4, "E"),
        answer_question(1, "F"),
        answer_question(6, "G"),

    ]
    await asyncio.gather(*tasks)

await main()

开始回答用户A的问题
开始回答用户B的问题
开始回答用户C的问题
开始回答用户D的问题
开始回答用户E的问题
开始回答用户F的问题
开始回答用户G的问题
用户 B的问题得到了回答，回答用时 1 秒
用户 F的问题得到了回答，回答用时 1 秒
用户 C的问题得到了回答，回答用时 2 秒
用户 D的问题得到了回答，回答用时 2 秒
用户 E的问题得到了回答，回答用时 4 秒
用户 A的问题得到了回答，回答用时 5 秒
用户 G的问题得到了回答，回答用时 6 秒


怎么实践：

1. 函数定义：使用 `async def` 定义异步函数。

2. 函数内部改成异步：当你用 `async def` 定义一个函数时，你就是在说"这个函数需要能够同时处理多个任务"。为了实现这一点，函数内部的操作也需要支持这种多任务处理的方式。
    - 如果方法涉及I/O操作或需要等待某些异步事件完成，那么这些方法应当是异步的，且需要使用 await 调用。
    - 如果方法只是简单地返回已经存在的数据，而不涉及任何阻塞或等待操作，那么它们可以是同步的，不需要 await。

3. 等待异步操作：在调用协程或异步方法时使用 `await`。

异步上下文管理器 (`async with`)**

**作用**：管理异步资源的获取和释放，确保资源在使用后被正确关闭。

**类比**：小明在使用高效厨房设备（如智能烤箱）时，确保设备在使用完毕后自动关闭，避免资源浪费或安全隐患。

异步迭代器 (`async for`)**

**作用**：用于异步地遍历数据流或生成器，适用于处理异步生成的数据。

In [15]:
from anthropic import Anthropic

client = Anthropic()

def streaming_with_helpers():
    with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        # 进入 with 语句块后，API 请求才会真正发出
        for text in stream.text_stream:
            print(text, end="", flush=True)

    final_message = stream.get_final_message()
    print("\n\nSTREAM函数返回对象的得到最终消息的内置方法get_final_message()： ")
    print(final_message)
    
    final_text = stream.get_final_text()
    print("\n\nSTREAM函数返回对象的得到最终文本的内置方法get_final_text()： ")
    print(final_text)

streaming_with_helpers()

好的,我来为您分享一些有趣的动物趣事:

1. 章鱼有三颗心脏。两颗心脏专门为鳃泵血,另一颗则为其余身体部位供血。

2. 蜂鸟是唯一能够倒着飞的鸟类。它们可以向后飞、上下飞,甚至可以倒立悬停在空中。

3. 熊猫的拇指其实是腕骨演化而来的。它们用这个"伪拇指"来抓握竹子。

4. 海豚睡觉时只有大脑一半在休息,另一半保持清醒状态以便呼吸和警惕捕食者。

5. 蚂蚁能够举起相当于自身体重50倍的重物。相比之下,人类只能举起大约自身体重的一半。

6. 长颈鹿的舌头长达45-50厘米,可以清洁自己的耳朵。

7. 企鹅会向伴侣求婚,并赠送鹅卵石作为定情信物。

8. 树懒的动作极其缓慢,以至于藻类可以在它们的毛发上生长。

这些动物的特征和行为都非常有趣,展现了大自然的神奇和多样性。希望这些趣事能让您感到有趣!

STREAM函数返回对象的得到最终消息的内置方法get_final_message()： 
Message(id='msg_01X5Q7YWa9vdFCoJXPrySqah', content=[TextBlock(text='好的,我来为您分享一些有趣的动物趣事:\n\n1. 章鱼有三颗心脏。两颗心脏专门为鳃泵血,另一颗则为其余身体部位供血。\n\n2. 蜂鸟是唯一能够倒着飞的鸟类。它们可以向后飞、上下飞,甚至可以倒立悬停在空中。\n\n3. 熊猫的拇指其实是腕骨演化而来的。它们用这个"伪拇指"来抓握竹子。\n\n4. 海豚睡觉时只有大脑一半在休息,另一半保持清醒状态以便呼吸和警惕捕食者。\n\n5. 蚂蚁能够举起相当于自身体重50倍的重物。相比之下,人类只能举起大约自身体重的一半。\n\n6. 长颈鹿的舌头长达45-50厘米,可以清洁自己的耳朵。\n\n7. 企鹅会向伴侣求婚,并赠送鹅卵石作为定情信物。\n\n8. 树懒的动作极其缓慢,以至于藻类可以在它们的毛发上生长。\n\n这些动物的特征和行为都非常有趣,展现了大自然的神奇和多样性。希望这些趣事能让您感到有趣!', type='text')], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason='end_turn', stop_sequence=None, type='message', usage=Usage(inpu

找错误

In [18]:
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def streaming_with_helpers():
    #异步上下文管理
    async with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        #异步迭代
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    final_message = await stream.get_final_message()
    print("\n\nSTREAM函数返回对象的得到最终消息的内置方法get_final_message()： ")
    print(final_message)
    
    final_text = await stream.get_final_text()
    print("\n\nSTREAM函数返回对象的得到最终文本的内置方法get_final_text()： ")
    print(final_text)

await streaming_with_helpers()

好的,我来分享一些有趣的动物小知识:

1. 章鱼有三颗心脏。两颗心脏负责将血液泵送到鳃部,另一颗负责将血液输送到全身。

2. 海豚睡觉时只有半个大脑休息,另外半个大脑保持清醒以控制呼吸和警惕捕食者。

3. 蜂鸟是唯一能够倒着飞行的鸟类。

4. 长颈鹿的舌头长度可达45-50厘米,可以清洁自己的耳朵。

5. 企鹅会向心仪的对象赠送小石子作为求偶礼物。

6. 大象是唯一不能跳跃的哺乳动物。它们的腿骨结构不允许它们离地。

7. 蚂蚁能够举起相当于自身体重50倍的重物。

8. 树懒的动作非常缓慢,以至于藻类可以在它们的毛发上生长。

9. 猫科动物,如猫和老虎,都无法品尝甜味。

10. 蝙蝠总是向左转身飞出洞穴,原因尚不明确。

希望这些小知识能让你感到有趣!动物世界确实充满了奇妙之处。

STREAM函数返回对象的得到最终消息的内置方法get_final_message()： 
Message(id='msg_01YLaHn2uqEStYLAB1pr9zt7', content=[TextBlock(text='好的,我来分享一些有趣的动物小知识:\n\n1. 章鱼有三颗心脏。两颗心脏负责将血液泵送到鳃部,另一颗负责将血液输送到全身。\n\n2. 海豚睡觉时只有半个大脑休息,另外半个大脑保持清醒以控制呼吸和警惕捕食者。\n\n3. 蜂鸟是唯一能够倒着飞行的鸟类。\n\n4. 长颈鹿的舌头长度可达45-50厘米,可以清洁自己的耳朵。\n\n5. 企鹅会向心仪的对象赠送小石子作为求偶礼物。\n\n6. 大象是唯一不能跳跃的哺乳动物。它们的腿骨结构不允许它们离地。\n\n7. 蚂蚁能够举起相当于自身体重50倍的重物。\n\n8. 树懒的动作非常缓慢,以至于藻类可以在它们的毛发上生长。\n\n9. 猫科动物,如猫和老虎,都无法品尝甜味。\n\n10. 蝙蝠总是向左转身飞出洞穴,原因尚不明确。\n\n希望这些小知识能让你感到有趣!动物世界确实充满了奇妙之处。', type='text')], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason='end_turn', stop_sequence=None, type='message', usage=Usage(input_tokens

stream.text_stream 是一个异步迭代器。每次迭代都可能涉及一个异步操作（比如等待新的数据到达）。
Python中没有 await for 这样的语法。async for 已经隐含了在每次迭代中进行 await 的操作。
如果我们展开 async for 循环，它大致相当于：

In [None]:
iterator = stream.text_stream.__aiter__()
while True:
    try:
        text = await iterator.__anext__()
    except StopAsyncIteration:
        break
    print(text, end="", flush=True)

async with 语句实际上是对异步上下文管理器的一种简化使用。如果我们展开它，大致会是这样的结构：

In [None]:
async def streaming_with_helpers():
    context_manager = client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    )
    
    stream = await context_manager.__aenter__()
    try:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
    finally:
        await context_manager.__aexit__(None, None, None)

是否需要异步调用取决于这些方法的具体实现和功能。以下是判断的几个关键点：

a. 方法的实现细节
- 如果方法涉及I/O操作或需要等待某些异步事件完成，那么这些方法应当是异步的，且需要使用 await 调用。
如果方法只是简单地返回已经存在的数据，而不涉及任何阻塞或等待操作，那么它们可以是同步的，不需要 await。

b. 查看库的文档或源代码
- 最直接的方法是查看您所使用的客户端库的文档或源代码。通常，异步方法会被定义为 async def，并返回协程对象。

在我们的代码中，get_final_message() 和 get_final_text() 方法是异步的，但你没有使用 await 来等待它们完成。

其实我们知道这个道理，但为什么疏漏了呢？因为他藏在stream（）函数返回的对象里面

因为我们的client = AsyncAnthropic()是异步的小助手，它里面的对象，方法也都是异步的。

正确改造：

In [83]:
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

async def streaming_with_helpers():
    async with client.messages.stream(
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": "讲几个动物的趣事",
            }
        ],
        model="claude-3-5-sonnet-20240620",
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

    final_message = await stream.get_final_message()
    print("\n\nSTREAM函数返回对象的得到最终消息的内置方法get_final_message()： ")
    print(final_message)
    
    final_text = await stream.get_final_text()
    print("\n\nSTREAM函数返回对象的得到最终文本的内置方法get_final_text()： ")
    print(final_text)


await streaming_with_helpers()

好的,我来分享一些有趣的动物小知识:

1. 海豚睡觉时只有半个大脑在休息,另一半保持清醒以便呼吸和警惕捕食者。

2. 蜗牛可以睡眠长达3年之久。

3. 企鹅会向自己心仪的对象赠送小石子作为求偶礼物。

4. 北极熊的皮毛其实是透明的,看起来是白色是因为反射了阳光。

5. 章鱼有三颗心脏,其中两颗专门为鳃供血,另一颗为其他器官供血。

6. 袋鼠不能向后行走,因为它们强壮的尾巴和大脚掌不允许。

7. 蚂蚁能举起相当于自身体重50倍的重物。

8. 长颈鹿的舌头长达21英寸(约53厘米),可以清洁自己的耳朵。

9. 大象是唯一不会跳跃的哺乳动物。

10. 蝴蝶用脚来品尝食物。

这些动物的特性和行为是不是很有趣呢?大自然真是充满了奇妙!

STREAM函数返回对象的得到最终消息的内置方法get_final_message()： 
Message(id='msg_016yXmAyAhN1HDhA5QXW2Az5', content=[TextBlock(text='好的,我来分享一些有趣的动物小知识:\n\n1. 海豚睡觉时只有半个大脑在休息,另一半保持清醒以便呼吸和警惕捕食者。\n\n2. 蜗牛可以睡眠长达3年之久。\n\n3. 企鹅会向自己心仪的对象赠送小石子作为求偶礼物。\n\n4. 北极熊的皮毛其实是透明的,看起来是白色是因为反射了阳光。\n\n5. 章鱼有三颗心脏,其中两颗专门为鳃供血,另一颗为其他器官供血。\n\n6. 袋鼠不能向后行走,因为它们强壮的尾巴和大脚掌不允许。\n\n7. 蚂蚁能举起相当于自身体重50倍的重物。\n\n8. 长颈鹿的舌头长达21英寸(约53厘米),可以清洁自己的耳朵。\n\n9. 大象是唯一不会跳跃的哺乳动物。\n\n10. 蝴蝶用脚来品尝食物。\n\n这些动物的特性和行为是不是很有趣呢?大自然真是充满了奇妙!', type='text')], model='claude-3-5-sonnet-20240620', role='assistant', stop_reason='end_turn', stop_sequence=None, type='message', usage=Usage(input_tokens=18, output_tokens=385))


STREAM函数返回对象的得到最终文本的内置方法g

异步处理多个用户来提问的问题


In [20]:
import asyncio
import time
from anthropic import Anthropic, AsyncAnthropic

sync_client = Anthropic()
async_client = AsyncAnthropic()

def sync_streaming_with_helpers(model, content):
    print(f"\n开始同步流式处理，使用模型: {model}")
    with sync_client.messages.stream(
        max_tokens=1000,
        messages=[{"role": "user", "content": content}],
        model=model
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)


async def async_streaming_with_helpers(model, content):
    print(f"\n开始异步流式处理，使用模型: {model}")
    async with async_client.messages.stream(
        max_tokens=1000,
        messages=[{"role": "user", "content": content}],
        model=model
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
    

def run_sync():
    start_time = time.time()
    
    sync_streaming_with_helpers("claude-3-haiku-20240307", "写关于唐代的历史")
    sync_streaming_with_helpers("claude-3-sonnet-20240229", "写一首关于自然的诗。")
    
    end_time = time.time()
    print(f"\n同步执行总时间: {end_time - start_time:.2f} 秒")

async def run_async():
    start_time = time.time()
    
    tasks = [
        async_streaming_with_helpers("claude-3-haiku-20240307", "讲讲唐代的历史"),
        async_streaming_with_helpers("claude-3-sonnet-20240229", "写一首关于自然的诗。")
    ]
    await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"\n异步执行总时间: {end_time - start_time:.2f} 秒")

# 在 Jupyter notebook 中，在不同的单元格中运行以下代码：

print("运行同步版本:")
run_sync()

print("\n\n运行异步版本:")
await run_async()

print("\n比较:")
print("1. 同步版本按顺序执行每个请求，总时间是两个请求时间的和。")
print("2. 异步版本并发执行请求，总时间接近单个最长请求的时间。")
print("3. 异步版本的输出会交错显示，而同步版本会按顺序完整显示每个响应。需要后续工程化的改进")
print("4. 当处理多个独立请求时，异步版本能显著提高效率，尤其是在有多个耗时操作的情况下。")

运行同步版本:

开始同步流式处理，使用模型: claude-3-haiku-20240307
这里给您概括性地介绍一下唐代的历史:

1. 唐朝成立于公元618年,是中国历史上最为繁荣昌盛的一个王朝,史称"盛唐"。唐朝采用"将相和"的分权制度,实行中央集权的君主专制政体。

2. 唐太宗李世民是唐朝最杰出的君主,他在位期间政治文化均达到了高峰。大唐盛世在他的统治下展现得淋漓尽致。

3. 唐代是中国历史上对外交往最为广泛的时期,中国文化在这个时期走向世界,与外国文明充分交流。唐朝的商贸、外交、文化影响力远远超过前代。

4. 唐朝后期出现安史之乱,战火纷飞,国力逐渐衰落。到公元907年唐朝灭亡,标志着中国历史上的"五代十国"时期的开始。

5. 唐代在政治、经济、文化等多个层面都取得了辉煌成就,被后世尊崇为"盛世"。它对后来的中国历史发展产生了深远影响。

总之,唐代是中国历史上最为璀璨夺目的王朝之一,其广阔的疆域、丰富的文化成就令后世景仰不已。
开始同步流式处理，使用模型: claude-3-sonnet-20240229
这是一首关于自然的诗:

曙光初现,田野苏醒
彩云霞映,朝露玲珑
群鸟啭鸣,溪水潺潺
青山环绕,万木苍苍

嫩芽探出清翠枝头
野花绽放缤纷芬芳
蜂儿翩翩,蝶儿穿梭
大自然中,生机勃勃

夕阳西下,云影变幻
皓月当空,繁星闪耀
林间微风徐徐吹拂
清澈溪流潺潺入梦

大自然赐予生命能量
阳光雨露孕育新生
感谢造物馈赠这馨香
尽享大自然的鸟语花香
同步执行总时间: 9.72 秒


运行异步版本:

开始异步流式处理，使用模型: claude-3-haiku-20240307

开始异步流式处理，使用模型: claude-3-sonnet-20240229
好的,下面我为您简单介绍一下唐代历史的一些重要事件:

1. 唐朝的建立

唐朝是中国历史上继隋朝之后的第二个统一王朝,由李渊于公元618年建立。开国君主李渊是以下是一首关于自然的诗:当时西部草原诸部的首领,凭借其政

大自然的馈赠

治智慧和军事实力逐步统一了中国。

2. 唐太宗的统治

在李渊去阳光温暖地拥抱着世后,他的儿子李世民即位,是唐朝最
大地重新焕发生机伟大的君主之一,他统一天下,治国有方
,被绿色新称为唐太宗。他实行贤臣任芽从土用政壤中策,制定了一套完善的政治、经济、军事体系,钻