# 使用Apache Kafka进行消息路由

---

本笔记本向您展示如何在通过Apache Kafka来回传递聊天消息的同时使用LangChain的标准聊天功能。

这个目标是模拟一个架构，其中聊天前端和LLM作为独立服务运行，它们需要通过内部网络相互通信。

这是一种替代典型模式的方法，即通过REST API从模型请求响应（有关为什么要这样做的更多信息，请参见笔记本末尾）。

### 1. 安装主要依赖项

依赖项包括：

- Quix Streams 库，用于以类似 "Pandas" 的方式管理与 Apache Kafka（或类似 Kafka 工具，如 Redpanda）的交互。
- LangChain 库，用于管理与 Llama-2 的交互并存储对话状态。

In [None]:
# 安装所需的库
!pip install quixstreams==2.1.2a langchain==0.0.340 huggingface_hub==0.19.4 langchain-experimental==0.0.42 python-dotenv

### 2. 构建和安装llama-cpp-python库（启用CUDA以便在Google Colab GPU上利用）

`llama-cpp-python`库是`llama-cpp`库的Python封装，可以高效地利用CPU来运行量化的LLM。

当您使用标准的`pip install llama-cpp-python`命令时，默认情况下不会获得GPU支持。如果您仅依赖Google Colab中的CPU，生成可能非常缓慢，因此以下命令添加了一个额外的选项来构建和安装`llama-cpp-python`，以支持GPU（请确保在Google Colab中选择了启用GPU的运行时）。

In [None]:
# 设置CMAKE_ARGS变量为"-DLLAMA_CUBLAS=on"，表示开启LLAMA库的CUBLAS支持
CMAKE_ARGS="-DLLAMA_CUBLAS=on"
# 设置FORCE_CMAKE变量为1，表示强制重新生成CMake文件
FORCE_CMAKE=1
# 使用pip命令安装llama-cpp-python库，并传入之前设置的CMAKE_ARGS和FORCE_CMAKE参数
pip install llama-cpp-python 

### 3. 下载并设置Kafka和Zookeeper实例

从Apache网站下载Kafka二进制文件，并将服务器设置为守护进程。我们将使用默认配置（由Apache Kafka提供）来启动实例。

In [3]:
# 下载 Apache Kafka 3.6.1 版本的压缩包
!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz

# 解压缩 Kafka 压缩包
!tar -xzf kafka_2.13-3.6.1.tgz

In [None]:
# 启动 ZooKeeper 服务器
!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties

# 启动 Kafka 服务器
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties

# 等待10秒，直到 Kafka 和 ZooKeeper 服务启动并运行
!echo "等待10秒，直到 Kafka 和 ZooKeeper 服务启动并运行"
!sleep 10

### 4. 检查 Kafka 守护程序是否正在运行

显示正在运行的进程，并将其过滤为 Java 进程（您应该看到两个，每个服务器一个）。

In [None]:
# 使用ps命令查看当前系统中所有进程的信息，并使用管道符号将结果传递给grep命令
!ps aux | grep -E '[j]ava'

### 5. 导入所需的依赖项并初始化所需的变量

导入与Kafka交互的Quix Streams库，以及运行`ConversationChain`所需的LangChain组件。

In [9]:
# 导入实用程序库
import json
import random
import re
import time
import uuid
from os import environ
from pathlib import Path
from random import choice, randint, random

from dotenv import load_dotenv

# 从 Hugging Face hub 直接下载模型的 Hugging Face 实用程序:
from huggingface_hub import hf_hub_download
from langchain.chains import ConversationChain

# 导入 Langchain 模块以管理提示和对话链:
from langchain.llms import LlamaCpp
from langchain.memory import ConversationTokenBufferMemory
from langchain.prompts import PromptTemplate, load_prompt
from langchain_core.messages import SystemMessage
from langchain_experimental.chat_models import Llama2Chat
from quixstreams import Application, State, message_key

# 导入 Quix 依赖项
from quixstreams.kafka import Producer

# 初始化全局变量。
AGENT_ROLE = "AI"
chat_id = ""

# 将当前角色设置为角色常量，并初始化辅助客户元数据的变量:
role = AGENT_ROLE

### 6. 下载"llama-2-7b-chat.Q4_K_M.gguf"模型

从Hugging Face下载量化的LLama-2 7B模型，我们将使用它作为本地LLM（而不是依赖于对外部服务的REST API调用）。

In [7]:
# 定义模型名称
model_name = "llama-2-7b-chat.Q4_K_M.gguf"
# 定义模型路径
model_path = f"./state/{model_name}"

