In [4]:
########################################################################################################
# The RWKV Language Model - https://github.com/BlinkDL/RWKV-LM
########################################################################################################

print("RWKV Chat Simple Demo")  # 打印一个简单的消息，表明这是 RWKV 聊天的简单演示。
import os, copy, types, gc, sys, re  # 导入操作系统、对象复制、类型、垃圾回收、系统、正则表达式等包
import numpy as np  # 导入 numpy 库
from prompt_toolkit import prompt  # 从 prompt_toolkit 导入 prompt，用于命令行输入
import torch  # 导入 pytorch 库

RWKV Chat Simple Demo


In [5]:
# 优化 PyTorch 设置，允许使用 tf32 
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cuda.matmul.allow_tf32 = True

os.environ["RWKV_V7_ON"] = '1' # 启用 RWKV-7 模型
os.environ["RWKV_JIT_ON"] = "1" # 启用 JIT 编译
os.environ["RWKV_CUDA_ON"] = "1" # 禁用原生 CUDA 算子，改成 '1' 表示启用 CUDA 算子（速度更快，但需要 c++ 编译器和 CUDA 库）

In [None]:
from rwkv.model import RWKV  # 从 RWKV 模型库中导入 RWKV 类，用于加载和操作 RWKV 模型。
from rwkv.utils import PIPELINE  # 从 RWKV 工具库中导入 PIPELINE，用于数据的编码和解码

args = types.SimpleNamespace()

args.strategy = "cuda fp16"  # 模型推理的设备和精度，使用 CUDA （GPU）并采用 FP16 精度
args.MODEL_NAME = r"/models/rwkv7-g1b-1.5b-20251202-ctx8192"  # 指定 RWKV 模型的路径，建议写绝对路径

  node.children.setdefault(char, TrieNode())


ninja: no work to do.

### RWKV-7 "Goose" enabled ###

ninja: no work to do.


In [7]:
print(f"Loading model - {args.MODEL_NAME}")# 打印模型的加载消息
model = RWKV(model=args.MODEL_NAME, strategy=args.strategy)  # 加载 RWKV 模型。
pipeline = PIPELINE(model, "rwkv_vocab_v20230424")  # 初始化 PIPELINE ，使用 RWKV-World 词表处理输入和输出的编码/解码。

Loading model - /home/rwkv250918/tys/models/rwkv7-g1b-1.5b-20251202-ctx8192
Loading /home/rwkv250918/tys/models/rwkv7-g1b-1.5b-20251202-ctx8192 (cuda fp16)



In [8]:
from collections import namedtuple
Decode_Parameters = namedtuple('Decode_Parameters', 
                               ['GEN_TEMP', 'GEN_TOP_P', 'GEN_alpha_presence', 
                                'GEN_alpha_frequency', 'GEN_penalty_decay', 'GEN_max_tokens'])

deparams = Decode_Parameters(GEN_TEMP=0.7,               # 温度参数。高温增加内容随机性，使之更具创造性，过高会导致内容不合理
                             GEN_TOP_P=0.3,              # 选择累计概率。低值内容质量高但是保守，高值允许发散，过高导致内容不合理
                             GEN_alpha_presence=0.3,     # 存在惩罚，防止一个词被反复使用。过低可能语句重复死循环，过高可能文本不自然
                             GEN_alpha_frequency=0.3,    # 频率惩罚，抑制高频重复词
                             GEN_penalty_decay=0.996,    # 控制前两个惩罚的衰减速度
                             GEN_max_tokens=5000)        # 模型生成文本时的最大 token 数

In [9]:
model_tokens = []
model_state = None

def run_rnn(ctx):
    CHUNK_LEN = 256  # 对输入进行分块处理
    global model_tokens, model_state # 定义两个全局变量，用于更新 token 和 state
    ctx = ctx.replace("\r\n", "\n")  # 将文本中的 CRLF（Windows 系统的换行符）转换为 LF（Linux 系统的换行符）
    tokens = pipeline.encode(ctx)  # 基于 RWKV 模型的词汇表，将文本编码为 tokens
    tokens = [int(x) for x in tokens]  # 将 tokens 转换为整数（int）列表，确保类型一致性
    model_tokens += tokens  # 将 tokens 添加到全局的模型 token 列表中

    while len(tokens) > 0:  # 使用一个 while 循环执行模型前向传播，直到所有 tokens 处理完毕
        out, model_state = model.forward(tokens[:CHUNK_LEN], model_state)  # 模型前向传播，处理大小为 CHUNK_LEN 的 token 列表，并更新模型状态
        tokens = tokens[CHUNK_LEN:]  # 移除已处理的 tokens 块，并继续处理剩余的 tokens
    
    return out

