# Reward Modeling 脚本详细解释


In [3]:
from reward_modeling import *

## 初始化
- 导入必要的库和模块: 代码开始导入了多个库,包括 PyTorch、Transformers、PEFT 等。这些库提供了模型训练和数据处理所需的核心功能。
- 定义模型类别常量 (MODEL_CLASSES): 创建了一个字典,将模型类型映射到相应的配置、模型和分词器类。这使得代码可以灵活地支持多种模型架构。
- 定义数据类: 使用 Python 的 dataclass 装饰器定义了几个数据类(ModelArguments, DataArguments, ScriptArguments),用于存储和管理各种参数。
- 解析命令行参数: 使用 HfArgumentParser 解析命令行参数,将它们转换为之前定义的数据类的实例。

### 几个数据类(ModelArguments, DataArguments, ScriptArguments)



1. 如何使用 dataclasses.field 为数据类字段设置默认值和元数据？请给出示例。
2. __post_init__ 方法的作用是什么？请描述其在数据类中的用途，并给出一个验证字段值的示例。




In [6]:
from dataclasses import dataclass, field
from typing import Optional, Union, List, Dict, Any

# 假设这里是 ModelArguments, DataArguments, ScriptArguments 的定义
# 由于代码已经在上文中给出，这里不再重复定义

# 测试 ModelArguments
model_args = ModelArguments(
    model_type="bert",
    model_name_or_path="bert-base-uncased",
    tokenizer_name_or_path="bert-base-uncased",
    load_in_4bit=False,
    load_in_8bit=False,
    cache_dir="./cache",
    use_fast_tokenizer=True,
    torch_dtype="float32",
    device_map="auto",
    trust_remote_code=True
)

# 测试 DataArguments
data_args = DataArguments(
    dataset_name="squad",
    dataset_config_name="v2",
    train_file_dir="./data/train",
    validation_file_dir="./data/valid",
    max_source_length=512,
    max_target_length=128,
    overwrite_cache=False,
    validation_split_percentage=5,
    preprocessing_num_workers=4
)

# 测试 ScriptArguments
script_args = ScriptArguments(
    use_peft=True,
    target_modules="all",
    lora_rank=8,
    lora_dropout=0.05,
    lora_alpha=32.0,
    modules_to_save=None,
    peft_path=None,
    template_name="vicuna"
)

# 打印实例化对象，验证参数
print(model_args)
print(data_args)
print(script_args)

ModelArguments(model_type='bert', model_name_or_path='bert-base-uncased', tokenizer_name_or_path='bert-base-uncased', load_in_4bit=False, load_in_8bit=False, cache_dir='./cache', use_fast_tokenizer=True, torch_dtype='float32', device_map='auto', trust_remote_code=True)
DataArguments(dataset_name='squad', dataset_config_name='v2', train_file_dir='./data/train', validation_file_dir='./data/valid', max_source_length=512, max_target_length=128, max_train_samples=None, max_eval_samples=None, overwrite_cache=False, validation_split_percentage=5, preprocessing_num_workers=4)
ScriptArguments(use_peft=True, target_modules='all', lora_rank=8, lora_dropout=0.05, lora_alpha=32.0, modules_to_save=None, peft_path=None, template_name='vicuna')


### 奖励模型的特殊数据处理器（RewardDataCollatorWithPadding）

RewardDataCollatorWithPadding 类与 ModelArguments、DataArguments 和 ScriptArguments 这几个类在本质上有很大的不同。

特殊性原因：
- 奖励模型使用成对数据（选择vs拒绝的回答）
- 需要特殊的批处理方式
- 要求对不同长度的回答进行填充
- 可能需要特定的输入格式（如系统提示+对话历史+当前问题）
- 提高训练效率
- 增加数据处理的灵活性

RewardDataCollatorWithPadding 的主要功能：
- 分别处理"选择"和"拒绝"的输入
- 对输入进行适当的填充
- 将处理后的数据组织成模型所需的格式

