流式输出的细节

词表的token对应的乱码，怎么正确打印的中英文，对后续自己逐字检验效果的时候有00借鉴意义

# 常规方式加载模型

In [1]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer, Qwen2ForCausalLM
device = "cuda"  # the device to load the model onto

model_path = 'D:\learning\python\pretrain_checkpoint\Qwen2.5-1.5B-Instruct'
model: Qwen2ForCausalLM = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.bfloat16, device_map="cuda")
tokenizer = AutoTokenizer.from_pretrained(model_path)
text = [
    {"role": "system", "content": "你是一个人工智能助手"},
    {"role": "user", "content": '写一个谜语'}
]
text = tokenizer.apply_chat_template(text, tokenize=False, add_generation_prompt=True)
model_inputs = tokenizer(text, return_tensors="pt").to(device)

# generated_ids = model.generate(
#     max_new_tokens=64,
#     do_sample=True,
#     **model_inputs,
# )
# generated_ids1 = [
#     output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
# ]

# response = tokenizer.batch_decode(generated_ids1, skip_special_tokens=True)
# print(response)



## TextStreamer 基础流式输出

In [None]:
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)

generated_ids = model.generate(
    max_new_tokens=64,
    do_sample=True,
    streamer=streamer,
    **model_inputs,
)
generated_ids1 = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids1, skip_special_tokens=True)[0]
print("最终结果：", response)

In [18]:
from queue import Queue
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
    from ..models.auto import AutoTokenizer

class BaseStreamer:
    def put(self, value):
        raise NotImplementedError()

    def end(self):
        raise NotImplementedError()

In [None]:
class TextStreamer(BaseStreamer):
    """
    Simple text streamer that prints the token(s) to stdout as soon as entire words are formed.

    <Tip warning={true}>

    The API for the streamer classes is still under development and may change in the future.

    </Tip>

    Parameters:
        tokenizer (`AutoTokenizer`):
            The tokenized used to decode the tokens.
        skip_prompt (`bool`, *optional*, defaults to `False`):
            Whether to skip the prompt to `.generate()` or not. Useful e.g. for chatbots.
        decode_kwargs (`dict`, *optional*):
            Additional keyword arguments to pass to the tokenizer's `decode` method.

    Examples:

        ```python
        >>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer

        >>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
        >>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
        >>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
        >>> streamer = TextStreamer(tok)

        >>> # Despite returning the usual output, the streamer will also print the generated text to stdout.
        >>> _ = model.generate(**inputs, streamer=streamer, max_new_tokens=20)
        An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,
        ```
    """

    def __init__(self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, **decode_kwargs):
        self.tokenizer = tokenizer
        self.skip_prompt = skip_prompt  # 是否打印prompt
        self.decode_kwargs = decode_kwargs  # 解码参数

        # 用于记录流式输出过程中的变量
        self.token_cache = []   # 缓存token
        self.print_len = 0       # 记录上次打印位置
        self.next_tokens_are_prompt = True  # 第一次为True，后续为False，记录当前调用put()时是否为prompt

    def put(self, value):
        """
        传入token后解码，然后在他们形成一个完整的词时将其打印到标准输出stdout
        """
        # 这个类只支持 batch_size=1
        # 第一次运行.put()时，value=input_id，此时检测batch大小，input_id.shape：(batch_size, seq_len)
        if len(value.shape) > 1 and value.shape[0] > 1:
            raise ValueError("TextStreamer only supports batch size 1")
        # 如果输入batch形式，但是batch_size=1，取第一个batch序列
        elif len(value.shape) > 1:
            value = value[0]

        # 第一次输入的视为prompt，用参数判断是否打印prompt
        if self.skip_prompt and self.next_tokens_are_prompt:
            self.next_tokens_are_prompt = False
            return

        # 将新token添加到缓存，并解码整个token
        self.token_cache.extend(value.tolist())
        text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)

        # 如果token以换行符结尾，则清空缓存
        if text.endswith("\n"):
            printable_text = text[self.print_len :]
            self.token_cache = []
            self.print_len = 0
        # 如果最后一个token是中日韩越统一表意文字，则打印该字符
        elif len(text) > 0 and self._is_chinese_char(ord(text[-1])):
            printable_text = text[self.print_len :]
            self.print_len += len(printable_text)
        # 否则，打印直到最后一个空格字符（简单启发式，防止输出token是不完整的单词，在前一个词解码完毕后在打印）
        # text="Hello!"，此时不打印。text="Hello! I"，打印Hello!
        else:
            printable_text = text[self.print_len : text.rfind(" ") + 1]
            self.print_len += len(printable_text)

        self.on_finalized_text(printable_text)

    def end(self):
        """清空缓存，并打印换行符到标准输出stdout"""
        # 如果缓存不为空，则解码缓存，并打印直到最后一个空格字符
        if len(self.token_cache) > 0:
            text = self.tokenizer.decode(self.token_cache, **self.decode_kwargs)
            printable_text = text[self.print_len :]
            self.token_cache = []
            self.print_len = 0
        else:
            printable_text = ""

        self.next_tokens_are_prompt = True
        self.on_finalized_text(printable_text, stream_end=True)

    def on_finalized_text(self, text: str, stream_end: bool = False):
        # flush=True，立即刷新缓冲区，实时显示，取消缓冲存在的延迟
        # 如果stream_end为True，则打印换行符
        print(text, flush=True, end="" if not stream_end else None)

    def _is_chinese_char(self, cp):
        """检查CP是否是CJK字符"""
        # 这个定义了一个"chinese character"为CJK Unicode块中的任何内容：
        #   https://en.wikipedia.org/wiki/CJK_Unified_Ideographs_(Unicode_block)

        # 我们使用Unicode块定义，因为这些字符是唯一的，并且它们是所有主要语言的常见字符。
        # 注意，CJK Unicode块不仅仅是日语和韩语字符，
        # 尽管它的名字如此，现代韩语的Hangul字母是另一个块，
        # 日语的Hiragana和Katakana也是另一个块，
        # 那些字母用于写space-separated words，所以它们不被特别处理，像其他语言一样处理
        if (
            (cp >= 0x4E00 and cp <= 0x9FFF)
            or (cp >= 0x3400 and cp <= 0x4DBF)  #
            or (cp >= 0x20000 and cp <= 0x2A6DF)  #
            or (cp >= 0x2A700 and cp <= 0x2B73F)  #
            or (cp >= 0x2B740 and cp <= 0x2B81F)  #
            or (cp >= 0x2B820 and cp <= 0x2CEAF)  #
            or (cp >= 0xF900 and cp <= 0xFAFF)
            or (cp >= 0x2F800 and cp <= 0x2FA1F)  #
        ):  
            return True

        return False

