# 中文角色扮演微调

**目标**：在有限计算资源（Colab 免费 GPU）下，对 Qwen3-1.7B 做小规模 SFT，使其能根据 `instruction`（角色设定）与 `input` 进行一轮符合设定的对话并输出 `output`。

## 1. 准备工作

In [None]:
!nvidia-smi

In [None]:
!git clone https://github.com/chenkx612/Qwen3-Roleplay-SFT.git
%cd Qwen3-Roleplay-SFT

In [None]:
!pip install -q bitsandbytes

In [None]:
import os
import torch

OUTPUT_DIR = "./checkpoints"
ADAPTER_DIR = "./adapter"
SEED = 42

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(ADAPTER_DIR, exist_ok=True)
torch.manual_seed(SEED)

## 2. 加载 LLM & Tokenizer & LoRA

In [None]:
from transformers import BitsAndBytesConfig, AutoModelForCausalLM

MODEL_NAME = "Qwen/Qwen3-1.7B"
CACHE_DIR = "/content/hf_cache"  # 本地缓存目录
USE_4BIT = True

if USE_4BIT:
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_quant_type="nf4"
    )
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        cache_dir=CACHE_DIR,
        trust_remote_code=True,                 # 允许执行模型 repo 的自定义代码
        device_map="auto",                      # 自动把模型切到可用设备/做分配
        quantization_config=bnb_config          # bitsandbytes 的量化配置
    )
else:
    # 优先用 fp16 在 GPU 上加载，降低显存占用（如果没有 GPU，会回退到 cpu）
    dtype = torch.float16 if torch.cuda.is_available() else None
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_NAME,
        cache_dir=CACHE_DIR,
        trust_remote_code=True,
        device_map='auto',
        torch_dtype=dtype
    )

print('Loaded model class:', model.__class__)
print(model.device)

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(
    MODEL_NAME, cache_dir=CACHE_DIR, trust_remote_code=True
)

In [None]:
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training

if USE_4BIT:
    model = prepare_model_for_kbit_training(model)

lora_config = LoraConfig(
    r=8,
    lora_alpha=32,
    target_modules=[
        "q_proj", "k_proj", "v_proj",
        "o_proj", "gate_proj", "up_proj", "down_proj"
    ],
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)
model = get_peft_model(model, lora_config)

In [None]:
# 打印可训练参数供检查（LoRA 只激活小部分参数）
from utils import print_trainable_parameters
print_trainable_parameters(model)


## 3. 加载并筛选数据集

In [None]:
from datasets import load_dataset, Dataset
from utils import total_length

TOP_N = 100
MAX_LEN = 500
MIN_LEN = 100

raw_ds = load_dataset("LooksJuicy/Chinese-Roleplay-SingleTurn")
train_raw = raw_ds['train']

def is_valid_sample(sample, min_len=MIN_LEN, max_len=MAX_LEN):
    total_len = total_length(sample)
    if total_len < min_len or total_len > max_len:
        return False
    if (
        not sample['instruction'].strip() or 
        not sample['input'].strip() or 
        not sample['output'].strip()
    ):
        return False
    return True

filtered_samples = [s for s in train_raw if is_valid_sample(s)]
filtered_samples = sorted(filtered_samples, key=total_length, reverse=True)
train_ds = Dataset.from_list(
    filtered_samples[:min(TOP_N, len(filtered_samples))]
)
print(f"筛选后样本数: {len(train_ds)}")
print(train_ds[0])

## 4. Chat Template

In [None]:
def format_roleplay(example, include_assistant=True):
    """
    将 instruction 作为角色设定放入 system, input 作为 user, output 作为 assistant.
    include_assistant: 当为 False 时，会省略 assistant 参考答案（用于推理/测试）

    返回值: dict, 包含 'full_text'（用于 tokenization 的完整对话文本）和
    'assistant_text' (参考答案，仅训练时用于生成 labels) 。
    """
    instr = example.get("instruction", "").strip() or "<未提供角色设定>"
    user_input = example.get("input", "").strip() or "<无用户输入>"
    assistant_output = example.get("output", "").strip() or ""

    system_prompt = (
        "你将扮演由下方“角色设定”描述的角色。"
        "始终以该角色的第一人称身份回答，保持角色的语气、知识和情感一致，增强代入感。"
        f"\n\n角色设定：\n{instr}\n\n"
    )

    def _build_conversation(include_assistant_flag: bool):
        conv = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_input},
        ]
        if include_assistant_flag and assistant_output:
            conv.append({"role": "assistant", "content": assistant_output})
        return conv

    full_text = tokenizer.apply_chat_template(
        conversation=_build_conversation(include_assistant), tokenize=False
    )

    return {"full_text": full_text, "assistant_text": assistant_output}


In [None]:
train_ds = train_ds.map(format_roleplay, remove_columns=train_ds.column_names)
print(train_ds[0])


## 5. 微调前测试

In [None]:
# 载入 samples.json 并对每个样例在微调前进行一次推理，保存结果到 pre_results
import json
from pathlib import Path

MAX_LENGTH = 1024
DO_SAMPLE = True
DECODE_TEMPERATURE = 1.0
DECODE_TOP_K = 50
DECODE_TOP_P = 0.9
DECODE_MAX_NEW_TOKENS = 256

samples_path = Path('samples.json')
if not samples_path.exists():
    raise FileNotFoundError(f'samples.json not found at {samples_path.resolve()}')
samples = json.loads(samples_path.read_text(encoding='utf-8'))

# 复用统一的 format_roleplay，但在测试/推理时不包含参考答案
def build_prompt_from_sample(s):
    out = format_roleplay(s, include_assistant=False)
    return out['full_text'] if isinstance(out, dict) else out

