### 1. 实现金融领域的专有名词标准化系统


In [1]:
from pymilvus import model
from pymilvus import MilvusClient, DataType, FieldSchema, CollectionSchema
import pandas as pd
from tqdm import tqdm
import logging
from dotenv import load_dotenv
import torch
import os

# 加载环境变量（如果后续用到 API key 等）
load_dotenv()

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化嵌入模型（BGE-M3，适合多语言金融文本）
embedding_function = model.dense.SentenceTransformerEmbeddingFunction(
    model_name='BAAI/bge-m3',
    device='cuda:0' if torch.cuda.is_available() else 'cpu',
    trust_remote_code=True
)

# 文件路径（金融术语 CSV）
file_path = "data/financial_terms.csv"  # 建议重命名文件以反映内容
db_path = "db/financial_terms_bge_m3.db"

# Milvus 集合名称（语义清晰）
collection_name = "financial_term_embeddings"

# 连接 Milvus（使用本地文件存储）
client = MilvusClient(db_path)

# 加载金融术语 CSV
logging.info("Loading financial terms from CSV...")
df = pd.read_csv(file_path, header=None, names=["term", "category"], dtype=str).fillna("")

# 确保有两列
if df.shape[1] < 2:
    raise ValueError("CSV must contain at least two columns: term and category")

# 获取向量维度（用样本文本）
sample_embedding = embedding_function(["Sample financial term"])[0]
vector_dim = len(sample_embedding)

# 定义 Milvus Collection Schema（金融场景字段）
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=vector_dim),
    FieldSchema(name="term", dtype=DataType.VARCHAR, max_length=200),       # 金融术语（如 "A Round Financing"）
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=50),    # 类别（如 "FINTERP"）
    FieldSchema(name="source_file", dtype=DataType.VARCHAR, max_length=255),
]

schema = CollectionSchema(
    fields=fields,
    description="Financial terminology vector collection using BGE-M3 embeddings"
)

# 创建集合（如果不存在）
if not client.has_collection(collection_name):
    client.create_collection(collection_name=collection_name, schema=schema)
    logging.info(f"Created collection: {collection_name}")

# 创建向量索引（COSINE 相似度）
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="vector",
    index_type="AUTOINDEX",
    metric_type="COSINE",
    params={"nlist": 128}  # 可根据数据规模调整；小数据集可降低
)
client.create_index(collection_name=collection_name, index_params=index_params)
logging.info("Vector index created.")



  from pkg_resources import DistributionNotFound, get_distribution
2025-12-26 03:09:03,030 - INFO - Load pretrained SentenceTransformer: BAAI/bge-m3
2025-12-26 03:09:09,138 - DEBUG - Created new connection using: 8ec1c13d52234604837e246abe858e79
2025-12-26 03:09:09,139 - INFO - Loading financial terms from CSV...
2025-12-26 03:09:09,935 - DEBUG - Successfully created an index on collection: financial_term_embeddings
2025-12-26 03:09:09,936 - INFO -  Vector index created.


In [5]:
# 批量插入数据
batch_size = 512  # 金融术语通常较短，可适当增大；也可保持 1024

for start_idx in tqdm(range(0, len(df), batch_size), desc="Embedding & inserting batches"):
    end_idx = min(start_idx + batch_size, len(df))
    batch_df = df.iloc[start_idx:end_idx]

    # 仅使用术语文本（第一列）生成嵌入
    terms = batch_df["term"].astype(str).tolist()
    try:
        embeddings = embedding_function(terms)
    except Exception as e:
        logging.error(f"❌ Embedding generation failed at batch {start_idx // batch_size + 1}: {e}")
        continue

    # 构建插入数据
    data = [
        {
            "vector": emb,
            "term": term,
            "category": cat,
            "source_file": os.path.basename(file_path),
        }
        for emb, (term, cat) in zip(embeddings, zip(batch_df["term"], batch_df["category"]))
    ]

    # 插入 Milvus
    try:
        res = client.insert(collection_name=collection_name, data=data)
        logging.debug(f"Inserted batch {start_idx // batch_size + 1}, IDs: {res['ids'][:3]}...")
    except Exception as e:
        logging.error(f" Insert failed at batch {start_idx // batch_size + 1}: {e}")

logging.info("All financial terms inserted successfully.")



Embedding & inserting batches: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [00:10<00:00,  3.15it/s]
2025-12-25 08:19:38,641 - INFO -  All financial terms inserted successfully.


In [6]:
# ------------------------
# 示例：查询相似金融术语
# ------------------------
query_term = "A Round Financing"
logging.info(f" Searching for similar terms to: '{query_term}'")

query_vec = embedding_function([query_term])[0]

search_results = client.search(
    collection_name=collection_name,
    data=[query_vec.tolist()],
    limit=5,
    output_fields=["term", "category"]
)

logging.info("Top matches:")
for hit in search_results[0]:
    logging.info(f"  - {hit['entity']['term']} ({hit['entity']['category']}) | score: {hit['distance']:.4f}")



2025-12-25 08:19:57,844 - INFO - Searching for similar terms to: 'A Round Financing'
2025-12-25 08:19:57,890 - INFO - Top matches:
2025-12-25 08:19:57,890 - INFO -   - A Round Financing (FINTERM) | score: 1.0000
2025-12-25 08:19:57,890 - INFO -   - Financing (FINTERM) | score: 0.7270
2025-12-25 08:19:57,891 - INFO -   - Back-to-Back Loan (FINTERM) | score: 0.7172
2025-12-25 08:19:57,891 - INFO -   - To Fund (FINTERM) | score: 0.7160
2025-12-25 08:19:57,892 - INFO -   - Cash-Flow Financing (FINTERM) | score: 0.7159


