随机抽取500条数据保存到新文件，数据不足则报错。

In [None]:
import random

input_file = r"F:\LLM\lora_finetune_dataset_functioncall.jsonl"
output_file = r"F:\LLM\lora_finetune_dataset_functioncall_sample600.jsonl"

with open(input_file, 'r', encoding='utf-8') as f:
    lines = f.readlines()
num = 500
if len(lines) < num:
    raise ValueError("数据不足600条，当前仅有{}条数据。".format(len(lines)))

sample_lines = random.sample(lines, num)

with open(output_file, 'w', encoding='utf-8') as f:
    for line in sample_lines:
        f.write(line)
print("成功抽取600条数据并保存到：", output_file)

### 创建Hugging Face数据仓库并上传文件

此代码创建一个Hugging Face数据仓库并将指定的JSONL文件上传到该仓库。


In [None]:
import os
from huggingface_hub import HfApi, create_repo, upload_file

HF_TOKEN = os.getenv("HF_TOKEN")

repo_id = "zyss1/ljz_chains_datasets"  

try:
    create_repo(repo_id, token=HF_TOKEN, repo_type="dataset", exist_ok=True)
    print(f"数据仓库 {repo_id} 创建或已存在。")
except Exception as e:
    print(f"创建仓库时出错：{e}")

file_path = r"F:\LLM\lora_finetune_dataset_functioncall_sample600.jsonl"
path_in_repo = "lora_finetune_dataset_functioncall_sample600.jsonl"

try:
    upload_file(
        path_or_fileobj=file_path,
        path_in_repo=path_in_repo,
        repo_id=repo_id,
        token=HF_TOKEN,
        repo_type="dataset"
    )
    print(f"文件已成功上传到 {repo_id} 数据仓库。")
except Exception as e:
    print(f"上传文件时出错：{e}")

### 数据集处理与预处理

该代码加载Hugging Face上的数据集并对其进行预处理。它修复了文本中的JSON格式错误，处理了`<tool_call>`、`<tool_response>`和`<think>`标签。接着，它使用指定的模型和tokenizer进行消息的格式化，并将数据集划分为训练集和测试集。


In [None]:
from datasets import load_dataset
from transformers import AutoTokenizer
import re

model_name = "Qwen/Qwen2.5-7B-Instruct"
dataset_name = "zyss1/ljz_chains_datasets"

tokenizer = AutoTokenizer.from_pretrained(model_name)

tokenizer.chat_template = (
    "{{ bos_token }}"
    "{% if messages[0]['role'] == 'system' %}{{ raise_exception('System role not supported') }}{% endif %}"
    "{% for message in messages %}"
    "{{ '<start_of_turn>' + message['role'] + '\n' + message['content'] | trim + '<end_of_turn><eos>\n' }}"
    "{% endfor %}"
    "{% if add_generation_prompt %}{{'<start_of_turn>model\n'}}{% endif %}"
)

dataset = load_dataset(dataset_name)
dataset = dataset.rename_column("conversation", "messages")

print("Dataset sample structure:")
sample = dataset["train"][0]
print(type(sample["messages"]))
print(sample["messages"] if isinstance(sample["messages"], list) else "Not a list")

import re
import json

def fix_escaped_quotes(text):
    """修复文本中过度转义的引号"""
    text = text.replace("\\'", "'")
    text = text.replace('\\"', '"')
    return text

def fix_json_in_tags(text):
    """修复标签内部的JSON格式"""
    
    def process_tag_content(match):
        content = match.group(2)
        fixed_content = fix_escaped_quotes(content)
        try:
            json_obj = json.loads(fixed_content)
            formatted_json = json.dumps(json_obj, ensure_ascii=False)
            return match.group(1) + formatted_json + match.group(3)
        except:
            return match.group(1) + fixed_content + match.group(3)
    
    text = re.sub(r'(<tool_call>)(.*?)(</tool_call>)', process_tag_content, text, flags=re.DOTALL)
    
    text = re.sub(r'(<tool_response>)(.*?)(</tool_response>)', process_tag_content, text, flags=re.DOTALL)
    
    text = re.sub(r'(<think>)(.*?)(</think>)', process_tag_content, text, flags=re.DOTALL)
    
    return text

