# 1. 生成原始批判

In [18]:
import datasets

original_data_path = '../original_data/Skywork-Reward-Preference-80K-v0.1/data'
critique_data_path = 'data/skyword_original_critique.json'

original_dataset = datasets.load_dataset(original_data_path, split='train')
original_dataset

Dataset({
    features: ['chosen', 'rejected', 'source'],
    num_rows: 81973
})

In [17]:
# 将成对偏好数据生成原始批判，初次生成，可能会有错误
import os
import datasets
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # 防止打印一堆没用的东西

from prompt import SYS_MSG_ENGLISH, SYS_MSG_CHINESE
from utils import is_chinese_text

'''
vllm 启动qwen-72b得到原始批判
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server --host 0.0.0.0 --port 5001 --served-model-name qwen2 --model Qwen/Qwen2-72B-Instruct --tensor_parallel_size 4 --gpu-memory-utilization 0.9 --max-model-len 2048
'''

def build_origin_critiques(query, chosen_response, rejected_response):
    # 适配中文数据集，中文样本使用中文批判prompt
    sys_msg = SYS_MSG_CHINESE if is_chinese_text(query+chosen_response+rejected_response) else SYS_MSG_ENGLISH
    messages = [{"role": "system", "content": sys_msg}]
    messages.append({"role": "user", "content": "<question>" + query + "</question>\n<chosen>" + chosen_response + "</chosen>\n<rejected>" + rejected_response + "</rejected>\n"})
    return messages

original_dataset = datasets.load_dataset(original_data_path, split='train')
print(f'中英文翻译数据集数量：{len(original_dataset)}')

data_list = [build_origin_critiques(d['chosen'][0]['content'], d['chosen'][1]['content'], d['rejected'][1]['content']) for d in original_dataset]
print(f'data_list数量：{len(data_list)}')

中英文翻译数据集数量：81973
data_list数量：81973


In [None]:
# step1: 批量生成所有数据的批判
from multi_process_reason_for_vllm import distributed_inference_parallel

critique_path = 'tmp/pure_critique.json'  # 保存文件名
batch_size = 500  # 每批的样本数，分批次推理写入本地文件，防止中途故障数据丢失
workers_per_node = 32   # 多线程调用大模型的线程数
model_name = "qwen2"  # 大模型名称，vllm部署时的名称
api_bases = [  # API节点，可输入多个节点
    "http://0.0.0.0:5001/v1",
    # "http://0.0.0.0:5002/v1",
    # "http://0.0.0.0:5003/v1",
    # "http://0.0.0.0:5004/v1"
]
all_results = distributed_inference_parallel(
    data_list=data_list[:100],
    api_bases=api_bases,
    model_name=model_name,
    batch_size=batch_size,
    workers_per_node=workers_per_node,
    save_file_name=critique_path,
)

In [77]:
# 对生成的批判进行多种启发式规则检验，反复执行
import json
from json_repair import repair_json

from utils import try_parse_json_object, read_json, write_json
from transformers import AutoTokenizer

# 使用tokenizer计算长度，英文字符使用len判断长度不准
tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen2.5-7B-Instruct')

def json_format_check(critiques):
    # 正确条件：
    # 1. 是字典
    # 2. `有且仅有`chosen critiques和rejected critiques
    # 3. chosen critiques和rejected critiques都不为空
    # 4. chosen critiques和rejected critiques长度都大于20
    if type(critiques) == dict:
        return critiques
    try:
        _, r = try_parse_json_object(critiques)
    except:
        r = json.loads(repair_json(critiques))
    if type(r) == dict and 'chosen critiques' in r and 'rejected critiques' in r and r['chosen critiques'] and r['rejected critiques'] and len(r.keys()) == 2 and len(r['chosen critiques']) > 20 and len(r['rejected critiques']) > 20:
        return r
    else:
        return None

# 长度检查
def token_len_check(critiques):
    # 太短和太长的多数是有问题的，被误伤也不差这几条
    if critiques is None:
        return False
    token1 = len(tokenizer.encode(critiques['chosen critiques']))
    token2 = len(tokenizer.encode(critiques['rejected critiques']))
    if min(token1, token2) < 30 or max(token1, token2) > 1000:
        return False
    return True

# 生成格式错误但被json正确解码
def custom_check1(critiques):
    if critiques is None:
        return False
    if '[rejected critiques]' in str(critiques) or '[chosen critiques]' in str(critiques):
        return False
    return True

