In [None]:
from datasets import load_dataset
dataset = load_dataset(".cache/modelscope/datasets/AI-ModelScope/chinese-fineweb-edu-v2",
                       split="train",
                       num_proc=40)

dataset

In [13]:
import os
# Dataset({
#     features: ['text', 'score', '__index__', 'source'],
#     num_rows: 187668844
# })

# 1. 定义保存分类后数据集的根目录
output_dir = '/DATA/disk2/yuhang/.cache/bit_brain_data/chinese-fineweb-classification'
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

print(f"\n分类后的数据集将被保存在 '{output_dir}' 目录下。\n")

# 2. 获取所有唯一的 'source' 值
unique_sources = dataset.unique('source')
print(f"找到的唯一 'source' 值: {unique_sources}")

# 3. 外层循环：遍历每个 'source'
for source_value in unique_sources:
    print(f"\n--- 正在处理 Source: '{source_value}' ---")
    
    # 根据 source 创建子目录
    source_dir = os.path.join(output_dir, f"source_{source_value}")
    if not os.path.exists(source_dir):
        os.makedirs(source_dir)
        
    # 过滤出当前 source 的所有数据
    # datasets.filter() 非常高效，它不会立即加载数据到内存
    source_dataset = dataset.filter(lambda example: example['source'] == source_value,
                                    num_proc=40)
    
    # 4. 定义 score 的划分范围 (bins)
    # 根据数据集本身的score分布进行划分
    score_bins = [
        (0.6, 0.75),
        (0.75, 1.0)
    ]
    print(f"  将 'score' 按照预设范围进行划分。")
  
    # 5. 内层循环：遍历每个 score 范围
    for i, (lower_bound, upper_bound) in enumerate(score_bins):
        
        # 定义一个清晰的目录名来表示范围
        range_name = f"score_{lower_bound}_to_{upper_bound}"
        
        # 处理最后一个区间的边界，使其包含上界 (e.g., score == 1.0)
        if i == len(score_bins) - 1:
            print(f"  --> 正在处理 Score 范围: [{lower_bound}, {upper_bound}]...")
            # 过滤条件： lower_bound <= score <= upper_bound
            final_filtered_dataset = source_dataset.filter(
                lambda example: lower_bound <= example['score'] <= upper_bound,
                num_proc=40 # 多进程加速
            )
        else:
            print(f"  --> 正在处理 Score 范围: [{lower_bound}, {upper_bound})...")
            # 过滤条件： lower_bound <= score < upper_bound
            final_filtered_dataset = source_dataset.filter(
                lambda example: lower_bound <= example['score'] < upper_bound,
                num_proc=40 # 多进程加速
            )

        # 优化：如果过滤后的数据集为空，则跳过保存
        if len(final_filtered_dataset) == 0:
            print(f"      范围 '{range_name}' 内没有数据，跳过保存。")
            continue
            
        # 6. 将最终过滤出的数据集保存到磁盘
        save_path = os.path.join(source_dir, range_name)
        
        # 检查是否已有数据，若有则可以跳过或覆盖
        if os.path.exists(save_path):
            print(f"      目录 '{save_path}' 已存在，跳过保存。")
            continue
            
        final_filtered_dataset.save_to_disk(save_path)
        print(f"      数据已保存到: '{save_path}'，包含 {len(final_filtered_dataset)} 行。")

print("\n--- 所有分类和保存任务已完成！ ---")



{'completion': '昭通机场（ZPZT）是位于中国云南昭通的民用机场，始建于1935年，1960年3月开通往返航班“昆明－昭通”，原来属军民合用机场。1986年机场停止使用。1991年11月扩建，于1994年2月恢复通航。是西南地区「文明机场」，通航城市昆明。 机场占地1957亩，飞行区等级为4C，有一条跑道，长2720米，宽48米，可供波音737及以下机型起降。机坪面积6600平方米，停机位2个，航站楼面积1900平方米。位于城东6公里处，民航路与金鹰大道交叉处。\n航点\n客服电话\n昭通机场客服电话：0870-2830004',
 'source': 'wikipedia.zh2307'}