def generate_for_prompt(prompt):
    # 批量/动态填充并移动到模型所在设备
    inputs = tokenizer(
        prompt, return_tensors='pt', truncation=True, 
        padding=True, max_length=MAX_LENGTH
    ).to(model.device)
    model.eval()
    with torch.no_grad():
        gen = model.generate(
            **inputs, max_new_tokens=DECODE_MAX_NEW_TOKENS,
            do_sample=DO_SAMPLE, temperature=DECODE_TEMPERATURE,
            top_k=DECODE_TOP_K, top_p=DECODE_TOP_P
        )
    text = tokenizer.batch_decode(gen, skip_special_tokens=True)[0]
    return text

pre_results = []
for s in samples:
    prompt = build_prompt_from_sample(s)
    out = generate_for_prompt(prompt)
    pre_results.append(out)

print('微调前测试完成，样本数：', len(pre_results))


## 6. 微调

In [None]:
from transformers import Trainer, TrainingArguments, default_data_collator
from utils import find_sublist

PER_DEVICE_BATCH_SIZE = 2
GRADIENT_ACCUMULATION_STEPS = 4
NUM_EPOCHS = 2
LEARNING_RATE = 2e-4

# 查看 train_ds 的列，选择第一个字符串列作为文本来源（对不同数据结构做兼容处理）
print('train_ds columns:', train_ds.column_names)

def _get_text_list(batch):
    # batch 是一个 dict，值是 list。寻找 'full_text' 列优先，其次回退到第一个字符串列
    if 'full_text' in batch:
        return batch['full_text']
    for k, v in batch.items():
        if isinstance(v, list) and v and isinstance(v[0], str):
            return v
    # 兜底：把第一个列强转为字符串列表
    first = next(iter(batch.items()))
    return [str(x) for x in first[1]]

def tokenize_function(examples):
    # texts: 已包含 assistant（训练时）或不包含（用于推理时）
    texts = _get_text_list(examples)
    tokenized = tokenizer(
        texts, truncation=True, padding='max_length', max_length=MAX_LENGTH
    )

    # 获取对应的 assistant_text 列（可能为空字符串列表）
    assistant_texts = examples.get('assistant_text', [''] * len(texts))
    # tokenize assistant_texts without special tokens to get token ids sequence
    assistant_tokenized = tokenizer(assistant_texts, add_special_tokens=False).input_ids

    pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0

    labels = []
    for input_ids, assist_ids in zip(tokenized['input_ids'], assistant_tokenized):
        # 默认全部 -100
        lab = [-100] * len(input_ids)
        if assist_ids:
            # 在 full input_ids 中寻找 assist_ids 子序列
            start = find_sublist(input_ids, assist_ids)
            if start != -1:
                for i in range(start, start + len(assist_ids)):
                    if i < len(lab):
                        lab[i] = input_ids[i]
            else:
                # 未找到时：尝试在去掉 padding 后的末尾区域对齐
                real_len = len(input_ids)
                while real_len > 0 and input_ids[real_len-1] == pad_id:
                    real_len -= 1
                start = max(0, real_len - len(assist_ids))
                for i in range(start, real_len):
                    lab[i] = input_ids[i]
        # else assistant 为空，保持全 -100
        labels.append(lab)

    tokenized['labels'] = labels
    return tokenized

# tokenized_ds 用于训练。remove_columns 保留 label/input_ids，不会丢失需要的列
tokenized_ds = train_ds.map(
    tokenize_function, batched=True, remove_columns=train_ds.column_names
)
print('tokenized_ds example:', tokenized_ds[0])


In [None]:
training_args = TrainingArguments(
    output_dir='./checkpoints',
    per_device_train_batch_size=PER_DEVICE_BATCH_SIZE,
    gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS,
    num_train_epochs=NUM_EPOCHS,
    learning_rate=LEARNING_RATE,
    fp16=torch.cuda.is_available(),
    logging_steps=10,
    save_total_limit=2,
    save_strategy='epoch',
    remove_unused_columns=False,
    report_to="none"  # 关闭wandb日志
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_ds,
    data_collator=default_data_collator,
    tokenizer=tokenizer,
)

# 开始训练
train_result = trainer.train()
print('train_result:', train_result)


## 7. 微调后测试

In [None]:
# 对 samples.json 再次推理，收集 post_results 并与 pre_results 并列展示
post_results = []
for s in samples:
    prompt = build_prompt_from_sample(s)
    out = generate_for_prompt(prompt)
    post_results.append(out)


In [None]:
import pandas as pd
from IPython.display import display

table_data = []
for i, sample in enumerate(samples):
    table_data.append({
        '角色': sample['name'],
        '输入': sample['input'],
        '微调前': pre_results[i] if i < len(pre_results) else '',
        '微调后': post_results[i] if i < len(post_results) else ''
    })
df = pd.DataFrame(table_data, columns=['角色','输入','微调前','微调后'])
display(df)

## 8. 保存

In [None]:
import shutil

# 仅保存 LoRA adapter（不保存基模型或 tokenizer），以节省存储空间。
# model 是经过 get_peft_model 包装的 PeftModel，save_pretrained 仅会保存 adapter 权重和配置。
model.save_pretrained(ADAPTER_DIR)
print('\n保存 LoRA adapter 至:', ADAPTER_DIR)

# 把保存的文件打包，方便下载
archive_path = shutil.make_archive(ADAPTER_DIR, 'zip', ADAPTER_DIR)
print('\n已将 adapter 打包为:', archive_path)