# 去除生成不完整的（没有以正确的标点符号结尾，或以...省略号结尾）
def custom_check2(critiques):
    if critiques is None:
        return False
    chosen = critiques['chosen critiques'].strip()
    rejected = critiques['rejected critiques'].strip()
    if chosen[-1] not in ['。', '.'] or rejected[-1] not in ['。', '.']:
        return False
    if chosen.endswith('...') or rejected.endswith('...'):
        return False
    return True


critiques = read_json(critique_path)
critiques = [json_format_check(c) for c in critiques]  # 将字符串形式解码成json，解码失败的返回None

error_idx1 = [i for i, c in enumerate(critiques) if not token_len_check(c)]  # 长度不符合要求的索引
error_idx2 = [i for i, c in enumerate(critiques) if not custom_check1(c)]  # 生成格式错误但被json正确解码的索引
error_idx3 = [i for i, c in enumerate(critiques) if not custom_check2(c)]  # 去除生成不完整的索引

error_idx = error_idx1 + error_idx2 + error_idx3
error_idx = sorted(list(set(error_idx)))
print(f'不符合要求的索引数量：{len(error_idx)}')
print(f'各子判断索引数量：', len(error_idx1), len(error_idx2), len(error_idx3))
write_json(critiques, critique_path)

原始数据集数量：98876
不符合要求的索引数量：479
各子判断索引数量： 456 454 477


In [None]:
'''
**注意注意**
step2: 如果上一步有错误的批判数量过多，可重新调用模型生成，生成完之后再执行上一步筛选错误批判
如果数量较少可以直接丢掉，下一步可过滤 error_idx 之后保存带批判的数据集
这块生成完之后再次运行上一个代码块，再次找出剩余的错误批判
'''
save_file_name = 'tmp/tmp_critique.json'  # 这次保存一个临时文件名，生成完之后直接写入到原来的 pure_critique 中
batch_size = 500  # 每批的样本数
workers_per_node = 32   # 多线程调用大模型的线程数
model_name = "qwen2"  # 大模型名称
api_bases = [  # API节点
    "http://0.0.0.0:5001/v1",
    # "http://0.0.0.0:5002/v1",
    # "http://0.0.0.0:5003/v1",
    # "http://0.0.0.0:5004/v1"
]
data_list = [data_list[i] for i in error_idx]
all_results = distributed_inference_parallel(
    data_list=data_list,
    api_bases=api_bases,
    model_name=model_name,
    batch_size=batch_size,
    workers_per_node=workers_per_node,
    save_file_name=save_file_name,
)

# 赋值回原来的批判中
for i in range(len(all_results)):
    critiques[error_idx[i]] = all_results[i]
write_json(critiques, critique_path)

In [None]:
# 重构批判和sky原始数据集，将其整理成可以进行cloud第一阶段训练的格式
from utils import is_chinese_text
# 原始数据集
original_dataset = datasets.load_dataset(original_data_path, split='train')
print(f'原始数据集数量：{len(original_dataset)}')

critiques = read_json(critique_path)
print('新生成的批判数量：', len(critiques))

COT_PROMPT_ENGLISH = "The following is a break down on the helpfulness and safety of the assistant's response to my question: "
COT_PROMPT_CHINESE = "以下是assistant对问题回答的帮助性和安全性的分解："

def get_sft1_data(query, response, critique):
    input = "<question>" + query + "</question>\n<response>" + response + "</response>"
    input += COT_PROMPT_CHINESE if is_chinese_text(input) else COT_PROMPT_ENGLISH
    return {
        "instruction": '',
        "input": input,
        "output": critique
    }

sft1_dataset = []
for i, d in enumerate(original_dataset):
    if i in error_idx:  # 到最后还是不符合要求的索引，直接丢掉
        continue
    sft1_dataset.append(get_sft1_data(d['chosen'][0]['content'], d['chosen'][1]['content'], critiques[i]['chosen critiques']))
    sft1_dataset.append(get_sft1_data(d['chosen'][0]['content'], d['rejected'][1]['content'], critiques[i]['rejected critiques']))

write_json(sft1_dataset, critique_data_path)
print('最终训练集数量：', len(sft1_dataset))

# 2. 第一阶段的训练权重+原模型权重 重组

In [None]:
# 基于qwen2.5原有配置文件，和checkpoint中新训练的权重，生成新的权重文件
import os
import shutil
from tqdm import tqdm

