### TSC 交通信号控制微调 - Non-Thinking 版本


### Installation


In [None]:
import os
os.environ["UNSLOTH_VLLM_STANDBY"] = "1" # [NEW] Extra 30% context lengths! # To enable memory efficient GRPO with vLLM
os.environ["UNSLOTH_USE_MODELSCOPE"] = "1"

In [None]:
import subprocess 
import os 
result = subprocess.run('bash -c "source /etc/network_turbo && env | grep proxy"', shell=True, capture_output=True, text=True)
output = result.stdout
for line in output.splitlines():
	if '=' in line:
		var, value = line.split('=', 1)
		os.environ[var] = value


### 加载模型


In [None]:
from unsloth import FastLanguageModel
import torch

max_seq_length = 2048
lora_rank = 32

os.environ["HF_HOME"] = 'model'
os.environ["MODELSCOPE_CACHE"] = 'model'

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = "unsloth/Qwen3-4B-Instruct-2507",
    max_seq_length = max_seq_length,
    load_in_4bit = False,
    fast_inference = True,
    max_lora_rank = lora_rank,
    gpu_memory_utilization = 0.85,
)

model = FastLanguageModel.get_peft_model(
    model,
    r = lora_rank,
    target_modules = [
        "q_proj", "k_proj", "v_proj", "o_proj",
        "gate_proj", "up_proj", "down_proj",
    ],
    lora_alpha = lora_rank*2,
    use_gradient_checkpointing = "unsloth",
    random_state = 3407,
)


### 加载 TSC 数据集


In [None]:
import json
import re
from datasets import Dataset
from sklearn.model_selection import train_test_split

# 加载 TSC 数据集
with open('./data_TSC/tsc_sft_dataset.json', 'r', encoding='utf-8') as f:
    tsc_data = json.load(f)

print(f"总数据量: {len(tsc_data)}")

# 分割数据集：95% 训练，5% 测试
train_data, test_data = train_test_split(tsc_data, test_size=0.05, random_state=42)

print(f"训练集大小: {len(train_data)}")
print(f"测试集大小: {len(test_data)}")


In [None]:
# 提取答案函数 - 严格格式要求
def extract_phase_answer(text: str) -> str | None:
    """从输出中提取相位数字，严格要求格式为：下一个信号相位：数字"""
    # 只匹配严格格式：下一个信号相位：数字
    pattern = r'下一个信号相位[:：]\s*(\d+)'
    match = re.search(pattern, text)
    if match:
        return match.group(1)
    return None

# 准备训练数据集
def prepare_dataset(data):
    dataset_list = []
    for item in data:
        # 修改系统提示，强制格式为 "下一个信号相位：数字"
        system_prompt = "你是一位交通管理专家。你可以运用你的交通常识知识来解决交通信号控制任务。根据给定的交通场景和状态，预测下一个信号相位。你必须直接回答，格式必须是：下一个信号相位：{数字}（其中数字是0-9之间的单个数字）"
        
        prompt = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": item["input"]},
        ]
        answer = extract_phase_answer(item["output"])
        dataset_list.append({
            "prompt": prompt,
            "answer": answer,
        })
    return Dataset.from_list(dataset_list)

train_dataset = prepare_dataset(train_data)
test_dataset = prepare_dataset(test_data)

print(f"训练集样例:")
print(f"Prompt: {train_dataset[0]['prompt']}")
print(f"Answer: {train_dataset[0]['answer']}")


### 微调前测试模型准确率


In [None]:
from tqdm import tqdm

def evaluate_model(model, tokenizer, test_dataset, max_samples=100):
    """评估模型在测试集上的准确率"""
    correct = 0
    total = 0
    
    # 只测试前 max_samples 个样本以节省时间
    test_samples = min(max_samples, len(test_dataset))
    
    FastLanguageModel.for_inference(model)  # 启用推理模式
    
    for i in tqdm(range(test_samples), desc="评估中"):
        item = test_dataset[i]
        
        # 构建输入
        messages = item['prompt']
        inputs = tokenizer.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_tensors="pt"
        ).to(model.device)
        
        # 生成回答
        outputs = model.generate(
            input_ids=inputs,
            max_new_tokens=128,
            temperature=0.7,
            do_sample=True,
            pad_token_id=tokenizer.pad_token_id,
        )
        
        # 解码输出
        response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
        
        # 提取预测的相位
        predicted_phase = extract_phase_answer(response)
        true_phase = item['answer']
        
        if predicted_phase == true_phase:
            correct += 1
        total += 1
        
        # 打印前5个样例
        if i < 5:
            print(f"\n样例 {i+1}:")
            print(f"真实相位: {true_phase}")
            print(f"预测相位: {predicted_phase}")
            print(f"模型回答: {response[:200]}...")
    
    accuracy = correct / total if total > 0 else 0
    print(f"\n准确率: {accuracy:.2%} ({correct}/{total})")
    return accuracy

