In [None]:
import os
from typing import Any, List

import datasets
from datasets import Dataset

# Download & Pre-process Datasets

## open-text-books

In [None]:
# Set local name
local_name: str = "open-text-books"

In [None]:
# Download original dataset
ori_save_dir: str = os.path.join(ori_datasets_dir, local_name)
os.makedirs(ori_save_dir, exist_ok=True)

open_text_books: Any = download_ori_dataset("izumi-lab/open-text-books", cache_dir=ori_save_dir)

In [None]:
# Show basic information
print(type(open_text_books), open_text_books.keys())
print(type(open_text_books["train"]), type(open_text_books["train"][0]))
print(open_text_books["train"][0].keys(), type(open_text_books["train"][0]["text"]))
print(open_text_books["train"][0]["text"])

In [None]:
# Get only text for training
open_text_books = open_text_books["train"]["text"]

In [None]:
# Show length distribution
show_sample_length_distribution(open_text_books, split_percent=0.05)

In [None]:
# Pre-process: split samples by paragraphs
open_text_books = split_sample_by_paragraphs(open_text_books, threshold_length=512)

In [None]:
# Show length distribution again
show_sample_length_distribution(open_text_books, split_percent=0.05)

In [None]:
# Show some samples after pre-processing
print("\n\n".join(open_text_books[:5]))

In [None]:
# Save pre-processed data as Parquet files
batch_size: int = 10_000_000
save_dir: str = os.path.join(preprocessed_datasets_dir, local_name)
os.makedirs(save_dir, exist_ok=True)

save_as_parquet(local_name, open_text_books, batch_size=batch_size, save_dir=save_dir)

## c4-subset

## GPU加速预处理工具

In [None]:
# GPU加速预处理环境检测
import torch
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
from tqdm.auto import tqdm

# 检测是否有GPU可用
gpu_available = torch.cuda.is_available()
device_count = torch.cuda.device_count() if gpu_available else 0
cpu_count = mp.cpu_count()

print(f"GPU可用: {gpu_available}")
if gpu_available:
    print(f"GPU数量: {device_count}")
    for i in range(device_count):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
        print(f"GPU {i} 内存: {torch.cuda.get_device_properties(i).total_memory / 1024**3:.1f} GB")
print(f"CPU核心数: {cpu_count}")

# 设置处理策略
use_gpu = gpu_available and device_count > 0
use_multiprocess = cpu_count > 1
print(f"使用GPU加速: {use_gpu}")
print(f"使用多进程: {use_multiprocess}")

### 可选：安装GPU加速库

如果要使用GPU加速，请运行以下命令安装所需的库：

```bash
# 安装RAPIDS库（包含cuDF）
conda install -c rapidsai -c conda-forge -c nvidia cudf python=3.9 cudatoolkit=11.8

# 或者使用pip安装（需要已安装CUDA）
pip install cudf-cu11 --extra-index-url=https://pypi.nvidia.com
pip install cupy-cuda11x
```

如果没有GPU或安装失败，代码会自动回退到CPU处理。

In [None]:
def load_dataset_streaming(dataset_name: str, cache_dir: str, streaming_chunk_size: int = 1000):
    """流式加载数据集，避免内存溢出"""
    print(f"开始流式加载数据集: {dataset_name}")
    
    # 使用streaming模式加载数据集
    dataset = datasets.load_dataset(dataset_name, cache_dir=cache_dir, streaming=True)
    
    # 将流式数据集转换为迭代器
    train_iter = iter(dataset["train"])
    
    chunks = []
    current_chunk = []
    
    try:
        while True:
            for _ in range(streaming_chunk_size):
                try:
                    item = next(train_iter)
                    current_chunk.append(item["text"])
                except StopIteration:
                    if current_chunk:
                        chunks.append(current_chunk)
                    raise StopIteration
            
            chunks.append(current_chunk)
            current_chunk = []
            print(f"已加载 {len(chunks)} 个数据块，每块 {streaming_chunk_size} 个样本")
            
    except StopIteration:
        print(f"数据集加载完成，总共 {len(chunks)} 个数据块")
    
    return chunks

