In [1]:
import numpy as np
import os
import time
import gc
from transformers import AutoTokenizer
from threading import Lock
from utils.session import Session
from config import InferenceConfig
import torch
import torch_npu

class Inference:
    def __init__(self, config: InferenceConfig) -> None:
        # Initialize the necessary variables
        self.max_input_length = config.max_input_length
        self.max_output_length = config.max_output_length
        self.tokenizer = AutoTokenizer.from_pretrained(
            config.tokenizer_dir, trust_remote_code=True
        )
        self.session = Session.fromConfig(config)
        self.session_type = config.session_type
        self.kv_cache_length = config.kv_cache_length
        self.state: dict = {"code": 200, "isEnd": False, "message": ""}
        self.reset()
        self.lock = Lock()
        self.first = True
        print("[INFO] init success")

        # Set device to NPU
        # self.device = torch.device("npu" if torch.npu.is_available() else "cpu")
        self.device = torch.device("npu")
        print("[INFO] NPU context set successfully")

    def generate_cache(self, prompt: str):
        if len(prompt) == 0:
            return
        self.first = False
        input_ids = np.asarray(self.tokenizer.encode(prompt), dtype=np.int64).reshape(1, -1)
        logits = self.session.run(input_ids)[0]
        next_token = self.sample_logits(logits[0][-1:])
        return next_token, logits

    def sample_logits(self, logits: np.ndarray, sampling_method: str = "greedy", sampling_value: float = None, temperature: float = 1.0) -> np.ndarray:
        if temperature == 0 or sampling_method == "greedy":
            return np.argmax(logits, axis=-1).astype(np.int64)
        elif sampling_method == "top_k" or sampling_method == "top_p":
            assert sampling_value is not None
            logits = logits.astype(np.float32)
            logits /= temperature
            probs = np.exp(logits) / np.sum(np.exp(logits))
            sorted_probs = np.sort(probs)[:, ::-1]
            sorted_indices = np.argsort(probs)[:, ::-1]

            if sampling_method == "top_k":
                index_of_interest = int(sampling_value)
            elif sampling_method == "top_p":
                p = sampling_value
                cumulative_probs = np.cumsum(sorted_probs, axis=-1)
                for index_of_interest, cumulative_prob in enumerate(cumulative_probs[0]):
                    if cumulative_prob > p:
                        break

            probs_of_interest = sorted_probs[:, : index_of_interest + 1]
            indices_of_interest = sorted_indices[:, : index_of_interest + 1]
            probs_of_interest /= np.sum(probs_of_interest)
            return np.array([np.random.choice(indices_of_interest[0], p=probs_of_interest[0])])
        else:
            raise Exception(f"Unknown sampling method {sampling_method}")

    def predict(self, prompt, history=None, system_prompt="You are a helpful assistant.", max_new_tokens=1024):
        if history is None:
            history = []
        messages = [{"role": "system", "content": system_prompt}]
        for (use_msg, bot_msg) in history:
            messages.append({"role": "user", "content": use_msg})
            messages.append({"role": "assistant", "content": bot_msg})
        messages.append({"role": "user", "content": prompt})

        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        if self.session_type == "pytorch":
            input_ids = self.tokenizer([text], return_tensors="pt")["input_ids"].to(torch.long).reshape(1, -1)
        else:
            raise Exception(f"unknown session_type {self.session_type}")

        input_ids = input_ids[:, -self.max_input_length:]
        self.first = False
        ids_list = []
        input_length = input_ids.shape[1]
        max_output_len = self.max_output_length - input_length
        max_output_len = min(max_output_len, max_new_tokens)

        for i in range(max_output_len):
            logits = self.session.run(input_ids)
            input_ids = self.sample_logits(logits[0][-1:])
            input_ids = input_ids.reshape(1, -1)
            ids_list.append(input_ids[0].item())
            text_out = self.tokenizer.decode(ids_list)

            # Early stop if EOS token is generated
            if input_ids[0] == self.tokenizer.eos_token_id:
                break

        return text_out

    def reset(self):
        self.first = True
        self.session.run_times = 0
        self.session.reset()

    def getState(self):
        with self.lock:
            return self.state.copy()

# Hardcoded configuration (you can set values directly here)
config = InferenceConfig(
    hf_model_dir="/data/shaos/data/Qwen2.5-VL-3B-Instruct",
    om_model_path="path_to_om_model",
    onnx_model_path="path_to_onnx_model",
    cpu_thread=4,
    session_type="pytorch",
    max_batch=1,
    max_output_length=2048,
    max_input_length=1024,
    kv_cache_length=2048,
    max_prefill_length=4,
    dtype="float32",
    torch_dtype="float32",
    #tokenizer_dir="path_to_tokenizer"
)