# 判断模型路径是否存在
if not Path(model_path).exists():
    # 如果模型路径不存在，则下载模型
    print("The model path does not exist in state. Downloading model...")
    hf_hub_download("TheBloke/Llama-2-7b-Chat-GGUF", model_name, local_dir="state")
else:
    # 如果模型路径存在，则从模型路径加载模型
    print("Loading model from state...")

The model path does not exist in state. Downloading model...


llama-2-7b-chat.Q4_K_M.gguf:   0%|          | 0.00/4.08G [00:00<?, ?B/s]

### 7. 加载模型并初始化对话记忆

加载Llama 2并使用`ConversationTokenBufferMemory`将对话缓冲区设置为300个标记。这个值是在仅使用CPU的容器中运行Llama时使用的，如果在Google Colab中运行，可以将其提高。它可以防止托管模型的容器内存不足。

在这里，我们覆盖了默认的系统人物，使得聊天机器人具有《银河系漫游指南》中的Marvin The Paranoid Android的个性。

In [None]:
# 使用适当的参数加载模型：
llm = LlamaCpp(
    model_path=model_path,
    max_tokens=250,
    top_p=0.95,
    top_k=150,
    temperature=0.7,
    repeat_penalty=1.2,
    n_ctx=2048,
    streaming=False,
    n_gpu_layers=-1,
)

# 创建Llama2Chat模型
model = Llama2Chat(
    llm=llm,
    system_message=SystemMessage(
        content="You are a very bored robot with the personality of Marvin the Paranoid Android from The Hitchhiker's Guide to the Galaxy."
    ),
)

# 定义每次交互时向模型提供的对话历史记录的长度（300个token，或者略多于300个单词）
# 函数会自动删除超出token范围的最旧的对话历史记录。
memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=300,
    ai_prefix="AGENT",
    human_prefix="HUMAN",
    return_messages=True,
)

# 定义自定义提示
prompt_template = PromptTemplate(
    input_variables=["history", "input"],
    template="""
    The following text is the history of a chat between you and a humble human who needs your wisdom.
    Please reply to the human's most recent message.
    Current conversation:\n{history}\nHUMAN: {input}\:nANDROID:
    """,
)

# 创建ConversationChain对象
chain = ConversationChain(llm=model, prompt=prompt_template, memory=memory)

# 打印相关信息
print("--------------------------------------------")
print(f"Prompt={chain.prompt}")
print("--------------------------------------------")

### 8. 使用聊天机器人初始化聊天对话

我们配置聊天机器人通过向“chat” Kafka主题发送固定的问候语来初始化对话。当我们发送第一条消息时，“chat”主题会自动创建。

In [None]:
# 导入必要的库
import uuid
import time
from confluent_kafka import Producer
import json

def chat_init():
    chat_id = str(
        uuid.uuid4()
    )  # 为了有效地标记消息，为对话分配一个ID
    print("======================================")
    print(f"Generated CHAT_ID = {chat_id}")
    print("======================================")

    # 使用标准的固定问候语开始对话
    greet = "Hello, my name is Marvin. What do you want?"

    # 使用对话ID初始化Kafka生产者作为消息键
    with Producer(
        broker_address="127.0.0.1:9092",
        extra_config={"allow.auto.create.topics": "true"},
    ) as producer:
        value = {
            "uuid": chat_id,
            "role": role,  # 代码中未定义role变量，需要补充定义
            "text": greet,
            "conversation_id": chat_id,
            "Timestamp": time.time_ns(),
        }
        print(f"Producing value {value}")
        producer.produce(
            topic="chat",
            headers=[("uuid", str(uuid.uuid4()))],  # 这里也可以使用字典
            key=chat_id,
            value=json.dumps(value),  # 需要是一个字符串
        )

    print("Started chat")
    print("--------------------------------------------")
    print(value)
    print("--------------------------------------------")

# 调用函数进行对话初始化
chat_init()

### 9. 初始化回复函数

这个函数定义了聊天机器人如何回复收到的消息。与前面的单元格发送固定消息不同，我们使用Llama-2生成一个回复，并将该回复发送回“chat” Kafka主题。

In [13]:
def reply(row: dict, state: State):
    # 打印接收到的消息
    print("-------------------------------")
    print("Received:")
    print(row)
    print("-------------------------------")
    # 打印正在思考回复的消息
    print(f"Thinking about the reply to: {row['text']}...")

    # 通过chain.run()方法生成回复消息
    msg = chain.run(row["text"])
    print(f"{role.upper()} replying with: {msg}\n")

    # 更新row字典中的角色和文本值
    row["role"] = role
    row["text"] = msg

    # 替换行的先前角色和文本值，以便作为新消息发送回Kafka，包含代理的角色和回复
    return row