In [None]:
import os
import math
from pathlib import Path

def get_dir_size(path):
    """
    计算指定路径下所有文件和子文件夹的总大小。
    这是一个递归函数，意味着它会调用自己来处理子文件夹。
    """
    total_size = 0
    # Path(path).rglob('*') 会遍历目录下的所有内容，包括子文件夹里的
    # 这是一种更现代、更简洁的写法
    for file in Path(path).rglob('*'):
        # 确保我们只计算文件的大小
        if file.is_file():
            total_size += file.stat().st_size
    return total_size

def format_size(size_bytes):
    """
    将字节大小格式化为人类易读的形式 (B, KB, MB, GB, TB)。
    """
    if size_bytes == 0:
        return "0B"
    # 定义大小单位的元组
    size_names = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    # 使用log计算单位的索引
    i = int(math.floor(math.log(size_bytes, 1024)))
    # 计算转换后的大小
    p = math.pow(1024, i)
    s = round(size_bytes / p, 2)
    return f"{s} {size_names[i]}"

# 1. 定义数据集的根目录
# 根据你的日志，数据保存在这个路径下
root_dir = '/DATA/disk2/yuhang/.cache/bit_brain_data/chinese-fineweb-classification'
# 我们只关心高质量数据集
target_score_dir_name = 'score_0.75_to_1.0'

# 用一个字典来存储每个source和它对应的大小
source_sizes = {}

print(f"开始扫描目录: {root_dir}\n")

# 2. 遍历根目录，找到所有的 source_* 文件夹
# os.scandir 比 os.listdir 更高效，因为它在扫描时就获取了文件信息
try:
    for entry in os.scandir(root_dir):
        # 确保它是一个目录并且以 'source_' 开头
        if entry.is_dir() and entry.name.startswith('source_'):
            # 提取 source 的名字，比如从 'source_CCI3' 提取出 'CCI3'
            source_name = entry.name.replace('source_', '')
            
            # 构造我们感兴趣的高质量数据文件夹的完整路径
            high_score_path = os.path.join(entry.path, target_score_dir_name)
            
            # 3. 检查路径是否存在，然后计算大小
            if os.path.exists(high_score_path):
                print(f"正在计算 '{source_name}' 的大小...")
                # 调用我们写的函数来获取文件夹大小
                size = get_dir_size(high_score_path)
                source_sizes[source_name] = size
            else:
                # 如果某个source没有这个分数段的数据，也打印出来，方便我们知晓
                print(f"  - 警告: 路径 '{high_score_path}' 不存在，跳过。")
except FileNotFoundError:
    print(f"错误: 根目录 '{root_dir}' 不存在，请检查路径是否正确。")

# 4. 计算总大小和各自的比例
total_size = sum(source_sizes.values())

print("\n--- 结果分析 ---")
if total_size == 0:
    print("未能计算任何数据的大小，总大小为 0。请检查目录结构和路径是否正确。")
else:
    # 使用我们写的格式化函数，让结果更易读
    print(f"所有 'score_0.75_to_1.0' 数据的总大小: {format_size(total_size)}\n")
    print("各 source 数据大小及其占比:")
    
    # 按照大小从大到小排序，这样结果看起来更有条理
    sorted_sources = sorted(source_sizes.items(), key=lambda item: item[1], reverse=True)
    
    for source, size in sorted_sources:
        # 计算百分比
        proportion = (size / total_size) * 100
        # 使用 f-string 格式化输出，让表格对齐，更美观
        print(f"  - {source:<20}: {format_size(size):<10} ({proportion:.2f}%)")