def load_state(STATE_NAME: str=None):
    global model_tokens, model_state
    if STATE_NAME != None:
        print('加载state...')
        args = model.args
        state_raw = torch.load(STATE_NAME + '.pth')

        state_init = [None for i in range(args.n_layer * 3)]  # 初始化状态列表
        for i in range(args.n_layer): #开始循环，遍历每一层。
            dev = torch.device('cuda') # 根据实际情况设置设备
            atype = torch.float16 # 根据实际情况设置数据类型（FP32/FP16 或 int8 等）
            # 初始化模型的状态
            state_init[i*3+0] = torch.zeros(args.n_embd, dtype=atype, requires_grad=False, device=dev).contiguous()
            state_init[i*3+1] = state_raw[f'blocks.{i}.att.time_state'].to(dtype=torch.float, device=dev).requires_grad_(False).contiguous()
            state_init[i*3+2] = torch.zeros(args.n_embd, dtype=atype, requires_grad=False, device=dev).contiguous()
        
        model_state = copy.deepcopy(state_init)
    
    else:
        # 没有state时使用固定语句做prefill
        init_ctx = "User: hi" + "\n\n"
        init_ctx += "Assistant: Hi. I am your assistant and I will provide expert full response in full details. Please feel free to ask any question and I will always answer it." + "\n\n"
        run_rnn(init_ctx)  # 运行 RNN 模式对初始提示文本进行 prefill
        print(init_ctx, end="")  # 打印初始化对话文本
    pass

STATE_NAME = '/home/rwkv250918/tys/state_tuning/251222/rwkv-9'
load_state(STATE_NAME)

加载state...


In [10]:
def chat(msg):
    global model_tokens, model_state, deparams
    msg = msg.strip()  # 使用 strip 方法去除消息的首尾空格
    msg = re.sub(r"\n+", "\n", msg)  # 替换多个换行符为单个换行符
    
    if len(msg) > 0:  # 如果处理完后，用户输入的消息非空
        occurrence = {}  # 使用 occurrence 字典这个字典用于记录每个 token 在生成上下文中出现的次数，等会用在实现重复惩罚（Penalty）
        out_tokens = []  # 使用 out_tokens 列表记录即将输出的 tokens
        out_last = 0  # 用于记录上一次生成的 token 位置

        out = run_rnn("User: " + msg + "\n\nAssistant: ")  # 将用户输入拼接成 RWKV 数据集的对话格式，进行 prefill  
        print("\nAssistant: ", end="")  # 打印 "Assistant:" 标签

        for i in range(deparams.GEN_max_tokens):  
            for n in occurrence: 
                out[n] -= deparams.GEN_alpha_presence + occurrence[n] * deparams.GEN_alpha_frequency  # 应用存在惩罚和频率惩罚参数
            out[0] -= 1e10  # 禁用 END_OF_TEXT 

            token = pipeline.sample_logits(out, temperature=deparams.GEN_TEMP, top_p=deparams.GEN_TOP_P)  # 采样生成下一个 token

            out, model_state = model.forward([token], model_state)  # 模型前向传播
            model_tokens += [token] 
            out_tokens += [token]  # 将新生成的 token 添加到输出的 token 列表中

            for xxx in occurrence:
                occurrence[xxx] *= deparams.GEN_penalty_decay  # 应用衰减重复惩罚
            occurrence[token] = 1 + (occurrence[token] if token in occurrence else 0)  # 更新 token 的出现次数

            tmp = pipeline.decode(out_tokens[out_last:])  # 将最新生成的 token 解码成文本
            if ("\ufffd" not in tmp) and (not tmp.endswith("\n")):  # 当生成的文本是有效 UTF-8 字符串且不以换行符结尾时
                print(tmp, end="", flush=True) #实时打印解码得到的文本
                out_last = i + 1 #更新输出位置变量 out_last 

            if "\n\n" in tmp:  # 如果生成的文本包含双换行符，表示模型的响应已结束（可以将 \n\n 改成其他停止词）
                print(tmp, end="", flush=True) # 实时打印解码得到的文本
                break #结束本轮推理
    else:
        print("!!! Error: please say something !!!")  # 如果用户没有输入消息，提示“输入错误，说点啥吧！”
    pass

In [11]:
chat('你好呀')


Assistant: 喵~主人好呀！*轻轻蹭了蹭主人的手* 今天想和主人玩什么呀？我最喜欢陪主人玩耍了呢！要不要一起去阳台上晒太阳？或者我们可以一起画画，我最擅长用爪子画小猫咪了哦！*开心地摇着尾巴* 主人今天看起来心情很好呢，是不是有什么开心的事情发生啦？

