# Llama 类定义

## 采样


In [None]:
# 用于解析和生成JSON数据
import json
import os
import time
import sys
# 用于处理文件路径的类，使得代码能够以面向对象的方式处理文件和目录路径。
from pathlib import Path
from typing import List, Literal, Optional, Tuple, TypedDict

In [None]:
import torch
import torch.nn.functional as F

In [None]:
# FairScale是一个PyTorch扩展库，用于进行大规模训练
from fairscale.nn.model_parallel.initialize import (
    # 函数用于初始化模型并行环境，模型并行指的是将一个大型模型分割成多个部分，然后在不同的处理器或机器上并行执行。
    initialize_model_parallel,
    # 函数用于获取当前设备在模型并行中的等级，在模型并行中，每个进程通常负责模型的一个部分。这个“排名”标识了进程处理的是模型的哪一部分。
    get_model_parallel_rank,
    # 函数则是用于检查模型并行环境是否已经初始化
    model_parallel_is_initialized,
)

In [None]:
from model import ModelArgs, Transformer
from tokenizer import Tokenizer

In [None]:
# Literal : 用于指定一个变量只能取固定的字面值
Role = Literal["system", "user", "assistant"]

# TypedDict : 用于为字典指定预期的键和对应值的类型
class Message(TypedDict):
    role: Role # 角色
    content: str # 消息的内容
    
class CompletionPrediction(TypedDict, total=False):
    '''这个字典主要用于存储文本生成任务的结果和相关信息'''
    # 将total参数设置为False，这个TypedDict被指定为非严格模式，这意味着不是所有的键都是必需的。
    generation: str # 生成的文本
    tokens: List[str] # 字符串列表，是生成文本的单词标记列表。此字段不是必需的。
    logprobs: List[float] # 浮点数列表，表示生成每个单词标记的对数概率。此字段也不是必需的。
    
    
class ChatPrediction(TypedDict, total=False):
    generation: Message # Message类，表示生成的聊天信息
    tokens: List[str] # 字符串列表，是生成文本的单词标记列表。此字段不是必需的。
    logprobs: List[str] # 浮点数列表，表示生成每个单词标记的对数概率。此字段也不是必需的。
    


In [None]:
# 定义为Message类型的列表。每个Message都是一个字典，包含role和content字段。

Dialog = List[Message]

In [None]:
# 分别表示一个指令的开始和结束标记
B_INST, E_INST = "[INST]", "[/INST]"

# 分别表示一个系统消息的开始和结束标记
B_SYS, E_SYS = "<<SYS>>\n", "\n<</SYS>>\n\n"

In [None]:
# 默认的系统提示
DEFAULT_SYSTEM_PROMPT = """\
You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature.

If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information."""


In [None]:
def sample_top_p(probs, p):
    '''这种采样方法是为了在文本生成过程中增加多样性，防止模型仅生成概率最高的词，同时也能防止生成概率极低的词。'''
    
    # 将概率probs降序排序，并返回排序后的值probs_sort和对应的索引probs_idx
    probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True)
    # 计算累积的概率
    probs_sum = torch.cumsum(probs_sort, dim=-1)
    # 标记出累加和大于阈值p的元素
    mask = probs_sum - probs_sort > p
    # 用0填充probs_sort中所有mask为True的位置，这意味着所有累积概率超过p的词都不会被考虑。
    probs_sort[mask] = 0.0
    # 对概率进行归一化，使得剩下的token的概率之和为1。
    probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True))
    # 从修改后的probs_sort中随机抽样，得到下一个token的索引，torch.multinomial函数可以从给定的多项分布中抽取样本。
    next_token = torch.multinomial(probs_sort, num_samples=1)
    # 根据索引从排序之前的概率分布中找到对应的token
    next_token = torch.gather(probs_idx, -1, next_token)
    return next_token

In [None]:
import　torch
random_seed = 200
torch.manual_seed(random_seed)
input = torch.randint(0, 100, (2, 3, 4))
print("input:")
print(input)

index = torch.randint(0, 2, (2, 1, 2))
print("index:")
print(index)