In [None]:
# 读取所有高质量数据集（score_0.75_to_1.0）并合并为一个dataset对象
from datasets import load_from_disk, concatenate_datasets
import os
# 定义数据集的根目录和目标分数目录
root_dir = '/DATA/disk2/yuhang/.cache/bit_brain_data/chinese-fineweb-classification'
target_score_dir_name = 'score_0.75_to_1.0'

# 用来存储所有加载的数据集
datasets_list = []

print(f"开始加载高质量数据集 (score_0.75_to_1.0)...")

# 遍历根目录，找到所有的 source_* 文件夹
try:
    for entry in os.scandir(root_dir):
        # 确保它是一个目录并且以 'source_' 开头
        if entry.is_dir() and entry.name.startswith('source_'):
            # 提取 source 的名字
            source_name = entry.name.replace('source_', '')
            
            # 构造高质量数据文件夹的完整路径
            high_score_path = os.path.join(entry.path, target_score_dir_name)
            
            # 检查路径是否存在
            if os.path.exists(high_score_path):
                print(f"  正在加载 '{source_name}' 数据集...")
                try:
                    # 加载数据集
                    source_dataset = load_from_disk(high_score_path)
                    # 添加source信息到数据集中，方便后续分析
                    #source_dataset = source_dataset.add_column("source", [source_name] * len(source_dataset))
                    datasets_list.append(source_dataset)
                    print(f"    成功加载 {len(source_dataset)} 条数据")
                except Exception as e:
                    print(f"    错误: 无法加载 '{source_name}' 数据集: {e}")
            else:
                print(f"  跳过: '{source_name}' - 路径不存在")

    # 合并所有数据集
    if datasets_list:
        print(f"\n正在合并 {len(datasets_list)} 个数据集...")
        dataset = concatenate_datasets(datasets_list)
        print(f"合并完成！总共包含 {len(dataset)} 条高质量数据")
        
        # 显示数据集的基本信息
        print(f"\n数据集信息:")
        print(f"  - 总样本数: {len(dataset):,}")
        print(f"  - 列名: {dataset.column_names}")
        
    #     # 显示各个source的数据分布
    #     if 'source' in dataset.column_names:
    #         source_counts = {}
    #         for item in dataset:
    #             source = item['source']
    #             source_counts[source] = source_counts.get(source, 0) + 1
            
    #         print(f"\n各数据源分布:")
    #         for source, count in sorted(source_counts.items(), key=lambda x: x[1], reverse=True):
    #             percentage = (count / len(dataset)) * 100
    #             print(f"  - {source:<20}: {count:>10,} ({percentage:.2f}%)")
    # else:
    #     print("错误: 没有找到任何可用的数据集")
    #     dataset = None

except FileNotFoundError:
    print(f"错误: 根目录 '{root_dir}' 不存在，请检查路径是否正确。")
    dataset = None

In [25]:
# import os
# os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"  

from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(".cache/modelscope/models/Qwen/Qwen3-0.6B",
                                          use_fast=True)
tokenizer