与其他参数类（如ModelArguments）的区别：
1. 用途：数据处理 vs 配置管理
2. 定义：包含方法的完整类 vs 简单的数据容器
3. 使用时机：训练过程中持续使用 vs 训练开始前设置
4. 复杂度：包含数据处理逻辑 vs 主要是属性定义
5. 可定制性：根据任务需求可调整 vs 主要由外部输入决定
6. 生命周期：每批次调用 vs 整个训练过程中保持不变

总结：RewardDataCollatorWithPadding 是动态的功能组件，而其他参数类是静态的配置容器，反映了它们在机器学习管道中的不同角色和职责。

In [4]:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from transformers import PreTrainedTokenizerBase
import torch

# 复制 RewardDataCollatorWithPadding 类定义（如您提供的代码）

# 创建一个模拟的分词器
class MockTokenizer:
    def pad(self, features, padding, max_length, pad_to_multiple_of, return_tensors):
        max_length = max(len(f['input_ids']) for f in features)
        padded_features = []
        for feature in features:
            pad_length = max_length - len(feature['input_ids'])
            padded_feature = {
                'input_ids': feature['input_ids'] + [0] * pad_length,
                'attention_mask': feature['attention_mask'] + [0] * pad_length
            }
            padded_features.append(padded_feature)
        return {
            'input_ids': torch.tensor([f['input_ids'] for f in padded_features]),
            'attention_mask': torch.tensor([f['attention_mask'] for f in padded_features])
        }

# 初始化 RewardDataCollatorWithPadding 实例
tokenizer = MockTokenizer()
data_collator = RewardDataCollatorWithPadding(tokenizer)

# 准备模拟的输入数据
mock_features = [
    {
        'input_ids_chosen': [1, 2, 3],
        'attention_mask_chosen': [1, 1, 1],
        'input_ids_rejected': [4, 5],
        'attention_mask_rejected': [1, 1]
    },
    {
        'input_ids_chosen': [1, 2],
        'attention_mask_chosen': [1, 1],
        'input_ids_rejected': [4, 5, 6],
        'attention_mask_rejected': [1, 1, 1]
    }
]

# 调用 RewardDataCollatorWithPadding 实例处理数据
batch = data_collator(mock_features)

# 打印并验证结果
print("Processed batch:")
for key, value in batch.items():
    if isinstance(value, torch.Tensor):
        print(f"{key}:\n{value}\n")
    else:
        print(f"{key}: {value}\n")

# 验证
assert batch['input_ids_chosen'].shape == batch['attention_mask_chosen'].shape
assert batch['input_ids_rejected'].shape == batch['attention_mask_rejected'].shape
assert batch['return_loss'] == True

print("All assertions passed. The RewardDataCollatorWithPadding is working as expected.")

Processed batch:
input_ids_chosen:
tensor([[1, 2, 3],
        [1, 2, 0]])

attention_mask_chosen:
tensor([[1, 1, 1],
        [1, 1, 0]])

input_ids_rejected:
tensor([[4, 5, 0],
        [4, 5, 6]])

attention_mask_rejected:
tensor([[1, 1, 0],
        [1, 1, 1]])

return_loss: True

All assertions passed. The RewardDataCollatorWithPadding is working as expected.


RewardDataCollatorWithPadding 在奖励模型训练中的应用

偏好数据集结构：$\mathcal{D}_{\text{off}} = \{(x, a^w, a^l)\}$
- $x$: 输入（问题/上下文）
- $a^w$: 优选回答
- $a^l$: 劣选回答

处理流程：

1. 原始数据示例
   - $x$: "What's the capital of France?"
   - $a^w$: "The capital of France is Paris."
   - $a^l$: "The capital of France is London."

2. 数据预处理
   转换为 token ID：
   - input_ids_chosen: [101, 2054, ..., 3000, 1012, 102]
   - input_ids_rejected: [101, 2054, ..., 2414, 1012, 102]