train_checkpoint_dir = 'save/cloud_sft1_0825/checkpoint-9500'
llm_original_dir = 'Qwen/Qwen2.5-7B-Instruct'
new_checkpoint_dir = 'model_checkpoint/cloud_sft1_0825'

os.makedirs(new_checkpoint_dir, exist_ok=True)

# 将qwen2.5中的tokenizer等其他文件复制过去
for file in tqdm(os.listdir(path=llm_original_dir), desc="复制其他文件"):
    if file.startswith('model') or os.path.isdir(os.path.join(llm_original_dir, file)):
        continue
    shutil.copy(os.path.join(llm_original_dir, file), os.path.join(new_checkpoint_dir, file))

# 将新训练的权重复制过去
for file in tqdm(os.listdir(path=train_checkpoint_dir), desc="复制新训练的权重"):
    if file.startswith('model'):
        shutil.copy(os.path.join(train_checkpoint_dir, file), os.path.join(new_checkpoint_dir, file))

# 3. 生成自批判

In [None]:
'''vllm 部署，小模型可部署多个节点
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m vllm.entrypoints.openai.api_server --host 0.0.0.0 --port 5001 --served-model-name qwen2 --model model_checkpoint/cloud_sft1_0825 --tensor_parallel_size 4 --gpu-memory-utilization 0.9
CUDA_VISIBLE_DEVICES=4,5,6,7 python -m vllm.entrypoints.openai.api_server --host 0.0.0.0 --port 5002 --served-model-name qwen2 --model model_checkpoint/cloud_sft1_0825 --tensor_parallel_size 4 --gpu-memory-utilization 0.9
'''

In [5]:
train_original_path = 'data/skyword_original_critique.json'
train_selfgen_path  = 'data/skyword_selfgen_critique.json'

In [13]:
from utils import read_json, call_model

def get_datalist(data):
    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": data['input']},
    ]
    return messages

sft1_dataset = read_json(train_original_path)
data_list = [get_datalist(d) for d in sft1_dataset]
print(f'需要生成自批判的样本数量：{len(data_list)}')

需要生成自批判的样本数量：200


In [None]:
# 可简单测试一条，看看输出效果是否符合预期
from openai import OpenAI

def call_model(messages):
    model_name = 'qwen2'
    openai_api_key = "EMPTY"
    openai_api_base = "http://0.0.0.0:5004/v1"
    client = OpenAI(
        api_key=openai_api_key,
        base_url=openai_api_base,
    )
    chat_response = client.chat.completions.create(
        model=model_name,
        messages=messages,
    ).choices[0].message.content
    return chat_response

call_model(data_list[0])

"In terms of helpfulness, the response provides a comprehensive overview of C#, detailing its object-oriented nature, cross-platform capabilities, and strong typing system. It also elucidates the language's event-driven model and garbage collection, which are crucial for building robust applications. The explanation is enriched by highlighting C#'s role in various application domains, such as web, desktop, and mobile applications, and games. Regarding safety, the response avoids any misleading claims about C#'s capabilities and limitations, ensuring a balanced perspective. It also mentions the community-driven nature of C#, which is important for understanding its ecosystem and support."

In [20]:
"""step1: 批量生成所有数据的批判"""
import os
from multi_process_reason_for_vllm import distributed_inference_parallel
from utils import read_json, write_json
os.environ["TOKENIZERS_PARALLELISM"] = "false"

critique_path = 'tmp/pure_critique.json'  # 保存文件名
batch_size = 500  # 每批的样本数
workers_per_node = 32   # 多线程调用大模型的线程数
model_name = "qwen2"  # 大模型名称
max_tokens = 3000
api_bases = [  # API节点
    "http://0.0.0.0:5001/v1",
    # "http://0.0.0.0:5002/v1",
    # "http://0.0.0.0:5003/v1",
    # "http://0.0.0.0:5004/v1"
]
print(f'data_list数量：{len(data_list)}')
critiques = distributed_inference_parallel(
    data_list=data_list,
    api_bases=api_bases,
    model_name=model_name,
    max_tokens=max_tokens,
    batch_size=batch_size,
    workers_per_node=workers_per_node,
    save_file_name=critique_path,
)

data_list数量：200
并行处理: 跨1个节点以1批处理200项数据


批次完成进度: 100%|██████████| 1/1 [00:14<00:00, 14.13s/it]