Qwen2TokenizerFast(name_or_path='/home/ytllm/.cache/modelscope/models/Qwen/Qwen3-0.6B', vocab_size=151643, model_max_length=131072, is_fast=True, padding_side='right', truncation_side='right', special_tokens={'eos_token': '<|im_end|>', 'pad_token': '<|endoftext|>', 'additional_special_tokens': ['<|im_start|>', '<|im_end|>', '<|object_ref_start|>', '<|object_ref_end|>', '<|box_start|>', '<|box_end|>', '<|quad_start|>', '<|quad_end|>', '<|vision_start|>', '<|vision_end|>', '<|vision_pad|>', '<|image_pad|>', '<|video_pad|>']}, clean_up_tokenization_spaces=False, added_tokens_decoder={
	151643: AddedToken("<|endoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	151644: AddedToken("<|im_start|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	151645: AddedToken("<|im_end|>", rstrip=False, lstrip=False, single_word=False, normalized=False, special=True),
	151646: AddedToken("<|object_ref_start|>", rstrip=False, lstrip

In [None]:
# --- 1. 定义超参数 ---
# max_length 保持不变
max_length = 2048
# stride (步长) 是新参数。一个常见的选择是 max_length 的 1/4 或 1/2。
# 这里我们选择 512，意味着块之间有 2048 - 1024 = 1024 个 token 的重叠。
stride = 1024
# 新增：定义要保留的最小块长度，小于此长度的最后一个块将被丢弃
min_chunk_length = 256

# --- 2. 编写新的分词和切块函数 ---
def chunk_and_tokenize_function(examples):
    """
    这个函数接收一批文本 (examples)，然后对每个文本进行分词。
    如果分词后的长度超过 max_length，就使用滑动窗口将其切分成多个块。
    这个函数不仅将长文本切块，还会丢弃掉最后那个过短的块。
    """
    # 首先，对所有文本进行分词，但这次不进行截断和填充，以获取每个文本的完整 token 序列。
    # 我们只关心 'input_ids'
    full_tokenized = tokenizer(examples['text'], truncation=False, padding=False)
    
    # 用来存放最终切分好的所有块
    chunked_input_ids = []
    chunked_attention_mask = []
    
    # 遍历刚刚分词后的每一篇文章
    for input_ids in full_tokenized['input_ids']:
        # 获取当前文章的总长度
        doc_length = len(input_ids)
        
        # 如果文章本身就不够长，就直接填充它，作为一个样本
        if doc_length <= max_length:
            # 填充到 max_length
            padded_ids = input_ids + [tokenizer.pad_token_id] * (max_length - doc_length)
            chunked_input_ids.append(padded_ids)
            # attention_mask 中，真实 token 为 1，填充 token 为 0
            attention_mask = [1] * doc_length + [0] * (max_length - doc_length)
            chunked_attention_mask.append(attention_mask)
        
        # 如果文章太长，就开始滑动窗口切分
        else:
            # 从头开始切
            start_index = 0
            while start_index < doc_length:
                # 定义块的结束位置
                end_index = start_index + max_length
                
                # 从完整序列中切出这个块的 input_ids
                chunk = input_ids[start_index:end_index]
                
                # 如果块的长度小于我们设定的最小阈值，就跳出循环，不再处理这个及之后可能的块
                if len(chunk) < min_chunk_length:
                    break

                # 如果这是最后一个块，且长度不足 max_length，需要填充
                if len(chunk) < max_length:
                    padded_chunk = chunk + [tokenizer.pad_token_id] * (max_length - len(chunk))
                    attention_mask = [1] * len(chunk) + [0] * (max_length - len(chunk))
                # 如果是中间的完整块
                else:
                    padded_chunk = chunk
                    attention_mask = [1] * max_length
                
                chunked_input_ids.append(padded_chunk)
                chunked_attention_mask.append(attention_mask)
                
                # 如果窗口已经滑到了文章末尾，结束循环
                if end_index >= doc_length:
                    break
                    
                # 窗口向前滑动一个步长 (stride)
                start_index += stride

    # 返回一个字典，包含所有新生成的块
    return {
        'input_ids': chunked_input_ids,
        'attention_mask': chunked_attention_mask
    }


# --- 3. 应用新的函数 ---
# 注意：因为一行输入可能产生多行输出，我们必须移除所有原始列，
# 否则会因为行数不匹配而报错。
tokenized_dataset = dataset.map(
    chunk_and_tokenize_function,
    batched=True,           # 依然使用批处理以提高效率
    num_proc=40,            # 依然使用多进程
    remove_columns=dataset.column_names  # 移除所有旧列，非常重要！
)


In [None]:
tokenized_dataset.set_format(type="torch", columns=["input_ids", "attention_mask"])

In [27]:
tokenized_dataset.save_to_disk(".cache/pretrain_data/high_score_tokenized_data")

Saving the dataset (7/7 shards): 100%|██████████| 254547/254547 [00:02<00:00, 106823.51 examples/s]
