In [None]:
#!/usr/bin/env python3
"""
ORCID 女性姓名变化检测程序 - 修复版本
添加了对难以处理的XML文件的错误处理和复制功能
解决了多进程返回结果时的 Pickling RecursionError 问题
"""

import os
import csv
import logging
import multiprocessing as mp
from multiprocessing import Pool, Manager, Lock
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import signal
import sys
from functools import partial
import shutil
import traceback

from bs4 import BeautifulSoup
import gender_guesser.detector as gender
import Levenshtein
from datetime import datetime


# 增加递归限制
sys.setrecursionlimit(15000)

# 全局配置
CONFIG = {
    'base_path': '/hy-tmp/orcid/ORCID_2024_10_summaries',
    'output_file': '/hy-tmp/female_name_changes.csv',
    'log_file': '/hy-tmp/orcid_processing.log',
    'problem_files_dir': '/hy-tmp/problem_xml_files',  # 问题文件存储目录
    'max_workers': 2,  
    'chunk_size': 10000,  # 每个进程处理的文件数量
    'batch_size': 10,  # 批量写入CSV的记录数
    'max_file_size': 10 * 1024 * 1024,  # 10MB 文件大小限制
}

def setup_problem_files_dir():
    """创建问题文件存储目录"""
    problem_dir = Path(CONFIG['problem_files_dir'])
    problem_dir.mkdir(parents=True, exist_ok=True)
    
    # 创建子目录
    (problem_dir / 'recursion_error').mkdir(exist_ok=True)
    (problem_dir / 'parse_error').mkdir(exist_ok=True)
    (problem_dir / 'large_files').mkdir(exist_ok=True)
    (problem_dir / 'other_errors').mkdir(exist_ok=True)
    
    return problem_dir

def copy_problem_file(file_path: Path, error_type: str, error_msg: str = ""):
    """复制问题文件到指定目录"""
    try:
        problem_dir = Path(CONFIG['problem_files_dir'])
        error_dir = problem_dir / error_type
        
        # 创建唯一的文件名（避免重名）
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        new_filename = f"{file_path.stem}_{timestamp}{file_path.suffix}"
        dest_path = error_dir / new_filename
        
        shutil.copy2(file_path, dest_path)
        
        # 记录错误信息
        log_file = error_dir / "error_log.txt"
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write(f"{datetime.now().isoformat()}\t{file_path}\t{new_filename}\t{error_msg}\n")
            
        return True
    except Exception as e:
        print(f"无法复制问题文件 {file_path}: {e}")
        return False

def safe_parse_xml(file_path: Path) -> Optional[BeautifulSoup]:
    """安全的XML解析函数，包含各种错误处理"""
    try:
        # 检查文件大小
        file_size = file_path.stat().st_size
        if file_size > CONFIG['max_file_size']:
            copy_problem_file(file_path, 'large_files', f"File size: {file_size} bytes")
            return None
        
        # 尝试解析XML
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            
            # 检查内容长度
            if len(content) > 10_000_000:  # 10MB文本内容限制
                copy_problem_file(file_path, 'large_files', f"Content length: {len(content)} chars")
                return None
            
            soup = BeautifulSoup(content, 'lxml-xml')
            return soup
            
    except RecursionError as e:
        copy_problem_file(file_path, 'recursion_error', f"RecursionError: {str(e)}")
        return None
    except Exception as e:
        error_msg = f"{type(e).__name__}: {str(e)}"
        if "recursion" in str(e).lower():
            copy_problem_file(file_path, 'recursion_error', error_msg)
        elif any(keyword in str(e).lower() for keyword in ['parse', 'xml', 'malformed']):
            copy_problem_file(file_path, 'parse_error', error_msg)
        else:
            copy_problem_file(file_path, 'other_errors', error_msg)
        return None

def extract_name_info(soup: BeautifulSoup, gender_detector) -> Optional[Dict]:
    """从BeautifulSoup对象中提取姓名信息和性别"""
    try:
        person_name_tag = soup.find("person:name")
        if not person_name_tag:
            return None

        # 提取person:name基本信息
        person_created_date_tag = person_name_tag.find("common:created-date")
        given_names_tag = person_name_tag.find("personal-details:given-names")
        family_name_tag = person_name_tag.find("personal-details:family-name")

        # <-- FIX: Convert all extracted text to plain Python strings
        person_created_date = person_created_date_tag.get_text(strip=True) if person_created_date_tag else None
        given_name = given_names_tag.get_text(strip=True) if given_names_tag else None
        family_name = family_name_tag.get_text(strip=True) if family_name_tag else None

        # 性别识别 - 只处理女性
        if not given_name:
            return None
        
        gender_result = gender_detector.get_gender(given_name.split()[0])
        if gender_result not in ['female', 'mostly_female']:
            return None 

        # 构建基准姓名 (given_name + family_name)
        if not family_name:
            return None
        base_name = f"{given_name} {family_name}".strip()

        # 提取other-name信息 - 可能有多个
        other_names_data = []
        other_name_tags = soup.find_all("other-name:other-name")
        
        for other_name_tag in other_name_tags:
            other_created_date_tag = other_name_tag.find("common:created-date")
            other_name_content_tag = other_name_tag.find("other-name:content")
            
            # <-- FIX: Convert all extracted text to plain Python strings
            other_created_date = other_created_date_tag.get_text(strip=True) if other_created_date_tag else None
            other_name_content = other_name_content_tag.get_text(strip=True) if other_name_content_tag else None
            
            if other_created_date and other_name_content:
                other_names_data.append({
                    'other_name_content': other_name_content,
                    'other_created_date': other_created_date
                })

        if not other_names_data:
            return None

        return {
            'person_created_date': person_created_date,
            'given_name': given_name,
            'family_name': family_name,
            'base_name': base_name,
            'other_names_data': other_names_data,
            'gender': gender_result
        }

    except RecursionError:
        raise  # 让上层处理
    except Exception as e:
        return None