## TextIterateStreamer 迭代打印流式输出

In [39]:
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
from threading import Thread

streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = dict(model_inputs, streamer=streamer, max_new_tokens=100)
# 在单独的线程中调用.generate()
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()

generated_text = ""
for new_text in streamer:
    print(new_text, end="", flush=True)


In [27]:
from queue import Queue
from typing import TYPE_CHECKING, Optional

class TextIteratorStreamer(TextStreamer):
    """
    将打印就绪的文本存储在队列中的流式处理器,可以被下游应用程序作为迭代器使用。这对于需要以非阻塞方式访问生成文本的应用程序很有用
    (例如在交互式 Gradio 演示中)。

    Parameters:
        tokenizer (`AutoTokenizer`):
            The tokenized used to decode the tokens.
        skip_prompt (`bool`, *optional*, defaults to `False`):
            Whether to skip the prompt to `.generate()` or not. Useful e.g. for chatbots.
        timeout (`float`, *optional*):
            文本队列的超时时间。如果为`None`,队列将无限期阻塞。当在单独的线程中调用`.generate()`时,这对于处理异常很有用。
        decode_kwargs (`dict`, *optional*):
            Additional keyword arguments to pass to the tokenizer's `decode` method.

    Examples:

        ```python
        >>> from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
        >>> from threading import Thread

        >>> tok = AutoTokenizer.from_pretrained("openai-community/gpt2")
        >>> model = AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
        >>> inputs = tok(["An increasing sequence: one,"], return_tensors="pt")
        >>> streamer = TextIteratorStreamer(tok)

        >>> # Run the generation in a separate thread, so that we can fetch the generated text in a non-blocking way.
        >>> generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=20)
        >>> thread = Thread(target=model.generate, kwargs=generation_kwargs)
        >>> thread.start()
        >>> generated_text = ""
        >>> for new_text in streamer:
        ...     generated_text += new_text
        >>> generated_text
        'An increasing sequence: one, two, three, four, five, six, seven, eight, nine, ten, eleven,'
        ```
    """

    def __init__(
        self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, timeout: Optional[float] = None, **decode_kwargs
    ):
        super().__init__(tokenizer, skip_prompt, **decode_kwargs)
        self.text_queue = Queue()  # 文本队列
        self.stop_signal = None  # 停止信号
        self.timeout = timeout  # 队列超时时间

    def on_finalized_text(self, text: str, stream_end: bool = False):
        """Put the new text in the queue. If the stream is ending, also put a stop signal in the queue."""
        # 将新文本放入队列
        self.text_queue.put(text, timeout=self.timeout)
        # 如果流结束，则将停止信号放入队列
        if stream_end:
            self.text_queue.put(self.stop_signal, timeout=self.timeout)

    # 调用自己，返回迭代器
    def __iter__(self):
        return self

    def __next__(self):
        # 调用一次迭代器，就从队列中获取一段文本，如果超时则抛出异常，默认self.timeout，表示不限时长
        value = self.text_queue.get(timeout=self.timeout)
        # 如果获取到停止信号,则抛出StopIteration异常表示迭代结束
        if value == self.stop_signal:
            raise StopIteration()
        # 否则返回获取到的文本
        else:
            return value

