# 测试并发

In [1]:
import asyncio
import aiohttp
import time
from datetime import datetime

# DeepSeek API配置
API_KEY = "sk-8de35978ccec41e39e2b9ebfc90b7aa1"
API_URL = "https://api.deepseek.com/v1/chat/completions"
MODEL = "deepseek-chat"

async def call_deepseek_api(session, request_id):
    """异步调用DeepSeek API"""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": MODEL,
        "messages": [
            {"role": "user", "content": f"这是测试请求 #{request_id}，请简单回复收到。"}
        ],
        "max_tokens": 50
    }
    
    start_time = time.time()
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] 请求 #{request_id} 开始")
    
    try:
        async with session.post(API_URL, json=payload, headers=headers) as response:
            result = await response.json()
            end_time = time.time()
            elapsed = end_time - start_time
            
            if response.status == 200:
                print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] 请求 #{request_id} 成功 (耗时: {elapsed:.2f}秒)")
                return {"id": request_id, "status": "success", "time": elapsed, "response": result}
            else:
                print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] 请求 #{request_id} 失败: {response.status}")
                return {"id": request_id, "status": "failed", "time": elapsed, "error": result}
    except Exception as e:
        end_time = time.time()
        elapsed = end_time - start_time
        print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] 请求 #{request_id} 异常: {str(e)}")
        return {"id": request_id, "status": "error", "time": elapsed, "error": str(e)}

async def test_concurrent_requests(num_requests=5):
    """测试并发请求"""
    print(f"\n{'='*60}")
    print(f"开始测试 {num_requests} 个并发请求")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    async with aiohttp.ClientSession() as session:
        # 创建5个并发任务
        tasks = [call_deepseek_api(session, i+1) for i in range(num_requests)]
        # 同时执行所有任务
        results = await asyncio.gather(*tasks)
    
    total_time = time.time() - start_time
    
    print(f"\n{'='*60}")
    print(f"测试完成！总耗时: {total_time:.2f}秒")
    print(f"{'='*60}\n")
    
    # 统计结果
    success_count = sum(1 for r in results if r["status"] == "success")
    failed_count = sum(1 for r in results if r["status"] != "success")
    avg_time = sum(r["time"] for r in results) / len(results)
    
    print(f"成功: {success_count}/{num_requests}")
    print(f"失败: {failed_count}/{num_requests}")
    print(f"平均响应时间: {avg_time:.2f}秒")
    print(f"\n如果总耗时接近平均响应时间，说明支持真正的并发")
    print(f"如果总耗时接近平均响应时间×请求数，说明是串行执行")
    
    return results

# 运行测试
results = await test_concurrent_requests(10)



开始测试 10 个并发请求

[13:35:17.543] 请求 #1 开始
[13:35:17.543] 请求 #2 开始
[13:35:17.543] 请求 #3 开始
[13:35:17.543] 请求 #4 开始
[13:35:17.543] 请求 #5 开始
[13:35:17.544] 请求 #6 开始
[13:35:17.544] 请求 #7 开始
[13:35:17.544] 请求 #8 开始
[13:35:17.544] 请求 #9 开始
[13:35:17.544] 请求 #10 开始
[13:35:19.495] 请求 #6 成功 (耗时: 1.95秒)
[13:35:19.496] 请求 #4 成功 (耗时: 1.95秒)
[13:35:19.500] 请求 #9 成功 (耗时: 1.96秒)
[13:35:19.502] 请求 #2 成功 (耗时: 1.96秒)
[13:35:19.504] 请求 #3 成功 (耗时: 1.96秒)
[13:35:19.509] 请求 #7 成功 (耗时: 1.97秒)
[13:35:19.509] 请求 #5 成功 (耗时: 1.97秒)
[13:35:19.568] 请求 #10 成功 (耗时: 2.02秒)
[13:35:19.568] 请求 #8 成功 (耗时: 2.02秒)
[13:35:19.869] 请求 #1 成功 (耗时: 2.33秒)

测试完成！总耗时: 2.33秒

成功: 10/10
失败: 0/10
平均响应时间: 2.01秒

如果总耗时接近平均响应时间，说明支持真正的并发
如果总耗时接近平均响应时间×请求数，说明是串行执行


# 测试规则抽取摘要

In [3]:
import re
from pathlib import Path

def extract_abstract_from_md(file_path):
    """
    从Markdown文件中提取Abstract部分的正文内容
    
    参数:
        file_path: str 或 Path对象,指向.md文件的路径
    
    返回:
        tuple: (bool, str) 第一个元素表示是否找到摘要,第二个元素是摘要文本
    """
    try:
        # 读取文件内容
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 按行分割
        lines = content.split('\n')
        
        # 更灵活的abstract匹配模式
        def is_abstract_header(line):
            """判断一行是否是abstract标题"""
            # 去除首尾空白
            stripped = line.strip()
            
            # 必须以#开头
            if not stripped.startswith('#'):
                return False
            
            # 去掉#号和后面的空白字符
            after_hash = stripped[1:].strip()
            
            # 如果#后面没有内容,不是有效标题
            if not after_hash:
                return False
            
            # 去掉所有空格、标点符号和特殊字符,只保留字母
            # 这样可以匹配 "A B S T R A C T", "Abstract:", "Abstract." 等各种形式
            cleaned = re.sub(r'[^a-zA-Z\u4e00-\u9fff]', '', after_hash)
            
            # 转为小写进行比较
            normalized = cleaned.lower()
            
            # 检查是否匹配abstract的各种形式
            abstract_variants = [
                'abstract',
                'abstracts',
                '摘要',
            ]
            
            # 直接检查是否匹配
            if normalized in abstract_variants:
                return True
            
            return False
        
        # 查找abstract开始位置
        abstract_start = -1
        for i, line in enumerate(lines):
            if is_abstract_header(line):
                abstract_start = i
                print(f"找到abstract标题在第{i+1}行: {line.strip()}")
                break
        
        # 如果没找到abstract标题,返回(False, "")
        if abstract_start == -1:
            print("未找到abstract标题")
            return (False, "")
        
        # 查找下一个一级标题(abstract结束位置)
        abstract_end = len(lines)
        for i in range(abstract_start + 1, len(lines)):
            # 检查是否是一级标题(以单个#开头,后面跟空格和非空白字符)
            stripped = lines[i].strip()
            if stripped.startswith('#') and not stripped.startswith('##'):
                # 确保#后面有内容
                after_hash = stripped[1:].strip()
                if after_hash:
                    abstract_end = i
                    break
        
        # 提取abstract内容(跳过标题行本身)
        abstract_lines = lines[abstract_start + 1:abstract_end]
        
        # 去除首尾空行
        while abstract_lines and not abstract_lines[0].strip():
            abstract_lines.pop(0)
        while abstract_lines and not abstract_lines[-1].strip():
            abstract_lines.pop()
        
        # 合并为文本
        abstract_text = '\n'.join(abstract_lines).strip()
        
        # 检查摘要文本的单词数是否大于30
        word_count = len(abstract_text.split())
        if word_count <= 30:
            print(f"摘要文本单词数不足: {word_count} 个单词 (需要大于30个)")
            return (False, "")
        
        return (True, abstract_text)
    
    except FileNotFoundError:
        print(f"错误:文件未找到 - {file_path}")
        return (False, "")
    except Exception as e:
        print(f"错误:处理文件时出现异常 - {str(e)}")
        return (False, "")