In [7]:
# ------------------------
# 示例：精确查询某个术语
# ------------------------
exact_query = client.query(
    collection_name=collection_name,
    filter='term == "AAA"',
    output_fields=["term", "category"]
)
logging.info(f"Exact match for 'AAA': {exact_query}")

2025-12-25 08:20:02,724 - INFO - Exact match for 'AAA': data: ["{'id': 463116899139452943, 'category': 'FINTERM', 'term': 'AAA'}"] 


### 2. 自学习并实践不同的数据导入方法。

#### TextLoader Load txt

In [2]:
#
# 01_LangChain_TXT.py
from langchain_community.document_loaders import TextLoader

loader = TextLoader("data/black.txt")
documents = loader.load()

print(documents)

[Document(metadata={'source': 'data/black.txt'}, page_content='在中国传统文化与现代游戏设计交织的奇幻世界里，《黑神话：悟空》凭借其深厚的文化底蕴和卓越的艺术表现力，成为了国产游戏的一颗璀璨明珠。本文将深入探讨这款游戏背后的故事情节、游戏角色设计以及场景设定，为那些渴望系统学习角色设计和场景设计的创作者们提供灵感和指导。通过对《黑神话：悟空》的剖析，我们可以窥见如何将传统元素与现代创意巧妙融合，从而创作出既有文化底蕴又充满创新精神的艺术作品。\n\n\n\n《黑神话：悟空》是一款以中国古典名著《西游记》为背景的国产单机游戏，它以其精美的画面、深刻的角色设定和精美的场景设计而备受瞩目。下面我们将详细介绍游戏的背景故事、游戏角色设计以及场景设定，并为希望学习角色设计和场景设计的用户提供一些借鉴思路。\n\n\n\n背景故事\n《黑神话：悟空》的故事背景设定在《西游记》原著之后的一个时空。在这个世界中，由于神仙们的争斗导致生灵涂炭，妖精们趁乱当上了假佛。玩家需要扮演天命之人，踏上新的征程，与孙悟空一起揭开真相，恢复世界的秩序。\n\n\n\n角色设计\n孙悟空：作为主角，他的设计既保留了经典的形象特征，如金箍棒和猴脸，同时也融入了一些新的元素，如更现代的服饰和战斗风格。孙悟空的设计注重体现力量感和灵动性，同时也要表现出他内心的挣扎和成长。\n\n其他角色：游戏中还包括了许多其他的经典角色，如猪八戒、沙僧等，他们的设计同样融合了传统与现代元素，以符合游戏的整体风格。此外，还有许多原创角色和妖怪，如犀将军、雾隐兽等，这些角色的设计同样精致且富有想象力。\n\n\n\n\n场景设定\n游戏的场景设定极具特色，不仅有经典的中国山水画风格，还结合了现代特效技术，打造出一个既神秘又壮丽的世界。场景设计强调氛围营造和视觉冲击力，具体包括：\n\n花果山：作为孙悟空的故乡，这里是一个生机勃勃的地方，充满了绿意盎然的树木和瀑布。\n\n天宫：天宫的设计体现了宏伟和庄严，建筑风格融合了传统与幻想元素，给人以震撼之感。\n\n地狱：地狱场景则更加黑暗和恐怖，用以衬托游戏中的冲突和挑战。\n\n \n\n\n学习借鉴思路\n对于希望系统学习角色设计和场景设计的用户来说，《黑神话：悟空》提供了一个很好的学习案例：\n\n深入研究原作：深

#### LangChain JSON Loader

In [11]:
# 02_LangChain_JSON.py
from langchain_community.document_loaders import JSONLoader

print("==== JSONLoader 加载结果 ====")

# 1. 主角信息
print("1. 主角信息：")
main_loader = JSONLoader(
    file_path="data/黑神话人物角色.json",
    jq_schema='.mainCharacter | "姓名: " + .name + ", 背景: " + .backstory',
    text_content=True
)
main_char = main_loader.load()
print(main_char)

# 2. 支持角色信息
print("\n2. 支持角色信息：")
support_loader = JSONLoader(
    file_path="data/黑神话人物角色.json",
    jq_schema='.supportCharacters[] | "姓名: " + .name + ", 背景: " + .background',
    text_content=True
)
support_chars = support_loader.load()
print(support_chars)

==== JSONLoader 加载结果 ====
1. 主角信息：
[Document(metadata={'source': '/data/home/pi/git/rag-project02-medical-nlp-box/backend/data/黑神话人物角色.json', 'seq_num': 1}, page_content='姓名: 孙悟空, 背景: 混沌初开之时，盘古开天辟地，天地灵气凝结成仙石，其中孕育出一只石猴。这只石猴拜师菩提老祖，修得一身本领，后被赐名孙悟空。')]

2. 支持角色信息：
[Document(metadata={'source': '/data/home/pi/git/rag-project02-medical-nlp-box/backend/data/黑神话人物角色.json', 'seq_num': 1}, page_content='姓名: 白龙马, 背景: 原为西海龙王三太子敖烈，因冒犯天条被贬为马，后随唐僧西行取经，成为孙悟空的伙伴。'), Document(metadata={'source': '/data/home/pi/git/rag-project02-medical-nlp-box/backend/data/黑神话人物角色.json', 'seq_num': 2}, page_content='姓名: 红孩儿, 背景: 牛魔王与铁扇公主之子，修炼火焰三昧，掌握三昧真火。'), Document(metadata={'source': '/data/home/pi/git/rag-project02-medical-nlp-box/backend/data/黑神话人物角色.json', 'seq_num': 3}, page_content='姓名: 六耳猕猴, 背景: 天地间与美猴王最像的存在，有着与孙悟空相似的能力。')]


#### 3. 重构 Load File (增加工具和参数)

In [12]:
from pypdf import PdfReader
from unstructured.partition.pdf import partition_pdf
import pdfplumber
import fitz  # PyMuPDF
import pypdfium2 as pdfium  # <<< 新增导入
import logging
import os
from datetime import datetime
import json

logger = logging.getLogger(__name__)

class LoadingService:
    """
    PDF文档加载服务类，提供多种PDF文档加载和处理方法。
    
    属性:
        total_pages (int): 当前加载PDF文档的总页数
        current_page_map (list): 存储当前文档的页面映射信息，每个元素包含页面文本和页码
    """
    
    def __init__(self):
        self.total_pages = 0
        self.current_page_map = []
    
    def load_pdf(self, file_path: str, method: str, strategy: str = None, chunking_strategy: str = None, chunking_options: dict = None) -> str:
        """
        加载PDF文档的主方法，支持多种加载策略。

        参数:
            file_path (str): PDF文件路径
            method (str): 加载方法，支持 'pymupdf', 'pypdf', 'pdfplumber', 'unstructured', 'pypdfium2'
            strategy (str, optional): 使用unstructured方法时的策略
            chunking_strategy (str, optional): 文本分块策略
            chunking_options (dict, optional): 分块选项配置

        返回:
            str: 提取的文本内容
        """
        try:
            if method == "pymupdf":
                return self._load_with_pymupdf(file_path)
            elif method == "pypdf":
                return self._load_with_pypdf(file_path)
            elif method == "pdfplumber":
                return self._load_with_pdfplumber(file_path)
            elif method == "unstructured":
                return self._load_with_unstructured(
                    file_path, 
                    strategy=strategy,
                    chunking_strategy=chunking_strategy,
                    chunking_options=chunking_options
                )
            elif method == "pypdfium2":
                return self._load_with_pypdfium2(file_path)
            else:
                raise ValueError(f"Unsupported loading method: {method}")
        except Exception as e:
            logger.error(f"Error loading PDF with {method}: {str(e)}")
            raise

    # --- 其他已有方法保持不变 ---
    def get_total_pages(self) -> int:
        return max(page_data['page'] for page_data in self.current_page_map) if self.current_page_map else 0

    def get_page_map(self) -> list:
        return self.current_page_map

    def _load_with_pymupdf(self, file_path: str) -> str:
        text_blocks = []
        try:
            with fitz.open(file_path) as doc:
                self.total_pages = len(doc)
                for page_num, page in enumerate(doc, 1):
                    text = page.get_text("text")
                    if text.strip():
                        text_blocks.append({
                            "text": text.strip(),
                            "page": page_num
                        })
            self.current_page_map = text_blocks
            return "\n".join(block["text"] for block in text_blocks)
        except Exception as e:
            logger.error(f"PyMuPDF error: {str(e)}")
            raise

    def _load_with_pypdf(self, file_path: str) -> str:
        text_blocks = []
        try:
            with open(file_path, "rb") as file:
                pdf = PdfReader(file)
                self.total_pages = len(pdf.pages)
                for page_num, page in enumerate(pdf.pages, 1):
                    page_text = page.extract_text()
                    if page_text and page_text.strip():
                        text_blocks.append({
                            "text": page_text.strip(),
                            "page": page_num
                        })
            self.current_page_map = text_blocks
            return "\n".join(block["text"] for block in text_blocks)
        except Exception as e:
            logger.error(f"PyPDF error: {str(e)}")
            raise

    def _load_with_unstructured(self, file_path: str, strategy: str = "fast", chunking_strategy: str = "basic", chunking_options: dict = None) -> str:
        try:
            strategy_params = {
                "fast": {"strategy": "fast"},
                "hi_res": {"strategy": "hi_res"},
                "ocr_only": {"strategy": "ocr_only"}
            }            
         
            chunking_params = {}
            if chunking_strategy == "basic":
                chunking_params = {
                    "max_characters": chunking_options.get("maxCharacters", 4000),
                    "new_after_n_chars": chunking_options.get("newAfterNChars", 3000),
                    "combine_text_under_n_chars": chunking_options.get("combineTextUnderNChars", 2000),
                    "overlap": chunking_options.get("overlap", 200),
                    "overlap_all": chunking_options.get("overlapAll", False)
                }
            elif chunking_strategy == "by_title":
                chunking_params = {
                    "chunking_strategy": "by_title",
                    "combine_text_under_n_chars": chunking_options.get("combineTextUnderNChars", 2000),
                    "multipage_sections": chunking_options.get("multiPageSections", False)
                }
            
            params = {**strategy_params.get(strategy, {"strategy": "fast"}), **chunking_params}
            elements = partition_pdf(file_path, **params)
            
            text_blocks = []
            pages = set()
            for elem in elements:
                metadata = elem.metadata.__dict__
                page_number = metadata.get('page_number')
                if page_number is not None:
                    pages.add(page_number)
                    cleaned_metadata = {}
                    for key, value in metadata.items():
                        if key == '_known_field_names':
                            continue
                        try:
                            json.dumps({key: value})
                            cleaned_metadata[key] = value
                        except (TypeError, OverflowError):
                            cleaned_metadata[key] = str(value)
                    cleaned_metadata['element_type'] = elem.__class__.__name__
                    cleaned_metadata['id'] = str(getattr(elem, 'id', None))
                    cleaned_metadata['category'] = str(getattr(elem, 'category', None))
                    
                    text_blocks.append({
                        "text": str(elem),
                        "page": page_number,
                        "metadata": cleaned_metadata
                    })
            
            self.total_pages = max(pages) if pages else 0
            self.current_page_map = text_blocks
            return "\n".join(block["text"] for block in text_blocks)
            
        except Exception as e:
            logger.error(f"Unstructured error: {str(e)}")
            raise

    def _load_with_pdfplumber(self, file_path: str) -> str:
        text_blocks = []
        try:
            with pdfplumber.open(file_path) as pdf:
                self.total_pages = len(pdf.pages)
                for page_num, page in enumerate(pdf.pages, 1):
                    page_text = page.extract_text()
                    if page_text and page_text.strip():
                        text_blocks.append({
                            "text": page_text.strip(),
                            "page": page_num
                        })
            self.current_page_map = text_blocks
            return "\n".join(block["text"] for block in text_blocks)
        except Exception as e:
            logger.error(f"pdfplumber error: {str(e)}")
            raise

    # >>> 新增：PyPDFium2 支持 <<<
    def _load_with_pypdfium2(self, file_path: str) -> str:
        """
        使用 PyPDFium2 库加载PDF文档。
        特点：基于 PDFium（Chromium 使用的 PDF 引擎），支持高精度文本提取，性能良好。

        参数:
            file_path (str): PDF文件路径

        返回:
            str: 提取的文本内容
        """
        text_blocks = []
        try:
            pdf = pdfium.PdfDocument(file_path)
            self.total_pages = len(pdf)
            for page_num in range(self.total_pages):
                page = pdf[page_num]
                textpage = page.get_textpage()
                text = textpage.get_text_range()
                textpage.close()
                page.close()
                if text.strip():
                    text_blocks.append({
                        "text": text.strip(),
                        "page": page_num + 1  # 页码从1开始
                    })
            pdf.close()
            self.current_page_map = text_blocks
            return "\n".join(block["text"] for block in text_blocks)
        except Exception as e:
            logger.error(f"PyPDFium2 error: {str(e)}")
            raise

    def save_document(self, filename: str, chunks: list, metadata: dict, loading_method: str, strategy: str = None, chunking_strategy: str = None) -> str:
        try:
            timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
            base_name = filename.replace('.pdf', '').split('_')[0]
            
            if loading_method == "unstructured" and strategy:
                doc_name = f"{base_name}_{loading_method}_{strategy}_{chunking_strategy}_{timestamp}"
            else:
                doc_name = f"{base_name}_{loading_method}_{timestamp}"
            
            document_data = {
                "filename": str(filename),
                "total_chunks": int(len(chunks)),
                "total_pages": int(metadata.get("total_pages", 1)),
                "loading_method": str(loading_method),
                "loading_strategy": str(strategy) if loading_method == "unstructured" and strategy else None,
                "chunking_strategy": str(chunking_strategy) if loading_method == "unstructured" and chunking_strategy else None,
                "chunking_method": "loaded",
                "timestamp": datetime.now().isoformat(),
                "chunks": chunks
            }
            
            filepath = os.path.join("01-loaded-docs", f"{doc_name}.json")
            os.makedirs("01-loaded-docs", exist_ok=True)
            
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(document_data, f, ensure_ascii=False, indent=2)
                
            return filepath
            
        except Exception as e:
            logger.error(f"Error saving document: {str(e)}")
            raise

In [None]:
###测试 


In [13]:
service = LoadingService()
text = service.load_pdf("data/basic-text.pdf", method="pypdfium2")
print(f"Total pages: {service.get_total_pages()}")

Total pages: 1




In [14]:
text

"Sample Document for PDF Testing\r\nIntroduction\r\nThis is a simple document created to test basic PDF functionality. It includes various text formatting\r\noptions to ensure proper rendering in PDF readers.\r\nText Formatting Examples\r\n1. Bold text is used for emphasis.\r\n2. Italic text can be used for titles or subtle emphasis.\r\n3. Strikethrough is used to show deleted text.\r\nLists\r\nHere's an example of an unordered list:\r\nItem 1\r\nItem 2\r\nItem 3\r\nAnd here's an ordered list:\r\n1. First item\r\n2. Second item\r\n3. Third item\r\nQuote\r\nThis is an example of a block quote. It can be used to highlight important information or\r\ncitations.\r\nTable\r\nHeader 1 Header 2 Header 3\r\nRow 1, Col 1 Row 1, Col 2 Row 1, Col 3\r\nRow 2, Col 1 Row 2, Col 2 Row 2, Col 3\r\nThis document demonstrates various formatting options that should translate well to PDF format.\r\nThis sample PDF file is provided by Sample-Files.com. Visit us for more sample files and resources."

#### 3. 重构 Chunk File (不同方案和大小切块，保持JSON格式)

In [15]:
from datetime import datetime
import logging
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any, Optional

logger = logging.getLogger(__name__)


class ChunkingService:
    """
    文本分块服务，支持多种分块策略并保留准确的页码信息。

    支持的分块方法：
    - 'by_pages': 每页作为一个 chunk
    - 'recursive': 使用 RecursiveCharacterTextSplitter 进行智能分块（可配置 chunk_size, overlap, separators）
    """

    def chunk_text(
        self,
        text: str,
        method: str,
        metadata: Dict[str, Any],
        page_map: List[Dict[str, Any]],
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        将文本按指定方法分块，并记录准确的页码范围。

        Args:
            text: 原始全文（用于校验，实际分块基于 page_map）
            method: 分块方法 ('by_pages', 'recursive')
            metadata: 文档元数据（如 filename, loading_method）
            page_map: 页面映射列表，每个元素含 'text' 和 'page'
            chunk_size: 分块最大字符数（仅 recursive 有效）
            chunk_overlap: 分块重叠字符数（仅 recursive 有效）
            separators: 分隔符列表（仅 recursive 有效）

        Returns:
            标准化文档结构（JSON 兼容）
        """
        if not page_map:
            raise ValueError("Page map is required for chunking.")

        try:
            if method == "by_pages":
                chunks = self._chunk_by_pages(page_map)
            elif method == "recursive":
                chunks = self._chunk_recursive(
                    page_map,
                    chunk_size=chunk_size,
                    chunk_overlap=chunk_overlap,
                    separators=separators
                )
            else:
                raise ValueError(f"Unsupported chunking method: {method}")

            document_data = {
                "filename": metadata.get("filename", ""),
                "total_chunks": len(chunks),
                "total_pages": len(page_map),
                "loading_method": metadata.get("loading_method", ""),
                "chunking_method": method,
                "chunk_size": chunk_size,
                "chunk_overlap": chunk_overlap,
                "timestamp": datetime.now().isoformat(),
                "chunks": chunks
            }
            return document_data

        except Exception as e:
            logger.error(f"Error in chunk_text: {str(e)}")
            raise

    def _chunk_by_pages(self, page_map: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """按页面分块，每页一个 chunk"""
        chunks = []
        for page_data in page_map:
            page_text = page_protect(page_data["text"])
            chunks.append({
                "content": page_text,
                "metadata": {
                    "chunk_id": len(chunks) + 1,
                    "page_number": page_data["page"],
                    "page_range": str(page_data["page"]),
                    "word_count": len(page_text.split())
                }
            })
        return chunks

    def _chunk_recursive(
        self,
        page_map: List[Dict[str, Any]],
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        separators: Optional[List[str]] = None
    ) -> List[Dict[str, Any]]:
        """
        使用 RecursiveCharacterTextSplitter 进行跨页智能分块，
        并准确记录每个 chunk 所覆盖的页码范围。
        """
        if separators is None:
            separators = ["\n\n", "\n", ".", "!", "?", "。", "！", "？", " ", ""]

        # Step 1: 将 page_map 转换为带位置信息的文本块
        # 同时记录每个字符所属的页码（用于后续映射）
        full_text = ""
        char_to_page = []  # char_to_page[i] = 该字符所在的页码

        for page_data in page_map:
            page_text = page_protect(page_data["text"])
            if not page_text.strip():
                continue
            start_len = len(full_text)
            full_text += page_text + "\n\n"  # 添加分隔符避免段落粘连
            end_len = len(full_text)
            char_to_page.extend([page_data["page"]] * (end_len - start_len))

        if not full_text.strip():
            return []

        # Step 2: 使用 LangChain 分块
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=separators,
            length_function=len
        )
        split_texts = splitter.split_text(full_text)

        # Step 3: 为每个 chunk 计算页码范围
        chunks = []
        current_pos = 0
        for chunk_text in split_texts:
            if not chunk_text.strip():
                continue

            chunk_len = len(chunk_text)
            # 找到该 chunk 在 full_text 中的起止位置
            start_idx = full_text.find(chunk_text, current_pos)
            if start_idx == -1:
                # fallback: 按顺序推进（可能因重复文本找不到）
                start_idx = current_pos
            end_idx = start_idx + chunk_len

            # 边界保护
            start_idx = max(0, min(start_idx, len(char_to_page) - 1))
            end_idx = max(0, min(end_idx - 1, len(char_to_page) - 1))

            if start_idx >= len(char_to_page) or end_idx >= len(char_to_page):
                # 跳过异常 chunk
                continue

            start_page = char_to_page[start_idx]
            end_page = char_to_page[end_idx]

            page_range = (
                str(start_page) if start_page == end_page else f"{start_page}-{end_page}"
            )

            chunks.append({
                "content": chunk_text.strip(),
                "metadata": {
                    "chunk_id": len(chunks) + 1,
                    "page_number": start_page,  # 主页码（起始页）
                    "page_range": page_range,
                    "word_count": len(chunk_text.split())
                }
            })

            current_pos = end_idx + 1

        return chunks


def page_protect(text: str) -> str:
    """清理文本，避免空或异常内容"""
    if not text:
        return ""
    return text.replace("\0", "").strip()

In [None]:
### 测试

In [24]:
page_map = [
    {"text": "第一页内容...", "page": 1},
    {"text": "第二页内容...", "page": 2},
]

metadata = {"filename": "data/sample-report.pdf", "loading_method": "pymupdf"}

service = ChunkingService()

# 按页分块
doc1 = service.chunk_text("", "by_pages", metadata, page_map)



In [25]:
doc1

{'filename': 'data/sample-report.pdf',
 'total_chunks': 2,
 'total_pages': 2,
 'loading_method': 'pymupdf',
 'chunking_method': 'by_pages',
 'chunk_size': 1000,
 'chunk_overlap': 200,
 'timestamp': '2025-12-26T01:18:47.081562',
 'chunks': [{'content': '第一页内容...',
   'metadata': {'chunk_id': 1,
    'page_number': 1,
    'page_range': '1',
    'word_count': 1}},
  {'content': '第二页内容...',
   'metadata': {'chunk_id': 2,
    'page_number': 2,
    'page_range': '2',
    'word_count': 1}}]}

In [26]:
#智能分块（2000字符，重叠200）
doc2 = service.chunk_text(
    text, 
    "recursive", 
    metadata, 
    page_map,
    chunk_size=2000,
    chunk_overlap=200,
    separators=["\n\n", "\n", ".", "。"]
)

In [27]:
doc2 

{'filename': 'data/sample-report.pdf',
 'total_chunks': 1,
 'total_pages': 2,
 'loading_method': 'pymupdf',
 'chunking_method': 'recursive',
 'chunk_size': 2000,
 'chunk_overlap': 200,
 'timestamp': '2025-12-26T01:18:51.707378',
 'chunks': [{'content': '第一页内容...\n\n第二页内容...',
   'metadata': {'chunk_id': 1,
    'page_number': 1,
    'page_range': '1-2',
    'word_count': 2}}]}

In [29]:
# test_chunking.py
import json
from datetime import datetime
from typing import List, Dict, Any

# 如果你不想拆文件，也可以直接把 ChunkingService 类定义放在这里（见下方完整版）

# ------------------ 模拟 PDF 加载后的 page_map ------------------
page_map: List[Dict[str, Any]] = [
    {
        "text": "这是第一页的内容。它包含两个句子。第一个句子。第二个句子。",
        "page": 1
    },
    {
        "text": "这是第二页的内容。这一整页只有一段文字，但字符数很多，超过100个字符。",
        "page": 2
    },
    {
        "text": "第三页。\n\n这是一个新段落。\n\n这是另一个段落。",
        "page": 3
    }
]

metadata = {
    "filename": "sample.pdf",
    "loading_method": "pymupdf"
}

# ------------------ 测试分块 ------------------
def main():

    print("=== 测试 1: by_pages（每页一个 chunk） ===")
    result1 = service.chunk_text(
        text="",  # 实际未使用，保留接口
        method="by_pages",
        metadata=metadata,
        page_map=page_map
    )
    print(json.dumps(result1, indent=2, ensure_ascii=False))

    print("\n" + "="*60 + "\n")

    print("=== 测试 2: recursive（固定大小 50 字符，重叠 10） ===")
    result2 = service.chunk_text(
        text="",
        method="recursive",
        metadata=metadata,
        page_map=page_map,
        chunk_size=50,          # 很小的 chunk_size，便于观察跨页
        chunk_overlap=10,
        separators=["\n\n", "\n", "。", ".", " ", ""]
    )
    print(json.dumps(result2, indent=2, ensure_ascii=False))

    # 保存到文件（可选）
    with open("chunks_by_pages.json", "w", encoding="utf-8") as f:
        json.dump(result1, f, indent=2, ensure_ascii=False)
    with open("chunks_recursive.json", "w", encoding="utf-8") as f:
        json.dump(result2, f, indent=2, ensure_ascii=False)

    print("\n测试完成！结果已保存到 chunks_by_pages.json 和 chunks_recursive.json")

if __name__ == "__main__":
    main()

=== 测试 1: by_pages（每页一个 chunk） ===
{
  "filename": "sample.pdf",
  "total_chunks": 3,
  "total_pages": 3,
  "loading_method": "pymupdf",
  "chunking_method": "by_pages",
  "chunk_size": 1000,
  "chunk_overlap": 200,
  "timestamp": "2025-12-26T01:19:15.936545",
  "chunks": [
    {
      "content": "这是第一页的内容。它包含两个句子。第一个句子。第二个句子。",
      "metadata": {
        "chunk_id": 1,
        "page_number": 1,
        "page_range": "1",
        "word_count": 1
      }
    },
    {
      "content": "这是第二页的内容。这一整页只有一段文字，但字符数很多，超过100个字符。",
      "metadata": {
        "chunk_id": 2,
        "page_number": 2,
        "page_range": "2",
        "word_count": 1
      }
    },
    {
      "content": "第三页。\n\n这是一个新段落。\n\n这是另一个段落。",
      "metadata": {
        "chunk_id": 3,
        "page_number": 3,
        "page_range": "3",
        "word_count": 3
      }
    }
  ]
}


=== 测试 2: recursive（固定大小 50 字符，重叠 10） ===
{
  "filename": "sample.pdf",
  "total_chunks": 3,
  "total_pages": 3,
  "loading_method": "pymup

### 3 重构 Parse File (解析PDF/MD中的表和图为文本，保存为JSON，含Metadata)。

### 重构后的代码（支持 PDF 表格 + 图像 + Markdown）

In [30]:
# parsing_service.py
import logging
import os
from typing import Dict, List, Any, Optional
from datetime import datetime
import fitz  # PyMuPDF
import pdfplumber
import pandas as pd
import json
import re

logger = logging.getLogger(__name__)


class ParsingService:
    """
    PDF/Markdown 文档解析服务类
    
    支持：
    - 真实 PDF 表格提取（使用 pdfplumber）
    - PDF 图像检测与元数据记录
    - Markdown 表格/图像识别
    - 统一 JSON 输出格式，含完整 metadata
    """

    def parse_document(
        self,
        file_path: str,
        method: str = "text_and_tables",
        metadata: Optional[Dict] = None,
        source_type: str = "pdf"  # 'pdf' or 'markdown'
    ) -> Dict[str, Any]:
        """
        解析 PDF 或 Markdown 文档，提取文本、表格、图像
        
        Args:
            file_path: 文件路径
            method: 解析方法（目前统一为 'text_and_tables'）
            metadata: 文档元数据
            source_type: 'pdf' 或 'markdown'
            
        Returns:
            标准化 JSON 结构
        """
        metadata = metadata or {}
        try:
            if source_type == "pdf":
                elements = self._parse_pdf_full(file_path)
            elif source_type == "markdown":
                with open(file_path, 'r', encoding='utf-8') as f:
                    md_text = f.read()
                elements = self._parse_markdown(md_text)
            else:
                raise ValueError("source_type must be 'pdf' or 'markdown'")

            document_data = {
                "metadata": {
                    "filename": os.path.basename(file_path),
                    "source_type": source_type,
                    "total_elements": len(elements),
                    "parsing_method": method,
                    "timestamp": datetime.now().isoformat(),
                    **metadata
                },
                "elements": elements
            }
            return document_data
        except Exception as e:
            logger.error(f"Error parsing document: {str(e)}")
            raise

    def _parse_pdf_full(self, file_path: str) -> List[Dict[str, Any]]:
        """使用 fitz + pdfplumber 联合提取文本、表格、图像"""
        elements = []

        # Step 1: 用 PyMuPDF 提取所有文本块 + 图像
        doc = fitz.open(file_path)
        page_text_blocks = {}  # page_num -> list of text blocks

        for page_num, page in enumerate(doc, start=1):
            # 提取文本块（带位置）
            blocks = page.get_text("dict")["blocks"]
            text_blocks = []
            for block in blocks:
                if "lines" in block:
                    block_text = ""
                    for line in block["lines"]:
                        for span in line["spans"]:
                            block_text += span["text"]
                    if block_text.strip():
                        text_blocks.append({
                            "text": block_text.strip(),
                            "bbox": block["bbox"]  # (x0, y0, x1, y1)
                        })
            page_text_blocks[page_num] = text_blocks

            # 提取图像
            image_list = page.get_images(full=True)
            for img_index, img in enumerate(image_list):
                xref = img[0]
                base_image = doc.extract_image(xref)
                # 生成图像描述（可扩展为 Vision API 调用）
                elements.append({
                    "type": "image",
                    "content": f"[Image {img_index + 1} on page {page_num}]",
                    "metadata": {
                        "page": page_num,
                        "image_id": img_index + 1,
                        "bbox": None,  # PyMuPDF 不直接返回图像 bbox，可选增强
                        "extension": base_image["ext"] if base_image else "unknown"
                    }
                })

        doc.close()

        # Step 2: 用 pdfplumber 提取表格
        with pdfplumber.open(file_path) as pdf:
            for page_num, page in enumerate(pdf.pages, start=1):
                tables = page.extract_tables()
                table_bboxes = page.find_tables()

                for idx, (table, table_bbox) in enumerate(zip(tables, table_bboxes)):
                    if not table or not table[0]:
                        continue

                    # 转为 pandas DataFrame 再转为 list of dict（结构化）
                    try:
                        df = pd.DataFrame(table[1:], columns=table[0])
                        # 去除 NaN，转为 JSON 安全格式
                        table_json = df.where(pd.notnull(df), "").to_dict(orient="records")
                        table_text = json.dumps(table_json, ensure_ascii=False)
                    except Exception:
                        # fallback: 转为文本表格
                        table_text = "\n".join([" | ".join([str(cell) or "" for cell in row]) for row in table])

                    elements.append({
                        "type": "table",
                        "content": table_text,
                        "metadata": {
                            "page": page_num,
                            "table_id": idx + 1,
                            "bbox": list(table_bbox.bbox) if table_bbox else None,
                            "num_rows": len(table),
                            "num_cols": len(table[0]) if table else 0
                        }
                    })

        # Step 3: 将剩余文本作为 "text" 元素
        for page_num, blocks in page_text_blocks.items():
            for block in blocks:
                # 简化：暂不判断是否在表格区域内
                elements.append({
                    "type": "text",
                    "content": block["text"],
                    "metadata": {
                        "page": page_num,
                        "bbox": block["bbox"]
                    }
                })

        # 按页码排序
        elements.sort(key=lambda x: x["metadata"]["page"])
        return elements

    def _parse_markdown(self, text: str) -> List[Dict[str, Any]]:
        """解析 Markdown 中的文本、表格、图像"""
        elements = []
        lines = text.split('\n')
        current_paragraph = []
        page = 1  # Markdown 无页码，设为 1

        i = 0
        while i < len(lines):
            line = lines[i].strip()

            # 图像语法: ![alt](url)
            img_match = re.match(r'!\[([^\]]*)\]\(([^)]+)\)', line)
            if img_match:
                if current_paragraph:
                    elements.append({
                        "type": "text",
                        "content": "\n".join(current_paragraph),
                        "metadata": {"page": page}
                    })
                    current_paragraph = []
                alt_text, url = img_match.groups()
                elements.append({
                    "type": "image",
                    "content": f"[Image: {alt_text or 'no alt'}]({url})",
                    "metadata": {"page": page, "url": url, "alt": alt_text}
                })
                i += 1
                continue

            # 表格识别（简单：检测分隔行）
            if line.startswith('|') and '|' in line:
                if current_paragraph:
                    elements.append({
                        "type": "text",
                        "content": "\n".join(current_paragraph),
                        "metadata": {"page": page}
                    })
                    current_paragraph = []

                # 读取表格行
                table_lines = []
                while i < len(lines) and lines[i].strip().startswith('|'):
                    table_lines.append(lines[i].strip())
                    i += 1

                # 转为结构化表格（简化）
                if len(table_lines) >= 2:
                    headers = [h.strip() for h in table_lines[0].split('|')[1:-1]]
                    rows = []
                    for t_line in table_lines[2:]:  # 跳过分隔行
                        cells = [c.strip() for c in t_line.split('|')[1:-1]]
                        if len(cells) == len(headers):
                            rows.append(dict(zip(headers, cells)))
                    table_text = json.dumps(rows, ensure_ascii=False)
                    elements.append({
                        "type": "table",
                        "content": table_text,
                        "metadata": {"page": page}
                    })
                continue

            # 普通文本
            if line:
                current_paragraph.append(lines[i])  # 保留原始缩进/换行
            else:
                if current_paragraph:
                    elements.append({
                        "type": "text",
                        "content": "\n".join(current_paragraph),
                        "metadata": {"page": page}
                    })
                    current_paragraph = []
            i += 1

        # 剩余段落
        if current_paragraph:
            elements.append({
                "type": "text",
                "content": "\n".join(current_paragraph),
                "metadata": {"page": page}
            })

        return elements

### 测试

In [31]:
# 解析 PDF
parser = ParsingService()
result = parser.parse_document("data/sample-report.pdf", source_type="pdf")
with open("parsed_pdf.json", "w", encoding="utf-8") as f:
    json.dump(result, f, indent=2, ensure_ascii=False)



  table_json = df.where(pd.notnull(df), "").to_dict(orient="records")
  table_json = df.where(pd.notnull(df), "").to_dict(orient="records")
  table_json = df.where(pd.notnull(df), "").to_dict(orient="records")
  table_json = df.where(pd.notnull(df), "").to_dict(orient="records")


In [32]:
result

{'metadata': {'filename': 'sample-report.pdf',
  'source_type': 'pdf',
  'total_elements': 89,
  'parsing_method': 'text_and_tables',
  'timestamp': '2025-12-26T01:31:31.409018'},
 'elements': [{'type': 'table',
   'content': '[{"null": "Multi-Page\\nReport\\n“A comprehensive and content-heavy report that\\nincludes text, images, and tables for thorough\\ntesting of pagination and complex layouts.”\\nPrepared By\\nSample Team\\nsample-files.com", "ulti-Page\\nReport": "ulti-Page\\nReport"}]',
   'metadata': {'page': 1,
    'table_id': 1,
    'bbox': [0.0, -26.570542941943604, 609.1195132927621, 834.41998360625],
    'num_rows': 2,
    'num_cols': 2}},
  {'type': 'text',
   'content': 'Sample Team',
   'metadata': {'page': 1,
    'bbox': (223.67361450195312,
     620.0859375,
     371.58587646484375,
     651.3339233398438)}},
  {'type': 'text',
   'content': 'Prepared By',
   'metadata': {'page': 1,
    'bbox': (230.87416076660156,
     581.9524536132812,
     364.38641357421875,
     

In [33]:
## sample.pdf
result1 = parser.parse_document("data/sample.pdf", source_type="pdf")

In [34]:
result1

{'metadata': {'filename': 'sample.pdf',
  'source_type': 'pdf',
  'total_elements': 114,
  'parsing_method': 'text_and_tables',
  'timestamp': '2025-12-26T01:50:17.793887'},
 'elements': [{'type': 'text',
   'content': 'ORIGINAL RESEARCHpublished: 11 September 2018doi: 10.3389/fonc.2018.00373',
   'metadata': {'page': 1,
    'bbox': (457.04681396484375,
     35.29071044921875,
     550.4570922851562,
     58.21261978149414)}},
  {'type': 'text',
   'content': 'Frontiers in Oncology | www.frontiersin.org1September 2018 | Volume 8 | Article 373',
   'metadata': {'page': 1,
    'bbox': (44.82889938354492,
     741.6016235351562,
     550.4484252929688,
     748.5755004882812)}},
  {'type': 'text',
   'content': 'Edited by:Charles A. Kunos,National Cancer Institute (NIH),United States',
   'metadata': {'page': 1,
    'bbox': (67.55380249023438,
     358.0904846191406,
     159.9248809814453,
     395.0997009277344)}},
  {'type': 'text',
   'content': 'Reviewed by:Vivek Verma,University of 

In [35]:
# 解析 Markdown
result_md = parser.parse_document("data/README.md", source_type="markdown")

In [36]:
result_md

{'metadata': {'filename': 'README.md',
  'source_type': 'markdown',
  'total_elements': 107,
  'parsing_method': 'text_and_tables',
  'timestamp': '2025-12-26T01:52:57.928700'},
 'elements': [{'type': 'text',
   'content': '# 手工制作一个RAG框架',
   'metadata': {'page': 1}},
  {'type': 'text',
   'content': '[学习链接](https://u.geekbang.org/subject/airag/1009927) https://u.geekbang.org/subject/airag/1009927',
   'metadata': {'page': 1}},
  {'type': 'text',
   'content': '一个从零开始实现的 RAG (Retrieval Augmented Generation) 系统，不依赖现有的 RAG 框架。该项目旨在提供一个轻量级、可定制的知识库问答解决方案。',
   'metadata': {'page': 1}},
  {'type': 'image',
   'content': '[Image: RAG Frontend](images/RAG-fontend.png)',
   'metadata': {'page': 1,
    'url': 'images/RAG-fontend.png',
    'alt': 'RAG Frontend'}},
  {'type': 'text', 'content': '## 项目概述', 'metadata': {'page': 1}},
  {'type': 'text',
   'content': '本项目是一个完全自主实现的 RAG 系统，通过将文档分块、向量化存储、相似度检索等核心功能模块化实现，使用户能够构建自己的知识库问答系统。',
   'metadata': {'page': 1}},
  {'type': 'text', 'content': '##