In [None]:
def process_text_chunk_gpu(texts: List[str], threshold_length: int = 512) -> List[str]:
    """使用GPU加速处理文本块"""
    if not use_gpu:
        return process_text_chunk_cpu(texts, threshold_length)
    
    try:
        # 尝试使用cuDF进行GPU加速
        import cudf
        import cupy as cp
        
        # 转换为GPU DataFrame
        df = cudf.DataFrame({"text": texts})
        
        # 计算文本长度
        df["length"] = df["text"].str.len()
        
        # 分离短文本和长文本
        short_texts = df[df["length"] <= threshold_length]["text"].to_pandas().tolist()
        long_texts = df[df["length"] > threshold_length]["text"].to_pandas().tolist()
        
        # 对长文本进行分割
        paragraphs = short_texts.copy()
        for text in long_texts:
            # 按换行符分割
            split_texts = [t.strip() for t in text.split("\\n") if t.strip()]
            paragraphs.extend(split_texts)
        
        return paragraphs
        
    except ImportError:
        print("cuDF不可用，回退到CPU处理")
        return process_text_chunk_cpu(texts, threshold_length)
    except Exception as e:
        print(f"GPU处理出错: {e}，回退到CPU处理")
        return process_text_chunk_cpu(texts, threshold_length)

def process_text_chunk_cpu(texts: List[str], threshold_length: int = 512) -> List[str]:
    """CPU处理文本块"""
    paragraphs = []
    
    for text in texts:
        if len(text) <= threshold_length:
            paragraphs.append(text)
        else:
            # 按换行符分割
            split_texts = [t.strip() for t in text.split("\\n") if t.strip()]
            paragraphs.extend(split_texts)
    
    # 过滤空段落
    return [p for p in paragraphs if len(p) > 0]

In [None]:
def process_chunks_parallel(chunks: List[List[str]], threshold_length: int = 512, max_workers: int = None) -> List[str]:
    """并行处理数据块"""
    if max_workers is None:
        max_workers = min(cpu_count, len(chunks))
    
    print(f"使用 {max_workers} 个进程并行处理 {len(chunks)} 个数据块")
    
    all_paragraphs = []
    
    if use_multiprocess and len(chunks) > 1:
        # 多进程处理
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_chunk = {
                executor.submit(process_text_chunk_gpu, chunk, threshold_length): i 
                for i, chunk in enumerate(chunks)
            }
            
            # 收集结果
            with tqdm(total=len(chunks), desc="处理数据块") as pbar:
                for future in as_completed(future_to_chunk):
                    chunk_idx = future_to_chunk[future]
                    try:
                        result = future.result()
                        all_paragraphs.extend(result)
                        pbar.update(1)
                        pbar.set_postfix({"已处理段落": len(all_paragraphs)})
                    except Exception as e:
                        print(f"处理数据块 {chunk_idx} 时出错: {e}")
    else:
        # 单进程处理
        for i, chunk in enumerate(tqdm(chunks, desc="处理数据块")):
            try:
                result = process_text_chunk_gpu(chunk, threshold_length)
                all_paragraphs.extend(result)
            except Exception as e:
                print(f"处理数据块 {i} 时出错: {e}")
    
    print(f"处理完成，总共得到 {len(all_paragraphs):,} 个段落")
    return all_paragraphs

In [None]:
def save_paragraphs_incrementally(paragraphs: List[str], local_name: str, save_dir: str, 
                                  batch_size: int = 50000, start_part: int = 0) -> None:
    """渐进式保存段落，避免内存积累"""
    os.makedirs(save_dir, exist_ok=True)
    
    total_batches = (len(paragraphs) + batch_size - 1) // batch_size
    
    for i in range(0, len(paragraphs), batch_size):
        batch = paragraphs[i:i + batch_size]
        part_num = start_part + i // batch_size
        
        # 创建数据集
        batch_dataset = Dataset.from_dict({"text": batch})
        
        # 保存路径
        batch_save_path = os.path.join(save_dir, f"{local_name}_part_{part_num}.parquet")
        
        # 保存
        batch_dataset.to_parquet(batch_save_path)
        
        print(f"已保存第 {i // batch_size + 1}/{total_batches} 批次到 {batch_save_path}，包含 {len(batch):,} 个样本")
        
        # 强制垃圾回收
        del batch, batch_dataset
        import gc
        gc.collect()

## c4-subset (GPU加速版本)

## c4-subset (多GPU超快版本) 🚀🚀🚀🚀

In [None]:
# 多GPU超快处理工具
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
import time