# 测试函数
if __name__ == "__main__":
    print("测试修复后的abstract提取函数:")
    print("=" * 60)
    
    # 测试第一个文件
    test_file1 = "/Users/xiaokong/task/2025/paper_vis/vis/md/af705d1369467b0aa55cb59354a84a0e.md"
    print(f"测试文件1: {test_file1}")
    found, abstract1 = extract_abstract_from_md(test_file1)
    if found:
        print(found)
        print("✓ 成功提取Abstract内容:")
        print(abstract1)
    else:
        print(found)
        print("✗ 未找到Abstract内容")
    
    print()

测试修复后的abstract提取函数:
测试文件1: /Users/xiaokong/task/2025/paper_vis/vis/md/af705d1369467b0aa55cb59354a84a0e.md
找到abstract标题在第14行: # ---- A B S T R A C T  ----
True
✓ 成功提取Abstract内容:
Keywords: Brain-inspired continual learning Context similarity assessment Neuronal discriminative expansion Neuronal selective reuse Sparse spiking neural networks  

Biological brains have the capability to adaptively coordinate relevant neuronal populations based on the task context to learn continuously changing tasks in real-world environments. However, existing spiking neural network-based continual learning algorithms treat each task equally, ignoring the guiding role of different task similarity associations for network learning, which limits knowledge utilization efficiency. Inspired by the context-dependent plasticity mechanism of the brain, we propose a Similarity-based Context Aware Spiking Neural Network (SCA-SNN) continual learning algorithm to efficiently accomplish task incremental learning and cl

# LLM抽取摘要

In [5]:
# LLM摘要语步提取系统
import asyncio
import aiohttp
import json
import time
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field, validator
from pathlib import Path

# DeepSeek API配置
API_KEY = "sk-8de35978ccec41e39e2b9ebfc90b7aa1"
API_URL = "https://api.deepseek.com/v1/chat/completions"
MODEL = "deepseek-chat"

# Pydantic Schema定义
class AbstractSteps(BaseModel):
    """学术摘要四步语步结构"""
    background_problem: str = Field(
        ..., 
        alias="Background/Problem",
        description="背景/问题描述，不超过35个英文单词",
        max_length=200
    )
    method_approach: str = Field(
        ..., 
        alias="Method/Approach", 
        description="方法/途径，不超过35个英文单词",
        max_length=200
    )
    result: str = Field(
        ..., 
        alias="Result",
        description="结果，不超过35个英文单词", 
        max_length=200
    )
    conclusion_contribution: str = Field(
        ..., 
        alias="Conclusion/Contribution",
        description="结论/贡献，不超过35个英文单词",
        max_length=200
    )
    
    @validator('background_problem', 'method_approach', 'result', 'conclusion_contribution')
    def validate_word_count(cls, v):
        """验证每个字段不超过35个英文单词"""
        word_count = len(v.split())
        if word_count > 35:
            raise ValueError(f"字段超过35个单词限制: {word_count} 个单词")
        return v
    
    class Config:
        allow_population_by_field_name = True
        json_encoders = {
            str: lambda v: v.strip()
        }

# 系统提示词
SYSTEM_PROMPT = """You are a highly specialized expert in academic text analysis and structured data extraction. Your **sole task** is to accurately identify and extract the four standard rhetorical steps of the academic abstract from the provided text, and output the result in a **strictly valid JSON format**.

**[Core Task & Output]**
1. The input text might be a **clean abstract** OR a **document segment** (including metadata, title, authors, and the abstract).
2. If the text is a segment, you **must first locate the Abstract** based on its semantic features (a condensed summary of background, method, and results), and **ignore all surrounding noise**.
3. Decompose the identified Abstract content into the following four standard rhetorical steps: Background/Problem, Method/Approach, Result, Conclusion/Contribution.

**[Format Requirements]**
1. **MUST** output a JSON object that strictly conforms to the provided schema.
2. **ABSOLUTELY DO NOT** output any introductory text, explanations, or text outside the raw JSON object.
3. Each summary **MUST NOT exceed 35 English words**, focusing on high-level summarization."""

# 用户提示词模板
USER_PROMPT_TEMPLATE = """Please analyze the text provided below. And decompose its content into the four standard rhetorical steps.

**[Text to Analyze]**
{text}

**[Expected JSON Schema]**
Please adhere strictly to this JSON structure for your output:

{{
    "Background/Problem": "The concise English summary for this step, no more than 35 words.",
    "Method/Approach": "The concise English summary for this step, no more than 35 words.",
    "Result": "The concise English summary for this step, no more than 35 words.",
    "Conclusion/Contribution": "The concise English summary for this step, no more than 35 words."
}}"""

async def call_deepseek_api(text: str, max_retries: int = 2) -> Optional[AbstractSteps]:
    """
    调用DeepSeek API进行摘要语步提取
    
    Args:
        text: 要分析的文本
        max_retries: 最大重试次数
    
    Returns:
        AbstractSteps对象或None
    """
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    user_prompt = USER_PROMPT_TEMPLATE.format(text=text)
    
    payload = {
        "model": MODEL,
        "messages": [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_prompt}
        ],
        "max_tokens": 500,
        "temperature": 0.1  # 降低随机性，提高一致性
    }
    
    for attempt in range(max_retries + 1):
        try:
            print(f"🔄 LLM调用尝试 {attempt + 1}/{max_retries + 1}")
            
            async with aiohttp.ClientSession() as session:
                async with session.post(API_URL, json=payload, headers=headers) as response:
                    if response.status != 200:
                        print(f"❌ API调用失败: HTTP {response.status}")
                        if attempt < max_retries:
                            await asyncio.sleep(1)  # 等待1秒后重试
                            continue
                        return None
                    
                    result = await response.json()
                    
                    # 提取回复内容
                    if "choices" in result and len(result["choices"]) > 0:
                        content = result["choices"][0]["message"]["content"].strip()
                        print(f"📝 LLM原始回复: {content[:200]}...")
                        
                        # 尝试解析JSON
                        try:
                            # 清理可能的markdown代码块标记
                            if content.startswith("```json"):
                                content = content[7:]
                            if content.endswith("```"):
                                content = content[:-3]
                            content = content.strip()
                            
                            # 解析JSON
                            json_data = json.loads(content)
                            print(f"✅ JSON解析成功")
                            
                            # 验证并创建Pydantic对象
                            abstract_steps = AbstractSteps(**json_data)
                            print(f"✅ Pydantic验证成功")
                            return abstract_steps
                            
                        except json.JSONDecodeError as e:
                            print(f"❌ JSON解析失败: {str(e)}")
                            if attempt < max_retries:
                                print(f"🔄 准备重试...")
                                await asyncio.sleep(1)
                                continue
                            
                        except Exception as e:
                            print(f"❌ Pydantic验证失败: {str(e)}")
                            if attempt < max_retries:
                                print(f"🔄 准备重试...")
                                await asyncio.sleep(1)
                                continue
                    else:
                        print(f"❌ API响应格式异常")
                        if attempt < max_retries:
                            await asyncio.sleep(1)
                            continue
                        
        except Exception as e:
            print(f"❌ 网络请求异常: {str(e)}")
            if attempt < max_retries:
                await asyncio.sleep(1)
                continue
    
    print(f"❌ 所有重试尝试均失败")
    return None