print("="*50)
print("微调前模型准确率:")
print("="*50)
accuracy_before = evaluate_model(model, tokenizer, test_dataset, max_samples=500)


### 定义奖励函数


In [None]:
# 验证输出格式
def is_valid_format(text: str) -> bool:
    """验证文本是否满足严格格式：下一个信号相位：数字"""
    pattern = r'^下一个信号相位[:：]\s*\d+\s*$'
    return bool(re.match(pattern, text.strip()))

# 奖励函数：检查预测的相位是否正确
def correctness_reward_func(prompts, completions, answer, **kwargs) -> list[float]:
    responses = [completion[0]["content"] for completion in completions]
    q = prompts[0][-1]["content"][:100]  # 只显示前100字符
    extracted_responses = [extract_phase_answer(r) for r in responses]
    
    print(
        "-" * 20,
        f"\n问题:\n{q}...",
        f"\n正确答案:\n{answer[0]}",
        f"\n模型回答:\n{responses[0][:150]}...",
        f"\n提取结果:\n{extracted_responses[0]}",
    )
    
    return [2.0 if r == a else 0.0 for r, a in zip(extracted_responses, answer)]

# 奖励函数：严格验证格式 - 下一个信号相位：数字
def format_reward_func(completions, **kwargs) -> list[float]:
    responses = [completion[0]["content"] for completion in completions]
    # 严格检查格式是否为 "下一个信号相位：数字"
    rewards = []
    for r in responses:
        if is_valid_format(r):
            rewards.append(0.5)  # 格式正确得到1.0分
        else:
            rewards.append(0.0)  # 格式不正确得到0分
    return rewards

# 奖励函数：检查答案是否为数字
def digit_reward_func(completions, **kwargs) -> list[float]:
    responses = [completion[0]["content"] for completion in completions]
    extracted_responses = [extract_phase_answer(r) for r in responses]
    return [0.1 if r and r.isdigit() else 0.0 for r in extracted_responses]


### 配置并开始 GRPO 训练


In [None]:
from trl import GRPOConfig, GRPOTrainer

max_prompt_length = 512  # TSC 的输入比较长

training_args = GRPOConfig(
    learning_rate=5e-6,
    adam_beta1=0.9,
    adam_beta2=0.99,
    weight_decay=0.1,
    warmup_ratio=0.1,
    lr_scheduler_type="cosine",
    optim="paged_adamw_8bit",
    logging_steps=1,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=2,
    num_generations=6,
    max_prompt_length=max_prompt_length,
    max_completion_length=max_seq_length - max_prompt_length,
    max_steps=100,  # 根据需要调整
    save_steps=100,
    max_grad_norm=0.1,
    report_to="none",
    output_dir="outputs_tsc",
)

trainer = GRPOTrainer(
    model=model,
    processing_class=tokenizer,
    reward_funcs=[
        correctness_reward_func,
        digit_reward_func,
        format_reward_func,
    ],
    args=training_args,
    train_dataset=train_dataset,
)


In [None]:
print("开始训练...")
trainer.train()


In [None]:
### 检查训练后的权重是否被修改
import torch

print("检查微调后的权重...")
print("="*50)

# 获取 LoRA 模块的权重
lora_modules_with_weights = {}
for name, module in model.named_modules():
    if hasattr(module, 'lora_A') and hasattr(module, 'lora_B'):
        # 检查权重是否被修改（非零）
        A_norm = torch.norm(module.lora_A.default.weight if hasattr(module.lora_A, 'default') else module.lora_A.weight).item()
        B_norm = torch.norm(module.lora_B.default.weight if hasattr(module.lora_B, 'default') else module.lora_B.weight).item()
        
        if A_norm > 0 or B_norm > 0:
            lora_modules_with_weights[name] = (A_norm, B_norm)
            print(f"✓ {name}")
            print(f"  - lora_A norm: {A_norm:.6f}")
            print(f"  - lora_B norm: {B_norm:.6f}")