def process_single_file(file_path: Path, port_name: str, gender_detector) -> Optional[Dict]:
    """处理单个XML文件 - 增强错误处理"""
    try:
        # 使用安全的XML解析
        soup = safe_parse_xml(file_path)
        if not soup:
            return None
            
        name_info = extract_name_info(soup, gender_detector)
        if not name_info:
            return None

        is_change, change_data = is_name_change_candidate(name_info)
        if is_change and change_data:
            path_tag = soup.find("common:path")
            # <-- FIX: Convert the ORCID ID to a plain Python string
            orcid_id = path_tag.get_text(strip=True) if path_tag else "unknown"
            
            return {
                'port_name': port_name,
                'id': orcid_id,
                'person_name': change_data[0],
                'person_date': change_data[1],
                'other_name': change_data[2],
                'other_date': change_data[3],
                'gender': name_info['gender']
            }
        return None

    except RecursionError as e:
        copy_problem_file(file_path, 'recursion_error', f"RecursionError in process_single_file: {str(e)}")
        return None
    except Exception as e:
        # 对于其他异常，不复制文件（因为safe_parse_xml已经处理了）
        return None

def _extract_name_parts(full_name: str) -> Dict[str, str]:
    """从全名中提取姓、名、中间名"""
    parts = full_name.strip().split()
    if not parts:
        return {'given': '', 'middle': '', 'family': ''}
    
    given = parts[0]
    if len(parts) == 1:
        return {'given': given, 'middle': '', 'family': ''}
    
    family = parts[-1]
    middle = ' '.join(parts[1:-1])
    return {'given': given, 'middle': middle, 'family': family}

def is_name_change_candidate(name_info: Dict) -> Tuple[bool, Optional[List]]:
    """
    判断是否为姓名变化候选者
    """
    try:
        # --- 1. 数据提取 ---
        base_name = name_info['base_name']
        person_created_date = name_info['person_created_date']
        other_names_data = name_info['other_names_data']
        family_name = name_info['family_name']
        given_name = name_info['given_name']

        if not all([base_name, person_created_date, other_names_data, family_name, given_name]):
            return False, None

        # 解析person:name的创建日期 
        try:
            person_date = datetime.fromisoformat(person_created_date.replace('Z', '+00:00'))
        except ValueError:
            return False, None

        # --- 2. 遍历 other-names，进行筛选和判断 ---
        for other_data in other_names_data:
            other_name_content = other_data.get('other_name_content')
            other_created_date = other_data.get('other_created_date')
            
            if not other_name_content or not other_created_date:
                continue

            try:
                other_date = datetime.fromisoformat(other_created_date.replace('Z', '+00:00'))
            except ValueError:
                continue

            # 时间差过滤
            if abs((person_date - other_date).days) < 30:
                continue

            # --- 3. 姓名标准化与结构化 ---
            cleaned_base = ' '.join(str(base_name).lower().strip().replace('.', '').replace(',', '').replace('-', ' ').split())
            cleaned_other = ' '.join(str(other_name_content).lower().strip().replace('.', '').replace(',', '').replace('-', ' ').split())
            
            # Levenshtein 距离
            if Levenshtein.distance(cleaned_base, cleaned_other) < 2:
                continue

            base_parts = _extract_name_parts(cleaned_base)
            other_parts = _extract_name_parts(cleaned_other)

            if not all([base_parts['given'], other_parts['given'], base_parts['family'], other_parts['family']]):
                continue

            # --- 4. 核心逻辑判断 ---
            
            # a) 验证是否为同一个人 (名字部分容错判断)
            if Levenshtein.distance(base_parts['given'], other_parts['given']) >= 2:
                continue
            
            # b) 获取并比较完整的姓氏部分
            base_full_surname = f"{base_parts['middle']} {base_parts['family']}".strip()
            other_full_surname = f"{other_parts['middle']} {other_parts['family']}".strip()

            if Levenshtein.distance(base_full_surname, other_full_surname) < 2:
                continue

            # c) 排除只增加/减少中间名的情况
            is_subset_change = (base_full_surname in other_full_surname or other_full_surname in base_full_surname)
            if is_subset_change and Levenshtein.distance(base_parts['family'], other_parts['family']) <= 1:
                continue

            # d) 排除缩写情况
            if (len(base_parts['family']) == 1 and len(other_parts['family']) > 1 and base_parts['family'] == other_parts['family'][0]) or \
               (len(other_parts['family']) == 1 and len(base_parts['family']) > 1 and other_parts['family'] == base_parts['family'][0]):
                continue

            # --- 5. 确认是候选者 ---
            
            if person_date < other_date:
                older_name, newer_name = cleaned_base, cleaned_other
                older_date_str, newer_date_str = person_created_date, other_created_date
            else:
                older_name, newer_name = cleaned_other, cleaned_base
                older_date_str, newer_date_str = other_created_date, person_created_date

            return True, [older_name, older_date_str, newer_name, newer_date_str]

        return False, None

    except RecursionError:
        raise  # 让上层处理
    except Exception as e:
        return False, None