def preprocess(sample):
    messages = sample["messages"]
    
    if not isinstance(messages, list):
        return {"text": "Error: unexpected message format"}
    
    for i, msg in enumerate(messages):
        if isinstance(msg, dict) and "content" in msg and isinstance(msg["content"], str):
            if msg.get("role") == "model":
                try:
                    content_dict = json.loads(msg["content"])
                    
                    if "steps" in content_dict and isinstance(content_dict["steps"], list):
                        for step in content_dict["steps"]:
                            if "think" in step:
                                step["think"] = fix_json_in_tags(step["think"])
                            if "tool_call" in step:
                                step["tool_call"] = fix_json_in_tags(step["tool_call"])
                            if "tool_response" in step:
                                step["tool_response"] = fix_json_in_tags(step["tool_response"])
             
                    messages[i]["content"] = json.dumps(content_dict, ensure_ascii=False)
                except json.JSONDecodeError:
                    messages[i]["content"] = fix_json_in_tags(msg["content"])
            else:
                messages[i]["content"] = fix_json_in_tags(msg["content"])

    if messages and isinstance(messages[0], dict) and "role" in messages[0]:
        if messages[0]["role"] == "system":
            system_message_content = messages[0]["content"]
            if len(messages) > 1:
                messages[1]["content"] = (
                    system_message_content +
                    "Also, before making a call to a function take the time to plan the function to take. "
                    "Make that thinking process between <think>{your thoughts}</think>\n\n" +
                    messages[1]["content"]
                )
            messages = messages[1:]
    
    return {"text": tokenizer.apply_chat_template(messages, tokenize=False)}

dataset = dataset.map(preprocess, remove_columns=["messages"])
dataset = dataset["train"].train_test_split(test_size=0.1)
print(dataset)

In [None]:
print(dataset["train"][85]["text"])

In [None]:
print(tokenizer.pad_token)
print(tokenizer.eos_token)

In [None]:
original_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
print(original_tokenizer.tokenize("<start_of_turn>"))
print(original_tokenizer.tokenize("<end_of_turn>"))

### 特殊标记处理与验证

此代码通过定义自定义的特殊标记，初始化tokenizer，并验证这些标记是否成功添加到tokenizer中。它通过检查标记是否被正确分解、词汇表大小的变化以及对含有多个特殊标记的句子的编码与解码，确保特殊标记的正确性和一致性。


In [None]:
from enum import Enum
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

class ChatmlSpecialTokens(str, Enum):
    tools = "<tools>"
    eotools = "</tools>"
    think = "<think>"
    eothink = "</think>"
    tool_call = "<tool_call>"
    eotool_call = "</tool_call>"
    tool_response = "<tool_response>"
    eotool_response = "</tool_response>"
    answer = "<answer>"
    eoanswer = "</answer>"
    start_of_turn = "<start_of_turn>"
    end_of_turn = "<end_of_turn>"
    pad_token = "<pad>"
    eos_token = "<eos>"
    
    @classmethod
    def list(cls):
        return [c.value for c in cls]

model_name = "Qwen/Qwen2.5-7B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(
    model_name,
    pad_token=ChatmlSpecialTokens.pad_token.value,
    additional_special_tokens=ChatmlSpecialTokens.list()
)

tokenizer.chat_template = (
    "{{ bos_token }}"
    "{% if messages[0]['role'] == 'system' %}{{ raise_exception('System role not supported') }}{% endif %}"
    "{% for message in messages %}"
    "{{ '<start_of_turn>' + message['role'] + '\n' + message['content'] | trim + '<end_of_turn><eos>\n' }}"
    "{% endfor %}"
    "{% if add_generation_prompt %}{{'<start_of_turn>model\n'}}{% endif %}"
)

print("特殊标记检查:")
for token in ChatmlSpecialTokens.list():
    in_special = token in tokenizer.all_special_tokens
    print(f"{token}: {'在特殊标记列表中' if in_special else '不在特殊标记列表中'}")

print("\n分词测试:")
for token in ChatmlSpecialTokens.list():
    tokens = tokenizer.tokenize(token)
    token_ids = tokenizer.encode(token, add_special_tokens=False)
    print(f"{token} -> {tokens} (token_ids: {token_ids})")
    
    if len(tokens) == 1:
        print(f"✓ 正确: 被识别为单个token")
    else:
        print(f"✗ 错误: 被分解为多个token")

original_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B-Instruct")
print(f"\n原始词汇表大小: {len(original_tokenizer)}")
print(f"修改后词汇表大小: {len(tokenizer)}")
print(f"增加的token数量: {len(tokenizer) - len(original_tokenizer)}")

test_text = "<start_of_turn>human\n你好<end_of_turn><eos>\n<start_of_turn>model\n<think>用户打招呼，我应该回应</think><tool_call>{'name': 'test'}</tool_call><answer>你好！有什么我可以帮助你的吗？</answer><end_of_turn><eos>"
encoded = tokenizer.encode(test_text)
decoded = tokenizer.decode(encoded)
print("\n测试文本编码再解码:")
print(f"原始文本: {test_text}")
print(f"解码后文本: {decoded}")
print(f"文本是否保持一致: {'是' if test_text == decoded else '否'}")

### 加载模型并调整token嵌入

该代码加载预训练模型并调整其token嵌入大小以匹配新的tokenizer词汇表。接着，模型被转换为bfloat16格式，以提高计算性能并减少内存占用。


In [None]:
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    attn_implementation='eager',
    device_map="auto"
)

model.resize_token_embeddings(len(tokenizer))

model.to(torch.bfloat16)