if not lora_modules_with_weights:
    print("⚠️  警告: 没有找到非零的 LoRA 权重！")
else:
    print(f"\n总共找到 {len(lora_modules_with_weights)} 个有非零权重的 LoRA 模块")

print("="*50)


### 保存模型


In [None]:
### 改进的权重保存方式 - 新的Cell
from peft import get_peft_model_state_dict
import os

print("保存微调后的权重...")
print("="*50)

# 方法1: 保存 LoRA 权重
save_dir_1 = "tsc_grpo_saved_lora_v2"
if os.path.exists(save_dir_1):
    import shutil
    shutil.rmtree(save_dir_1)

model.save_pretrained(save_dir_1)
print(f"✓ 已使用 save_pretrained 保存权重到: {save_dir_1}")

# 方法2: 保存完整的状态字典
save_dir_2 = "tsc_grpo_saved_lora_state_dict"
if os.path.exists(save_dir_2):
    import shutil
    shutil.rmtree(save_dir_2)
os.makedirs(save_dir_2, exist_ok=True)

# 保存 LoRA 状态字典
lora_state_dict = get_peft_model_state_dict(model)
torch.save(lora_state_dict, os.path.join(save_dir_2, "lora_state_dict.pt"))
print(f"✓ 已保存 LoRA 状态字典到: {save_dir_2}")

# 验证保存的权重是否非零
print("\n验证保存的权重...")
for key in list(lora_state_dict.keys())[:5]:
    val_norm = torch.norm(lora_state_dict[key]).item()
    print(f"  - {key}: norm = {val_norm:.6f}")

print("="*50)


In [None]:
model.save_lora("tsc_grpo_saved_lora")
print("模型已保存到 tsc_grpo_saved_lora")


In [None]:
### 改进的权重加载和评估方式
from peft import PeftModel
import os

print("\n改进的权重加载方式...")
print("="*50)

# 获取原始基础模型
from transformers import AutoModelForCausalLM, AutoTokenizer

# 从保存的 LoRA 适配器加载
lora_model_id = "tsc_grpo_saved_lora_v2"

if os.path.exists(lora_model_id):
    print(f"从 {lora_model_id} 加载 LoRA 权重...")
    
    # 获取基础模型（当前模型）
    base_model = model.base_model
    
    # 使用 PeftModel 加载 LoRA 权重
    model_with_lora = PeftModel.from_pretrained(base_model, lora_model_id)
    print(f"✓ 已成功加载 LoRA 权重")
    
    # 合并权重到基础模型中
    merged_model = model_with_lora.merge_and_unload()
    print(f"✓ 已将 LoRA 权重合并到基础模型")
    
    # 现在使用合并后的模型进行评估
    print("\n使用合并后的模型进行评估...")
    
    def evaluate_merged_model(model, tokenizer, test_dataset, max_samples=100):
        """使用合并后的模型评估准确率"""
        correct = 0
        total = 0
        
        test_samples = min(max_samples, len(test_dataset))
        FastLanguageModel.for_inference(model)
        
        for i in tqdm(range(test_samples), desc="评估中"):
            item = test_dataset[i]
            messages = item['prompt']
            inputs = tokenizer.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,
                return_tensors="pt"
            ).to(model.device)
            
            outputs = model.generate(
                input_ids=inputs,
                max_new_tokens=128,
                temperature=0.7,
                do_sample=True,
                pad_token_id=tokenizer.pad_token_id,
            )
            
            response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
            predicted_phase = extract_phase_answer(response)
            true_phase = item['answer']
            
            if predicted_phase == true_phase:
                correct += 1
            total += 1
            
            if i < 1:
                print(f"\n样例 {i+1}:")
                print(f"真实相位: {true_phase}")
                print(f"预测相位: {predicted_phase}")
                print(f"模型回答: {response[:200]}...")
        
        accuracy = correct / total if total > 0 else 0
        print(f"\n准确率: {accuracy:.2%} ({correct}/{total})")
        return accuracy
    
    accuracy_after_merged = evaluate_merged_model(merged_model, tokenizer, test_dataset, max_samples=500)
    
    print("\n" + "="*50)
    print("准确率对比（使用合并后的模型）")
    print("="*50)
    print(f"微调前准确率: {accuracy_before:.2%}")
    print(f"微调后准确率: {accuracy_after_merged:.2%}")
    print(f"提升幅度: {(accuracy_after_merged - accuracy_before):.2%}")
    print("="*50)