def process_file_chunk(args):
    """处理文件块的工作函数 - 增强错误处理"""
    file_paths, port_name, process_id = args
    
    # 在子进程中初始化 gender detector
    try:
        local_gender_detector = gender.Detector()
    except Exception as e:
        print(f"Process {process_id}: Failed to initialize gender detector: {e}")
        return [], 0, 0, 0  # 增加错误文件计数
    
    results = []
    processed = 0
    error_count = 0
    
    for file_path in file_paths:
        try:
            # 检查文件大小
            file_size = file_path.stat().st_size
            if file_size > CONFIG['max_file_size']:
                copy_problem_file(file_path, 'large_files', f"File size: {file_size} bytes")
                error_count += 1
                processed += 1
                continue
            
            result = process_single_file(file_path, port_name, local_gender_detector)
            if result:
                results.append(result)
            processed += 1
            
            # 每处理100000个文件报告一次进度
            if processed % 100000 == 0:
                print(f"Process {process_id}: Processed {processed}/{len(file_paths)} files in {port_name}, errors: {error_count}")
                
        except RecursionError as e:
            copy_problem_file(file_path, 'recursion_error', f"RecursionError: {str(e)}")
            error_count += 1
            processed += 1
        except MemoryError as e:
            copy_problem_file(file_path, 'large_files', f"MemoryError: {str(e)}")
            error_count += 1
            processed += 1
        except Exception as e:
            error_msg = f"{type(e).__name__}: {str(e)}"
            if "recursion" in str(e).lower():
                copy_problem_file(file_path, 'recursion_error', error_msg)
            elif any(keyword in str(e).lower() for keyword in ['parse', 'xml', 'malformed']):
                copy_problem_file(file_path, 'parse_error', error_msg)
            else:
                copy_problem_file(file_path, 'other_errors', error_msg)
            error_count += 1
            processed += 1
    
    print(f"Process {process_id} completed: {processed} files, {len(results)} results, {error_count} errors")
    return results, processed, len(results), error_count

def get_file_chunks(base_path: str, chunk_size: int = 1000) -> List[Tuple]:
    """获取文件分块信息"""
    base_path = Path(base_path)
    chunks = []
    chunk_id = 0
    
    for portfolio_folder in base_path.iterdir():
        if not portfolio_folder.is_dir():
            continue
            
        # 获取文件夹中的所有XML文件
        xml_files = list(portfolio_folder.glob("*.xml"))
        if not xml_files:
            xml_files = [f for f in portfolio_folder.iterdir() if f.is_file()]  # 如果没有.xml扩展名
            
        # 将文件分块
        for i in range(0, len(xml_files), chunk_size):
            file_chunk = xml_files[i:i + chunk_size]
            chunks.append((file_chunk, portfolio_folder.name, chunk_id))
            chunk_id += 1
    
    return chunks
    
def write_results_batch(results_batch: List[Dict], output_file: str, write_header: bool = False):
    """批量写入结果到CSV文件"""
    mode = 'w' if write_header else 'a'
    with open(output_file, mode, newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['port_name', 'id', 'person_name', 'person_date', 
                                             'other_name', 'other_date', 'gender'])
        if write_header:
            writer.writeheader()
        writer.writerows(results_batch)

def signal_handler(signum, frame):
    """信号处理器"""
    print("\n收到中断信号，正在安全退出...")
    sys.exit(0)

def write_summary_report(total_processed: int, total_results: int, total_errors: int, duration: float):
    """写入处理总结报告"""
    summary_file = Path(CONFIG['problem_files_dir']) / 'processing_summary.txt'
    with open(summary_file, 'w', encoding='utf-8') as f:
        f.write(f"ORCID 姓名变化检测处理总结\n")
        f.write(f"处理时间: {datetime.now().isoformat()}\n")
        f.write(f"="*50 + "\n")
        f.write(f"总处理文件数: {total_processed:,}\n")
        f.write(f"发现姓名变化: {total_results:,}\n")
        f.write(f"错误文件数: {total_errors:,}\n")
        f.write(f"成功率: {((total_processed - total_errors) / total_processed * 100):.2f}%\n")
        f.write(f"处理时间: {duration/3600:.2f} 小时\n")
        f.write(f"平均速度: {total_processed/duration:.0f} 文件/秒\n")
        f.write(f"\n错误文件分类:\n")
        
        # 统计各类错误文件数量
        problem_dir = Path(CONFIG['problem_files_dir'])
        for error_type in ['recursion_error', 'parse_error', 'large_files', 'other_errors']:
            error_dir = problem_dir / error_type
            if error_dir.exists():
                error_files = list(error_dir.glob("*.xml"))
                f.write(f"  {error_type}: {len(error_files)} 个文件\n")