3. RewardDataCollatorWithPadding 处理
   - 分离 "chosen" 和 "rejected" 数据
   - 填充序列
   - 创建注意力掩码
   - 组织数据格式

4. 输出
   字典格式：
   - input_ids_chosen
   - attention_mask_chosen
   - input_ids_rejected
   - attention_mask_rejected
   - return_loss: True

5. 用于训练
   - 计算优选和劣选回答得分
   - 计算损失
   - 反向传播和参数更新

优点：
1. 确保批次数据长度一致
2. 保持优选和劣选回答配对
3. 允许模型同时学习正面和负面例子

结论：RewardDataCollatorWithPadding 巧妙适应奖励模型的训练需求，促进有效学习人类偏好。

### 数据准备




In [14]:
parser = HfArgumentParser((ModelArguments, DataArguments, TrainingArguments, ScriptArguments))
model_args, data_args, training_args, script_args = parser.parse_args_into_dataclasses()
prompt_template = get_conv_template(script_args.template_name)

# Preprocessing the datasets
full_max_length = data_args.max_source_length + data_args.max_target_length

def preprocess_reward_function(examples):
    """
    Turn the dataset into pairs of Question + Answer, where input_ids_chosen is the preferred question + answer
        and text_rejected is the other.
    """
    new_examples = {
        "input_ids_chosen": [],
        "attention_mask_chosen": [],
        "input_ids_rejected": [],
        "attention_mask_rejected": [],
    }
    for system, history, question, chosen, rejected in zip(
            examples["system"],
            examples["history"],
            examples["question"],
            examples["response_chosen"],
            examples["response_rejected"]
    ):
        system_prompt = system or ""
        chosen_messages = history + [[question, chosen]] if history else [[question, chosen]]
        chosen_prompt = prompt_template.get_prompt(messages=chosen_messages, system_prompt=system_prompt)
        rejected_messages = history + [[question, rejected]] if history else [[question, rejected]]
        rejected_prompt = prompt_template.get_prompt(messages=rejected_messages, system_prompt=system_prompt)

        tokenized_chosen = tokenizer(chosen_prompt)
        tokenized_rejected = tokenizer(rejected_prompt)

        new_examples["input_ids_chosen"].append(tokenized_chosen["input_ids"])
        new_examples["attention_mask_chosen"].append(tokenized_chosen["attention_mask"])
        new_examples["input_ids_rejected"].append(tokenized_rejected["input_ids"])
        new_examples["attention_mask_rejected"].append(tokenized_rejected["attention_mask"])
    return new_examples

usage: ipykernel_launcher.py [-h] [--model_type MODEL_TYPE]
                             [--model_name_or_path MODEL_NAME_OR_PATH]
                             [--tokenizer_name_or_path TOKENIZER_NAME_OR_PATH]
                             [--load_in_4bit [LOAD_IN_4BIT]]
                             [--load_in_8bit [LOAD_IN_8BIT]]
                             [--cache_dir CACHE_DIR]
                             [--use_fast_tokenizer [USE_FAST_TOKENIZER]]
                             [--torch_dtype {auto,bfloat16,float16,float32}]
                             [--device_map DEVICE_MAP]
                             [--trust_remote_code [TRUST_REMOTE_CODE]]
                             [--no_trust_remote_code]
                             [--dataset_name DATASET_NAME]
                             [--dataset_config_name DATASET_CONFIG_NAME]
                             [--train_file_dir TRAIN_FILE_DIR]
                             [--validation_file_dir VALIDATION_FILE_DIR]
                 

SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
# Get reward dataset for tuning the reward model.
if data_args.dataset_name is not None:
    # Downloading and loading a dataset from the hub.
    raw_datasets = load_dataset(
        data_args.dataset_name,
        data_args.dataset_config_name,
        cache_dir=model_args.cache_dir,
    )
    if "validation" not in raw_datasets.keys():
        raw_datasets["validation"] = load_dataset(
            data_args.dataset_name,
            data_args.dataset_config_name,
            split=f"train[:{data_args.validation_split_percentage}%]",
            cache_dir=model_args.cache_dir,
        )
        raw_datasets["train"] = load_dataset(
            data_args.dataset_name,
            data_args.dataset_config_name,
            split=f"train[{data_args.validation_split_percentage}%:]",
            cache_dir=model_args.cache_dir,
        )
else:
    data_files = {}
    if data_args.train_file_dir is not None and os.path.exists(data_args.train_file_dir):
        train_data_files = glob(f'{data_args.train_file_dir}/**/*.json', recursive=True) + glob(
            f'{data_args.train_file_dir}/**/*.jsonl', recursive=True)
        logger.info(f"train files: {', '.join(train_data_files)}")
        data_files["train"] = train_data_files
    if data_args.validation_file_dir is not None and os.path.exists(data_args.validation_file_dir):
        eval_data_files = glob(f'{data_args.validation_file_dir}/**/*.json', recursive=True) + glob(
            f'{data_args.validation_file_dir}/**/*.jsonl', recursive=True)
        logger.info(f"eval files: {', '.join(eval_data_files)}")
        data_files["validation"] = eval_data_files
    raw_datasets = load_dataset(
        'json',
        data_files=data_files,
        cache_dir=model_args.cache_dir,
    )
    # If no validation data is there, validation_split_percentage will be used to divide the dataset.
    if "validation" not in raw_datasets.keys():
        raw_datasets["validation"] = load_dataset(
            'json',
            data_files=data_files,
            split=f"train[:{data_args.validation_split_percentage}%]",
            cache_dir=model_args.cache_dir,
        )
        raw_datasets["train"] = load_dataset(
            'json',
            data_files=data_files,
            split=f"train[{data_args.validation_split_percentage}%:]",
            cache_dir=model_args.cache_dir,
        )
logger.info(f"Raw datasets: {raw_datasets}")

In [13]:
train_dataset = None
max_train_samples = 0
if training_args.do_train:
    if "train" not in raw_datasets:
        raise ValueError("--do_train requires a train dataset")
    train_dataset = raw_datasets['train']
    max_train_samples = len(train_dataset)
    if data_args.max_train_samples is not None and data_args.max_train_samples > 0:
        max_train_samples = min(len(train_dataset), data_args.max_train_samples)
        train_dataset = train_dataset.select(range(max_train_samples))
    logger.debug(f"Example train_dataset[0]: {train_dataset[0]}")
    with training_args.main_process_first(desc="Train dataset tokenization"):
        tokenized_dataset = train_dataset.shuffle().map(
            preprocess_reward_function,
            batched=True,
            num_proc=data_args.preprocessing_num_workers,
            remove_columns=train_dataset.column_names,
            load_from_cache_file=not data_args.overwrite_cache,
            desc="Running tokenizer on dataset",
        )
        train_dataset = tokenized_dataset.filter(
            lambda x: 0 < len(x['input_ids_rejected']) <= full_max_length and 0 < len(
                x['input_ids_chosen']) <= full_max_length
        )
        logger.debug(f"Num train_samples: {len(train_dataset)}")
        logger.debug("Tokenized training example:")
        logger.debug(tokenizer.decode(train_dataset[0]['input_ids_chosen']))

