In [11]:
# AutoDL官方学术资源加速
import subprocess
import os

result = subprocess.run('bash -c "source /etc/network_turbo && env | grep proxy"', shell=True, capture_output=True, text=True)
output = result.stdout
for line in output.splitlines():
    if '=' in line:
        var, value = line.split('=', 1)
        os.environ[var] = value

In [12]:
from datasets import load_dataset
import pandas as pd
import numpy as np
from typing import List, Dict
import json
import os
from tqdm import tqdm
import random

In [13]:
class TaskSpecificDatasetBuilder:
    def __init__(self, base_data_path='./base_stage_data'):
        self.datasets = {}
        self.task_data = []  # 存储处理后的任务数据
        self.base_data = []  # 存储基础数据
        self.processed_data = []  # 存储最终混合后的数据
        self.base_data_path = base_data_path
        
    def load_task_datasets(self):
        """加载特定任务所需的数据集"""
        print("正在加载任务特定数据集...")
        
        try:
            # 翻译任务数据集
            self.datasets['translation'] = load_dataset('Helsinki-NLP/news_commentary', "en-zh", split='train') # 69.2k rows
            print("✓ 翻译数据集加载完成") # https://huggingface.co/datasets/Helsinki-NLP/news_commentary
            
            # 对话任务数据集 - QA。这个数据集还包gpt的回答
            self.datasets['dialogue'] = load_dataset('Hello-SimpleAI/HC3-Chinese', "all", split='train', trust_remote_code=True) # 12.9k rows
            print("✓ 对话数据集加载完成") # https://huggingface.co/datasets/Hello-SimpleAI/HC3-Chinese
            
            # 故事创作数据集 - 这个数据集包含很多kind的数据，所以我们首先要提取kind = StoryGeneration的数据
            self.datasets['story'] = load_dataset('YeungNLP/firefly-train-1.1M', split='train') # 1.65M rows
            print("✓ 故事创作数据集加载完成") # https://huggingface.co/datasets/YeungNLP/firefly-train-1.1M
            
        except Exception as e:
            print(f"加载数据集时出错: {e}")
            raise
            
    def load_base_data(self):
        """加载基础阶段的数据"""
        print("加载基础阶段数据...")
        
        train_path = os.path.join(self.base_data_path, 'train.json')
        with open(train_path, 'r', encoding='utf-8') as f:
            self.base_data = json.load(f)
        print(f"✓ 成功加载基础数据: {len(self.base_data)} 条")
    
    def preprocess_task_data(self, max_samples_per_task=20000):
        """预处理特定任务的数据集"""
        print("处理特定任务数据集...")
        
        for dataset_name, dataset in self.datasets.items():
            print(f"处理 {dataset_name} 数据集...")
            
            if dataset_name == 'translation':
                processed = self._process_translation(dataset, max_samples_per_task)
            elif dataset_name == 'dialogue':
                processed = self._process_dialogue(dataset, max_samples_per_task)
            elif dataset_name == 'story':
                processed = self._process_story(dataset, max_samples_per_task)
                
            self.task_data.extend(processed)
            print(f"✓ {dataset_name} 处理完成，添加 {len(processed)} 条数据")
            
        return self.task_data # task_data 的结构类似：[{'type': 'translation', 'instruction': '请将以下中文翻译成英文：', 'input': '今天天气很好', 'output': 'The weather is nice today'}, {'type': 'translation', 'instruction': '请将以下英文翻译成中文：', 'input': 'The weather is nice today', 'output': '今天天气很好'}, ...]
    
    def mix_datasets(self, task_stage_data, total_samples=100000, base_ratio=0.7):
        """
        混合基础数据集和特定任务数据集
        
        Args:
            total_samples: 最终需要的总数据量
            base_ratio: 基础数据集占比
        """
        print(f"\n混合数据集 (目标总量: {total_samples}, 基础数据比例: {base_ratio})")
        
        # 计算基础数据和任务数据的目标数量
        base_samples = int(total_samples * base_ratio)
        task_samples = total_samples - base_samples
        
        # 随机采样
        selected_base = random.sample(self.base_data, min(base_samples, len(self.base_data)))
        selected_task = random.sample(task_stage_data, min(task_samples, len(task_stage_data)))
        
        # 合并数据
        self.processed_data = selected_base + selected_task
        
        print(f"- 选择的基础数据量: {len(selected_base)}")
        print(f"- 选择的任务数据量: {len(selected_task)}")
        print(f"- 最终数据总量: {len(self.processed_data)}")
        
        return self.processed_data
    
    def _process_translation(self, dataset, max_samples):
        """处理翻译数据集"""
        samples = dataset[:max_samples]
        processed = []
        
        # print(type(samples), samples.keys()) # <class 'dict'> dict_keys(['id', 'translation'])
        # print(type(samples['translation'])) # <class 'list'>
        # print(type(samples['translation'][0])) # <class 'dict'>

        for i in range(len(samples['translation'])):
            processed.append({
                'type': 'translation',
                'instruction': '请将以下中文翻译成英文：',
                'input': samples['translation'][i]['zh'],
                'output': samples['translation'][i]['en']
            })
            
            # 添加英译中的样本
            processed.append({
                'type': 'translation',
                'instruction': '请将以下英文翻译成中文：',
                'input': samples['translation'][i]['en'],
                'output': samples['translation'][i]['zh']
            })
        
        return processed
    
    def _process_dialogue(self, dataset, max_samples):
        """处理对话数据集"""
        samples = dataset[:max_samples]
        return [{
            'type': 'dialogue',
            'instruction': '请针对以下问题进行回答：',
            'input': samples['question'][i],
            'output': samples['human_answers'][i][0] if samples['human_answers'][i] else '' # samples['human_answers'][i] 是一个 list
        } for i in range(len(samples['question']))]
    
    # 由于我们选择的是"YeungNLP/firefly-train-1.1M"这个数据集，所以首先需要提取kind = StoryGeneration的数据
    def _process_story(self, dataset, max_samples):
        """处理故事创作数据集"""
        # 首先将数据集转换为列表形式
        all_data = dataset.to_pandas()
        
        # 筛选出kind为StoryGeneration的数据
        story_data = all_data[all_data['kind'] == 'StoryGeneration']
        print(f"故事生成数据总量: {len(story_data)} 条")
        
        # 如果数据量超过max_samples，则随机采样
        if len(story_data) > max_samples:
            story_data = story_data.sample(n=max_samples, random_state=42)
            print(f"采样后数据量: {len(story_data)} 条")
        
        # print(type(story_data)) # <class 'pandas.core.frame.DataFrame'>

        # 转换为所需格式
        processed = []
        for _, row in story_data.iterrows():
            processed.append({
                'type': 'story_generation',
                'instruction': "请根据以下提示生成一个故事：",
                'input': row['input'],
                'output': row['target']
            })
        
        return processed
    
    def format_for_gemma(self):
        """将数据格式化为Gemma模型训练所需的格式"""
        formatted_data = []
        
        USER_CHAT_TEMPLATE = "<start_of_turn>user\n{prompt}\n<end_of_turn><eos>\n"
        MODEL_CHAT_TEMPLATE = "<start_of_turn>model\n{response}\n<end_of_turn><eos>\n"
        
        # print(type(self.processed_data), self.processed_data[0].keys()) # 期待：<class 'list'> dict_keys(['text', 'prompt', 'completion', 'type'])
        # 实际上
        # print(type(self.processed_data)) # <class 'list'>
        # print(self.processed_data) # []
        # print(type(self.processed_data[0]), self.processed_data[0]) # 报错
        
        for item in self.task_data:

            # if item['type'] == 'translation':
            #     print(item) # {'type': 'translation', 'instruction': '请将以下中文翻译成英文：', 'input': '1929年还是1989年?', 'output': '1929 or 1989?'}
            #     break

            # 构建用户提示
            if item['input']:
                user_prompt = f"{item['instruction']}\n\n输入：{item['input']}"
            else:
                user_prompt = item['instruction']
            
            # 格式化用户输入和模型输出
            formatted_prompt = USER_CHAT_TEMPLATE.format(prompt=user_prompt)
            formatted_response = MODEL_CHAT_TEMPLATE.format(response=item['output'])
            
            formatted_data.append({
                'text': formatted_prompt + formatted_response,
                'prompt': formatted_prompt,
                'completion': formatted_response,
                'type': item['type']
            })
        
        return formatted_data
    
    def save_dataset(self, formatted_data, output_path='./task_stage_data', train_ratio=0.8):
        """保存处理后的数据集"""
        os.makedirs(output_path, exist_ok=True)
        
        # 随机打乱数据
        np.random.shuffle(formatted_data)
        
        # 划分训练集和验证集
        split_idx = int(len(formatted_data) * train_ratio)
        train_data = formatted_data[:split_idx]
        valid_data = formatted_data[split_idx:]
        
        # 保存训练集
        train_path = os.path.join(output_path, 'train.json')
        with open(train_path, 'w', encoding='utf-8') as f:
            json.dump(train_data, f, ensure_ascii=False, indent=2)
            
        # 保存验证集
        valid_path = os.path.join(output_path, 'valid.json')
        with open(valid_path, 'w', encoding='utf-8') as f:
            json.dump(valid_data, f, ensure_ascii=False, indent=2)
            
        print(f"数据集已保存：")
        print(f"- 训练集: {train_path} ({len(train_data)} 条数据)")
        print(f"- 验证集: {valid_path} ({len(valid_data)} 条数据)")
        
        # 输出数据集统计信息
        self._print_dataset_stats(formatted_data)
    
    def _print_dataset_stats(self, data):
        """打印数据集统计信息"""
        print("\n数据集统计信息:")
        
        # 统计不同类型的数据数量
        type_counts = {}
        for item in data:
            type_counts[item['type']] = type_counts.get(item['type'], 0) + 1
            
        print("\n数据类型分布:")
        for data_type, count in type_counts.items():
            percentage = (count / len(data)) * 100
            print(f"- {data_type}: {count} ({percentage:.2f}%)")