def main():
    """主函数"""
    # 注册信号处理器
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    start_time = time.time()
    
    # 设置问题文件目录
    print("设置问题文件存储目录...")
    setup_problem_files_dir()
    
    # 获取文件分块
    print("正在分析文件结构...")
    chunks = get_file_chunks(CONFIG['base_path'], CONFIG['chunk_size'])
    total_chunks = len(chunks)
    
    print(f"发现 {total_chunks} 个文件块")
    if total_chunks == 0:
        print("没有找到需要处理的文件")
        return
    
    # 初始化输出文件
    write_results_batch([], CONFIG['output_file'], write_header=True)
    
    # 统计变量
    total_processed = 0
    total_results = 0
    total_errors = 0
    results_buffer = []
    
    try:
        # 使用进程池处理
        with ProcessPoolExecutor(max_workers=CONFIG['max_workers']) as executor:
            print(f"启动 {CONFIG['max_workers']} 个工作进程...")
            print(f"问题文件将保存到: {CONFIG['problem_files_dir']}")
            
            # 提交所有任务
            future_to_chunk = {executor.submit(process_file_chunk, chunk): i 
                              for i, chunk in enumerate(chunks)}
            
            # 处理完成的任务
            for future in as_completed(future_to_chunk):
                chunk_idx = future_to_chunk[future]
                try:
                    chunk_results, chunk_processed, chunk_found, chunk_errors = future.result()
                    
                    total_processed += chunk_processed
                    total_results += chunk_found
                    total_errors += chunk_errors
                    results_buffer.extend(chunk_results)
                    
                    # 批量写入结果
                    if len(results_buffer) >= CONFIG['batch_size']:
                        write_results_batch(results_buffer, CONFIG['output_file'])
                        results_buffer = []
                    
                    # 进度报告
                    progress = (chunk_idx + 1) / total_chunks * 100
                    print(f"进度: {progress:.1f}% ({chunk_idx + 1}/{total_chunks}), "
                          f"已处理: {total_processed:,}, 发现变化: {total_results:,}, 错误: {total_errors:,}")
                    
                except Exception as e:
                    print(f"处理块 {chunk_idx} 时出错: {e}")
                    print(f"错误详情: {traceback.format_exc()}")
    
    except KeyboardInterrupt:
        print("\n用户中断程序执行")
        return
    except Exception as e:
        print(f"程序执行出错: {e}")
        print(f"错误详情: {traceback.format_exc()}")
        return
    finally:
        # 写入剩余结果
        if results_buffer:
            write_results_batch(results_buffer, CONFIG['output_file'])
    
    # 最终统计
    end_time = time.time()
    duration = end_time - start_time
    
    print("\n" + "="*60)
    print("处理完成!")
    print(f"总处理文件数: {total_processed:,}")
    print(f"发现姓名变化: {total_results:,}")
    print(f"错误文件数: {total_errors:,}")
    print(f"成功率: {((total_processed - total_errors) / total_processed * 100):.2f}%" if total_processed > 0 else "N/A")
    print(f"处理时间: {duration/3600:.1f} 小时 ({duration/60:.1f} 分钟)")
    print(f"平均速度: {total_processed/duration:.0f} 文件/秒" if duration > 0 else "N/A")
    print(f"结果已保存至: {CONFIG['output_file']}")
    print(f"问题文件已保存至: {CONFIG['problem_files_dir']}")
    print("="*60)
    
    # 写入处理总结报告
    write_summary_report(total_processed, total_results, total_errors, duration)

if __name__ == "__main__":   
    main()

设置问题文件存储目录...
正在分析文件结构...
发现 2431 个文件块
启动 2 个工作进程...
问题文件将保存到: /hy-tmp/problem_xml_files
Process 1 completed: 9980 files, 7 results, 0 errors
进度: 0.1% (2/2431), 已处理: 9,980, 发现变化: 7, 错误: 0
Process 0 completed: 10000 files, 24 results, 0 errors
进度: 0.0% (1/2431), 已处理: 19,980, 发现变化: 31, 错误: 0
Process 2 completed: 10000 files, 28 results, 0 errors
进度: 0.1% (3/2431), 已处理: 29,980, 发现变化: 59, 错误: 0
Process 3 completed: 9856 files, 6 results, 0 errors
进度: 0.2% (4/2431), 已处理: 39,836, 发现变化: 65, 错误: 0
Process 5 completed: 9863 files, 11 results, 0 errors
进度: 0.2% (6/2431), 已处理: 49,699, 发现变化: 76, 错误: 0
Process 4 completed: 10000 files, 20 results, 0 errors
进度: 0.2% (5/2431), 已处理: 59,699, 发现变化: 96, 错误: 0
Process 6 completed: 10000 files, 29 results, 0 errors
进度: 0.3% (7/2431), 已处理: 69,699, 发现变化: 125, 错误: 0
Process 7 completed: 9992 files, 7 results, 0 errors
进度: 0.3% (8/2431), 已处理: 79,691, 发现变化: 132, 错误: 0
Process 9 completed: 10000 files, 10 results, 0 errors
进度: 0.4% (10/2431), 已处理: 89,691, 发现变化: 