else:
    print(f"❌ 错误: 找不到 {lora_model_id} 目录")


### 微调后测试模型准确率


In [None]:
### 诊断：比较原始权重和微调后权重
import torch
import numpy as np

print("\n" + "="*60)
print("诊断：分析权重变化")
print("="*60)

# 1. 检查保存的 LoRA 权重文件
saved_lora_path = "tsc_grpo_saved_lora/adapter_model.safetensors"
if os.path.exists(saved_lora_path):
    print(f"\n✓ 发现保存的 LoRA 权重文件: {saved_lora_path}")
    
    # 使用 safetensors 加载
    from safetensors.torch import load_file
    lora_weights = load_file(saved_lora_path)
    
    print(f"  - 权重数量: {len(lora_weights)}")
    
    # 检查权重是否非零
    zero_count = 0
    nonzero_count = 0
    
    for key, val in lora_weights.items():
        if torch.sum(torch.abs(val)) == 0:
            zero_count += 1
        else:
            nonzero_count += 1
            norm = torch.norm(val).item()
            if nonzero_count <= 3:
                print(f"  - {key}: norm = {norm:.6f}")
    
    print(f"\n  非零权重数: {nonzero_count}")
    print(f"  零权重数: {zero_count}")
    
    if zero_count == len(lora_weights):
        print("\n⚠️  严重警告: 所有 LoRA 权重都是零！")
        print("  可能原因:")
        print("    1. 训练过程没有实际更新权重")
        print("    2. 学习率过低")
        print("    3. 优化器配置有问题")
        print("    4. 奖励信号不足以驱动学习")

else:
    print(f"\n❌ 找不到文件: {saved_lora_path}")

# 2. 检查保存的不同 checkpoint 的权重
print("\n" + "-"*60)
print("检查多个 checkpoints 的权重:")
print("-"*60)

for checkpoint_dir in ["outputs_tsc/checkpoint-250", "outputs_tsc/checkpoint-500"]:
    checkpoint_lora = f"{checkpoint_dir}/adapter_model.safetensors"
    if os.path.exists(checkpoint_lora):
        lora_weights = load_file(checkpoint_lora)
        
        nonzero_count = sum(1 for val in lora_weights.values() if torch.sum(torch.abs(val)) > 0)
        total_params = sum(val.numel() for val in lora_weights.values())
        
        print(f"\n{checkpoint_dir}:")
        print(f"  - 非零权重数: {nonzero_count}/{len(lora_weights)}")
        print(f"  - 总参数数: {total_params:,}")
        
        # 找到最大的权重值
        max_norm = max((torch.norm(val).item() for val in lora_weights.values()), default=0)
        print(f"  - 最大权重范数: {max_norm:.6f}")

print("\n" + "="*60)


In [6]:
print("微调后模型准确率 (使用保存的LoRA权重):")
print("="*50)

# 使用 vLLM 方式加载保存的 LoRA 权重
lora_request = model.load_lora("tsc_grpo_saved_lora")
print("✓ 已成功加载保存的 LoRA 权重")

# 使用加载的 LoRA 进行评估
from vllm import SamplingParams
from tqdm import tqdm

def evaluate_model_with_lora(model, tokenizer, test_dataset, lora_request, max_samples=100):
    """使用 vLLM 和加载的 LoRA 权重评估模型准确率"""
    correct = 0
    total = 0
    
    # 只测试前 max_samples 个样本以节省时间
    test_samples = min(max_samples, len(test_dataset))
    
    sampling_params = SamplingParams(
        temperature=0.7,
        max_tokens=128,
    )
    
    for i in tqdm(range(test_samples), desc="评估中"):
        item = test_dataset[i]
        
        # 构建输入
        messages = item['prompt']
        text = tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
        )
        
        # 使用 fast_generate 和 lora_request 生成回答
        output = model.fast_generate(
            text,
            sampling_params=sampling_params,
            lora_request=lora_request,
        )[0].outputs[0].text
        
        # 提取预测的相位
        predicted_phase = extract_phase_answer(output)
        true_phase = item['answer']
        
        if predicted_phase == true_phase:
            correct += 1
        total += 1
        
        # 打印前5个样例
        if i < 1:
            print(f"\n样例 {i+1}:")
            print(f"真实相位: {true_phase}")
            print(f"预测相位: {predicted_phase}")
            print(f"模型回答: {output[:200]}...")
    
    accuracy = correct / total if total > 0 else 0
    print(f"\n准确率: {accuracy:.2%} ({correct}/{total})")
    return accuracy