def process_text_chunk_multi_gpu(texts: List[str], threshold_length: int = 512, device_id: int = 0) -> List[str]:
    """在指定GPU上处理文本块"""
    try:
        import cudf
        import cupy as cp
        
        # 设置当前GPU设备
        cp.cuda.Device(device_id).use()
        
        # 转换为GPU DataFrame
        df = cudf.DataFrame({"text": texts})
        
        # 计算文本长度
        df["length"] = df["text"].str.len()
        
        # 分离短文本和长文本
        short_texts = df[df["length"] <= threshold_length]["text"].to_pandas().tolist()
        long_texts = df[df["length"] > threshold_length]["text"].to_pandas().tolist()
        
        # 对长文本进行分割
        paragraphs = short_texts.copy()
        for text in long_texts:
            # 按换行符分割
            split_texts = [t.strip() for t in text.split("\n") if t.strip()]
            paragraphs.extend(split_texts)
        
        return paragraphs
        
    except Exception as e:
        print(f"GPU {device_id} 处理出错: {e}，回退到CPU处理")
        return process_text_chunk_cpu(texts, threshold_length)

def multi_gpu_worker(gpu_id: int, input_queue: queue.Queue, output_queue: queue.Queue, threshold_length: int):
    """多GPU工作进程"""
    print(f"GPU {gpu_id} 工作进程启动")
    
    while True:
        try:
            # 获取任务
            task = input_queue.get(timeout=5)
            if task is None:  # 结束信号
                break
                
            chunk_id, texts = task
            
            # 处理文本
            start_time = time.time()
            result = process_text_chunk_multi_gpu(texts, threshold_length, gpu_id)
            process_time = time.time() - start_time
            
            # 返回结果
            output_queue.put((chunk_id, result, gpu_id, process_time))
            input_queue.task_done()
            
        except queue.Empty:
            continue
        except Exception as e:
            print(f"GPU {gpu_id} 工作进程出错: {e}")
            input_queue.task_done()
    
    print(f"GPU {gpu_id} 工作进程结束")

def process_with_multi_gpu(chunks: List[List[str]], threshold_length: int = 512, num_gpus: int = 4) -> List[str]:
    """使用多GPU并行处理数据块"""
    if not use_gpu or device_count < num_gpus:
        print(f"GPU不足，需要{num_gpus}张卡，实际{device_count}张，回退到单GPU/CPU处理")
        return process_chunks_parallel(chunks, threshold_length)
    
    print(f"🚀 启动 {num_gpus} 张GPU超快处理模式!")
    
    # 创建任务队列和结果队列
    input_queue = queue.Queue(maxsize=num_gpus * 2)  # 限制队列大小避免内存溢出
    output_queue = queue.Queue()
    
    # 启动GPU工作线程
    workers = []
    for gpu_id in range(num_gpus):
        worker = threading.Thread(
            target=multi_gpu_worker, 
            args=(gpu_id, input_queue, output_queue, threshold_length)
        )
        worker.start()
        workers.append(worker)
    
    # 创建任务提交线程
    def submit_tasks():
        for i, chunk in enumerate(chunks):
            input_queue.put((i, chunk))
        
        # 发送结束信号
        for _ in range(num_gpus):
            input_queue.put(None)
    
    submit_thread = threading.Thread(target=submit_tasks)
    submit_thread.start()
    
    # 收集结果
    all_paragraphs = []
    results = {}
    gpu_stats = {i: {"chunks": 0, "time": 0} for i in range(num_gpus)}
    
    with tqdm(total=len(chunks), desc="多GPU处理中") as pbar:
        for _ in range(len(chunks)):
            chunk_id, result, gpu_id, process_time = output_queue.get()
            results[chunk_id] = result
            gpu_stats[gpu_id]["chunks"] += 1
            gpu_stats[gpu_id]["time"] += process_time
            pbar.update(1)
    
    # 按顺序合并结果
    for i in range(len(chunks)):
        all_paragraphs.extend(results[i])
    
    # 等待所有工作线程结束
    submit_thread.join()
    for worker in workers:
        worker.join()
    
    # 显示GPU统计信息
    print(f"\n🎯 多GPU处理完成统计:")
    total_time = 0
    for gpu_id, stats in gpu_stats.items():
        avg_time = stats["time"] / max(stats["chunks"], 1)
        total_time += stats["time"]
        print(f"  GPU {gpu_id}: 处理了 {stats['chunks']} 个块, 平均耗时 {avg_time:.2f}s/块")
    
    print(f"  总处理时间: {total_time:.2f}s")
    print(f"  平均每GPU: {total_time/num_gpus:.2f}s")
    print(f"  总共得到 {len(all_paragraphs):,} 个段落")
    
    return all_paragraphs