In [4]:
#!/usr/bin/env python3
"""
ORCID 姓名变化检测程序 - 大文件重新处理版本
专门用于处理因体积过大而被主程序跳过的XML文件。
"""

import os
import csv
import logging
import multiprocessing as mp
from multiprocessing import Pool, Manager, Lock
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import signal
import sys
from functools import partial
import shutil
import traceback

from bs4 import BeautifulSoup
import gender_guesser.detector as gender
import Levenshtein
from datetime import datetime


# 增加递归限制
sys.setrecursionlimit(15000)

# 全局配置
CONFIG = {
    'base_path': '/hy-tmp/problem_xml_files/large_files',  # 指向大文件目录
    'output_file': '/hy-tmp/large_files_results.csv',    # 新的输出文件名
    'problem_files_dir': '/hy-tmp/problem_xml_files_reprocessing', # 新的问题文件目录
    'max_workers': 2,  # 根据你的CPU核心数调整
    'chunk_size': 50,  # 文件总数少，chunk可以小一点
    'batch_size': 10,
}

def setup_problem_files_dir():
    """创建问题文件存储目录"""
    problem_dir = Path(CONFIG['problem_files_dir'])
    problem_dir.mkdir(parents=True, exist_ok=True)
    
    # 创建子目录
    (problem_dir / 'recursion_error').mkdir(exist_ok=True)
    (problem_dir / 'parse_error').mkdir(exist_ok=True)
    (problem_dir / 'large_files').mkdir(exist_ok=True)
    (problem_dir / 'other_errors').mkdir(exist_ok=True)
    
    return problem_dir

def copy_problem_file(file_path: Path, error_type: str, error_msg: str = ""):
    """复制问题文件到指定目录"""
    try:
        problem_dir = Path(CONFIG['problem_files_dir'])
        error_dir = problem_dir / error_type
        
        # 创建唯一的文件名（避免重名）
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        new_filename = f"{file_path.stem}_{timestamp}{file_path.suffix}"
        dest_path = error_dir / new_filename
        
        shutil.copy2(file_path, dest_path)
        
        # 记录错误信息
        log_file = error_dir / "error_log.txt"
        with open(log_file, 'a', encoding='utf-8') as f:
            f.write(f"{datetime.now().isoformat()}\t{file_path}\t{new_filename}\t{error_msg}\n")
            
        return True
    except Exception as e:
        print(f"无法复制问题文件 {file_path}: {e}")
        return False

def safe_parse_xml(file_path: Path) -> Optional[BeautifulSoup]:
    """安全的XML解析函数 - 移除了文件大小限制以重新处理大文件"""
    try:
        # ## 核心修改：注释掉文件大小检查 ##
        # file_size = file_path.stat().st_size
        # if file_size > SOME_LIMIT:
        #     return None
        
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
            
        # ## 移除内容长度检查 ##
        # if len(content) > 10_000_000:
        #     return None
            
        soup = BeautifulSoup(content, 'lxml-xml')
        return soup
            
    except Exception as e:
        error_msg = f"{type(e).__name__}: {str(e)}"
        copy_problem_file(file_path, 'reprocessing_errors', error_msg)
        return None

def extract_name_info(soup: BeautifulSoup, gender_detector) -> Optional[Dict]:
    """从BeautifulSoup对象中提取姓名信息和性别"""
    try:
        person_name_tag = soup.find("person:name")
        if not person_name_tag:
            return None

        # 提取person:name基本信息
        person_created_date_tag = person_name_tag.find("common:created-date")
        given_names_tag = person_name_tag.find("personal-details:given-names")
        family_name_tag = person_name_tag.find("personal-details:family-name")

        # <-- FIX: Convert all extracted text to plain Python strings
        person_created_date = person_created_date_tag.get_text(strip=True) if person_created_date_tag else None
        given_name = given_names_tag.get_text(strip=True) if given_names_tag else None
        family_name = family_name_tag.get_text(strip=True) if family_name_tag else None

        # 性别识别 - 只处理女性
        if not given_name:
            return None
        
        gender_result = gender_detector.get_gender(given_name.split()[0])
        if gender_result not in ['female', 'mostly_female']:
            return None 

        # 构建基准姓名 (given_name + family_name)
        if not family_name:
            return None
        base_name = f"{given_name} {family_name}".strip()

        # 提取other-name信息 - 可能有多个
        other_names_data = []
        other_name_tags = soup.find_all("other-name:other-name")
        
        for other_name_tag in other_name_tags:
            other_created_date_tag = other_name_tag.find("common:created-date")
            other_name_content_tag = other_name_tag.find("other-name:content")
            
            # <-- FIX: Convert all extracted text to plain Python strings
            other_created_date = other_created_date_tag.get_text(strip=True) if other_created_date_tag else None
            other_name_content = other_name_content_tag.get_text(strip=True) if other_name_content_tag else None
            
            if other_created_date and other_name_content:
                other_names_data.append({
                    'other_name_content': other_name_content,
                    'other_created_date': other_created_date
                })

        if not other_names_data:
            return None

        return {
            'person_created_date': person_created_date,
            'given_name': given_name,
            'family_name': family_name,
            'base_name': base_name,
            'other_names_data': other_names_data,
            'gender': gender_result
        }

    except RecursionError:
        raise  # 让上层处理
    except Exception as e:
        return None