# Running the prediction
inference = Inference(config)
prompt = "What is the capital of France?"
output = inference.predict(prompt)
print("Generated Output:", output)


Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

[INFO] init success
[INFO] NPU context set successfully
Generated Output: The capital of France is Paris.<|im_end|>


# 1. 导入库

In [34]:
import numpy as np
import os
import time
import gc
from transformers import AutoTokenizer
from threading import Lock
from utils.session import Session
from config import InferenceConfig
import torch
import torch_npu

numpy：用于处理数组和进行数学运算的库。

os：提供与操作系统交互的功能，如文件处理。

time 和 gc：用于时间相关的操作和垃圾回收。

AutoTokenizer：来自 Hugging Face，用于加载预训练模型的分词器，将文本转化为模型可以理解的token。

Lock：来自threading模块，用于线程同步。

Session：自定义类，可能用于管理模型的会话。

InferenceConfig：自定义配置类，包含模型的超参数设置。

torch 和 torch_npu：PyTorch库，torch_npu用于处理NPU（神经处理单元）相关操作。

# 2. Inference 类

Inference 类是脚本的核心，负责与模型交互，包括加载模型、处理输入、生成预测以及管理会话状态。

## 初始化 (__init__ 方法)

In [35]:
class Inference:
    def __init__(self, config: InferenceConfig) -> None:
        # Initialize the necessary variables
        self.max_input_length = config.max_input_length
        self.max_output_length = config.max_output_length
        self.tokenizer = AutoTokenizer.from_pretrained(
            config.tokenizer_dir, trust_remote_code=True
        )
        self.session = Session.fromConfig(config)
        self.session_type = config.session_type
        self.kv_cache_length = config.kv_cache_length
        self.state: dict = {"code": 200, "isEnd": False, "message": ""}
        self.reset()
        self.lock = Lock()
        self.first = True
        print("[INFO] init success")

        # Set device to NPU
        # self.device = torch.device("npu" if torch.npu.is_available() else "cpu")
        self.device = torch.device("npu")
        print("[INFO] NPU context set successfully")

self.max_input_length 和 self.max_output_length：从配置中读取最大输入和输出长度，用于控制模型处理的文本长度。

AutoTokenizer：加载一个预训练的分词器，将输入的文本转换成模型可以理解的token。

Session：自定义的类，用于管理和配置模型会话。

self.device：将计算设备设置为 NPU（如果可用），否则默认为CPU。

## 缓存生成 (generate_cache 方法)

In [36]:
    def generate_cache(self, prompt: str):
        if len(prompt) == 0:
            return
        self.first = False
        input_ids = np.asarray(self.tokenizer.encode(prompt), dtype=np.int64).reshape(1, -1)
        logits = self.session.run(input_ids)[0]
        next_token = self.sample_logits(logits[0][-1:])
        return next_token, logits

    Inference.generate_cache = generate_cache

这个方法接收一个提示文本（prompt），将其转换为token（input_ids），然后通过模型生成预测的下一个token及其logits（原始输出）。

## 采样下一个token (sample_logits 方法)

In [37]:
    def sample_logits(self, logits: np.ndarray, sampling_method: str = "greedy", sampling_value: float = None, temperature: float = 1.0) -> np.ndarray:
        if temperature == 0 or sampling_method == "greedy":
            return np.argmax(logits, axis=-1).astype(np.int64)
        elif sampling_method == "top_k" or sampling_method == "top_p":
            assert sampling_value is not None
            logits = logits.astype(np.float32)
            logits /= temperature
            probs = np.exp(logits) / np.sum(np.exp(logits))
            sorted_probs = np.sort(probs)[:, ::-1]
            sorted_indices = np.argsort(probs)[:, ::-1]

            if sampling_method == "top_k":
                index_of_interest = int(sampling_value)
            elif sampling_method == "top_p":
                p = sampling_value
                cumulative_probs = np.cumsum(sorted_probs, axis=-1)
                for index_of_interest, cumulative_prob in enumerate(cumulative_probs[0]):
                    if cumulative_prob > p:
                        break

            probs_of_interest = sorted_probs[:, : index_of_interest + 1]
            indices_of_interest = sorted_indices[:, : index_of_interest + 1]
            probs_of_interest /= np.sum(probs_of_interest)
            return np.array([np.random.choice(indices_of_interest[0], p=probs_of_interest[0])])
        else:
            raise Exception(f"Unknown sampling method {sampling_method}")

    Inference.sample_logits = sample_logits