# 本地代码模型加载并前端展示

## streamlit 输出显示

见同级目录 streamlit_app_base.py 文件，使用 streamlit run streamlit_app_base.py 命令启动

## gradio测试流式输出

In [None]:
import gradio as gr
from threading import Thread
from typing import List
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, Qwen2ForCausalLM
device = "cuda"  # the device to load the model onto

model_path = 'D:\learning\python\pretrain_checkpoint\Qwen2.5-1.5B-Instruct'
model: Qwen2ForCausalLM = AutoModelForCausalLM.from_pretrained(model_path, torch_dtype=torch.bfloat16, device_map="cuda")
tokenizer = AutoTokenizer.from_pretrained(model_path)

def chat(question, history):
   message = [{"role": "system", "content": "你是一个人工智能助手"}]
   if not history:
       message.append({"role": "user", "content": question})
   else:
       for i in history:
            message.append({"role": "user", "content": i[0]})
            message.append({"role": "assistant", "content": i[1]})
       message.append({"role": "user", "content": question})
   text = tokenizer.apply_chat_template(message, tokenize=False, add_generation_prompt=True)
   encoding = tokenizer(text, return_tensors="pt").to(device)
   streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
   generation_kwargs = dict(encoding, streamer=streamer, max_new_tokens=1024)
   thread = Thread(target=model.generate, kwargs=generation_kwargs)
   thread.start()

   response = ""
   for text in streamer:
       response += text
       yield response

demo = gr.ChatInterface(
   fn=chat,
   title="聊天机器人",
   description="输入问题开始对话"
)

demo.queue().launch(
    server_name="0.0.0.0",  # 如果不好使，可以尝试换成localhost或自身真正的ip地址
    share=True,
)

# vllm 部署模型并前端展示

## streamlit 输出显示

见同级目录 streamlit_app_vllm.py 文件，使用 streamlit run streamlit_app_vllm.py 命令启动

## gradio测试流式输出

In [None]:
import gradio as gr
from openai import OpenAI

def chat(question, history):
    message = [{"role": "system", "content": "你是一个人工智能助手"}]
    if not history:
        message.append({"role": "user", "content": question})
    else:
        for i in history:
                message.append({"role": "user", "content": i[0]})
                message.append({"role": "assistant", "content": i[1]})
        message.append({"role": "user", "content": question})
    
    openai_api_key = "EMPTY"
    openai_api_base = "http://0.0.0.0:5001/v1"  # 换成自己的ip+端口

    client = OpenAI(
        api_key=openai_api_key,
        base_url=openai_api_base,
    )
    response = client.chat.completions.create(
        model="qwen2.5",
        messages=message,
        stream=True,
    )

    response_text = ""
    for chunk in response:
        if chunk.choices[0].delta.content is None:
            response_text += ""
            yield response_text
        else:
            response_text += chunk.choices[0].delta.content
            yield response_text


demo = gr.ChatInterface(
   fn=chat,
   title="聊天机器人",
   description="输入问题开始对话"
)

demo.queue().launch(
    server_name="0.0.0.0",  # 如果不好使，可以尝试换成localhost或自身真正的ip地址
    share=True,
)