eval_dataset = None
max_eval_samples = 0
if training_args.do_eval:
    with training_args.main_process_first(desc="Eval dataset tokenization"):
        if "validation" not in raw_datasets:
            raise ValueError("--do_eval requires a validation dataset")
        eval_dataset = raw_datasets["validation"]
        max_eval_samples = len(eval_dataset)
        if data_args.max_eval_samples is not None and data_args.max_eval_samples > 0:
            max_eval_samples = min(len(eval_dataset), data_args.max_eval_samples)
            eval_dataset = eval_dataset.select(range(max_eval_samples))
        logger.debug(f"Example eval_dataset[0]: {eval_dataset[0]}")
        tokenized_dataset = eval_dataset.map(
            preprocess_reward_function,
            batched=True,
            num_proc=data_args.preprocessing_num_workers,
            remove_columns=eval_dataset.column_names,
            load_from_cache_file=not data_args.overwrite_cache,
            desc="Running tokenizer on dataset",
        )
        eval_dataset = tokenized_dataset.filter(
            lambda x: 0 < len(x['input_ids_rejected']) <= full_max_length and 0 < len(
                x['input_ids_chosen']) <= full_max_length
        )
        logger.debug(f"Num eval_samples: {len(eval_dataset)}")
        logger.debug("Tokenized eval example:")
        logger.debug(tokenizer.decode(eval_dataset[0]['input_ids_chosen']))

NameError: name 'training_args' is not defined

## 模型和分词器准备

- 加载预训练模型配置
- 加载预训练模型
- 加载分词器
- 设置分词器参数（eos_token, bos_token, pad_token）

具体到这个代码

为了测试模型和分词器准备的逻辑，我们需要执行以下步骤：

1. **初始化模型参数**：创建一个 `ModelArguments` 实例，包含模型类型、模型名称或路径等信息。
2. **初始化分词器参数**：基于模型参数，创建一个分词器实例。
3. **验证模型和分词器**：确保模型和分词器正确加载，并且分词器的特殊令牌（如 `eos_token`, `bos_token`, `pad_token`）已正确设置。

In [8]:
from transformers import AutoTokenizer, AutoModelForCausalLM

# 根据提供的参数初始化模型和分词器
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# 加载模型和分词器
# Assuming you've downloaded and extracted the model to './local_model_directory'
model_directory = '/root/models/JimmyMa99/BaJie-Chat-mini/'
tokenizer = AutoTokenizer.from_pretrained(model_directory, trust_remote_code=True, device_map='cuda:0')
model = AutoModelForCausalLM.from_pretrained(model_directory, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map='cuda:0')


tokenizer_special_tokens = {
    'eos_token': '[EOS]',
    'bos_token': '[BOS]',
    'pad_token': '[PAD]'
}


# 设置分词器参数
tokenizer.add_special_tokens(tokenizer_special_tokens)

# 验证模型是否正确加载
assert model is not None, "Model is not loaded properly."

# 验证分词器是否正确加载
assert tokenizer is not None, "Tokenizer is not loaded properly."

# 验证分词器的特殊令牌是否已正确设置
assert tokenizer.eos_token == '[EOS]', "EOS token is not set correctly."
assert tokenizer.bos_token == '[BOS]', "BOS token is not set correctly."
assert tokenizer.pad_token == '[PAD]', "PAD token is not set correctly."

print("Model and tokenizer are correctly loaded and configured.")

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Model and tokenizer are correctly loaded and configured.


## PEFT 配置
- 如果启用 PEFT：
  - 使用 `LoraConfig` 配置 LoRA 参数
  - 使用 `get_peft_model` 将 LoRA 应用到模型

PEFT 配置逻辑梳理：

1. 判断是否使用 PEFT
   - 如果 `script_args.use_peft` 为 True，进行 PEFT 配置
   - 否则，进行全参数微调