def process_single_file(file_path: Path, port_name: str, gender_detector) -> Optional[Dict]:
    """处理单个XML文件 - 增强错误处理"""
    try:
        # 使用安全的XML解析
        soup = safe_parse_xml(file_path)
        if not soup:
            return None
            
        name_info = extract_name_info(soup, gender_detector)
        if not name_info:
            return None

        is_change, change_data = is_name_change_candidate(name_info)
        if is_change and change_data:
            path_tag = soup.find("common:path")
            # <-- FIX: Convert the ORCID ID to a plain Python string
            orcid_id = path_tag.get_text(strip=True) if path_tag else "unknown"
            
            return {
                'port_name': port_name,
                'id': orcid_id,
                'person_name': change_data[0],
                'person_date': change_data[1],
                'other_name': change_data[2],
                'other_date': change_data[3],
                'gender': name_info['gender']
            }
        return None

    except RecursionError as e:
        copy_problem_file(file_path, 'recursion_error', f"RecursionError in process_single_file: {str(e)}")
        return None
    except Exception as e:
        # 对于其他异常，不复制文件（因为safe_parse_xml已经处理了）
        return None

def _extract_name_parts(full_name: str) -> Dict[str, str]:
    """从全名中提取姓、名、中间名"""
    parts = full_name.strip().split()
    if not parts:
        return {'given': '', 'middle': '', 'family': ''}
    
    given = parts[0]
    if len(parts) == 1:
        return {'given': given, 'middle': '', 'family': ''}
    
    family = parts[-1]
    middle = ' '.join(parts[1:-1])
    return {'given': given, 'middle': middle, 'family': family}

def is_name_change_candidate(name_info: Dict) -> Tuple[bool, Optional[List]]:
    """
    判断是否为姓名变化候选者
    """
    try:
        # --- 1. 数据提取 ---
        base_name = name_info['base_name']
        person_created_date = name_info['person_created_date']
        other_names_data = name_info['other_names_data']
        family_name = name_info['family_name']
        given_name = name_info['given_name']

        if not all([base_name, person_created_date, other_names_data, family_name, given_name]):
            return False, None

        # 解析person:name的创建日期 
        try:
            person_date = datetime.fromisoformat(person_created_date.replace('Z', '+00:00'))
        except ValueError:
            return False, None

        # --- 2. 遍历 other-names，进行筛选和判断 ---
        for other_data in other_names_data:
            other_name_content = other_data.get('other_name_content')
            other_created_date = other_data.get('other_created_date')
            
            if not other_name_content or not other_created_date:
                continue

            try:
                other_date = datetime.fromisoformat(other_created_date.replace('Z', '+00:00'))
            except ValueError:
                continue

            # 时间差过滤
            if abs((person_date - other_date).days) < 30:
                continue

            # --- 3. 姓名标准化与结构化 ---
            cleaned_base = ' '.join(str(base_name).lower().strip().replace('.', '').replace(',', '').replace('-', ' ').split())
            cleaned_other = ' '.join(str(other_name_content).lower().strip().replace('.', '').replace(',', '').replace('-', ' ').split())
            
            # Levenshtein 距离
            if Levenshtein.distance(cleaned_base, cleaned_other) < 2:
                continue

            base_parts = _extract_name_parts(cleaned_base)
            other_parts = _extract_name_parts(cleaned_other)

            if not all([base_parts['given'], other_parts['given'], base_parts['family'], other_parts['family']]):
                continue

            # --- 4. 核心逻辑判断 ---
            
            # a) 验证是否为同一个人 (名字部分容错判断)
            if Levenshtein.distance(base_parts['given'], other_parts['given']) >= 2:
                continue
            
            # b) 获取并比较完整的姓氏部分
            base_full_surname = f"{base_parts['middle']} {base_parts['family']}".strip()
            other_full_surname = f"{other_parts['middle']} {other_parts['family']}".strip()

            if Levenshtein.distance(base_full_surname, other_full_surname) < 2:
                continue

            # c) 排除只增加/减少中间名的情况
            is_subset_change = (base_full_surname in other_full_surname or other_full_surname in base_full_surname)
            if is_subset_change and Levenshtein.distance(base_parts['family'], other_parts['family']) <= 1:
                continue

            # d) 排除缩写情况
            if (len(base_parts['family']) == 1 and len(other_parts['family']) > 1 and base_parts['family'] == other_parts['family'][0]) or \
               (len(other_parts['family']) == 1 and len(base_parts['family']) > 1 and other_parts['family'] == base_parts['family'][0]):
                continue

            # --- 5. 确认是候选者 ---
            
            if person_date < other_date:
                older_name, newer_name = cleaned_base, cleaned_other
                older_date_str, newer_date_str = person_created_date, other_created_date
            else:
                older_name, newer_name = cleaned_other, cleaned_base
                older_date_str, newer_date_str = other_created_date, person_created_date

            return True, [older_name, older_date_str, newer_name, newer_date_str]

        return False, None

    except RecursionError:
        raise  # 让上层处理
    except Exception as e:
        return False, None