accuracy_after = evaluate_model_with_lora(model, tokenizer, test_dataset, lora_request, max_samples=500)


微调后模型准确率 (使用保存的LoRA权重):


NameError: name 'model' is not defined

In [5]:
### 【快速诊断】检查权重是否真的被微调

from safetensors.torch import load_file
import torch

print("\n" + "="*70)
print("【诊断】检查 LoRA 权重是否被微调")
print("="*70)

lora_weights = load_file("tsc_grpo_saved_lora/adapter_model.safetensors")

print(f"\n权重文件统计:")
print(f"  - 权重张量数: {len(lora_weights)}")

print(f"\n检查权重数值...")
nonzero_count = 0
weight_norms = []

for key, val in lora_weights.items():
    norm = torch.norm(val).item()
    weight_norms.append((key, norm))
    
    if norm > 0:
        nonzero_count += 1

# 排序并显示
weight_norms.sort(key=lambda x: x[1], reverse=True)

print(f"\n权重范数排序（前10个）:")
for i, (key, norm) in enumerate(weight_norms[:10], 1):
    status = "✓" if norm > 0 else "✗"
    print(f"  {i}. {status} {key}: {norm:.8f}")

print(f"\n统计结果:")
print(f"  ✓ 非零权重数: {nonzero_count}/{len(lora_weights)}")
print(f"  ✗ 零权重数: {len(lora_weights) - nonzero_count}/{len(lora_weights)}")

if nonzero_count == 0:
    print("\n" + "⚠️ "*35)
    print("严重问题: 所有 LoRA 权重都是零！")
    print("可能原因:")
    print("  1. 学习率过低（当前: 5e-6，建议: 1e-4）")
    print("  2. 奖励函数没有正确计算")
    print("  3. 梯度未能正确传播")
    print("  4. 训练中没有梯度更新")
    print("\n建议: 需要重新训练，使用更大的学习率")
    print("⚠️ "*35)
else:
    print(f"\n✓ 权重已被正确更新！")
    print(f"  非零权重占比: {nonzero_count/len(lora_weights)*100:.1f}%")
    print(f"  最大权重范数: {max(norm for _, norm in weight_norms):.8f}")

print("="*70)



【诊断】检查 LoRA 权重是否被微调

权重文件统计:
  - 权重张量数: 504

检查权重数值...

权重范数排序（前10个）:
  1. ✓ base_model.model.model.layers.0.self_attn.v_proj.lora_A.weight: 3.28125000
  2. ✓ base_model.model.model.layers.10.mlp.up_proj.lora_A.weight: 3.28125000
  3. ✓ base_model.model.model.layers.17.self_attn.v_proj.lora_A.weight: 3.28125000
  4. ✓ base_model.model.model.layers.22.self_attn.q_proj.lora_A.weight: 3.28125000
  5. ✓ base_model.model.model.layers.26.mlp.up_proj.lora_A.weight: 3.28125000
  6. ✓ base_model.model.model.layers.3.self_attn.q_proj.lora_A.weight: 3.28125000
  7. ✓ base_model.model.model.layers.32.self_attn.q_proj.lora_A.weight: 3.28125000
  8. ✓ base_model.model.model.layers.33.self_attn.o_proj.lora_A.weight: 3.28125000
  9. ✓ base_model.model.model.layers.6.self_attn.o_proj.lora_A.weight: 3.28125000
  10. ✓ base_model.model.model.layers.7.self_attn.o_proj.lora_A.weight: 3.28125000

统计结果:
  ✓ 非零权重数: 504/504
  ✗ 零权重数: 0/504

✓ 权重已被正确更新！
  非零权重占比: 100.0%
  最大权重范数: 3.28125000


### 对比微调前后准确率


In [None]:
print("\n" + "="*50)
print("准确率对比")
print("="*50)
print(f"微调前准确率: {accuracy_before:.2%}")
print(f"微调后准确率: {accuracy_after:.2%}")
print(f"提升幅度: {(accuracy_after - accuracy_before):.2%}")
print("="*50)


### HuggingFace 发布

In [None]:
model.push_to_hub_gguf(
    "DavidRay93/Qwen3-4B-TSC-GRPO-Test",
    tokenizer,
    quantization_method=["f16"],
    token="YOUR_HUGGINGFACE_TOKEN_HERE",
    temporary_location="/root/autodl-tmp/saved_models",  # 指定保存和转换的文件夹路径
)