2. PEFT 配置流程
   a. 检查是否有预训练的 PEFT 模型
      - 如果 `script_args.peft_path` 不为 None，加载预训练的 PEFT 模型
      - 否则，初始化新的 PEFT 模型

   b. 初始化新 PEFT 模型的步骤
      1) 如果使用 8 位量化 (`model_args.load_in_8bit` 为 True)
         - 调用 `prepare_model_for_kbit_training` 准备模型
      
      2) 确定目标模块 (target_modules)
         - 如果指定了 target_modules，进行分割
         - 如果包含 'all'，使用 `find_all_linear_names` 函数查找所有线性层
      
      3) 处理需要保存的模块 (modules_to_save)
         - 如果指定了 modules_to_save，进行分割
      
      4) 配置 LoRA
         - 创建 LoraConfig 对象，设置任务类型、目标模块、LoRA 参数等
      
      5) 应用 PEFT
         - 使用 `get_peft_model` 函数将 LoRA 配置应用到模型

   c. 参数处理
      - 将所有需要梯度的参数转换为 float32 类型
      
   d. 打印可训练参数信息
      - 调用 `model.print_trainable_parameters()`

3. 全参数微调
   - 如果不使用 PEFT，直接打印可训练参数信息

In [9]:
if script_args.use_peft:
    logger.info("Fine-tuning method: LoRA(PEFT)")
    if script_args.peft_path is not None:
        logger.info(f"Peft from pre-trained model: {script_args.peft_path}")
        model = PeftModel.from_pretrained(model, script_args.peft_path, is_trainable=True)
    else:
        logger.info("Init new peft model")
        if model_args.load_in_8bit:
            model = prepare_model_for_kbit_training(model)
        target_modules = script_args.target_modules.split(',') if script_args.target_modules else None
        if target_modules and 'all' in target_modules:
            target_modules = find_all_linear_names(model, int4=False, int8=model_args.load_in_8bit)
        modules_to_save = script_args.modules_to_save
        if modules_to_save is not None:
            modules_to_save = modules_to_save.split(',')
        logger.info(f"Peft target_modules: {target_modules}")
        logger.info(f"Peft lora_rank: {script_args.lora_rank}")
        peft_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            target_modules=target_modules,
            inference_mode=False,
            r=script_args.lora_rank,
            lora_alpha=script_args.lora_alpha,
            lora_dropout=script_args.lora_dropout,
            modules_to_save=modules_to_save)
        model = get_peft_model(model, peft_config)
    for param in filter(lambda p: p.requires_grad, model.parameters()):
        param.data = param.data.to(torch.float32)
    model.print_trainable_parameters()
else:
    logger.info("Fine-tuning method: Full parameters training")
    print_trainable_parameters(model)