In [1]:
"""对生成的批判进行多种规则检验，反复执行"""

from utils import write_json, read_json
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('/data0/zejun7/model_checkpoint/Qwen2.5-7B-Instruct')

# 长度检查
def token_len_check(critiques):
    if critiques is None:
        return False
    token = len(tokenizer.encode(critiques))
    if token < 30 or token > 1000:
        return False
    return True

# 生成格式错误但被json正确解码
def custom_check1(critiques):
    if critiques is None:
        return False
    if '[rejected critiques]' in str(critiques) or '[chosen critiques]' in str(critiques):
        return False
    return True

# 去除生成不完整的（没有以正确的标点符号结尾，或以...省略号结尾）
def custom_check2(critiques):
    if critiques is None:
        return False
    if not critiques:
        return False
    critiques = critiques.strip()
    if critiques[-1] not in ['。', '.'] or critiques.endswith('...'):
        return False
    return True

# 这次没有json校验，直接对critiques进行校验
critiques = read_json(critique_path)
print(f'critiques数量：{len(critiques)}')
error_idx1 = [i for i, c in enumerate(critiques) if not token_len_check(c)]  # 长度不符合要求的索引
error_idx2 = [i for i, c in enumerate(critiques) if not custom_check1(c)]  # 生成格式错误但被json正确解码的索引
error_idx3 = [i for i, c in enumerate(critiques) if not custom_check2(c)]  # 去除生成不完整的索引

error_idx = error_idx1 + error_idx2 + error_idx3
error_idx = sorted(list(set(error_idx)))
print(f'不符合要求的索引数量：{len(error_idx)}')
print(f'各子判断索引数量：', len(error_idx1), len(error_idx2), len(error_idx3))

  from .autonotebook import tqdm as notebook_tqdm


critiques数量：566004
不符合要求的索引数量：28
各子判断索引数量： 5 0 26


In [21]:
'''
**注意注意**
step2: 如果上一步有错误的批判数量过多，可重新调用模型生成，生成完之后再执行上一步筛选错误批判
如果数量较少可以直接丢掉，下一步可过滤 error_idx 之后保存带批判的数据集
这块生成完之后再次运行上一个代码块，再次找出剩余的错误批判
'''
batch_size = 500  # 每批的样本数
workers_per_node = 32   # 多线程调用大模型的线程数
model_name = "qwen2"  # 大模型名称
api_bases = [  # API节点
    "http://0.0.0.0:5001/v1",
    # "http://0.0.0.0:5002/v1",
    # "http://0.0.0.0:5003/v1",
    # "http://0.0.0.0:5004/v1"
]
data_list = [data_list[i] for i in error_idx]
all_results = distributed_inference_parallel(
    data_list=data_list,
    api_bases=api_bases,
    model_name=model_name,
    batch_size=batch_size,
    workers_per_node=workers_per_node,
    save_file_name=critique_path,
)

# 赋值回原来的批判中
for i in range(len(all_results)):
    critiques[error_idx[i]] = all_results[i]
write_json(critiques, critique_path)

并行处理: 跨1个节点以6批处理512项数据


批次完成进度:  67%|██████▋   | 4/6 [05:03<01:49, 54.52s/it] 

API请求失败: Request timed out.


批次完成进度:  83%|████████▎ | 5/6 [35:09<11:26, 686.01s/it]

API请求失败: Request timed out.


批次完成进度: 100%|██████████| 6/6 [1:05:13<00:00, 652.24s/it] 


In [23]:
"""将子批判数据整理成 成对数据格式"""

sft1_dataset = read_json(file_path=train_original_path)
print(f'sft1数据集数量：{len(sft1_dataset)}')

critiques = read_json(critique_path)
print('新生成的自批判数量：', len(critiques))

sft2_dataset = []
for i in range(0, len(sft1_dataset), 2):
    if i in error_idx or i+1 in error_idx:
        continue
    sft2_dataset.append({
        'chosen_input': sft1_dataset[i]['input'],
        'rejected_input': sft1_dataset[i+1]['input'],
        'chosen_critique': critiques[i],
        'rejected_critique': critiques[i+1],
    })
print(f'sft2数据集数量：{len(sft2_dataset)}')
write_json(sft2_dataset, file_path=train_selfgen_path)

sft1数据集数量：200
新生成的自批判数量： 200
sft2数据集数量：100