def extract_text_for_llm(file_path: str) -> tuple[bool, str]:
    """
    从markdown文件中提取用于LLM分析的文本
    
    Args:
        file_path: markdown文件路径
    
    Returns:
        tuple: (是否找到摘要, 分析文本)
    """
    try:
        # 首先尝试规则提取摘要
        found, abstract_text = extract_abstract_from_md(file_path)
        
        if found and abstract_text.strip():
            print(f"✅ 规则提取成功，使用摘要文本 (长度: {len(abstract_text)} 字符)")
            return True, abstract_text
        else:
            # 规则提取失败，读取前5000字符
            print(f"⚠️ 规则提取失败，使用前5000字符")
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # 取前5000字符
            text_for_llm = content[:5000]
            print(f"📄 使用前5000字符 (实际长度: {len(text_for_llm)} 字符)")
            return False, text_for_llm
            
    except Exception as e:
        print(f"❌ 文件读取异常: {str(e)}")
        return False, ""

async def analyze_abstract_steps(file_path: str) -> Optional[AbstractSteps]:
    """
    分析markdown文件的摘要语步
    
    Args:
        file_path: markdown文件路径
    
    Returns:
        AbstractSteps对象或None
    """
    print(f"\n{'='*80}")
    print(f"🔍 开始分析文件: {file_path}")
    print(f"{'='*80}")
    
    # 1. 提取分析文本
    is_abstract, analysis_text = extract_text_for_llm(file_path)
    
    if not analysis_text.strip():
        print(f"❌ 无法获取分析文本")
        return None
    
    # 2. 调用LLM分析
    print(f"\n🤖 开始LLM语步分析...")
    start_time = time.time()
    
    result = await call_deepseek_api(analysis_text)
    
    end_time = time.time()
    elapsed = end_time - start_time
    
    if result:
        print(f"\n✅ 分析完成！耗时: {elapsed:.2f}秒")
        print(f"📊 语步分析结果:")
        print(f"   • 背景/问题: {result.background_problem}")
        print(f"   • 方法/途径: {result.method_approach}")
        print(f"   • 结果: {result.result}")
        print(f"   • 结论/贡献: {result.conclusion_contribution}")
        return result
    else:
        print(f"\n❌ 分析失败！耗时: {elapsed:.2f}秒")
        return None

# 测试函数
async def test_abstract_analysis():
    """测试摘要分析功能"""
    print(f"\n🧪 开始测试摘要语步分析系统")
    print(f"{'='*80}")
    
    # 测试文件列表
    test_files = [
        "/Users/xiaokong/task/2025/paper_vis/vis/md/af705d1369467b0aa55cb59354a84a0e.md",
        "/Users/xiaokong/task/2025/paper_vis/vis/md/3791465d4e18e4033b5c7bd322c44df2.md"
    ]
    
    results = []
    
    for i, file_path in enumerate(test_files, 1):
        print(f"\n📁 测试文件 {i}: {Path(file_path).name}")
        
        if not Path(file_path).exists():
            print(f"❌ 文件不存在: {file_path}")
            continue
        
        result = await analyze_abstract_steps(file_path)
        results.append((file_path, result))
        
        if i < len(test_files):
            print(f"\n⏳ 等待2秒后处理下一个文件...")
            await asyncio.sleep(2)
    
    # 汇总结果
    print(f"\n{'='*80}")
    print(f"📈 测试结果汇总")
    print(f"{'='*80}")
    
    success_count = sum(1 for _, result in results if result is not None)
    total_count = len(results)
    
    print(f"✅ 成功: {success_count}/{total_count}")
    print(f"❌ 失败: {total_count - success_count}/{total_count}")
    
    return results

# 运行测试
if __name__ == "__main__":
    # 运行异步测试
    results = await test_abstract_analysis()


/var/folders/qy/w56f01vd5h35lc_7plwj_k_r0000gn/T/ipykernel_1880/74677222.py:43: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  @validator('background_problem', 'method_approach', 'result', 'conclusion_contribution')



🧪 开始测试摘要语步分析系统

📁 测试文件 1: af705d1369467b0aa55cb59354a84a0e.md

🔍 开始分析文件: /Users/xiaokong/task/2025/paper_vis/vis/md/af705d1369467b0aa55cb59354a84a0e.md
找到abstract标题在第14行: # ---- A B S T R A C T  ----
✅ 规则提取成功，使用摘要文本 (长度: 1752 字符)