[32m2024-07-09 14:47:33.192[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m2[0m - [1mFine-tuning method: LoRA(PEFT)[0m
[32m2024-07-09 14:47:33.193[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1mInit new peft model[0m
[32m2024-07-09 14:47:33.193[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m16[0m - [1mPeft target_modules: ['output', 'w1', 'w2', 'w3', 'wo', 'wqkv'][0m
[32m2024-07-09 14:47:33.194[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m17[0m - [1mPeft lora_rank: 8[0m


trainable params: 8,621,056 || all params: 1,897,731,072 || trainable%: 0.45428228094059475


## 模型训练准备
- 配置梯度检查点
- 启用输入梯度计算
- 设置并行训练（如果有多个 GPU）


In [None]:
parser = HfArgumentParser((ModelArguments, DataArguments, TrainingArguments, ScriptArguments))
model_args, data_args, training_args, script_args = parser.parse_args_into_dataclasses()

# Initialize our Trainer
if training_args.gradient_checkpointing:
    model.gradient_checkpointing_enable()
    model.config.use_cache = False
else:
    model.config.use_cache = True
model.enable_input_require_grads()
if torch.cuda.device_count() > 1:
    # Keeps Trainer from trying its own DataParallelism when more than 1 gpu is available
    model.is_parallelizable = True
    model.model_parallel = True

## 自定义训练器定义
- 实现 `RewardTrainer` 类（继承自 `Trainer`）
  - 重写 `compute_loss` 方法
  - 重写 `evaluate` 方法
  - 重写 `prediction_step` 方法
  - 重写 `save_model` 方法


## 训练过程
- 初始化 `RewardTrainer`
- 如果 `do_train` 为 True：
  - 开始训练
  - 记录训练指标
  - 保存训练后的模型和分词器
- 如果 `do_eval` 为 True：
  - 进行评估
  - 计算并记录评估指标

In [None]:
trainer = RewardTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset if training_args.do_train else None,
    eval_dataset=eval_dataset if training_args.do_eval else None,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    data_collator=RewardDataCollatorWithPadding(
        tokenizer=tokenizer, max_length=full_max_length, padding="max_length"
    ),
)

# Training
if training_args.do_train:
    logger.info("*** Train ***")
    logger.debug(f"Train dataloader example: {next(iter(trainer.get_train_dataloader()))}")
    checkpoint = None
    if training_args.resume_from_checkpoint is not None:
        checkpoint = training_args.resume_from_checkpoint
    train_result = trainer.train(resume_from_checkpoint=checkpoint)

    metrics = train_result.metrics
    metrics["train_samples"] = max_train_samples
    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)
    trainer.save_state()

    model.config.use_cache = True  # enable cache after training
    if trainer.is_world_process_zero():
        logger.debug(f"Training metrics: {metrics}")
        logger.info(f"Saving model checkpoint to {training_args.output_dir}")
        save_model(model, tokenizer, training_args)

# Evaluation
if training_args.do_eval:
    logger.info("*** Evaluate ***")
    metrics = trainer.evaluate()

    metrics["eval_samples"] = max_eval_samples
    try:
        perplexity = math.exp(metrics["eval_loss"])
    except OverflowError:
        perplexity = float("inf")
    metrics["perplexity"] = perplexity
    trainer.log_metrics("eval", metrics)
    trainer.save_metrics("eval", metrics)
    if trainer.is_world_process_zero():
        logger.debug(f"Eval metrics: {metrics}")

## 主函数
- 按顺序调用上述所有步骤


In [1]:
from dataclasses import dataclass
from typing import Optional, List, Sequence, Dict

@dataclass
class Conversation:
    name: str
    system_prompt: str
    messages: Optional[List[Sequence[str]]]
    roles: Optional[Sequence[str]]
    prompt: str
    sep: str
    stop_str: Optional[str] = "</s>"

    def get_prompt(self, messages: Optional[List[Sequence[str]]] = None, system_prompt: Optional[str] = "") -> str:
        return "".join(self._format_example(messages, system_prompt))

    def get_dialog(self, messages: Optional[List[Sequence[str]]] = None, system_prompt: Optional[str] = "") -> List[str]:
        return self._format_example(messages, system_prompt)

    def _format_example(self, messages: Optional[List[Sequence[str]]] = None, system_prompt: Optional[str] = "") -> List[str]:
        system_prompt = system_prompt or self.system_prompt
        system_prompt = system_prompt + self.sep if system_prompt else ""
        messages = messages or self.messages
        convs = []
        for turn_idx, [user_query, bot_resp] in enumerate(messages):
            if turn_idx == 0:
                convs.append(system_prompt + self.prompt.format(query=user_query))
                convs.append(bot_resp)
            else:
                convs.append(self.sep + self.prompt.format(query=user_query))
                convs.append(bot_resp)
        return convs

    def append_message(self, query: str, answer: str):
        self.messages.append([query, answer])

# 全局注册表
conv_templates: Dict[str, Conversation] = {}

def register_conv_template(template: Conversation):
    conv_templates[template.name] = template

def get_conv_template(name: str) -> Conversation:
    return conv_templates[name]

# 注册一个示例模板
register_conv_template(
    Conversation(
        name="example",
        system_prompt="This is an example system prompt.",
        messages=[],
        roles=("USER", "ASSISTANT"),
        prompt="USER: {query} ASSISTANT:",
        sep="</s>"
    )
)

# 使用已注册的模板
example_template = get_conv_template("example")
print(example_template.get_prompt([["Hello", "Hi there!"]]))


This is an example system prompt.</s>USER: Hello ASSISTANT:Hi there!


In [2]:
2

2