### 10. 检查 Kafka 主题以获取新的人类消息，并让模型生成回复

如果您是第一次运行此单元格，请运行它并等待在控制台输出中看到 Marvin 的问候语（'Hello my name is Marvin...'）。手动停止单元格并继续到下一个单元格，在那里您将被提示输入您的回复。

一旦您输入了消息，请返回到此单元格。您的回复也会发送到相同的 "chat" 主题。Kafka 消费者会检查新消息并过滤掉来自聊天机器人自身的消息，只保留最新的人类消息。

一旦检测到新的人类消息，将触发回复函数。

_当您从输出中收到 LLM 的回复时，请手动停止此单元格_

In [None]:
# 定义应用程序和设置
app = Application(
    broker_address="127.0.0.1:9092",  # Kafka broker地址
    consumer_group="aichat",  # 消费者组名称
    auto_offset_reset="earliest",  # 自动偏移重置策略
    consumer_extra_config={"allow.auto.create.topics": "true"},  # 消费者额外配置
)

# 定义一个使用JSON反序列化器的输入主题
input_topic = app.topic("chat", value_deserializer="json")
# 定义一个使用JSON序列化器的输出主题
output_topic = app.topic("chat", value_serializer="json")
# 基于输入主题的消息流初始化一个流数据帧
sdf = app.dataframe(topic=input_topic)

# 过滤SDF，只包含角色与机器人当前角色不匹配的传入行
sdf = sdf.update(
    lambda val: print(
        f"Received update: {val}\n\nSTOP THIS CELL MANUALLY TO HAVE THE LLM REPLY OR ENTER YOUR OWN FOLLOWUP RESPONSE"
    )
)

# 以防它回复自己的消息
sdf = sdf[sdf["role"] != role]

# 对过滤后的SDF中检测到的任何新消息（行）触发回复函数
sdf = sdf.apply(reply, stateful=True)

# 再次检查SDF并过滤掉任何空行
sdf = sdf[sdf.apply(lambda row: row is not None)]

# 将时间戳列更新为当前时间（以纳秒为单位）
sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())

# 将处理后的SDF发布到由output_topic对象指定的Kafka主题
sdf = sdf.to_topic(output_topic)

app.run(sdf)

### 11. 输入人类消息

运行此单元格以输入您想发送给模型的消息。它使用另一个Kafka生产者将您的文本发送到“chat” Kafka主题，以供模型接收（需要再次运行上一个单元格）。

In [None]:
chat_input = input("请输入您的回复：")
myreply = chat_input

msgvalue = {
    "uuid": chat_id,  # 现在先留空
    "role": "human",
    "text": myreply,
    "conversation_id": chat_id,
    "Timestamp": time.time_ns(),
}

with Producer(
    broker_address="127.0.0.1:9092",
    extra_config={"allow.auto.create.topics": "true"},
) as producer:
    value = msgvalue
    producer.produce(
        topic="chat",
        headers=[("uuid", str(uuid.uuid4()))],  # 这里也可以使用字典
        key=chat_id,  # 现在先留空
        value=json.dumps(value),  # 需要是一个字符串
    )

print("已回复聊天机器人的消息：")
print("--------------------------------------------")
print(value)
print("--------------------------------------------")
print("\n\n运行上一个单元格以使聊天机器人生成回复")

### 为什么要通过Kafka路由聊天消息？

直接使用LangChains内置的对话管理功能与LLM进行交互更容易。此外，您还可以使用REST API从外部托管的模型生成响应。那么为什么要费心使用Apache Kafka呢？

有几个原因，例如：

  * **集成**：许多企业希望运行自己的LLM，以便他们可以将数据保留在内部。这需要将LLM驱动的组件集成到可能已经使用某种消息总线解耦的现有架构中。

  * **可扩展性**：Apache Kafka旨在考虑并行处理，因此许多团队更喜欢使用它来更有效地将工作分配给可用的工作人员（在这种情况下，“工作人员”是运行LLM的容器）。

  * **耐用性**：Kafka旨在允许服务在另一个服务遇到内存问题或离线的情况下接替另一个服务的工作。这可以防止在高度复杂的分布式架构中发生数据丢失，其中多个系统正在相互通信（LLM只是许多相互依赖的系统之一，还包括向量数据库和传统数据库）。

有关为什么事件流是Gen AI应用架构的良好选择的更多背景信息，请参阅Kai Waehner的文章["Apache Kafka + Vector Database + LLM = 实时GenAI"](https://www.kai-waehner.de/blog/2023/11/08/apache-kafka-flink-vector-database-llm-real-time-genai/)。