def process_file_chunk(args):
    """处理文件块的工作函数 - 增强错误处理"""
    file_paths, port_name, process_id = args
    
    # 在子进程中初始化 gender detector
    try:
        local_gender_detector = gender.Detector()
    except Exception as e:
        print(f"Process {process_id}: Failed to initialize gender detector: {e}")
        return [], 0, 0, 0  # 增加错误文件计数
    
    results = []
    processed = 0
    error_count = 0
    
    for file_path in file_paths:
        try:
            # 检查文件大小
            # file_size = file_path.stat().st_size
            # if file_size > CONFIG['max_file_size']:
            #     copy_problem_file(file_path, 'large_files', f"File size: {file_size} bytes")
            #     error_count += 1
            #     processed += 1
            #     continue
            
            result = process_single_file(file_path, port_name, local_gender_detector)
            if result:
                results.append(result)
            processed += 1
            
            # 每处理100000个文件报告一次进度
            if processed % 100000 == 0:
                print(f"Process {process_id}: Processed {processed}/{len(file_paths)} files in {port_name}, errors: {error_count}")
                
        except RecursionError as e:
            copy_problem_file(file_path, 'recursion_error', f"RecursionError: {str(e)}")
            error_count += 1
            processed += 1
        except MemoryError as e:
            copy_problem_file(file_path, 'large_files', f"MemoryError: {str(e)}")
            error_count += 1
            processed += 1
        except Exception as e:
            error_msg = f"{type(e).__name__}: {str(e)}"
            if "recursion" in str(e).lower():
                copy_problem_file(file_path, 'recursion_error', error_msg)
            elif any(keyword in str(e).lower() for keyword in ['parse', 'xml', 'malformed']):
                copy_problem_file(file_path, 'parse_error', error_msg)
            else:
                copy_problem_file(file_path, 'other_errors', error_msg)
            error_count += 1
            processed += 1
    
    print(f"Process {process_id} completed: {processed} files, {len(results)} results, {error_count} errors")
    return results, processed, len(results), error_count

# --- 修改 get_file_chunks (简化文件发现) ---
def get_file_chunks(base_path: str, chunk_size: int) -> List[Tuple]:
    """获取文件分块信息 - 简化版，用于处理单个文件夹内的文件"""
    base_path = Path(base_path)
    chunks = []
    chunk_id = 0
    
    all_files = [f for f in base_path.iterdir() if f.is_file()]
    print(f"在 {base_path} 中找到 {len(all_files)} 个大文件进行重新处理。")
    
    if not all_files:
        return []
        
    for i in range(0, len(all_files), chunk_size):
        file_chunk = all_files[i:i + chunk_size]
        # port_name 在这里不重要，可以设为固定值
        chunks.append((file_chunk, 'large_files_reprocess', chunk_id))
        chunk_id += 1
    
    return chunks
    
def write_results_batch(results_batch: List[Dict], output_file: str, write_header: bool = False):
    """批量写入结果到CSV文件"""
    mode = 'w' if write_header else 'a'
    with open(output_file, mode, newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['port_name', 'id', 'person_name', 'person_date', 
                                             'other_name', 'other_date', 'gender'])
        if write_header:
            writer.writeheader()
        writer.writerows(results_batch)

def signal_handler(signum, frame):
    """信号处理器"""
    print("\n收到中断信号，正在安全退出...")
    sys.exit(0)

def write_summary_report(total_processed: int, total_results: int, total_errors: int, duration: float):
    """写入处理总结报告"""
    summary_file = Path(CONFIG['problem_files_dir']) / 'processing_summary.txt'
    with open(summary_file, 'w', encoding='utf-8') as f:
        f.write(f"ORCID 姓名变化检测处理总结\n")
        f.write(f"处理时间: {datetime.now().isoformat()}\n")
        f.write(f"="*50 + "\n")
        f.write(f"总处理文件数: {total_processed:,}\n")
        f.write(f"发现姓名变化: {total_results:,}\n")
        f.write(f"错误文件数: {total_errors:,}\n")
        f.write(f"成功率: {((total_processed - total_errors) / total_processed * 100):.2f}%\n")
        f.write(f"处理时间: {duration/3600:.2f} 小时\n")
        f.write(f"平均速度: {total_processed/duration:.0f} 文件/秒\n")
        f.write(f"\n错误文件分类:\n")
        
        # 统计各类错误文件数量
        problem_dir = Path(CONFIG['problem_files_dir'])
        for error_type in ['recursion_error', 'parse_error', 'large_files', 'other_errors']:
            error_dir = problem_dir / error_type
            if error_dir.exists():
                error_files = list(error_dir.glob("*.xml"))
                f.write(f"  {error_type}: {len(error_files)} 个文件\n")


