<a href="https://colab.research.google.com/github/DominicDin/teest/blob/main/%E2%80%9Cdzx_Dataset_for_LLM_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Take Home Test: Reformat a Public Dataset for LLM Training

### Objective

The goal of this task is to prepare public datasets for more effective use in training and fine-tuning Large Language Models (LLMs). You are required to reformat a specific subset of a public dataset into a structured, consistent format to facilitate its usability.

### Detailed Instructions

#### 1. Dataset Selection and Preparation

- **Dataset:** You are assigned the `Headline` subset of the [AdaptLLM/finance-tasks](https://huggingface.co/datasets/AdaptLLM/finance-tasks) dataset.

- **Task Description:** Each entry in the `input` column contains multiple "Yes" or "No" questions alongside their respective answers. Your task is to:

  - Develop a Python script to parse and separate each question and its answer from the entry.
  - Save each question-answer pair in a structured JSON format as follows:
    ```json
    {
      "id": "<unique_identifier>",
      "Question": "<question_text>",
      "Answer": "<answer_text>"
    }
    ```

  - You are encouraged to introduce additional attributes if needed to preserve the integrity and completeness of the information. Adding relevant tag information is strongly recommended.
- **Automation Requirement:** The task must be completed using Python. Manual editing or data manipulation is strictly prohibited. Your script should efficiently handle variations in data format within the column.

#### 2. Deliverables

- **Reformatted Dataset:** Provide the schema of the final format you adopted for saving the results.
- **Transformation Code:** Submit the complete code used for converting the dataset into the designated format.
- **Statistics:** Report the total number of question-answer pairs extracted from the dataset.
- **Performance Metrics:** Document the time taken to complete the dataset cleanup and transformation process.


In [5]:
#!/usr/bin/env python3
"""
金融数据集处理器 - AdaptLLM/finance-tasks 标题子集处理工具
==========================================================

这个脚本专门用于处理AdaptLLM/finance-tasks数据集的Headline子集，
它能够从文本中提取问答对，并将其重新格式化为结构化的JSON格式，
便于后续用于大语言模型的训练和微调。

主要功能:
1. 从HuggingFace加载指定数据集
2. 使用多种正则表达式模式提取问答对
3. 清理和标准化文本内容
4. 生成结构化的JSON输出
5. 提供详细的处理统计和性能监控

作者: 数据集处理助手
日期: 2025-05-24
版本: 1.0
"""

import json
import re
import time
import hashlib
from typing import List, Dict, Any, Tuple
from datetime import datetime
import pandas as pd
from datasets import load_dataset
import logging

# ============================================================================
# 1. 日志配置和参数设置模块
# ============================================================================

class DatasetConfig:
    """
    数据集配置类 - 集中管理所有处理参数

    这个类包含了整个处理流程中需要的所有配置参数，
    包括数据集名称、文件路径、正则表达式模式等。
    统一管理便于维护和修改。
    """

    # 数据集基本信息
    DATASET_NAME = "AdaptLLM/finance-tasks"  # HuggingFace上的数据集名称
    SUBSET_NAME = "headline"                 # 要处理的子集名称
    OUTPUT_FILE = "finance_headline_qa_pairs.json"  # 输出文件名
    LOG_FILE = "processing.log"             # 日志文件名

    # 问答提取的正则表达式模式
    # 这些模式用于识别文本中的问答对，支持多种常见格式
    QA_PATTERNS = [
        r'Q:\s*(.*?)\s*A:\s*(Yes|No)',      # Q: 问题 A: 答案 格式
        r'Question:\s*(.*?)\s*Answer:\s*(Yes|No)',  # Question: 问题 Answer: 答案 格式
        r'(\d+\.\s*.*?)\s*(Yes|No)',        # 数字编号 + 问题 + 答案 格式
        r'(.*?\?)\s*(Yes|No)',              # 以问号结尾的问题 + 答案 格式
    ]

def setup_logging() -> logging.Logger:
    """
    设置日志系统配置

    配置日志记录器，使其能够同时将日志信息输出到文件和控制台。
    这样既能实时查看处理进度，又能保存完整的处理记录。

    返回:
        logging.Logger: 配置好的日志记录器实例
    """
    logging.basicConfig(
        level=logging.INFO,  # 设置日志级别为INFO
        format='%(asctime)s - %(levelname)s - %(message)s',  # 日志格式
        handlers=[
            logging.FileHandler(DatasetConfig.LOG_FILE),  # 文件输出
            logging.StreamHandler()  # 控制台输出
        ]
    )
    return logging.getLogger(__name__)

# ============================================================================
# 2. 数据加载模块
# ============================================================================

class DatasetLoader:
    """
    数据集加载器 - 负责从HuggingFace加载和预处理数据集

    这个类封装了数据集加载的所有操作，包括：
    - 从HuggingFace datasets库加载指定数据集
    - 验证数据集结构的完整性
    - 转换为pandas DataFrame便于后续处理
    """

    def __init__(self, logger: logging.Logger):
        """
        初始化数据加载器

        参数:
            logger: 日志记录器实例
        """
        self.logger = logger

    def load_finance_dataset(self) -> pd.DataFrame:
        """
        加载AdaptLLM/finance-tasks数据集的headline子集

        这个方法使用HuggingFace的datasets库来加载指定的数据集，
        然后将其转换为pandas DataFrame格式，便于后续的数据处理操作。

        返回:
            pd.DataFrame: 加载后的数据集DataFrame

        异常:
            Exception: 当数据集加载失败时抛出异常
        """
        try:
            self.logger.info(f"开始加载数据集: {DatasetConfig.DATASET_NAME}")
            self.logger.info(f"目标子集: {DatasetConfig.SUBSET_NAME}")

            # 使用HuggingFace datasets库加载数据集
            # split='train' 表示加载训练集部分
            dataset = load_dataset(
                DatasetConfig.DATASET_NAME,
                DatasetConfig.SUBSET_NAME,
                split='train'
            )

            # 将HuggingFace Dataset对象转换为pandas DataFrame
            # 这样可以利用pandas强大的数据处理功能
            df = dataset.to_pandas()

            self.logger.info(f"数据集加载成功! 数据形状: {df.shape}")
            self.logger.info(f"数据列名: {list(df.columns)}")

            return df

        except Exception as e:
            self.logger.error(f"数据集加载失败: {str(e)}")
            raise

    def validate_dataset_structure(self, df: pd.DataFrame) -> bool:
        """
        验证数据集结构是否符合预期

        检查数据集是否包含必需的列，以及数据的基本完整性。
        这是数据处理前的重要验证步骤，确保后续处理不会出错。

        参数:
            df (pd.DataFrame): 要验证的数据集

        返回:
            bool: 验证通过返回True

        异常:
            ValueError: 当数据集结构不符合要求时抛出异常
        """
        # 定义必需的列名
        required_columns = ['input']

        # 检查是否包含所有必需的列
        if not all(col in df.columns for col in required_columns):
            missing_cols = [col for col in required_columns if col not in df.columns]
            raise ValueError(f"缺少必需的列: {missing_cols}")

        # 检查input列是否完全为空
        if df['input'].isnull().all():
            raise ValueError("input列完全为空，无法处理")

        self.logger.info("数据集结构验证通过")
        return True

# ============================================================================
# 3. 问答提取模块
# ============================================================================

class QuestionAnswerExtractor:
    """
    问答对提取器 - 核心处理模块

    这是整个系统的核心组件，负责从原始文本中提取问答对。
    主要功能包括：
    - 使用多种正则表达式模式匹配问答对
    - 清理和标准化文本内容
    - 验证提取结果的质量
    - 去除重复的问答对
    - 生成结构化的输出记录
    """

    def __init__(self, logger: logging.Logger):
        """
        初始化问答提取器

        参数:
            logger: 日志记录器实例
        """
        self.logger = logger
        # 统计信息字典，用于跟踪处理效果
        self.extraction_stats = {
            'total_entries': 0,        # 总处理条目数
            'successful_extractions': 0,  # 成功提取的条目数
            'failed_extractions': 0,   # 提取失败的条目数
            'total_qa_pairs': 0        # 总问答对数量
        }

    def extract_qa_pairs(self, text: str) -> List[Dict[str, str]]:
        """
        从给定文本中提取问答对

        这是核心提取方法，使用多种正则表达式模式来识别文本中的问答对。
        方法会尝试所有预定义的模式，确保最大程度地提取有效信息。

        参数:
            text (str): 包含问答对的输入文本

        返回:
            List[Dict[str, str]]: 提取到的问答对列表
        """
        # 处理空值情况
        if not text or pd.isna(text):
            return []

        qa_pairs = []
        text_cleaned = self._clean_text(text)  # 清理文本

        # 尝试使用每个正则表达式模式进行匹配
        for pattern in DatasetConfig.QA_PATTERNS:
            matches = re.findall(pattern, text_cleaned, re.IGNORECASE | re.DOTALL)

            # 处理每个匹配结果
            for match in matches:
                if len(match) == 2:  # 确保匹配到问题和答案两部分
                    question, answer = match
                    question = self._clean_question(question.strip())
                    answer = answer.strip()

                    # 验证问答对的有效性
                    if self._validate_qa_pair(question, answer):
                        qa_pairs.append({
                            'question': question,
                            'answer': answer
                        })

        # 去除重复的问答对，但保持原有顺序
        qa_pairs = self._remove_duplicate_pairs(qa_pairs)

        return qa_pairs

    def _clean_text(self, text: str) -> str:
        """
        清理和标准化输入文本

        对原始文本进行预处理，包括：
        - 规范化空白字符和换行符
        - 统一问答标记格式
        - 去除多余的符号

        参数:
            text (str): 原始输入文本

        返回:
            str: 清理后的文本
        """
        # 规范化空白字符，将多个连续空格替换为单个空格
        text = re.sub(r'\s+', ' ', text.strip())

        # 标准化问题标记格式
        text = re.sub(r'Q\d*[\.:]\s*', 'Q: ', text)        # Q1: -> Q:
        text = re.sub(r'Question\d*[\.:]\s*', 'Question: ', text)  # Question1: -> Question:
        text = re.sub(r'A\d*[\.:]\s*', 'A: ', text)        # A1: -> A:
        text = re.sub(r'Answer\d*[\.:]\s*', 'Answer: ', text)      # Answer1: -> Answer:

        return text

    def _clean_question(self, question: str) -> str:
        """
        清理和格式化问题文本

        对提取出的问题文本进行进一步处理：
        - 去除编号和项目符号
        - 为疑问句添加问号（如果缺少）
        - 统一格式

        参数:
            question (str): 原始问题文本

        返回:
            str: 清理后的问题文本
        """
        # 去除开头的数字编号和项目符号
        question = re.sub(r'^\d+[\.\)]\s*', '', question)  # 1. 或 1)
        question = re.sub(r'^[•\-\*]\s*', '', question)    # • - *

        # 如果是疑问句但没有问号，则添加问号
        # 通过检查是否包含疑问词来判断
        question_words = ['is', 'are', 'was', 'were', 'do', 'does', 'did',
                         'can', 'could', 'will', 'would', 'should', 'has', 'have']

        if (not question.endswith('?') and
            any(word in question.lower() for word in question_words)):
            question += '?'

        return question.strip()

    def _validate_qa_pair(self, question: str, answer: str) -> bool:
        """
        验证提取的问答对是否有效

        对提取的问答对进行质量检查，包括：
        - 检查长度是否合理
        - 验证答案是否为Yes/No
        - 避免无意义的问题

        参数:
            question (str): 问题文本
            answer (str): 答案文本

        返回:
            bool: 有效返回True，否则返回False
        """
        # 检查最小长度要求
        if len(question.strip()) < 5 or len(answer.strip()) < 2:
            return False

        # 检查答案是否为Yes或No
        if answer.lower() not in ['yes', 'no']:
            return False

        # 避免无意义或重复的问题
        if question.lower() in ['yes', 'no', 'question', 'answer']:
            return False

        return True

    def _remove_duplicate_pairs(self, qa_pairs: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        去除重复的问答对

        使用哈希值来识别重复的问答对，确保结果中不包含重复内容，
        同时保持原有的顺序。

        参数:
            qa_pairs (List[Dict[str, str]]): 问答对列表

        返回:
            List[Dict[str, str]]: 去重后的问答对列表
        """
        seen = set()  # 用于存储已见过的哈希值
        unique_pairs = []

        for pair in qa_pairs:
            # 为问答对创建唯一的哈希标识
            pair_content = f"{pair['question'].lower()}-{pair['answer'].lower()}"
            pair_hash = hashlib.md5(pair_content.encode()).hexdigest()

            # 如果是新的问答对，则添加到结果中
            if pair_hash not in seen:
                seen.add(pair_hash)
                unique_pairs.append(pair)

        return unique_pairs

    def process_dataset(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
        """
        处理整个数据集，提取所有问答对

        这是数据集级别的处理方法，遍历数据集中的每一行，
        提取其中的问答对，并生成结构化的记录。

        参数:
            df (pd.DataFrame): 输入数据集

        返回:
            List[Dict[str, Any]]: 结构化的问答记录列表
        """
        all_qa_records = []
        self.extraction_stats['total_entries'] = len(df)

        self.logger.info(f"开始处理 {len(df)} 条数据集条目...")

        # 遍历数据集的每一行
        for idx, row in df.iterrows():
            try:
                input_text = row['input']
                qa_pairs = self.extract_qa_pairs(input_text)

                if qa_pairs:
                    self.extraction_stats['successful_extractions'] += 1

                    # 为每个问答对创建结构化记录
                    for qa_idx, qa_pair in enumerate(qa_pairs):
                        record = self._create_qa_record(
                            original_idx=idx,
                            qa_idx=qa_idx,
                            question=qa_pair['question'],
                            answer=qa_pair['answer'],
                            source_row=row
                        )
                        all_qa_records.append(record)
                        self.extraction_stats['total_qa_pairs'] += 1
                else:
                    self.extraction_stats['failed_extractions'] += 1
                    self.logger.debug(f"在条目 {idx} 中未找到问答对")

            except Exception as e:
                self.extraction_stats['failed_extractions'] += 1
                self.logger.error(f"处理条目 {idx} 时出错: {str(e)}")

        self.logger.info(f"提取完成，共找到 {len(all_qa_records)} 个问答对")
        return all_qa_records

    def _create_qa_record(self, original_idx: int, qa_idx: int, question: str,
                         answer: str, source_row: pd.Series) -> Dict[str, Any]:
        """
        创建结构化的问答记录

        将提取的问答对转换为标准化的JSON记录格式，
        包含问题、答案以及相关的元数据信息。

        参数:
            original_idx (int): 原始数据集中的索引
            qa_idx (int): 该条目中问答对的索引
            question (str): 问题文本
            answer (str): 答案文本
            source_row (pd.Series): 原始数据行

        返回:
            Dict[str, Any]: 结构化的问答记录
        """
        # 生成唯一标识符
        unique_id = f"finance_headline_{original_idx:06d}_{qa_idx:03d}"

        # 创建基础记录结构
        record = {
            "id": unique_id,
            "Question": question,
            "Answer": answer,
            "metadata": {
                "source_dataset": DatasetConfig.DATASET_NAME,
                "subset": DatasetConfig.SUBSET_NAME,
                "original_index": original_idx,
                "qa_pair_index": qa_idx,
                "extraction_timestamp": datetime.now().isoformat(),
                "tags": self._generate_tags(question, answer)
            }
        }

        # 添加源数据的其他字段（如果存在）
        for col in source_row.index:
            if col not in ['input'] and pd.notna(source_row[col]):
                record["metadata"][f"source_{col}"] = source_row[col]

        return record

    def _generate_tags(self, question: str, answer: str) -> List[str]:
        """
        为问答对生成相关标签

        基于问题内容和答案，自动生成相关的标签，
        这些标签有助于后续的分类和检索。

        参数:
            question (str): 问题文本
            answer (str): 答案文本

        返回:
            List[str]: 相关标签列表
        """
        # 基础标签
        tags = ["finance", "headline", "binary_classification"]

        # 基于答案的标签
        tags.append(f"answer_{answer.lower()}")

        # 基于内容的标签分析
        question_lower = question.lower()

        # 价格相关
        if any(word in question_lower for word in ['price', 'cost', 'dollar', '$', 'money']):
            tags.append("pricing")

        # 公司相关
        if any(word in question_lower for word in ['company', 'corporation', 'firm', 'business']):
            tags.append("corporate")

        # 市场相关
        if any(word in question_lower for word in ['market', 'stock', 'share', 'trading']):
            tags.append("market")

        # 财务表现相关
        if any(word in question_lower for word in ['revenue', 'profit', 'earnings', 'income']):
            tags.append("financial_performance")

        # 趋势分析相关
        if any(word in question_lower for word in ['increase', 'decrease', 'rise', 'fall', 'growth']):
            tags.append("trend_analysis")

        return tags

    def get_extraction_statistics(self) -> Dict[str, Any]:
        """
        获取提取过程的详细统计信息

        计算并返回提取过程的各种统计指标，包括成功率、
        平均问答对数量等，用于评估处理效果。

        返回:
            Dict[str, Any]: 提取统计信息
        """
        # 计算成功率
        success_rate = (self.extraction_stats['successful_extractions'] /
                       max(1, self.extraction_stats['total_entries'])) * 100

        # 计算平均问答对数量
        avg_pairs_per_entry = (self.extraction_stats['total_qa_pairs'] /
                              max(1, self.extraction_stats['successful_extractions']))

        return {
            **self.extraction_stats,
            'success_rate_percent': round(success_rate, 2),
            'average_pairs_per_successful_entry': round(avg_pairs_per_entry, 2)
        }

# ============================================================================
# 4. 输出和序列化模块
# ============================================================================

class DatasetSerializer:
    """
    数据集序列化器 - 负责保存和格式化处理结果

    这个类处理所有与数据输出相关的操作：
    - 将问答记录保存为JSON文件
    - 生成数据格式的文档说明
    - 确保输出格式的一致性和可读性
    """

    def __init__(self, logger: logging.Logger):
        """
        初始化序列化器

        参数:
            logger: 日志记录器实例
        """
        self.logger = logger

    def save_qa_records(self, qa_records: List[Dict[str, Any]],
                       output_file: str = None) -> str:
        """
        保存问答记录到JSON文件

        将处理好的问答记录以JSON格式保存到文件，
        使用UTF-8编码确保中文等特殊字符正确显示。

        参数:
            qa_records (List[Dict[str, Any]]): 问答记录列表
            output_file (str, optional): 输出文件路径

        返回:
            str: 保存文件的路径
        """
        if output_file is None:
            output_file = DatasetConfig.OUTPUT_FILE

        try:
            # 以UTF-8编码保存JSON文件，确保格式美观
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(qa_records, f, indent=2, ensure_ascii=False)

            self.logger.info(f"成功保存 {len(qa_records)} 条记录到 {output_file}")
            return output_file

        except Exception as e:
            self.logger.error(f"保存记录失败: {str(e)}")
            raise

    def generate_schema_documentation(self, qa_records: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        生成输出格式的文档说明

        创建详细的数据格式文档，包括字段说明、数据类型、
        示例记录等，便于理解和使用输出数据。

        参数:
            qa_records (List[Dict[str, Any]]): 样本记录

        返回:
            Dict[str, Any]: 格式文档
        """
        schema = {
            "format_version": "1.0",
            "description": "从AdaptLLM/finance-tasks headline子集提取的结构化问答对",
            "record_structure": {
                "id": {
                    "type": "string",
                    "description": "问答对的唯一标识符",
                    "pattern": "finance_headline_XXXXXX_XXX",
                    "example": "finance_headline_000001_001"
                },
                "Question": {
                    "type": "string",
                    "description": "提取的问题文本"
                },
                "Answer": {
                    "type": "string",
                    "description": "问题的答案（Yes或No）",
                    "enum": ["Yes", "No"]
                },
                "metadata": {
                    "type": "object",
                    "description": "记录的附加元数据信息",
                    "properties": {
                        "source_dataset": {
                            "type": "string",
                            "description": "源数据集名称"
                        },
                        "subset": {
                            "type": "string",
                            "description": "数据集子集名称"
                        },
                        "original_index": {
                            "type": "integer",
                            "description": "在原始数据集中的索引"
                        },
                        "qa_pair_index": {
                            "type": "integer",
                            "description": "在该条目中的问答对索引"
                        },
                        "extraction_timestamp": {
                            "type": "string",
                            "format": "ISO8601",
                            "description": "提取时间戳"
                        },
                        "tags": {
                            "type": "array",
                            "items": {"type": "string"},
                            "description": "自动生成的内容标签"
                        }
                    }
                }
            },
            "sample_record": qa_records[0] if qa_records else None,
            "total_records": len(qa_records),
            "usage_notes": [
                "每个记录包含一个问题和对应的Yes/No答案",
                "id字段确保每个问答对的唯一性",
                "metadata包含丰富的上下文信息",
                "tags字段便于分类和检索",
                "所有文本均经过清理和标准化处理"
            ]
        }

        return schema

# ============================================================================
# 5. 性能监控模块
# ============================================================================

class PerformanceMonitor:
    """
    性能监控器 - 跟踪和报告处理性能

    监控整个处理流程的性能指标，包括：
    - 总处理时间
    - 各个阶段的耗时
    - 处理速度统计
    """

    def __init__(self, logger: logging.Logger):
        """
        初始化性能监控器

        参数:
            logger: 日志记录器实例
        """
        self.logger = logger
        self.start_time = None      # 开始时间
        self.end_time = None        # 结束时间
        self.checkpoints = {}       # 检查点时间记录

    def start_monitoring(self):
        """开始性能监控"""
        self.start_time = time.time()
        self.logger.info("开始性能监控")

    def checkpoint(self, name: str):
        """
        记录性能检查点

        在处理过程的关键节点记录时间，
        用于分析各个阶段的耗时情况。

        参数:
            name (str): 检查点名称
        """
        if self.start_time is None:
            self.start_monitoring()

        elapsed_time = time.time() - self.start_time
        self.checkpoints[name] = elapsed_time
        self.logger.info(f"检查点 '{name}': {elapsed_time:.2f} 秒")

    def finish_monitoring(self) -> Dict[str, float]:
        """
        结束监控并返回性能指标

        计算总处理时间和各阶段用时，
        生成完整的性能报告。

        返回:
            Dict[str, float]: 性能指标字典
        """
        self.end_time = time.time()
        total_time = self.end_time - self.start_time

        # 构建性能指标字典
        metrics = {
            'total_processing_time_seconds': round(total_time, 2),
            'total_processing_time_minutes': round(total_time / 60, 2),
            # 添加各检查点的用时
            **{f"{name}_seconds": round(time_val, 2)
               for name, time_val in self.checkpoints.items()}
        }

        self.logger.info(f"总处理时间: {metrics['total_processing_time_seconds']} 秒")
        return metrics

# ============================================================================
# 6. 主要协调模块
# ============================================================================

class FinanceDatasetProcessor:
    """
    金融数据集处理器 - 主要协调类

    这是整个系统的核心协调器，负责：
    - 协调各个模块的工作
    - 控制整个处理流程
    - 处理异常和错误
    - 生成最终报告

    使用示例:
        processor = FinanceDatasetProcessor()
        results = processor.process_dataset()
    """

    def __init__(self):
        """
        初始化处理器，创建所有必要的组件实例
        """
        self.logger = setup_logging()                           # 日志记录器
        self.loader = DatasetLoader(self.logger)                # 数据加载器
        self.extractor = QuestionAnswerExtractor(self.logger)   # 问答提取器
        self.serializer = DatasetSerializer(self.logger)       # 数据序列化器
        self.monitor = PerformanceMonitor(self.logger)          # 性能监控器

    def process_dataset(self) -> Dict[str, Any]:
        """
        执行完整的数据集处理流程

        这是整个系统的主要入口方法，按顺序执行以下步骤：
        1. 加载数据集
        2. 提取问答对
        3. 保存结果
        4. 生成报告

        返回:
            Dict[str, Any]: 包含处理结果和统计信息的字典
        """
        self.logger.info("开始执行金融数据集处理流程")
        self.monitor.start_monitoring()

        try:
            # 步骤1: 加载数据集
            self.logger.info("步骤1: 正在加载数据集...")
            df = self.loader.load_finance_dataset()
            self.loader.validate_dataset_structure(df)
            self.monitor.checkpoint("数据集加载完成")

            # 步骤2: 提取问答对
            self.logger.info("步骤2: 正在提取问答对...")
            qa_records = self.extractor.process_dataset(df)
            self.monitor.checkpoint("问答提取完成")

            # 步骤3: 保存结果
            self.logger.info("步骤3: 正在保存处理结果...")
            output_file = self.serializer.save_qa_records(qa_records)
            self.monitor.checkpoint("数据保存完成")

            # 步骤4: 生成文档和统计信息
            self.logger.info("步骤4: 正在生成文档和统计...")
            schema_doc = self.serializer.generate_schema_documentation(qa_records)
            extraction_stats = self.extractor.get_extraction_statistics()
            performance_metrics = self.monitor.finish_monitoring()

            # 编译最终结果
            results = {
                "processing_summary": {
                    "status": "已完成",
                    "output_file": output_file,
                    "total_qa_pairs": len(qa_records)
                },
                "extraction_statistics": extraction_stats,
                "performance_metrics": performance_metrics,
                "schema_documentation": schema_doc
            }

            self.logger.info("数据集处理成功完成!")
            self._print_summary(results)

            return results

        except Exception as e:
            self.logger.error(f"数据集处理失败: {str(e)}")
            raise

    def _print_summary(self, results: Dict[str, Any]):
        """
        打印格式化的处理结果摘要

        在控制台输出美观的处理结果摘要，包括：
        - 处理状态
        - 统计信息
        - 性能指标

        参数:
            results (Dict[str, Any]): 处理结果字典
        """
        print("\n" + "="*80)
        print("🏦 金融数据集处理结果摘要")
        print("="*80)

        # 处理摘要
        summary = results["processing_summary"]
        print(f"📊 处理状态: {summary['status']}")
        print(f"📁 输出文件: {summary['output_file']}")
        print(f"💬 问答对总数: {summary['total_qa_pairs']:,}")

        # 提取统计
        stats = results["extraction_statistics"]
        print(f"\n📈 提取统计信息:")
        print(f"  • 处理条目总数: {stats['total_entries']:,}")
        print(f"  • 成功提取条目: {stats['successful_extractions']:,}")
        print(f"  • 提取失败条目: {stats['failed_extractions']:,}")
        print(f"  • 成功率: {stats['success_rate_percent']:.1f}%")
        print(f"  • 平均每条目问答对数: {stats['average_pairs_per_successful_entry']:.1f}")

        # 性能指标
        perf = results["performance_metrics"]
        print(f"\n⏱️  性能指标:")
        print(f"  • 总处理时间: {perf['total_processing_time_seconds']:.2f} 秒")
        print(f"  • 总处理时间: {perf['total_processing_time_minutes']:.2f} 分钟")

        # 显示各阶段用时
        if '数据集加载完成_seconds' in perf:
            print(f"  • 数据加载用时: {perf['数据集加载完成_seconds']:.2f} 秒")
        if '问答提取完成_seconds' in perf:
            extract_time = perf['问答提取完成_seconds'] - perf.get('数据集加载完成_seconds', 0)
            print(f"  • 问答提取用时: {extract_time:.2f} 秒")
        if '数据保存完成_seconds' in perf:
            save_time = perf['数据保存完成_seconds'] - perf.get('问答提取完成_seconds', 0)
            print(f"  • 数据保存用时: {save_time:.2f} 秒")

        print("="*80)
        print("✅ 处理完成! 可以查看输出文件获取详细结果。")
        print("="*80 + "\n")

# ============================================================================
# 7. 主运行函数
# ============================================================================

def main():
    """
    主运行函数 - 程序入口点

    这是整个程序的入口函数，负责：
    1. 初始化处理器
    2. 执行处理流程
    3. 保存详细结果
    4. 处理异常情况

    可以直接运行脚本或导入后调用此函数。

    返回:
        Dict[str, Any]: 处理结果，如果处理失败则返回None
    """
    print("🚀 开始启动金融数据集处理器...")
    print("📋 任务: 处理AdaptLLM/finance-tasks数据集的headline子集")
    print("🎯 目标: 提取问答对并转换为结构化JSON格式\n")

    try:
        # 初始化并运行处理器
        processor = FinanceDatasetProcessor()
        results = processor.process_dataset()

        # 保存详细的处理结果供后续参考
        results_file = "processing_results.json"
        with open(results_file, "w", encoding="utf-8") as f:
            json.dump(results, f, indent=2, ensure_ascii=False)

        print(f"📄 详细处理结果已保存到: {results_file}")
        print("🎉 所有任务已成功完成!")

        return results

    except KeyboardInterrupt:
        print("\n⚠️  用户中断了处理过程")
        return None

    except Exception as e:
        print(f"\n❌ 处理过程中发生错误: {str(e)}")
        print("💡 请检查日志文件获取详细错误信息")
        raise

# ============================================================================
# 8. 辅助函数 - 单独测试各模块
# ============================================================================

def test_data_loading():
    """
    测试数据加载功能
    用于单独测试数据集加载是否正常工作
    """
    print("🧪 测试数据加载模块...")
    logger = setup_logging()
    loader = DatasetLoader(logger)

    try:
        df = loader.load_finance_dataset()
        loader.validate_dataset_structure(df)
        print(f"✅ 数据加载测试成功! 数据集形状: {df.shape}")
        return df
    except Exception as e:
        print(f"❌ 数据加载测试失败: {str(e)}")
        return None

def test_qa_extraction(sample_text: str = None):
    """
    测试问答提取功能
    用于单独测试问答对提取是否正常工作

    参数:
        sample_text (str): 测试用的样本文本
    """
    print("🧪 测试问答提取模块...")
    logger = setup_logging()
    extractor = QuestionAnswerExtractor(logger)

    # 使用示例文本进行测试
    if sample_text is None:
        sample_text = """
        Q: Is the company's revenue over $1 million? A: Yes
        Question: Did the stock price increase? Answer: No
        1. Was there a merger announcement? Yes
        Does the earnings report show profit? No
        """

    try:
        qa_pairs = extractor.extract_qa_pairs(sample_text)
        print(f"✅ 问答提取测试成功! 提取到 {len(qa_pairs)} 个问答对:")
        for i, pair in enumerate(qa_pairs, 1):
            print(f"  {i}. Q: {pair['question']}")
            print(f"     A: {pair['answer']}")
        return qa_pairs
    except Exception as e:
        print(f"❌ 问答提取测试失败: {str(e)}")
        return None

def show_usage_examples():
    """
    显示使用示例和说明
    """
    print("""
🔧 使用方法和示例:

1. 直接运行脚本:
   python finance_dataset_processor.py

2. 在Python代码中使用:
   from finance_dataset_processor import main, FinanceDatasetProcessor

   # 方法1: 使用main函数
   results = main()

   # 方法2: 直接使用处理器类
   processor = FinanceDatasetProcessor()
   results = processor.process_dataset()

3. 测试特定模块:
   from finance_dataset_processor import test_data_loading, test_qa_extraction

   # 测试数据加载
   df = test_data_loading()

   # 测试问答提取
   qa_pairs = test_qa_extraction()

4. 输出文件说明:
   - finance_headline_qa_pairs.json: 主要的问答对数据
   - processing_results.json: 详细的处理结果和统计
   - processing.log: 完整的处理日志

📊 输出JSON格式示例:
{
  "id": "finance_headline_000001_001",
  "Question": "Does the company's revenue exceed $1 billion?",
  "Answer": "Yes",
  "metadata": {
    "source_dataset": "AdaptLLM/finance-tasks",
    "subset": "headline",
    "original_index": 1,
    "qa_pair_index": 1,
    "extraction_timestamp": "2025-05-24T10:30:45.123456",
    "tags": ["finance", "headline", "binary_classification", "answer_yes"]
  }
}
""")

# ============================================================================
# 9. 程序入口点
# ============================================================================

if __name__ == "__main__":
    # 检查命令行参数
    import sys


    main()

🚀 开始启动金融数据集处理器...
📋 任务: 处理AdaptLLM/finance-tasks数据集的headline子集
🎯 目标: 提取问答对并转换为结构化JSON格式



The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/8.23k [00:00<?, ?B/s]

ERROR:__main__:数据集加载失败: Invalid pattern: '**' can only be an entire path component
ERROR:__main__:数据集处理失败: Invalid pattern: '**' can only be an entire path component



❌ 处理过程中发生错误: Invalid pattern: '**' can only be an entire path component
💡 请检查日志文件获取详细错误信息


ValueError: Invalid pattern: '**' can only be an entire path component