In [None]:
# 🚀🚀🚀🚀 多GPU超快版本处理
local_name_ultra: str = "c4_15m_ultra"

# 优化参数设置（4GPU版本）
streaming_chunk_size = 500_000  # 增大chunk以充分利用GPU
processing_threshold = 512
save_batch_size = 2_000_000    # 增大保存批次
num_gpus = 4                 # 使用4张GPU

print(f"🚀🚀🚀🚀 启动4GPU超快处理模式!")
print(f"流式块大小: {streaming_chunk_size:,}")
print(f"文本长度阈值: {processing_threshold}")
print(f"保存批次大小: {save_batch_size:,}")
print(f"使用GPU数量: {num_gpus}")

# 设置保存目录
ori_save_dir_ultra = os.path.join(ori_datasets_dir, local_name_ultra)
preprocessed_save_dir_ultra = os.path.join(preprocessed_datasets_dir, local_name_ultra)
os.makedirs(ori_save_dir_ultra, exist_ok=True)
os.makedirs(preprocessed_save_dir_ultra, exist_ok=True)

In [None]:
# 🚀🚀🚀🚀 多GPU流式处理主循环
try:
    print("🚀 开始多GPU超快流式处理...")
    
    # 使用streaming模式加载数据集
    dataset_stream = datasets.load_dataset("teven/c4_15M", cache_dir=ori_save_dir_ultra, streaming=True)
    train_stream = dataset_stream["train"]
    
    # 初始化计数器和缓存
    total_processed = 0
    total_saved_parts = 0
    chunk_buffer = []
    processed_buffer = []
    
    # 性能统计
    start_time = time.time()
    last_report_time = start_time
    
    print("🔥 开始流式处理...")
    
    # 流式处理数据
    for i, item in enumerate(tqdm(train_stream, desc="🚀多GPU超快处理")):
        chunk_buffer.append(item["text"])
        
        # 当缓存达到chunk大小时，进行处理
        if len(chunk_buffer) >= streaming_chunk_size:
            # 将数据分成4个子chunk，每个GPU处理一个
            sub_chunk_size = len(chunk_buffer) // num_gpus
            chunks = [
                chunk_buffer[j:j+sub_chunk_size] 
                for j in range(0, len(chunk_buffer), sub_chunk_size)
            ]
            
            # 如果有剩余数据，添加到最后一个chunk
            if len(chunks) > num_gpus:
                chunks[num_gpus-1].extend(chunks[num_gpus])
                chunks = chunks[:num_gpus]
            
            # 多GPU并行处理
            chunk_start_time = time.time()
            processed_paragraphs = process_with_multi_gpu(chunks, processing_threshold, num_gpus)
            chunk_process_time = time.time() - chunk_start_time
            
            # 添加到缓存
            processed_buffer.extend(processed_paragraphs)
            
            # 如果处理后的段落足够多，就保存
            if len(processed_buffer) >= save_batch_size:
                save_paragraphs_incrementally(
                    processed_buffer, 
                    local_name_ultra, 
                    preprocessed_save_dir_ultra,
                    batch_size=save_batch_size,
                    start_part=total_saved_parts
                )
                total_saved_parts += (len(processed_buffer) + save_batch_size - 1) // save_batch_size
                processed_buffer.clear()
            
            total_processed += len(chunk_buffer)
            chunk_buffer.clear()
            
            # 内存清理
            import gc
            gc.collect()
            
            # 性能报告
            current_time = time.time()
            if current_time - last_report_time >= 30:  # 每30秒报告一次
                elapsed_time = current_time - start_time
                avg_speed = total_processed / elapsed_time
                chunk_speed = streaming_chunk_size / chunk_process_time
                
                print(f"\n📊 性能报告:")
                print(f"  已处理: {total_processed:,} 个原始样本")
                print(f"  已保存: {total_saved_parts} 个parquet文件")
                print(f"  平均速度: {avg_speed:.0f} 样本/秒")
                print(f"  当前块速度: {chunk_speed:.0f} 样本/秒")
                print(f"  预计4GPU加速比: {chunk_speed/avg_speed:.1f}x")
                
                last_report_time = current_time
    
    # 处理剩余的数据
    if chunk_buffer:
        # 处理剩余chunk
        sub_chunk_size = max(1, len(chunk_buffer) // num_gpus)
        chunks = [
            chunk_buffer[j:j+sub_chunk_size] 
            for j in range(0, len(chunk_buffer), sub_chunk_size)
        ]
        
        processed_paragraphs = process_with_multi_gpu(chunks, processing_threshold, min(num_gpus, len(chunks)))
        processed_buffer.extend(processed_paragraphs)
    
    # 保存剩余数据
    if processed_buffer:
        save_paragraphs_incrementally(
            processed_buffer, 
            local_name_ultra, 
            preprocessed_save_dir_ultra,
            batch_size=save_batch_size,
            start_part=total_saved_parts
        )
    
    # 最终统计
    total_time = time.time() - start_time
    avg_speed = total_processed / total_time
    
    print(f"\\n🎉🎉🎉🎉 多GPU超快处理完成!")
    print(f"📊 最终统计:")
    print(f"  总处理时间: {total_time:.2f} 秒 ({total_time/60:.1f} 分钟)")
    print(f"  总处理样本: {total_processed:,} 个")
    print(f"  平均处理速度: {avg_speed:.0f} 样本/秒")
    print(f"  保存的parquet文件数量: {total_saved_parts}")
    print(f"  🚀🚀🚀🚀 4GPU加速效果显著!")
    
except Exception as e:
    print(f"❌ 多GPU处理过程中出现错误: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# 🔥🔥🔥🔥 异步多GPU极速版本（实验性）
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def async_multi_gpu_processing():
    """异步多GPU处理，最大化GPU利用率"""
    local_name_extreme = "c4_15m_extreme"
    
    # 极限参数设置
    streaming_chunk_size = 50000    # 更大的chunk
    processing_threshold = 512
    save_batch_size = 2000000      # 更大的保存批次
    num_gpus = 4
    
    print(f"🔥🔥🔥🔥 启动异步4GPU极速处理模式!")
    print(f"流式块大小: {streaming_chunk_size:,}")
    print(f"保存批次大小: {save_batch_size:,}")
    
    # 设置保存目录
    ori_save_dir_extreme = os.path.join(ori_datasets_dir, local_name_extreme)
    preprocessed_save_dir_extreme = os.path.join(preprocessed_datasets_dir, local_name_extreme)
    os.makedirs(ori_save_dir_extreme, exist_ok=True)
    os.makedirs(preprocessed_save_dir_extreme, exist_ok=True)
    
    try:
        # 加载数据集
        dataset_stream = datasets.load_dataset("teven/c4_15M", cache_dir=ori_save_dir_extreme, streaming=True)
        train_stream = dataset_stream["train"]
        
        # 创建线程池执行器
        executor = ThreadPoolExecutor(max_workers=num_gpus * 2)
        
        # 初始化
        total_processed = 0
        total_saved_parts = 0
        chunk_buffer = []
        processing_tasks = []
        
        start_time = time.time()
        
        async def process_chunk_async(chunk_data, chunk_id):
            """异步处理单个chunk"""
            loop = asyncio.get_event_loop()
            
            # 将chunk分割给多个GPU
            sub_chunks = [chunk_data[i::num_gpus] for i in range(num_gpus)]
            sub_chunks = [chunk for chunk in sub_chunks if chunk]  # 过滤空chunk
            
            # 并行处理
            tasks = []
            for i, sub_chunk in enumerate(sub_chunks):
                task = loop.run_in_executor(
                    executor, 
                    process_text_chunk_multi_gpu, 
                    sub_chunk, 
                    processing_threshold, 
                    i % num_gpus
                )
                tasks.append(task)
            
            # 等待所有子任务完成
            results = await asyncio.gather(*tasks)
            
            # 合并结果
            all_paragraphs = []
            for result in results:
                all_paragraphs.extend(result)
            
            return chunk_id, all_paragraphs
        
        print("🚀 开始异步流式处理...")
        
        chunk_id = 0
        processed_buffer = []
        
        # 流式处理
        for i, item in enumerate(tqdm(train_stream, desc="🔥异步多GPU极速处理")):
            chunk_buffer.append(item["text"])
            
            # 当达到chunk大小时，启动异步处理
            if len(chunk_buffer) >= streaming_chunk_size:
                # 启动异步处理任务
                task = asyncio.create_task(
                    process_chunk_async(chunk_buffer.copy(), chunk_id)
                )
                processing_tasks.append(task)
                
                chunk_id += 1
                total_processed += len(chunk_buffer)
                chunk_buffer.clear()
                
                # 检查完成的任务
                done_tasks = [task for task in processing_tasks if task.done()]
                for task in done_tasks:
                    try:
                        task_id, paragraphs = await task
                        processed_buffer.extend(paragraphs)
                        processing_tasks.remove(task)
                        
                        # 保存数据
                        if len(processed_buffer) >= save_batch_size:
                            save_paragraphs_incrementally(
                                processed_buffer, 
                                local_name_extreme, 
                                preprocessed_save_dir_extreme,
                                batch_size=save_batch_size,
                                start_part=total_saved_parts
                            )
                            total_saved_parts += (len(processed_buffer) + save_batch_size - 1) // save_batch_size
                            processed_buffer.clear()
                            
                    except Exception as e:
                        print(f"异步任务处理错误: {e}")
                        processing_tasks.remove(task)
                
                # 限制并发任务数量
                if len(processing_tasks) > num_gpus * 2:
                    # 等待一些任务完成
                    done, pending = await asyncio.wait(
                        processing_tasks, 
                        return_when=asyncio.FIRST_COMPLETED
                    )
                    
                    for task in done:
                        try:
                            task_id, paragraphs = task.result()
                            processed_buffer.extend(paragraphs)
                            processing_tasks.remove(task)
                        except Exception as e:
                            print(f"异步任务处理错误: {e}")
                            processing_tasks.remove(task)
                
                # 性能报告
                if total_processed % (streaming_chunk_size * 5) == 0:
                    elapsed_time = time.time() - start_time
                    speed = total_processed / elapsed_time
                    print(f"📊 已处理: {total_processed:,}, 速度: {speed:.0f} 样本/秒, 待处理任务: {len(processing_tasks)}")
        
        # 处理剩余chunk
        if chunk_buffer:
            task = asyncio.create_task(
                process_chunk_async(chunk_buffer, chunk_id)
            )
            processing_tasks.append(task)
        
        # 等待所有任务完成
        print("🔄 等待所有GPU任务完成...")
        for task in processing_tasks:
            try:
                task_id, paragraphs = await task
                processed_buffer.extend(paragraphs)
            except Exception as e:
                print(f"最终任务处理错误: {e}")
        
        # 保存剩余数据
        if processed_buffer:
            save_paragraphs_incrementally(
                processed_buffer, 
                local_name_extreme, 
                preprocessed_save_dir_extreme,
                batch_size=save_batch_size,
                start_part=total_saved_parts
            )
        
        # 最终统计
        total_time = time.time() - start_time
        avg_speed = total_processed / total_time
        
        print(f"\\n🎉🎉🎉🎉 异步多GPU极速处理完成!")
        print(f"📊 最终统计:")
        print(f"  总处理时间: {total_time:.2f} 秒 ({total_time/60:.1f} 分钟)")
        print(f"  总处理样本: {total_processed:,} 个")
        print(f"  平均处理速度: {avg_speed:.0f} 样本/秒")
        print(f"  🔥🔥🔥🔥 异步4GPU极速效果!")
        
        executor.shutdown(wait=True)
        
    except Exception as e:
        print(f"❌ 异步多GPU处理错误: {e}")
        import traceback
        traceback.print_exc()

# 运行异步处理（可选择运行）
# await async_multi_gpu_processing()

In [None]:
# 🔍 GPU利用率监控工具
def monitor_gpu_usage():
    """监控GPU使用情况"""
    try:
        import pynvml
        pynvml.nvmlInit()
        
        print("🔍 GPU使用情况监控:")
        for i in range(device_count):
            handle = pynvml.nvmlDeviceGetHandleByIndex(i)
            
            # 获取GPU信息
            name = pynvml.nvmlDeviceGetName(handle).decode('utf-8')
            
            # 获取内存信息
            meminfo = pynvml.nvmlDeviceGetMemoryInfo(handle)
            memory_used = meminfo.used / 1024**3
            memory_total = meminfo.total / 1024**3
            memory_percent = (meminfo.used / meminfo.total) * 100
            
            # 获取GPU利用率
            utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
            gpu_util = utilization.gpu
            
            # 获取温度
            temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
            
            print(f"  GPU {i} ({name}):")
            print(f"    💾 内存: {memory_used:.1f}GB / {memory_total:.1f}GB ({memory_percent:.1f}%)")
            print(f"    🔥 利用率: {gpu_util}%")
            print(f"    🌡️  温度: {temp}°C")
            
    except ImportError:
        print("⚠️  pynvml未安装，无法监控GPU。可运行: pip install pynvml")
    except Exception as e:
        print(f"⚠️  GPU监控出错: {e}")

# 运行GPU监控
monitor_gpu_usage()

In [None]:
# 🏁 性能测试对比（小规模测试）
def performance_benchmark(test_size: int = 10000):
    """性能基准测试"""
    print(f"🏁 开始性能基准测试 (测试规模: {test_size:,} 样本)")
    
    # 生成测试数据
    test_data = []
    for i in range(test_size):
        # 生成不同长度的测试文本
        if i % 3 == 0:
            text = "短文本测试 " * 10  # 短文本
        elif i % 3 == 1:
            text = "中等长度文本测试 " * 50  # 中等文本
        else:
            text = "长文本测试需要分割 " * 100 + "\\n" + "第二段 " * 100  # 长文本
        test_data.append(text)
    
    results = {}
    
    # 1. CPU处理测试
    print("🔄 测试CPU处理...")
    start_time = time.time()
    cpu_result = process_text_chunk_cpu(test_data, 512)
    cpu_time = time.time() - start_time
    results["CPU"] = {"time": cpu_time, "speed": len(test_data)/cpu_time, "output": len(cpu_result)}
    
    # 2. 单GPU处理测试
    if use_gpu:
        print("🔄 测试单GPU处理...")
        start_time = time.time()
        single_gpu_result = process_text_chunk_multi_gpu(test_data, 512, 0)
        single_gpu_time = time.time() - start_time
        results["单GPU"] = {"time": single_gpu_time, "speed": len(test_data)/single_gpu_time, "output": len(single_gpu_result)}
    
    # 3. 多GPU处理测试
    if use_gpu and device_count >= 4:
        print("🔄 测试4GPU处理...")
        # 将数据分成4个chunk
        chunk_size = len(test_data) // 4
        chunks = [test_data[i:i+chunk_size] for i in range(0, len(test_data), chunk_size)]
        if len(chunks) > 4:
            chunks[3].extend(chunks[4])
            chunks = chunks[:4]
        
        start_time = time.time()
        multi_gpu_result = process_with_multi_gpu(chunks, 512, 4)
        multi_gpu_time = time.time() - start_time
        results["4GPU"] = {"time": multi_gpu_time, "speed": len(test_data)/multi_gpu_time, "output": len(multi_gpu_result)}
    
    # 显示结果
    print(f"\\n📊 性能基准测试结果:")
    print(f"{'方法':<10} {'时间(秒)':<10} {'速度(样本/秒)':<15} {'输出段落数':<10} {'加速比':<8}")
    print("-" * 60)
    
    baseline_speed = results["CPU"]["speed"]
    for method, result in results.items():
        speedup = result["speed"] / baseline_speed
        print(f"{method:<10} {result['time']:<10.2f} {result['speed']:<15.0f} {result['output']:<10,} {speedup:<8.1f}x")
    
    return results

# 运行性能测试
benchmark_results = performance_benchmark(20000)

In [None]:
# GPU加速版本的c4_15m数据集处理
local_name_gpu: str = "c4_15m_gpu"

# 设置参数
streaming_chunk_size = 1_000_000  # 每次流式加载的样本数
processing_threshold = 512   # 文本长度阈值
save_batch_size = 2_000_000     # 保存批次大小

print(f"开始GPU加速处理 {local_name_gpu} 数据集")
print(f"流式块大小: {streaming_chunk_size:,}")
print(f"文本长度阈值: {processing_threshold}")
print(f"保存批次大小: {save_batch_size:,}")

# 设置保存目录
ori_save_dir_gpu = os.path.join(ori_datasets_dir, local_name_gpu)
preprocessed_save_dir_gpu = os.path.join(preprocessed_datasets_dir, local_name_gpu)
os.makedirs(ori_save_dir_gpu, exist_ok=True)
os.makedirs(preprocessed_save_dir_gpu, exist_ok=True)

In [None]:
# 流式加载和处理数据集
try:
    # 使用streaming模式加载数据集
    print("开始流式加载c4_15M数据集...")
    dataset_stream = datasets.load_dataset("teven/c4_15M", cache_dir=ori_save_dir_gpu, streaming=True)
    train_stream = dataset_stream["train"]
    
    # 初始化计数器
    total_processed = 0
    total_saved_parts = 0
    current_chunk = []
    
    print("开始流式处理...")
    
    # 流式处理数据
    for i, item in enumerate(tqdm(train_stream, desc="流式处理")):
        current_chunk.append(item["text"])
        
        # 当达到chunk大小时，处理这个chunk
        if len(current_chunk) >= streaming_chunk_size:
            # 处理当前chunk
            processed_paragraphs = process_text_chunk_gpu(current_chunk, processing_threshold)
            
            # 如果处理后的段落足够多，就保存
            if len(processed_paragraphs) >= save_batch_size:
                save_paragraphs_incrementally(
                    processed_paragraphs, 
                    local_name_gpu, 
                    preprocessed_save_dir_gpu,
                    batch_size=save_batch_size,
                    start_part=total_saved_parts
                )
                total_saved_parts += (len(processed_paragraphs) + save_batch_size - 1) // save_batch_size
                processed_paragraphs.clear()
            
            total_processed += len(current_chunk)
            current_chunk.clear()
            
            # 内存清理
            import gc
            gc.collect()
            
            # 显示进度
            if total_processed % (streaming_chunk_size * 10) == 0:
                print(f"已处理 {total_processed:,} 个原始样本，已保存 {total_saved_parts} 个parquet文件")
    
    # 处理剩余的数据
    if current_chunk:
        processed_paragraphs = process_text_chunk_gpu(current_chunk, processing_threshold)
        if processed_paragraphs:
            save_paragraphs_incrementally(
                processed_paragraphs, 
                local_name_gpu, 
                preprocessed_save_dir_gpu,
                batch_size=save_batch_size,
                start_part=total_saved_parts
            )
    
    print(f"\\n流式处理完成！")
    print(f"总共处理了 {total_processed:,} 个原始样本")
    print(f"保存的parquet文件数量: {total_saved_parts}")
    
except Exception as e:
    print(f"处理过程中出现错误: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Set local name
local_name: str = "c4_15m"

In [None]:
# Download original datasets
ori_save_dir: str = os.path.join(ori_datasets_dir, local_name)
os.makedirs(ori_save_dir, exist_ok=True)

c4_15m: Any = download_ori_dataset("teven/c4_15M", cache_dir=ori_save_dir)

In [None]:
# Show basic information
print(type(c4_15m), c4_15m.keys())
print(type(c4_15m["train"]), type(c4_15m["train"][0]))
print(c4_15m["train"][0].keys(), type(c4_15m["train"][0]["text"]))
print(c4_15m["train"][0]["text"])

In [None]:
# Get only text for training
c4_15m = c4_15m["train"]["text"]

In [None]:
# Memory optimization for large dataset
import gc
import psutil
import os

def print_memory_usage():
    """Print current memory usage."""
    process = psutil.Process(os.getpid())
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Current memory usage: {memory_mb:.1f} MB")

print_memory_usage()
gc.collect()  # Force garbage collection before processing
print(f"Processing c4_15m dataset with {len(c4_15m):,} samples")

In [None]:
# Show length distribution
show_sample_length_distribution(c4_15m, split_percent=0.05)

In [None]:
# Pre-process: split samples by paragraphs (using smaller batch size for c4_15m)
print(f"Starting preprocessing of {len(c4_15m):,} samples...")
c4_15m = split_sample_by_paragraphs(c4_15m, threshold_length=512, batch_size=500)

In [None]:
# Show length distribution again
show_sample_length_distribution(c4_15m, split_percent=0.05)

In [None]:
# Show some samples after pre-processing
print("\n\n".join(c4_15m[:5]))

In [None]:
# Save pre-processed data as Parquet files
batch_size: int = 10_000
save_dir: str = os.path.join(preprocessed_datasets_dir, local_name)
os.makedirs(save_dir, exist_ok=True)

save_as_parquet(local_name, c4_15m, batch_size=batch_size, save_dir=save_dir)