def main():
    """主函数 - 用于重新处理"""
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    start_time = time.time()
    
    print(f"设置新的问题文件目录: {CONFIG['problem_files_dir']}")
    setup_problem_files_dir() # 你需要把这个函数复制过来
    
    print(f"正在从 {CONFIG['base_path']} 收集大文件...")
    chunks = get_file_chunks(CONFIG['base_path'], CONFIG['chunk_size'])
    total_chunks = len(chunks)
    
    if total_chunks == 0:
        print("没有找到需要重新处理的文件。")
        return
    
    print(f"发现 {total_chunks} 个文件块，开始重新处理...")
    write_results_batch([], CONFIG['output_file'], write_header=True) # 你需要把这个函数复制过来
    
    total_processed = 0
    total_results = 0
    results_buffer = []
    
    try:
        with ProcessPoolExecutor(max_workers=CONFIG['max_workers']) as executor:
            future_to_chunk_idx = {executor.submit(process_file_chunk, chunk): i for i, chunk in enumerate(chunks)}
            
            for i, future in enumerate(as_completed(future_to_chunk_idx)):
                try:
                    chunk_results, chunk_processed, chunk_found, _ = future.result()
                    total_processed += chunk_processed
                    total_results += chunk_found
                    results_buffer.extend(chunk_results)
                    
                    if len(results_buffer) >= CONFIG['batch_size']:
                        write_results_batch(results_buffer, CONFIG['output_file'])
                        results_buffer = []
                    
                    progress = (i + 1) / total_chunks * 100
                    print(f"重新处理进度: {progress:.1f}% ({i + 1}/{total_chunks})", end='\r')
                except Exception as e:
                    print(f"\n处理块时出错: {e}")
    finally:
        if results_buffer:
            write_results_batch(results_buffer, CONFIG['output_file'])
    
    duration = time.time() - start_time
    print(f"\n大文件重新处理完成！")
    print(f"总共处理文件数: {total_processed}")
    print(f"新发现姓名变化: {total_results}")
    print(f"处理时间: {duration:.1f} 秒")
    print(f"新结果已保存至: {CONFIG['output_file']}")

if __name__ == "__main__":
    main()

设置新的问题文件目录: /hy-tmp/problem_xml_files_reprocessing
正在从 /hy-tmp/problem_xml_files/large_files 收集大文件...
在 /hy-tmp/problem_xml_files/large_files 中找到 363 个大文件进行重新处理。
发现 8 个文件块，开始重新处理...
Process 0 completed: 50 files, 0 results, 0 errors
Process 1 completed: 50 files, 0 results, 0 errors
Process 3 completed: 50 files, 0 results, 0 errors
Process 2 completed: 50 files, 0 results, 0 errors
Process 4 completed: 50 files, 0 results, 0 errors
Process 5 completed: 50 files, 0 results, 0 errors
Process 7 completed: 13 files, 0 results, 0 errors
Process 6 completed: 50 files, 0 results, 0 errors
重新处理进度: 100.0% (8/8)
大文件重新处理完成！
总共处理文件数: 363
新发现姓名变化: 0
处理时间: 1117.0 秒
新结果已保存至: /hy-tmp/large_files_results.csv


In [5]:
#!/usr/bin/env python3
"""
合并ORCID姓名变化检测结果的脚本
"""
import pandas as pd
import os

# --- 配置 ---
ORIGINAL_RESULTS_FILE = '/hy-tmp/female_name_changes.csv'
LARGE_FILES_RESULTS_FILE = '/hy-tmp/large_files_results.csv'
MERGED_OUTPUT_FILE = '/hy-tmp/final_female_name_changes.csv'

def merge_results():

    df_original = pd.read_csv(ORIGINAL_RESULTS_FILE)
    print(f"原始结果包含 {len(df_original):,} 条记录。")

    df_large = pd.read_csv(LARGE_FILES_RESULTS_FILE)
    print(f"大文件结果包含 {len(df_large):,} 条记录。")

    df_merged = pd.concat([df_original, df_large], ignore_index=True)
    df_merged.drop_duplicates(subset=['id'], inplace=True)

    total_records = len(df_merged)
    print(f"合并后总记录数: {total_records:,}")

    print(f"正在保存最终结果至: {MERGED_OUTPUT_FILE}")
    df_merged.to_csv(MERGED_OUTPUT_FILE, index=False, encoding='utf-8')

if __name__ == "__main__":
    merge_results()

原始结果包含 42,389 条记录。
大文件结果包含 0 条记录。
合并后总记录数: 42,389
正在保存最终结果至: /hy-tmp/final_female_name_changes.csv