In [14]:
def main():
    # 1. 初始化数据集构建器
    builder = TaskSpecificDatasetBuilder()
    
    try:
        # 2. 加载任务数据集
        builder.load_task_datasets()
        
        # 3. 处理特定任务数据集
        builder.preprocess_task_data(max_samples_per_task=20000)
        
        # 4. 将任务数据转化为大模型格式
        task_stage_data = builder.format_for_gemma()
        
        # 5. 加载基础数据
        builder.load_base_data()
        
        # 6. 混合数据集
        mixed_data = builder.mix_datasets(task_stage_data, total_samples=80000, base_ratio=0.7)
        
        # 7. 保存数据集
        builder.save_dataset(mixed_data)
        
        print("\n任务特定阶段数据集构建完成！")
        
    except Exception as e:
        print(f"构建数据集时出错: {e}")
        raise

In [15]:
if __name__ == "__main__":
    main()

正在加载任务特定数据集...
✓ 翻译数据集加载完成
✓ 对话数据集加载完成


Repo card metadata block was not found. Setting CardData to empty.


✓ 故事创作数据集加载完成
处理特定任务数据集...
处理 translation 数据集...
✓ translation 处理完成，添加 40000 条数据
处理 dialogue 数据集...
✓ dialogue 处理完成，添加 12853 条数据
处理 story 数据集...
故事生成数据总量: 19048 条
✓ story 处理完成，添加 19048 条数据
加载基础阶段数据...
✓ 成功加载基础数据: 80000 条

混合数据集 (目标总量: 80000, 基础数据比例: 0.7)
- 选择的基础数据量: 56000
- 选择的任务数据量: 24000
- 最终数据总量: 80000
数据集已保存：
- 训练集: ./task_stage_data/train.json (64000 条数据)
- 验证集: ./task_stage_data/valid.json (16000 条数据)

数据集统计信息:

数据类型分布:
- instruction: 56000 (70.00%)
- translation: 13317 (16.65%)
- story_generation: 6420 (8.03%)
- dialogue: 4263 (5.33%)

任务特定阶段数据集构建完成！