In [38]:
    def reset(self):
        self.first = True
        self.session.run_times = 0
        self.session.reset()
    
    # 把这个函数挂到 Inference 类上，作为方法
    Inference.reset = reset
    



In [39]:
    def getState(self):
        with self.lock:
            return self.state.copy()

    Inference.getState = getState

sampling_method：决定如何选择下一个token（例如，贪婪算法（greedy）、Top-k采样、Top-p采样）。

temperature：控制生成文本的随机性。温度越高，生成的文本越多样化。

## 预测 (predict 方法)

In [40]:
    def predict(self, prompt, history=None, system_prompt="You are a helpful assistant.", max_new_tokens=1024):
        if history is None:
            history = []
        messages = [{"role": "system", "content": system_prompt}]
        for (use_msg, bot_msg) in history:
            messages.append({"role": "user", "content": use_msg})
            messages.append({"role": "assistant", "content": bot_msg})
        messages.append({"role": "user", "content": prompt})

        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        if self.session_type == "pytorch":
            input_ids = self.tokenizer([text], return_tensors="pt")["input_ids"].to(torch.long).reshape(1, -1)
        else:
            raise Exception(f"unknown session_type {self.session_type}")

        input_ids = input_ids[:, -self.max_input_length:]
        self.first = False
        ids_list = []
        input_length = input_ids.shape[1]
        max_output_len = self.max_output_length - input_length
        max_output_len = min(max_output_len, max_new_tokens)

        for i in range(max_output_len):
            logits = self.session.run(input_ids)
            input_ids = self.sample_logits(logits[0][-1:])
            input_ids = input_ids.reshape(1, -1)
            ids_list.append(input_ids[0].item())
            text_out = self.tokenizer.decode(ids_list)

            # Early stop if EOS token is generated
            if input_ids[0] == self.tokenizer.eos_token_id:
                break

        return text_out
    Inference.predict = predict

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'generate_cache', 'getState', 'predict', 'reset', 'sample_logits']


该方法通过递归生成token来实现文本生成。它会通过会话获取模型的输出，并根据指定的最大输出长度生成响应。

## 3. 配置设置

In [41]:

# Hardcoded configuration (you can set values directly here)
config = InferenceConfig(
    hf_model_dir="/data/shaos/data/Qwen2.5-VL-3B-Instruct",
    om_model_path="path_to_om_model",
    onnx_model_path="path_to_onnx_model",
    cpu_thread=4,
    session_type="pytorch",
    max_batch=1,
    max_output_length=2048,
    max_input_length=1024,
    kv_cache_length=2048,
    max_prefill_length=4,
    dtype="float32",
    torch_dtype="float32",
    #tokenizer_dir="path_to_tokenizer"
)



InferenceConfig：这个配置类包含了模型加载、设备设置、最大输入和输出长度等各种超参数和路径。

## 4. 执行预测

创建一个 Inference 对象，传入配置，并使用 predict 方法对给定的提示（"What is the capital of France?"）进行推理，生成并打印输出结果。

In [42]:

# Running the prediction
inference = Inference(config)
prompt = "What is the capital of France?"
output = inference.predict(prompt)
print("Generated Output:", output)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

[INFO] init success
[INFO] NPU context set successfully
Generated Output: The capital of France is Paris.<|im_end|>


In [None]:
关键概念和解释：

模型推理：指的是使用预训练的模型生成预测结果。在本脚本中，推理过程是通过传入文本提示并生成下一步最可能的token来实现的。

采样方法：脚本支持多种方法来选择下一个token（例如 贪婪（Greedy）、Top-k、Top-p）。贪婪方法简单地选择概率最高的token，而Top-k和Top-p则引入了一定的随机性，使生成的文本更加自然和多样。

会话管理：Session 类用于处理与模型的交互，它管理输入、输出、状态等。

NPU设备：如果支持NPU（神经处理单元），脚本会尝试使用NPU进行加速计算。如果没有NPU，它会回退到CPU或GPU。

状态重置：每次预测后，都会重置模型的状态，确保不会受到先前预测的影响。

建议的练习：

修改采样方法：尝试使用不同的采样方法（贪婪、Top-k、Top-p），观察它们如何影响生成文本的多样性和准确性。

调整输入输出长度：修改 max_input_length 和 max_output_length，看看这些参数如何影响模型的性能，尤其是在处理较长文本时。

优化推理速度：尝试批处理并设置不同的设备（例如使用GPU或CPU），以改善推理速度，尤其是对于大型模型。

使用不同的提示：更改 predict 方法中的提示，观察模型对不同类型的查询（事实性问题、创意问题等）的响应。