output = input.gather(0, index)
print("output:")
print(output)


## Llama

In [None]:
class Llama:
    def __init__(self, model: Transformer, tokenizer: Tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    # 构建 Llama 类的实例
    @staticmethod
    def build(
        ckpt_dir: str, # checkpoint的文件夹路径 
        tokenizer_path: str, # 分词器模型的文件路径
        max_seq_len: int, # 输入文本的最大序列长度
        max_batch_size: int, # 推理时的最大批处理大小
        model_parallel_size: Optional[int] = None, # 模型并行处理的进程数，是可选参数。
    ) -> "Llama":
        # 1. 检查分布式环境是否已初始化
        if not torch.distributed.is_initialized():
            # 如果没有，它将使用NCCL后端初始化分布式环境。
            # NCCL（Nvidia Collective Communications Library）是一个用于GPU之间通信的库，主要用于深度学习的训练中。
            torch.distributed.init_process_group("nccl")
        
        # 2. 检查模型并行环境是否已初始化
        if not model_parallel_is_initialized():
            # 初始化模型并行环境
            if model_parallel_size is None:
                # 从环境变量中获取 WORLD_SIZE 的值来确定模型并行的大小。如果未设置，则默认为 1。
                model_parallel_size = int(os.envirion.get("WORLD_SIZE", 1))
            # 使用指定的模型并行大小来初始化模型并行
            initialize_model_parallel(model_parallel_size)
            
        # 获取local rank，用于设置当前进程使用的GPU。
        local_rank = int(os.environ.get("LOCAL_RANK", 0))
        
        # 在分布式训练中，每个进程都会有一个local rank，它表示这个进程在当前节点中的rank。
        torch.cuda.set_device(local_rank)
        
        # 确保所有进程的一致性，方法设置了一个固定的随机种子 1
        torch.manual_seed(1)
        
        # 在多GPU环境中，这段代码确保只有主进程（local_rank == 0）会输出到标准输出，
        # 其他进程的输出被重定向到空设备，即不显示。
        if local_rank > 0:
            sys.stdout = open(os.devnull, "w")
        
        # 用于后续计算加载模型花费的时间
        start_time = time.time()
        
        # 搜索包含所有的.pth文件（PyTorch模型文件）的目录，确保至少找到一个这样的文件。
        # 这些文件是分布式训练结果，每个文件代表一个训练的部分。
        checkpoints = sorted(Path(ckpt_dir)).glob("*.pth")
        assert len(checkpoints) > 0, f"no checkpoint files found in {ckpt_dir}"
        
        # 检查模型并行的大小是否等于找到的.pth文件的数量。如果不等于，它将抛出一个异常，
        # 因为每一个模型并行的部分需要一个对应的.pth文件。
        assert model_parallel_size == len(checkpoints), \
        f"Loading a checkpoint for MP={len(checkpoints)} , but world size is {model_parallel_size}"
        
        # 根据模型并行的等级加载对应的.pth文件，并行等级决定了加载哪一个.pth文件。
        ckpt_path = checkpoints[get_model_parallel_rank()]
        checkpoint = torch.load(ckpt_path, map_location="cpu")
        
        # 打开并加载一个名为"params.json"的文件，这个文件包含了模型的参数
        with open(Path(ckpt_dir) / "params.json", "r") as f:
            params = json.loads(f.read())
            
        # 创建了一个ModelArgs实例
        model_args: ModelArgs = ModelArgs(
            max_seq_len=max_seq_len,
            max_batch_size=max_batch_size,
            **params, # 其他从JSON文件中读取的参数
        )
        
        # 初始化了一个Tokenizer实例
        tokenizer = Tokenizer(model_path=tokenizer_path)
        
        # 将分词器的词汇表大小设置为模型的参数
        model_args.vocab_size = tokenizer.n_words
        
        # 代码将默认的Tensor类型设置为半精度浮点数（HalfTensor）。
        # 这样做的目的是减少内存消耗并加速计算，但可能会导致计算的精度略有下降。
        torch.set_default_tensor_type(torch.cuda.HalfTensor)
        
        # 创建了一个Transformer模型的实例
        model = Transformer(model_args)
        
        # 从之前加载的模型参数检查点（checkpoint）中加载参数
        model.load_state_dict(checkpoint, strict=False)
        print(f"Loaded in {time.time() - start_time:.2f} seconds.")
        
        # 创建了一个Llama类的实例，传入了模型和分词器作为参数，并返回这个实例。
        return Llama(model, tokenizer)
    '''知识补充：
    模型并行涉及多个checkpoint文件的原因是什么？
    （1）分割模型的不同部分： 在模型并行设置中，一个大型的模型被分割成多个较小的部分。每个部分可以在不同的处理单元（例如GPU）上运行。
                          这意味着模型的不同部分可以并行处理，提高了计算效率和规模。

    （2）独立保存每部分的状态：由于模型被分割成多个部分，每个部分的状态（权重和偏差）通常单独保存在不同的检查点文件中。
                          这样做的主要原因是简化模型的保存和加载过程，尤其是在大规模分布式训练环境中。

    （3）加载时的并行恢复： 当加载模型用于推理或进一步训练时，这些检查点文件可以被并行地加载到相应的处理单元上。
                          这样可以快速恢复模型的完整状态，同时保持模型部分的并行性。
    '''
    
    
    @torch.inference_mode() # @torch.inference_mode() 是PyTorch的一个装饰器，用于优化模型推理（inference）时的性能。
    def generate(
        self,
        # 输入的提示token，是一个二维列表，每个元素是一个token列表。
        prompt_tokens: List[List[int]],
        # 最大的生成长度
        max_gen_len: int, 
        # 温度
        temperature: float = 0.6, 
        # 用于控制生成新token时采样的方式，只有概率在前top_p的token才会被考虑。
        top_p: float = 0.9, 
        # 如果为True，返回每个生成token的对数概率。
        logprobs: bool = False, 
        # 如果为True，输入的提示token也会被包含在生成的结果中。
        echo: bool = False, 
    ) -> Tuple[List[List[int]], Optional[List[List[float]]]]:
        params = self.model.params
        bsz = len(prompt_tokens)
        # 检查输入的批次大小是否超过了模型参数的最大批次大小
        assert bsz < params.max_batch_size, (bsz, params.max_batch_size)
        
        # 首先获取了输入提示tokens的最小和最大长度，然后确保最大长度不超过模型参数的最大序列长度。
        min_prompt_len = min(len(t) for t in prompt_tokens)
        max_prompt_len = max(len(t) for t in prompt_tokens)
        assert max_prompt_len < params.max_seq_len
        
        # 总长度不超过最大序列长度
        total_len = min(params.max_seq_len, max_gen_len + max_prompt_len)
        
        # 使用填充tokens初始化一个用于存储生成tokens的张量
        pad_id = self.tokenizer.pad_id
        tokens = torch.full((bsz, total_len), pad_id, dtype=torch.long, device="cuda")
        
        # 对于每一个输入的提示token，将其复制到tokens张量的对应行中
        for k, t in enumerate(prompt_tokens):
            tokens[k, : len(t)] = torch.tensor(t, dtype=torch.long, device="cuda")
            
        # 如果logprobs参数为True，还会创建一个与tokens张量大小相同的token_logprobs张量，
        # 用于存储生成的每个token的对话概率。
        if logprobs:
            token_logprobs = torch.zero_like(tokens, dtype=torch.float)
        
        # 在生成文本时每一步对模型进行前向传播，并可能计算每个生成的单词的对数概率。
        # 表示在前向传播过程中模型应该从哪个位置开始考虑输入token
        prev_pos = 0
        
        # 初始化一个布尔型的张量eos_reached，长度等于批次大小（bsz）。
        # 这个张量的每个元素都表示一个输入样本是否已经生成了结束token（EOS）。
        # 初始时，所有元素都被设置为False，表示还没有生成EOS。
        eos_reached = torch.tensor([False] * bsz, device="cuda")
        
        # 创建一个掩码input_text_mask，用于标记输入tokens中哪些位置的token是实际的输入token，哪些是padding token。
        input_text_mask = tokens != pad_id
        
        # cur_pos 表示当前要生成的token的位置
        for cur_pos in range(min_prompt_len, total_len):
            # 调用模型的前向传播函数，对输入tokens的一部分进行处理，从位置prev_pos到位置cur_pos。
            # 这会返回一个logits张量，该张量包含了模型对每个可能的下一个token的原始预测。
            logits = self.model.forward(tokens[:, prev_pos:cur_pos], prev_pos)
            
            # 检查是否需要计算每个生成的token的对数概率
            if logprobs:
                # F.cross_entropy是PyTorch提供的交叉熵损失函数，它需要两个输入：预测的概率（即logits）和实际的目标tokens。
                # 通过计算交叉熵，可以得到模型对每个token的预测概率的负对数值。负对数概率越低，表示模型对该token的预测越有信心。
                token_logprobs[:, prev_pos + 1 : cur_pos + 1] = -F.cross_entropy(
                    # input是预测输入，这里是logits的转置。cross_entropy需要的logits形状是 (N, C, *)，
                    # 其中N是批量大小，C是类别数，*是任何其他维度。在这里，由于logits最初的形状是(N, *, C)，
                    input = logits.transpose(1, 2),
                    target = tokens[:, prev_pos + 1 : cur_pos + 1],
                    # reduction参数决定了如何将每个样本的损失整合起来。设置为none表示不对损失进行整合，
                    # 而是返回一个和目标大小一致的张量，这样就可以得到每个token的单独损失。
                    reduction="none",
                    # 设置pad_id为忽略的目标值，这样pad token的损失就不会被计算进去。
                    ignore_index=pad_id,
                )
                
            if temperature > 0:
                # 使用温度对logits进行缩放，使生成的分布变得更锐利（温度低）或更平均（温度高）。
                # softmax函数用于将logits转换为概率分布。
                probs = torch.softmax(logits[:, -1] / temperature, dim=-1)
                
                # 使用sample_top_p函数从调整过的概率分布中抽样生成下一个token。
                # 这个函数首先按概率降序排序所有可能的token，
                # 然后选择一个概率累计达到top_p的最小集合，最后在这个集合中随机选择一个token。
                # 这种方法叫做"nucleus sampling"，它是一种平衡生成质量和多样性的方法。
                next_token = sample_top_p(probs, top_p)
            else:
                # 如果温度为0，则直接选择具有最高logits的token作为下一个token。
                next_token = torch.argmax(logits[:, -1], dim=-1)
                
            # reshape为一维向量
            next_token = next_token.reshape(-1)
            
            # 使用一个条件来决定是否应该使用输入tokens的当前位置的值，还是新生成的next_token。
            # input_text_mask是一个布尔值的mask，其中对于每一个位置，如果其值为True，
            # 表示这个位置在输入的文本中；如果为False，表示这个位置是需要生成的位置。
            # torch.where根据条件从两个选项中选择一个值。在我们的例子中，如果input_text_mask在cur_pos位置的值为True，
            # 我们会使用tokens[:, cur_pos]的值，否则使用新生成的next_token。
            next_token = torch.where(
                input_text_mask[:, cur_pos], tokens[:, cur_pos], next_token
            )
            
            # 将新生成的next_token放入tokens张量的当前位置
            tokens[:, cur_pos] = next_token
            
            # 检查是否已经生成了EOS token
            # 如果当前位置不是输入文本的一部分（~input_text_mask[:, cur_pos]返回True）
            # 并且生成的next_token 是EOS token（next_token == self.tokenizer.eos_id 返回True），
            # 那么eos_reached将被设为True。
            eos_reached |= (~input_text_mask[:, cur_pos]) & (next_token == self.tokenizer.eos_id)
            '''知识补充: 
            按位非运算符 ~，代码将输入文本的掩码值反转，如果当前位置是输入文本的一部分，结果将为False，反之则为True。
            通过位或运算符 |= 更新eos_reached变量。如果上述条件为True，则eos_reached将被设置为True。否则，如果eos_reached已经是 True，它将保持不变。
            '''
            
            # 将当前位置cur_pos保存到变量prev_pos中，以便在下一个循环中使用
            prev_pos = cur_pos
            
            # 如果所有的输入序列都已经生成了EOS token，那么就提前结束循环。
            # 在实际使用中，这可以帮助我们提高效率，因为一旦所有的序列都已经完成生成，就没有必要继续计算了。
            if all(eos_reached):
                break
                
            if logprobs:
                # 从PyTorch tensor转换为Python列表
                token_logprobs = token_logprobs.tolist()
                
            # 遍历，对每个生成的tokens进行处理
            out_tokens, out_logprobs = [], []
            for i, toks in enumerate(tokens.tolist()):
                # 如果echo为True，那么就从0开始，否则从输入prompt的长度开始。
                start = 0 if echo else len(prompt_tokens[i])
                # 截取了生成的tokens到指定的长度max_gen_len
                toks = toks[start : len(prompt_tokens[i]) + max_gen_len]
                probs = None
                
                if logprobs:
                    # 截取了对应的logprobs到指定的长度
                    probs = token_logprobs[i][start : len(prompt_tokens[i]) + max_gen_len]
                    
                # 是否存在eos_id（句子结束标记）
                if self.tokenizer.eos_id in toks:
                    # 获取了eos_id的位置
                    eos_idx = toks.index(self.tokenizer.eos_id)
                    # 截断
                    toks = toks[:eos_idx]
                    probs = probs[:eos_idx] if logprobs else None
                
                # 将处理后的tokens和logprobs添加到结果列表中
                out_tokens.append(toks)
                out_logprobs.append(probs)
                
            return (out_tokens, out_logprobs if logprobs else None)
        
        
        # 基于输入的提示文本进行文本生成
        def text_completion(
            self,
            prompts: List[str], # 一个字符串列表，每个字符串都是一个输入的提示文本。
            temperature: float = 0.6, 
            top_p: float = 0.9, # 用于控制生成新token时采样的方式，只有概率在前top_p的token才会被考虑。
            max_gen_len: Optional[int] = None, # 控制生成的文本的最大长度
            logprobs: bool = False, # 如果为True，返回每个生成token的log概率。
            echo: bool = False, # 如果为True，输入的提示token也会被包含在生成的结果中。
        ) -> List[CompletionPrediction]:
            if max_gen_len is None:
                # 设置为模型参数的最大序列长度减1
                max_gen_len = self.model.params.max_seq_len - 1
            
            # 将输入的提示文本编码成tokens，同时添加一个bos（begin of sequence）token在每个序列的开始，
            # 并且不在每个序列的结束添加eos（end of sequence）token。
            prompt_tokens = [self.tokenizer.encode(x, bos=True, eos=False) for x in prompts]
            
            # 生成文本，返回生成的tokens和相应的log概率
            generation_tokens, generation_logprobs = self.generate(
                prompt_tokens = prompt_tokens,
                max_gen_len = max_gen_len,
                temperature = temperature,
                top_p = top_p,
                logprobs = logprobs,
                echo = echo,
            )
            
            if logprobs:
                # 需要返回每个生成token的log概率
                return [
                    {
                        "generation": self.tokenizer.decode(t), # 解码后的生成文本
                        "tokens": [self.tokenizer.decode(x) for x in t], # 生成文本中的每个token解码后的形式
                        "logprobs": logprobs_i, # 每个生成token的log概率
                    }
                    for t, logprobs_i in zip(generation_tokens, generation_logprobs)
                ]
            
            # 如果logprobs为False，则返回一个字典列表。
            return [{"generation": self.tokenizer.decode(t)} for t in generation_tokens]
        
        
        
        
        def chat_completion(
            self,
            # 对话信息的列表，每个对话Dialog是一个字典列表，表示对话中的一条消息。
            # 对话中的每条消息都有一个角色（role）和内容（content）。
            dialogs: List[Dialog],
            temperature: float = 0.6,
            top_p: float = 0.9,
            max_gen_len: Optional[int] = None,
            logprobs: bool = False,
        ) -> List[ChatPrediction]:
            
            if max_gen_len is None:
                # 设置为模型参数的最大序列长度减1
                max_gen_len = self.model.params_max_seq_len - 1
                
            prompt_tokens = []
            for dialog in dialogs:
                # 确保每一个对话都是以系统提示开始的，并且消息会以user/assistant/user/assistant的顺序交替进行
                if dialog[0]["role"] != "system":
                    dialog = [
                        {
                            "role": "system",
                            "content": DEFAULT_SYSTEM_PROMPT,
                        }
                    ] + dialog
                    
                # 将对话中的前两条消息组合成一条，第一条消息的内容被包含在B_SYS和E_SYS标记中，
                # 并与第二条消息的内容组合起来。这是因为在该模型中，系统提示和用户的首条消息被视为一个整体，
                # 用于生成模型的第一个回复。
                dialog = [
                    {
                        "role": dialog[1]["role"],
                        "content": B_SYS + dialog[0]["content"] + E_SYS + dialog[1]["content"],
                    }
                ] + dialog[2:]
                
                # 使用assert来确保在对话中的消息是以用户和助手的顺序交替进行的
                assert all([msg["role"] == "user" for msg in dialog[::2]]) and \
                       all([msg["role"] == "assistant" for msg in dialog[1::2]]), \
                       ("model only supports 'system', 'user' and 'assistant' roles, "
                        "starting with 'system', then 'user' and alternating (u/a/u/a/u...)")
                        
                
                # ① 对于每一对prompt和answer，将其格式化为模型所需要的格式，这种格式包含B_INST（实例开始标记）和E_INST（实例结束标记）。
                # ② 对格式化的字符串进行编码，使其变为一串整数列表。
                # ③ 使用zip函数将prmpt和answer对应起来，这样可以确保每一对prompt和answer都是一组。
                # ④ 将对每一对消息编码的结果通过sum函数连接起来，形成一个列表，其中包含了整个对话的编码结果。
                dialog_tokens: List[int] = sum(
                    [
                        self.tokenizer.encode(
                            f"{B_INST} {(prompt['content']).strip()} {E_INST} {(answer['content']).strip()}",
                            bos = True,
                            eos = True,
                        )
                        for prompt, answer in zip(dialog[::2], dialog[1::2])
                    ],
                    []
                )
            
            
            # 确保对话的最后一条消息是由用户发送的。如果最后一条消息不是用户的，这将抛出一个错误。
            # 这是因为模型是设计用来生成对用户消息的回应的，所以需要用户的消息作为最后一个输入。
            assert (dialog[-1]["role"] == "user"), f"Last message must be from user, got {dialog[-1]['role']}"
            
            # 把这个最后的用户消息添加到dialog_tokens中。在添加时，
            # 它使用tokenizer.encode函数把最后一条用户消息转化成一个数字的列表（即tokens）。
            dialog_tokens += self.tokenizer.encode(
                f"{B_INST} {(dialog[-1]['content'].strip())} {E_INST}",
                bos = True,
                eos = False,
            )
            
            # 将处理好的 dialog_tokens 添加到 prompt_tokens 列表中，这个列表中的每个元素都代表一段对话的tokens。
            prompt_tokens.append(dialog_tokens)
            
        
        # 生成一些 tokens
        generation_tokens, generation_logprobs = self.generate(
            prompt_tokens = prompt_tokens, # 输入到模型的tokens
            max_gen_len = max_gen_len, # 文本的最大长度
            temperature = temperature, 
            top_p = top_p,
            logprobs = logprobs,
        )
        
        # 需要返回每个生成token的log概率
        if logprobs:
            return [
                {
                    "generation": {
                        "role": "assistant",
                        "content": self.tokenizer.decode(t),
                    },
                    "tokens": [self.tokenizer.decode(x) for x in t],
                    "logprobs": logprobs_i,
                }
                for t, logprobs_i in zip(generation_tokens, generation_logprobs)
            ]
        
        # 如果logprobs为False，则返回一个字典列表。
        return [
            {"generation": {"role": "assistant",
                            "content": self.tokenizer.decode(t)}}
            for t in generation_tokens
        ]
        