🤖 开始LLM语步分析...
🔄 LLM调用尝试 1/3
📝 LLM原始回复: {
    "Background/Problem": "Biological brains adaptively learn tasks, but current spiking neural networks ignore task similarity, limiting knowledge reuse efficiency in continual learning.",
    "Met...
✅ JSON解析成功
✅ Pydantic验证成功

✅ 分析完成！耗时: 7.19秒
📊 语步分析结果:
   • 背景/问题: Biological brains adaptively learn tasks, but current spiking neural networks ignore task similarity, limiting knowledge reuse efficiency in continual learning.
   • 方法/途径: We propose SCA-SNN, which adaptively reuses or expands neurons based on contextual task similarity to improve knowledge utilization and reduce energy.
   • 结果: Experiments on multiple datasets show SCA-SNN outperforms existing SNN and DNN continual learning methods in task and class i

# 标题规范化

In [None]:
# 标题层级规范化系统
import re
from pathlib import Path
from typing import List, Tuple, Dict

class HeadingNormalizer:
    """标题层级规范化器"""
    
    def __init__(self):
        """初始化规范化器"""
        # 定义已知的无编号一级标题关键词
        self.unnumbered_h1_keywords = [
            'Abstract', 'Introduction', 'Conclusion', 'References', 'Appendix',
            'Acknowledgments', 'Acknowledgements', 'Bibliography', 'Index',
            'Preface', 'Foreword', 'Summary', 'Executive Summary',
            'Table of Contents', 'List of Figures', 'List of Tables',
            'Nomenclature', 'Glossary', 'Abbreviations'
        ]
        
        # 编译正则表达式模式（按优先级排序）
        self.patterns = [
            # 三级标题: # 1.1.1. 标题
            (r'^(\s*)#\s+(\d+\.\d+\.\d+\.\s+.*)$', r'\1### \2'),
            
            # 二级标题: # 1.1. 标题  
            (r'^(\s*)#\s+(\d+\.\d+\.\s+.*)$', r'\1## \2'),
            
            # 一级标题: # 1. 标题
            (r'^(\s*)#\s+(\d+\.\s+.*)$', r'\1# \2'),
            
            # 无编号一级标题: # Abstract, # Conclusion 等
            (r'^(\s*)#\s+(' + '|'.join(self.unnumbered_h1_keywords) + r')(\s*.*)$', r'\1# \2\3'),
        ]
    
    def normalize_headings(self, markdown_text: str) -> List[str]:
        """
        对学术论文的Markdown文本进行标题层级规范化，并返回所有1级标题列表
        
        将PDF解析工具错误标记的所有一级标题(#)根据编号模式恢复为正确的标题层级
        
        Args:
            markdown_text: 包含论文内容的完整Markdown文本字符串
        
        Returns:
            List[str]: 所有1级标题列表
        """
        
        # 按行分割文本
        lines = markdown_text.split('\n')
        normalized_lines = []
        h1_headings = []  # 存储真正的1级标题
        
        # 统计信息
        stats = {
            'h1_numbered': 0,      # 编号一级标题
            'h1_unnumbered': 0,    # 无编号一级标题  
            'h2': 0,              # 二级标题
            'h3': 0,              # 三级标题
            'unchanged': 0        # 未匹配的标题
        }
        
        for i, line in enumerate(lines):
            original_line = line
            processed = False
            
            # 检查是否是以#开头的标题行
            if re.match(r'^\s*#\s+', line):
                # 按优先级尝试匹配各种模式
                for pattern, replacement in self.patterns:
                    if re.match(pattern, line):
                        # 执行替换
                        new_line = re.sub(pattern, replacement, line)
                        normalized_lines.append(new_line)
                        
                        # 统计处理结果并收集1级标题
                        if '###' in new_line:
                            stats['h3'] += 1
                        elif '##' in new_line:
                            stats['h2'] += 1
                        elif '# ' in new_line and not re.match(r'^\s*##', new_line):
                            # 只有真正的1级标题才收集
                            if re.match(r'^\s*#\s+\d+\.', new_line):
                                # 编号的一级标题 (如 # 1. Introduction)
                                stats['h1_numbered'] += 1
                                h1_headings.append(new_line.strip())
                            elif self._is_unnumbered_h1(new_line):
                                # 无编号的一级标题 (如 # Abstract, # Conclusion)
                                stats['h1_unnumbered'] += 1
                                h1_headings.append(new_line.strip())
                        
                        processed = True
                        break
                
                # 如果没有匹配任何模式，检查是否为无编号的1级标题
                if not processed:
                    normalized_lines.append(line)
                    if self._is_unnumbered_h1(line):
                        stats['h1_unnumbered'] += 1
                        h1_headings.append(line.strip())
                    else:
                        stats['unchanged'] += 1
            else:
                # 非标题行，直接添加
                normalized_lines.append(line)
        
        return h1_headings
    
    def _is_unnumbered_h1(self, line: str) -> bool:
        """
        判断是否为1级标题
        
        规则：
        1. 所有以#开头的都是标题
        2. 通过序号格式判断级别：
           - 1, 1. → 1级标题
           - 1.1, 1.1. → 2级标题
           - 1.1.1, 1.1.1. → 3级标题
        3. 没有序号的标题默认为1级标题
        
        Args:
            line: 标题行
        
        Returns:
            bool: 是否为1级标题
        """
        # 提取标题文本（去掉#和空格）
        title_text = re.sub(r'^\s*#+\s*', '', line).strip()
        
        # 1. 检查是否匹配已知的无编号1级标题关键词（精确匹配）
        for keyword in self.unnumbered_h1_keywords:
            if title_text.lower() == keyword.lower():
                return True
        
        # 2. 检查序号格式来判断标题级别
        # 2.1 匹配1级标题格式：纯数字开头（如 "1", "1.", "2", "2."）
        if re.match(r'^\d+\.?\s+', title_text):
            # 检查是否包含小数点（如 1.1, 1.1.1）
            if not re.search(r'\d+\.\d+', title_text):
                return True  # 这是1级标题
        
        # 2.2 检查是否为2级或3级标题格式
        if re.search(r'\d+\.\d+', title_text):
            return False  # 这是2级或3级标题
        
        # 3. 没有序号的标题默认为1级标题
        return True


    def extract_headings_only(self, markdown_text: str) -> List[str]:
        """
        提取规范化后的所有标题行
        
        Args:
            markdown_text: Markdown文本
        
        Returns:
            List[str]: 所有标题行的列表
        """
        lines = markdown_text.split('\n')
        headings = []
        
        for line in lines:
            if re.match(r'^\s*#+\s+', line):
                headings.append(line.strip())
        
        return headings
    
    def extract_h1_headings(self, markdown_text: str) -> List[str]:
        """
        提取所有1级标题，按在文档中出现的顺序返回
        
        Args:
            markdown_text: Markdown文本
        
        Returns:
            List[str]: 1级标题行的列表，每个元素是完整的原始行字符串（包含#号）
        """
        lines = markdown_text.split('\n')
        h1_headings = []
        
        for line in lines:
            line = line.strip()
            # 检查是否为标题行
            if line.startswith('#'):
                # 检查是否为1级标题
                if self._is_unnumbered_h1(line):
                    h1_headings.append(line)  # 返回完整的原始行（包含#号）
        
        return h1_headings

    def analyze_heading_structure(self, markdown_text: str) -> dict:
        """
        分析标题结构，返回层级统计信息
        
        Args:
            markdown_text: Markdown文本
        
        Returns:
            dict: 标题结构分析结果
        """
        headings = self.extract_headings_only(markdown_text)
        
        structure = {
            'total_headings': len(headings),
            'h1_count': 0,
            'h2_count': 0, 
            'h3_count': 0,
            'h4_count': 0,
            'h5_count': 0,
            'h6_count': 0,
            'headings_by_level': {}
        }
        
        for heading in headings:
            # 计算标题层级
            level = len(re.match(r'^#+', heading).group())
            structure[f'h{level}_count'] += 1
            
            if level not in structure['headings_by_level']:
                structure['headings_by_level'][level] = []
            structure['headings_by_level'][level].append(heading)
        
        return structure

    def process_markdown_file(self, file_path: str) -> List[str]:
        """
        处理实际的Markdown文件
        
        Args:
            file_path: Markdown文件路径
        
        Returns:
            List[str]: 所有1级标题列表
        """
        try:
            # 读取文件
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # 规范化处理，返回1级标题列表
            h1_headings = self.normalize_headings(content)
            
            return h1_headings
            
        except FileNotFoundError:
            return []
        except Exception as e:
            return []

# 运行测试
if __name__ == "__main__":
    # 创建规范化器实例
    normalizer = HeadingNormalizer()
    
    # 测试用例1: 模拟PDF解析后的错误标题格式
    test_text_1 = """# Abstract
This is the abstract content.

# 1. Introduction
This is the introduction.

# 1.1 Background
Background information here.

# 1.1.1 Historical Context
Historical context details.

# 1.2 Related Work
Related work section.

# 1.2.1 Previous Studies
Previous studies details.

# 2. Methodology
Methodology section.

# 2.1 Data Collection
Data collection methods.

# 2.1.1 Sampling Strategy
Sampling strategy details.

# 3. Results
Results section.

# Conclusion
This is the conclusion.

# References
References list."""
    
    # 规范化处理，返回1级标题列表
    h1_headings = normalizer.normalize_headings(test_text_1)
    print("1级标题列表:", h1_headings)
    
    # 分析标题结构
    structure = normalizer.analyze_heading_structure(test_text_1)
    print("标题结构分析:", structure)
    
    # 可选：处理实际文件
    test_files = [
         "/Users/xiaokong/task/2025/paper_vis/vis/md/3791465d4e18e4033b5c7bd322c44df2.md",
    ]
    
    for file_path in test_files:
        if Path(file_path).exists():
            h1_headings = normalizer.process_markdown_file(file_path)
            print(f"文件 {file_path} 的1级标题:", h1_headings)


1级标题列表: ['# Abstract', '# 1. Introduction', '# 2. Methodology', '# 3. Results', '# Conclusion', '# References']
标题结构分析: {'total_headings': 12, 'h1_count': 12, 'h2_count': 0, 'h3_count': 0, 'h4_count': 0, 'h5_count': 0, 'h6_count': 0, 'headings_by_level': {1: ['# Abstract', '# 1. Introduction', '# 1.1 Background', '# 1.1.1 Historical Context', '# 1.2 Related Work', '# 1.2.1 Previous Studies', '# 2. Methodology', '# 2.1 Data Collection', '# 2.1.1 Sampling Strategy', '# 3. Results', '# Conclusion', '# References']}}
文件 /Users/xiaokong/task/2025/paper_vis/vis/md/3791465d4e18e4033b5c7bd322c44df2.md 的1级标题: ['# A cerebrospinal fluid synaptic protein biomarker for prediction of cognitive resilience versus decline in Alzheimer’s disease', '# A list of authors and their affiliations appears at the end of the paper', '# Check for updates', '# Multicohort CSF proteomics for AD biomarker discovery', '# CSF YWHAG:NPTX2 versus established neurodegeneration AD biomarkers', '# CSF YWHAG:NPTX2 in normal

# LLM标题映射

In [None]:
"""
标题映射LLM模块
利用LLM对论文章节标题进行分类映射到四个标准泳道
"""

import json
import requests
import logging
from typing import List, Dict, Optional, Any
import time

class TitleMappingLLM:
    """标题映射LLM处理器"""
    
    def __init__(self, api_url: str = None, api_key: str = None, model: str = None):
        """
        初始化LLM处理器
        
        Args:
            api_url: LLM API地址
            api_key: API密钥
            model: 使用的模型名称
        """
        # 写死的LLM配置
        self.api_url = api_url or "https://api.deepseek.com/v1/chat/completions"
        self.api_key = api_key or "sk-8de35978ccec41e39e2b9ebfc90b7aa1"
        self.model = model or "deepseek-chat"
        
        # 设置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # 系统提示词
        self.system_prompt = """You are a top-tier **Academic Paper Structure Analyst** specializing in **cross-disciplinary semantic filtering and classification**. Your task is to accurately map chapter titles, which represent the **core research logic flow**, from a provided list of titles into four standard swimlanes.

**[Core Filtering and Mapping Rules]**
1.  **Filtering (Noise Reduction):** You must **ignore and discard** the following types of titles:
    * **Non-chapter content:** Paper main titles, author lists, publication metadata (e.g., "Article", "Online content", "Check for updates", "Reporting summary", "Data availability", "Author contributions", "Competing interests", etc.).
    * **Boundary anchors:** "Abstract" (or its variants), "References", "Acknowledgements", "Appendix".
2.  **Lane Assignment (Classification):** Only assign the filtered **valid core chapters** to the following **Four Standard Swimlanes**.
3.  **Quota Constraint (Max: 2):** The number of titles assigned to each standard swimlane **must not exceed two (Max: 2)**. If multiple titles belong to the same swimlane, you must select the core titles that best represent the function of that swimlane.

**[Four Standard Swimlanes]**
1.  Context & Related Work
2.  Methodology & Setup
3.  Results & Analysis
4.  Conclusion

**[CRITICAL FORMATTING REQUIREMENTS]**
1.  **Strictly and uniquely** output a JSON object conforming to the JSON structure.
2.  The Key must be the **Standard Swimlane Name**, and the Value must be an **array** containing the **EXACT original title strings**.
3.  **PRESERVE EXACT FORMAT:** You MUST preserve the exact original format of titles including ALL symbols, numbers, punctuation, capitalization, and spacing (e.g., "# 1. Introduction", "# 2. Related Work", etc.).
4.  **Absolutely forbid** outputting any explanations, preambles, summaries, or extra text."""

        # 用户提示词模板
        self.user_prompt_template = """Please analyze the **raw title list** provided below, which originates from a paper parser. Strictly adhere to the **filtering and quota constraints** rules specified in the system instructions to classify and map the core chapter titles into the four standard swimlanes.

**[Title List to be Processed]**
{title_list}

**[Example of Desired JSON Structure]**
Please strictly output your results according to the following concise structure, where the **Key is the Swimlane Name and the Value is an array of EXACT original title strings** (preserving all formatting including "#", numbers, punctuation):

{{
  "Context & Related Work": ["# 1. Introduction", "# 2. Related Work"],
  "Methodology & Setup": ["# 3. Methodology"],
  "Results & Analysis": ["# 4. Results", "# 5. Discussion"],
  "Conclusion": ["# 6. Conclusion"]
}}"""

    def _call_llm_api(self, messages: List[Dict[str, str]], max_retries: int = 3) -> Optional[str]:
        """
        调用LLM API
        
        Args:
            messages: 消息列表
            max_retries: 最大重试次数
            
        Returns:
            LLM响应内容
        """
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }
        
        payload = {
            "model": self.model,
            "messages": messages,
            "temperature": 0.1,  # 低温度确保一致性
            "max_tokens": 1000
        }
        
        for attempt in range(max_retries):
            try:
                response = requests.post(
                    self.api_url,
                    headers=headers,
                    json=payload,
                    timeout=30
                )
                
                if response.status_code == 200:
                    result = response.json()
                    return result["choices"][0]["message"]["content"]
                else:
                    self.logger.warning(f"API调用失败，状态码: {response.status_code}, 响应: {response.text}")
                    
            except requests.exceptions.RequestException as e:
                self.logger.warning(f"API调用异常 (尝试 {attempt + 1}/{max_retries}): {e}")
                
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 指数退避
                
        self.logger.error("LLM API调用失败，已达到最大重试次数")
        return None

    def _parse_json_response(self, response: str) -> Optional[Dict[str, List[str]]]:
        """
        解析LLM返回的JSON响应
        
        Args:
            response: LLM响应字符串
            
        Returns:
            解析后的字典，如果解析失败返回None
        """
        try:
            # 尝试直接解析
            result = json.loads(response)
            
            # 验证结构
            expected_keys = {
                "Context & Related Work",
                "Methodology & Setup", 
                "Results & Analysis",
                "Conclusion"
            }
            
            if not all(key in result for key in expected_keys):
                self.logger.warning("JSON结构不完整，缺少必要的键")
                return None
                
            # 验证值类型
            for key, value in result.items():
                if not isinstance(value, list):
                    self.logger.warning(f"键 '{key}' 的值不是列表类型")
                    return None
                    
            return result
            
        except json.JSONDecodeError as e:
            self.logger.warning(f"JSON解析失败: {e}")
            
            # 尝试提取JSON部分
            try:
                # 查找JSON开始和结束位置
                start_idx = response.find('{')
                end_idx = response.rfind('}')
                
                if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
                    json_str = response[start_idx:end_idx + 1]
                    result = json.loads(json_str)
                    return result
                    
            except json.JSONDecodeError:
                pass
                
            return None

    def map_titles(self, title_list: List[str]) -> Dict[str, List[str]]:
        """
        将标题列表映射到四个标准泳道
        
        Args:
            title_list: 原始标题列表
            
        Returns:
            干净的映射结果字典
        """
        if not title_list:
            self.logger.error("标题列表为空")
            return {}
            
        if not self.api_key:
            self.logger.error("API密钥未设置")
            return {}
        
        try:
            # 构建用户提示词
            title_list_str = "\n".join([f"'{title}'" for title in title_list])
            user_prompt = self.user_prompt_template.format(title_list=title_list_str)
            
            # 构建消息
            messages = [
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": user_prompt}
            ]
            
            self.logger.info(f"开始处理 {len(title_list)} 个标题")
            
            # 调用LLM API
            response = self._call_llm_api(messages)
            
            if response is None:
                self.logger.error("LLM API调用失败")
                return {}
            
            # 解析响应
            result = self._parse_json_response(response)
            
            if result is None:
                self.logger.error(f"LLM响应解析失败，原始响应: {response}")
                return {}
            
            # 验证结果
            total_mapped = sum(len(titles) for titles in result.values())
            self.logger.info(f"成功映射 {total_mapped} 个标题到四个泳道")
            
            return result
            
        except Exception as e:
            self.logger.error(f"标题映射过程中发生异常: {e}")
            return {}

    def map_titles_with_debug(self, title_list: List[str]) -> Dict[str, Any]:
        """
        带调试信息的标题映射
        
        Args:
            title_list: 原始标题列表
            
        Returns:
            包含详细调试信息的映射结果
        """
        debug_info = {
            "input_titles": title_list,
            "input_count": len(title_list),
            "api_url": self.api_url,
            "model": self.model,
            "timestamp": time.time()
        }
        
        result = self.map_titles(title_list)
        result["debug_info"] = debug_info
        
        return result


def main():
    """测试函数"""
    # 示例使用
    title_list = [
        "# A Q-learning approach to the continuous control problem of robot inverted pendulum balancing",
        "# Corresponding Author:",
        "# Abstract",
        "# 1. Introduction",
        "# 2. Proposed approach and background", 
        "# 3. Methodologies",
        "# 4. Results and discussion",
        "# 5. Conclusion",
        "# Acknowledgements",
        "# References"
    ]
    
    # 初始化处理器（需要设置API密钥）
    mapper = TitleMappingLLM(
        api_key="your-api-key-here",  # 替换为实际的API密钥
        model="gpt-3.5-turbo"
    )
    
    # 执行映射
    result = mapper.map_titles_with_debug(title_list)
    
    print("=== 标题映射结果 ===")
    print(f"成功: {result['success']}")
    
    if result['success']:
        print("映射结果:")
        print(json.dumps(result['result'], indent=2, ensure_ascii=False))
        print("\n统计信息:")
        print(json.dumps(result['statistics'], indent=2, ensure_ascii=False))
    else:
        print(f"错误: {result['error']}")
        if 'raw_response' in result:
            print(f"原始响应: {result['raw_response']}")


if __name__ == "__main__":
    main()


# 标题内容分割-抽取

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
根据标题提取Markdown文件内容的工具

功能：
- 根据标题列表和指定标题，提取该标题到下一个标题之间的内容
- 支持处理最后一个标题的情况（提取到文件结束）
"""

import os
import re
from typing import List, Optional


class ContentExtractor:
    """根据标题提取Markdown文件内容的类"""
    
    def __init__(self):
        """初始化内容提取器"""
        pass
    
    def _normalizeHeading(self, heading: str) -> str:
        """
        标准化标题格式，用于灵活匹配
        
        Args:
            heading: 原始标题
        
        Returns:
            str: 标准化后的标题
        """
        if not heading:
            return ""
        
        # 去除首尾空格
        normalized = heading.strip()
        
        # 如果标题以#开头，保留#和后面的内容
        if normalized.startswith('#'):
            # 提取#后面的内容
            content = normalized[1:].strip()
            return f"#{content}"
        else:
            # 如果没有#，直接返回去除空格后的内容
            return normalized
    
    def _extractHeadingContent(self, heading: str) -> str:
        """
        提取标题的核心内容（去除#、序号等）
        
        Args:
            heading: 标题
        
        Returns:
            str: 核心内容
        """
        if not heading:
            return ""
        
        # 去除首尾空格
        content = heading.strip()
        
        # 去除开头的#号
        if content.startswith('#'):
            content = content[1:].strip()
        
        # 去除开头的序号（如 "1. "、"2. "等）
        # 匹配模式：数字 + 点 + 空格
        content = re.sub(r'^\d+\.\s*', '', content)
        
        # 去除多余空格
        content = re.sub(r'\s+', ' ', content).strip()
        
        return content
    
    def _isHeadingMatch(self, heading1: str, heading2: str) -> bool:
        """
        判断两个标题是否匹配（支持灵活匹配）
        
        Args:
            heading1: 标题1
            heading2: 标题2
        
        Returns:
            bool: 是否匹配
        """
        # 标准化两个标题
        norm1 = self._normalizeHeading(heading1)
        norm2 = self._normalizeHeading(heading2)
        
        # 提取核心内容
        content1 = self._extractHeadingContent(norm1)
        content2 = self._extractHeadingContent(norm2)
        
        # 完全匹配
        if content1 == content2:
            return True
        
        # 忽略大小写匹配
        if content1.lower() == content2.lower():
            return True
        
        # 去除所有空格后匹配
        clean1 = re.sub(r'\s+', '', content1)
        clean2 = re.sub(r'\s+', '', content2)
        if clean1.lower() == clean2.lower():
            return True
        
        return False
    
    def extractContentByHeading(self, headingList: List[str], filePath: str, targetHeading: str) -> str:
        """
        根据标题列表和指定标题提取内容
        
        Args:
            headingList: 标题列表，例如 ['# Abstract', '# 1. Introduction', '# 2. Methods']
            filePath: Markdown文件路径
            targetHeading: 目标标题，例如 "# 5. Conclusion"
        
        Returns:
            str: 提取的内容文本
        """
        try:
            # 读取文件内容
            with open(filePath, 'r', encoding='utf-8') as file:
                content = file.read()
            
            # 按行分割内容
            lines = content.split('\n')
            
            # 找到目标标题在标题列表中的位置
            targetIndex = self._findHeadingIndex(headingList, targetHeading)
            if targetIndex == -1:
                return f"错误：在标题列表中未找到目标标题 '{targetHeading}'"
            
            # 找到目标标题在文件中的位置
            targetLineIndex = self._findHeadingInFile(lines, targetHeading)
            if targetLineIndex == -1:
                return f"错误：在文件中未找到目标标题 '{targetHeading}'"
            
            # 确定结束位置
            if targetIndex == len(headingList) - 1:
                # 如果是最后一个标题，提取到文件结束
                endLineIndex = len(lines)
            else:
                # 找到下一个标题在文件中的位置
                nextHeading = headingList[targetIndex + 1]
                endLineIndex = self._findHeadingInFile(lines, nextHeading)
                if endLineIndex == -1:
                    return f"错误：在文件中未找到下一个标题 '{nextHeading}'"
            
            # 提取内容
            extractedLines = lines[targetLineIndex:endLineIndex]
            extractedContent = '\n'.join(extractedLines)
            
            return extractedContent.strip()
            
        except FileNotFoundError:
            return f"错误：文件 '{filePath}' 不存在"
        except Exception as e:
            return f"错误：处理文件时发生异常 - {str(e)}"
    
    def _findHeadingIndex(self, headingList: List[str], targetHeading: str) -> int:
        """
        在标题列表中找到目标标题的索引（支持灵活匹配）
        
        Args:
            headingList: 标题列表
            targetHeading: 目标标题
        
        Returns:
            int: 标题索引，未找到返回-1
        """
        for i, heading in enumerate(headingList):
            if self._isHeadingMatch(heading, targetHeading):
                return i
        return -1
    
    def _findHeadingInFile(self, lines: List[str], heading: str) -> int:
        """
        在文件行中找到标题的位置（支持灵活匹配）
        
        Args:
            lines: 文件行列表
            heading: 要查找的标题
        
        Returns:
            int: 行索引，未找到返回-1
        """
        for i, line in enumerate(lines):
            if self._isHeadingMatch(line, heading):
                return i
        return -1
    
    def extractContentByHeadingWithDebug(self, headingList: List[str], filePath: str, targetHeading: str) -> dict:
        """
        带调试信息的提取内容方法
        
        Args:
            headingList: 标题列表
            filePath: Markdown文件路径
            targetHeading: 目标标题
        
        Returns:
            dict: 包含结果和调试信息的字典
        """
        result = {
            'success': False,
            'content': '',
            'debug_info': {
                'target_heading': targetHeading,
                'target_index': -1,
                'next_heading': '',
                'start_line': -1,
                'end_line': -1,
                'total_lines': 0,
                'error': ''
            }
        }
        
        try:
            # 读取文件内容
            with open(filePath, 'r', encoding='utf-8') as file:
                content = file.read()
            
            lines = content.split('\n')
            result['debug_info']['total_lines'] = len(lines)
            
            # 找到目标标题在标题列表中的位置
            targetIndex = self._findHeadingIndex(headingList, targetHeading)
            result['debug_info']['target_index'] = targetIndex
            
            if targetIndex == -1:
                result['debug_info']['error'] = f"在标题列表中未找到目标标题 '{targetHeading}'"
                return result
            
            # 找到目标标题在文件中的位置
            targetLineIndex = self._findHeadingInFile(lines, targetHeading)
            result['debug_info']['start_line'] = targetLineIndex
            
            if targetLineIndex == -1:
                result['debug_info']['error'] = f"在文件中未找到目标标题 '{targetHeading}'"
                return result
            
            # 确定结束位置
            if targetIndex == len(headingList) - 1:
                # 最后一个标题
                endLineIndex = len(lines)
                result['debug_info']['next_heading'] = '文件结束'
            else:
                # 找到下一个标题
                nextHeading = headingList[targetIndex + 1]
                result['debug_info']['next_heading'] = nextHeading
                endLineIndex = self._findHeadingInFile(lines, nextHeading)
                
                if endLineIndex == -1:
                    result['debug_info']['error'] = f"在文件中未找到下一个标题 '{nextHeading}'"
                    return result
            
            result['debug_info']['end_line'] = endLineIndex
            
            # 提取内容
            extractedLines = lines[targetLineIndex:endLineIndex]
            extractedContent = '\n'.join(extractedLines)
            
            result['success'] = True
            result['content'] = extractedContent.strip()
            
            return result
            
        except FileNotFoundError:
            result['debug_info']['error'] = f"文件 '{filePath}' 不存在"
            return result
        except Exception as e:
            result['debug_info']['error'] = f"处理文件时发生异常 - {str(e)}"
            return result


# def main():
#     """主函数，用于测试功能"""
#     extractor = ContentExtractor()
    
#     # 测试数据
#     headingList = [
#         '# A Q-learning approach to the continuous control problem of robot inverted pendulum balancing',
#         '# Corresponding Author:',
#         '# A Q-learning approach to the continuous control problem of robot inverted pendulum balancing',
#         '# Abstract',
#         '# 1. Introduction',
#         '# 2. Proposed approach and background',
#         '# 3. Methodologies',
#         '# 4. Results and discussion',
#         '# 5. Conclusion',
#         '# Acknowledgements',
#         '# References'
#     ]
    
#     filePath = "/Users/xiaokong/task/2025/paper_vis/vis/md/2dbbabd2678ba74fcd9b08aadae975ae.md"
#     targetHeading = "# 5. Conclusion"
    
#     print("=== 测试内容提取功能 ===")
#     print(f"目标标题: {targetHeading}")
#     print(f"文件路径: {filePath}")
#     print()
    
#     # 测试基本功能
#     content = extractor.extractContentByHeading(headingList, filePath, targetHeading)
#     print("=== 提取的内容 ===")
#     print(content)
#     print()
    
#     # 测试调试功能
#     debugResult = extractor.extractContentByHeadingWithDebug(headingList, filePath, targetHeading)
#     print("=== 调试信息 ===")
#     print(f"成功: {debugResult['success']}")
#     print(f"目标标题: {debugResult['debug_info']['target_heading']}")
#     print(f"目标索引: {debugResult['debug_info']['target_index']}")
#     print(f"下一个标题: {debugResult['debug_info']['next_heading']}")
#     print(f"开始行: {debugResult['debug_info']['start_line']}")
#     print(f"结束行: {debugResult['debug_info']['end_line']}")
#     print(f"总行数: {debugResult['debug_info']['total_lines']}")
#     if debugResult['debug_info']['error']:
#         print(f"错误: {debugResult['debug_info']['error']}")


# if __name__ == "__main__":
#     main()


# 从markdown中找到所有模块的内容

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
综合内容提取器
整合标题规范化、标题映射和内容提取功能

功能流程：
1. 使用NormalizeHeadings.py获取清洗后的一级标题列表
2. 使用TitleMappingLLM.py将标题映射到四个标准泳道
3. 使用extractContentByHeading.py根据映射结果提取具体内容
4. 返回按泳道组织的完整内容字典
"""

import os
from typing import Dict, List
from NormalizeHeadings import HeadingNormalizer
from TitleMappingLLM import TitleMappingLLM
from extractContentByHeading import ContentExtractor


class ComprehensiveContentExtractor:
    """综合内容提取器"""
    
    def __init__(self):
        """初始化综合提取器"""
        self.heading_normalizer = HeadingNormalizer()
        self.title_mapper = TitleMappingLLM()
        self.content_extractor = ContentExtractor()
    
    def extract_comprehensive_content(self, markdown_file_path: str) -> Dict[str, str]:
        """
        综合内容提取主函数
        
        Args:
            markdown_file_path: Markdown文件路径
        
        Returns:
            Dict[str, str]: 按四个标准泳道组织的完整内容字典
        """
        try:
            # 步骤1: 获取清洗后的一级标题列表
            print("步骤1: 获取清洗后的一级标题列表...")
            h1_headings = self.heading_normalizer.process_markdown_file(markdown_file_path)
            
            if not h1_headings:
                print("错误: 未能提取到一级标题")
                return {}
            
            print(f"提取到 {len(h1_headings)} 个一级标题:")
            for i, heading in enumerate(h1_headings, 1):
                print(f"  {i}. {heading}")
            print()
            
            # 步骤2: 将标题映射到四个标准泳道
            print("步骤2: 将标题映射到四个标准泳道...")
            mapping_result = self.title_mapper.map_titles(h1_headings)
            
            if not mapping_result:
                print("错误: 标题映射失败")
                return {}
            
            print("映射结果:")
            for lane, titles in mapping_result.items():
                print(f"  {lane}: {titles}")
            print()
            
            # 步骤3: 根据映射结果提取具体内容
            print("步骤3: 提取各泳道的具体内容...")
            final_result = {}
            
            for lane_name, mapped_titles in mapping_result.items():
                print(f"处理泳道: {lane_name}")
                lane_content = ""
                
                for title in mapped_titles:
                    print(f"  提取标题: {title}")
                    content = self.content_extractor.extractContentByHeading(
                        h1_headings, markdown_file_path, title
                    )
                    
                    # 检查是否提取成功
                    if content.startswith("错误："):
                        print(f"    警告: {content}")
                        continue
                    
                    # 拼接内容
                    if lane_content:
                        lane_content += "\n\n" + content
                    else:
                        lane_content = content
                    
                    print(f"    成功提取 {len(content)} 个字符")
                
                final_result[lane_name] = lane_content
                print(f"  泳道 '{lane_name}' 总内容长度: {len(lane_content)} 字符")
                print()
            
            print("=== 综合内容提取完成 ===")
            return final_result
            
        except Exception as e:
            print(f"综合内容提取过程中发生异常: {e}")
            return {}
    
    def extract_content_with_summary(self, markdown_file_path: str) -> Dict[str, any]:
        """
        带摘要信息的综合内容提取
        
        Args:
            markdown_file_path: Markdown文件路径
        
        Returns:
            Dict[str, any]: 包含内容和摘要信息的字典
        """
        try:
            # 获取清洗后的一级标题列表
            h1_headings = self.heading_normalizer.process_markdown_file(markdown_file_path)
            
            if not h1_headings:
                return {
                    'success': False,
                    'error': '未能提取到一级标题',
                    'content': {}
                }
            
            # 将标题映射到四个标准泳道
            mapping_result = self.title_mapper.map_titles(h1_headings)
            
            if not mapping_result:
                return {
                    'success': False,
                    'error': '标题映射失败',
                    'content': {}
                }
            
            # 提取具体内容
            final_result = {}
            extraction_stats = {}
            
            for lane_name, mapped_titles in mapping_result.items():
                lane_content = ""
                extraction_stats[lane_name] = {
                    'titles_count': len(mapped_titles),
                    'titles': mapped_titles,
                    'successful_extractions': 0,
                    'failed_extractions': 0
                }
                
                for title in mapped_titles:
                    content = self.content_extractor.extractContentByHeading(
                        h1_headings, markdown_file_path, title
                    )
                    
                    if content.startswith("错误："):
                        extraction_stats[lane_name]['failed_extractions'] += 1
                        continue
                    
                    extraction_stats[lane_name]['successful_extractions'] += 1
                    
                    if lane_content:
                        lane_content += "\n\n" + content
                    else:
                        lane_content = content
                
                final_result[lane_name] = lane_content
            
            return {
                'success': True,
                'content': final_result,
                'summary': {
                    'total_h1_headings': len(h1_headings),
                    'h1_headings': h1_headings,
                    'mapping_result': mapping_result,
                    'extraction_stats': extraction_stats,
                    'file_path': markdown_file_path
                }
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': f'处理过程中发生异常: {e}',
                'content': {}
            }


def main():
    """主函数，用于测试综合内容提取功能"""
    print("=== 综合内容提取器测试 ===")
    
    # 创建综合提取器实例
    extractor = ComprehensiveContentExtractor()
    
    # 测试文件路径（请根据实际情况修改）
    test_file_path = "/Users/xiaokong/task/2025/paper_vis/vis/md/2dbbabd2678ba74fcd9b08aadae975ae.md"
    
    # 检查文件是否存在
    if not os.path.exists(test_file_path):
        print(f"错误: 测试文件不存在: {test_file_path}")
        print("请修改 test_file_path 变量为实际存在的Markdown文件路径")
        return
    
    print(f"测试文件: {test_file_path}")
    print()
    
    # 执行综合内容提取
    result = extractor.extract_comprehensive_content(test_file_path)
    
    if result:
        print("=== 最终结果 ===")
        for lane_name, content in result.items():
            print(f"\n泳道: {lane_name}")
            print(f"内容长度: {len(content)} 字符")
            print(f"内容预览: {content}...")
            print("-" * 50)
    else:
        print("内容提取失败")
    
    print("\n=== 测试完成 ===")


if __name__ == "__main__":
    main()


# 匹配图表位置

{
  "Context & Related Work": [
    {
      "figure_id": "b357bf6b985700391e95e23414e4b79c293d3fc25bdec3f58908049d50871fa7", 
      "figure_caption": "....",
      "reference_text": ["....","..."，”...“] 
    }
  ],
  "Methodology & Setup": [
    {
      "figure_id": "80b3149cfde69122d789174087de284b1b7ca2a42efb05472127cf3a3416e08e",
      "figure_caption": "....",
      "reference_text": ["...."]
    },
    // ... 更多图表
  ],
  "Results & Analysis": [],
  "Conclusion": []
}


1. 通过ComprehensiveContentExtractor获取到最终需要抽取点的原始文本
2. 通过merge_data得到mergedata，执行get_figure得到图表数据
3. 遍历每个caption去匹配到泳道
4. 形成figure_map数据以作为图表可视化数据

In [6]:
!pip install sumy

Collecting sumy
  Downloading sumy-0.11.0-py2.py3-none-any.whl.metadata (7.5 kB)
Collecting breadability>=0.1.20 (from sumy)
  Downloading breadability-0.1.20.tar.gz (32 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting pycountry>=18.2.23 (from sumy)
  Downloading pycountry-24.6.1-py3-none-any.whl.metadata (12 kB)
Downloading sumy-0.11.0-py2.py3-none-any.whl (97 kB)
Downloading pycountry-24.6.1-py3-none-any.whl (6.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m11.5 MB/s[0m  [33m0:00:00[0meta [36m0:00:01[0m
[?25hBuilding wheels for collected packages: breadability
[33m  DEPRECATION: Building 'breadability' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to 

# 总调度器