In [None]:
## patent_parser    2025-0909-1508 记录代码

# parsers.py  -> 专利.md

from collections import OrderedDict
import os, re, json
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union


"""   
专利类别: 
- 发明专利       - 普通版PDF文件解析ok
- 实用新型专利   - 普通版PDF文件解析ok
- 外观设计专利   - 图片多、文字少，并且很有可能会是打印版PDF源文件    -- 这类直接不处理了，文字内容不多没啥意义。 


# 中国专利的著录项目
(10)	专利文献标识    
(12)	专利文献名称
(15)	专利文献更正数据
(19)	公布或公告专利文献的国家机构名称
(21)	申请号	
(22)	申请日 
(30)	优先权数据
(43)	申请公布日
(45)	授权公告日	
(48)	更正文献出版日                          
(51)	国际专利分类	
(54)	发明或实用新型名称
(56)	对比文件
(57)	摘要
(62)	分案原申请数据	
(66)	本国优先权数据
(71)	申请人
(72)	发明人
(73)	专利权人
(74)	专利代理机构及代理人
(83)	生物保藏信息
(85)	PCT国际申请进入国家阶段日	
(86)	PCT国际申请的申请数据
(87)	PCT国际申请的公布数据	

--- 我解析之后会得到markdown：
...
...
# (54) 发明或实用新型名称    -------- 分开，  上面是、初步筛选的信息、 然后是正文--正文部分再来匹配


我需要做的：抠出 图片的引用， 检索到 图片的相关描述， ---> {图1/2/3  : ["图片的一句话/简短描述", 图片的路径]，
                                                        摘要图：图片的路径}

	


	









基本及系统级别提示词：

# 一些基本的附加知识（可能有用）
1.发明专利
定义：发明专利是指对产品、方法或者其改进所提出的新的技术方案。它强调技术方案的新颖性、创造性和实用性。
特点：发明专利的申请和审查过程较为复杂，通常需要经过初审、公布和实质审查等多个阶段。获得授权后，发明专利的保护期限为20年。 

2. 实用新型专利
定义：实用新型专利是针对产品的形状、构造或者它们的结合所提出的适于实用的新技术方案。与发明专利相比，实用新型专利更注重产品的实用性和新颖性，而不强调创造性。
特点：实用新型专利的申请流程相对简单，通常只需经过初审和授权两个阶段，保护期限为10年。 

3. 外观设计专利
定义：外观设计专利是指对产品的形状、图案或者其结合以及色彩与形状、图案的结合所作出的富有美感并适于工业应用的新设计。
特点：外观设计专利主要保护产品的外观美感，申请过程相对简单，通常只需经过初审和授权两个阶段，保护期限为15年。 


路径：专利文件路径：
    docs_patents/
        + xxxxx/   # 某一个专利的子文件夹
            + images/       # 专利markdown中的图片
            + full.md       # 专利markdown文件
            + *_origin.pdf  # 原始专利pdf文件
            + *.json  # 目前用不上的文件
        + xxxxx/   # 某一个专利的子文件夹
            + images/       # 专利markdown中的图片
            + full.md       # 专利markdown文件
            + *_origin.pdf  # 原始专利pdf文件
            + *.json  # 目前用不上的文件
        + ...
        
--> docs_patents/
        + xxxxx/   # 某一个专利的子文件夹
            + images/                 # 专利markdown中的图片
            + full_filtered.md        # 清理后的专利markdown文件
            + MetaDict_filtered.json  # 专利MetaDicy
            + .                       # 目前用不上的文件
        + xxxxx/   # 某一个专利的子文件夹
            + images/                 # 专利markdown中的图片
            + full_filtered.md        # 清理后的专利markdown文件
            + MetaDict_filtered.json  # 专利MetaDicy
            + .                       # 目前用不上的文件
        + ...
"""

class patentMD_parser:
    """
    读取专利，
    
    清理图片引用，抽取元数据，输出：
      1) MetaDict（OrderedDict，字段按给定骨架）
      2) 结构化纯文本 Markdown（<原名>_z.md）到指定目录

    结构化纯文本：
        # (54) 实用新型名称

    解析要点：
      1) 基本元数据：title / apply_time / applier / address / inventor / pubno
      2) 图片元数据：摘要图（abs_im）+ 附图（图1/图2/...）→ {"fig_list": {"图1": ["描述", "绝对路径"], ...}}
      3) 清理正文中的图片标记（![](...) + “图X ...”行）后作为向量化文本
    """

    # --------------------------- 初始化 ---------------------------
    def __init__(self, markdown_file: Union[str, Path], out_dir: Union[str, Path, None] = None):
        self.markdown_file = str(Path(markdown_file))
        self.base_dir: Path = Path(self.markdown_file).parent
        self.images_dir: Path = self.base_dir / "images"
        self.out_dir: Path = Path(out_dir) if out_dir is not None else self.base_dir
        self.pdfp = next(Path(self.markdown_file).parent.glob("*_origin.pdf"),None)
        assert Path(self.pdfp).is_file()
        self.text: str = ""  # 原始 md 文本

        self.meta_schema = OrderedDict({
            "publ_no":"",          # (10) 申请公布号 or 授权公告号   <公开号>
            "publ_date":"",        # (43) 申请公布日 or (45) 授权公告日
            "is_granted": False,    # 是否授权，授权的话才会有专利号
            "patent_no":"",        # 专利号（由申请号生成）          <专利号> if is_granted is Ture
            "apply_no": '',        # (21) 申请号(不重要)            <申请号>
            "apply_time": "",      # (22) 申请日
            "title": "",           # (54) 专利标题（实用新型/发明 名称）
            "applicant": "",      # (54) 专利权人 申请人
            "address": "",         # "邮编 地址"
            "inventors": "",       # (71) 发明人
            "doc_type": "",        # <(12) 文献类型，如“实用新型专利/发明专利申请/外观设计专利”>  # 外观设计专利只有图可以展示
            "tech_field": "",      # # 技术领域 的正文（不含标题）
            
            "root_dir": "",        # 专利目录（绝对路径）
            "pdf_path": "",        # 原始 PDF 的绝对路径
            "fig_list": {},        # {"abs_im": ["摘要图", abs_path], "图1": ["描述", abs_path], ...}
        })


    def __call__(self):
        self.pipeline()
        
    # ============== 主入口 ==============
    def pipeline(self):
        # 1) 读全文
        self.text = self._load_md_text()

        # 2) 图片元数据（摘要图 + 附图）
        img_meta = self._extract_img_metadata()  # {"fig_list": {...}}

        # 3) 清理图片引用（含摘要段中的图片）
        text_wo_imgs = self._filter_lines(self.text)          # 全文去图片
        text_wo_imgs = self._clean_abstract_images(text_wo_imgs)    # 摘要段内再次兜底清图
        text_wo_imgs = self._filter_trailing_image_blocks(text_wo_imgs)  # 末尾批量图清理


        # 4) 结构化字段
        meta_blocks = self._extract_meta_blocks(self.text)  # 注意：元数据从「全文」抓，避免被裁剪掉

        # 4.1)  技术领域正文抽取（不含标题）
        tech_field_txt = self._extract_section_plain_text(self.text, r"技术领域", strip_para_tags=True)
        if tech_field_txt:
            meta_blocks["tech_field"] = tech_field_txt

        # 5) PDF 公告号 
        pubno_info = self._extract_pubno() # {"pubno": "...", "pdf_path": "..."}

        # 6) 汇总 MetaDict
        self.meta_schema["root_dir"] = str(self.base_dir.resolve())
        self.meta_schema.update(img_meta)
        self.meta_schema.update(meta_blocks)
        self.meta_schema.update(pubno_info)

        # 6-2) MetaDict_norm
        self.MetaDict_norm()
        
        # 6-3) del enpty value  ke-value
        self.meta_schema = self.MetaDict_NoEmptyValue(self.meta_schema)


        # # 7) 落盘, out_dir不是None的话
        if self.out_dir is not None:
            # text_wo_imgs     ## structured_md 没有必要删除那么多信息
            out_path = self._write_structured_md(text_wo_imgs)
            return self.meta_schema, out_path
        else:
            print("no mdz")
        return self.meta_schema

    # ============== 基础工具 ==============
    def _load_md_text(self) -> str:
        return Path(self.markdown_file).read_text(encoding="utf-8", errors="ignore")

    @staticmethod
    def _find_section(text: str, title_pattern: str) -> Optional[str]:
        """取指定 # 段（到下一个 # 或文末），title_pattern 不含 #。"""
        hdr = re.search(rf"^\s*#{{1,3}}\s*{title_pattern}\s*$", text, flags=re.MULTILINE)
        if not hdr:
            return None
        start = hdr.end()
        nxt = re.search(r"^\s*#\s+", text[start:], flags=re.MULTILINE)
        return text[start: start + nxt.start()] if nxt else text[start:]


    # ============== 中文数字工具（用于图号解析） ==============
    @staticmethod
    def _chs_num_to_int(s: str) -> Optional[int]:
        m = {"零":0,"一":1,"二":2,"两":2,"三":3,"四":4,"五":5,"六":6,"七":7,"八":8,"九":9,"十":10}
        s = s.strip()
        if not s: return None
        if s == "十": return 10
        if len(s) == 1 and s in m: return m[s]
        if s[0] == "十":
            tail = m.get(s[1:], 0) if s[1:] else 0
            return 10 + tail
        if "十" in s:
            left, right = s.split("十", 1)
            left_v = m.get(left, 1) if left else 1
            right_v = m.get(right, 0) if right else 0
            return left_v * 10 + right_v
        if s.isdigit(): return int(s)
        if s in m: return m[s]
        return None

    # ============== 图片元数据（摘要图 / 附图） ==============
    def _extract_abstract_img(self) -> Tuple[str, Dict[str, Dict[str, Union[List[str], None]]]]:
        block = self._find_section(self.text, r"\(?57\)?\s*摘要")
        if not block:
            return "", {"fig_list": {"abs_im": None}}
        img_m = re.search(r'!\[.*?\]\((.*?)\)', block)
        if not img_m:
            return block.strip(), {"fig_list": {"abs_im": None}}
        rel_path = img_m.group(1).strip()
        abs_path = str((self.base_dir / rel_path).resolve())
        cleaned = re.sub(r'!\[.*?\]\(.*?\)\s*\n?', "", block).strip()
        return cleaned, {"fig_list": {"abs_im": ["摘要图", abs_path]}}

    def _parse_figure_descriptions(self) -> Dict[str, str]:
        """从『附图说明』段解析 图X→描述。"""
        desc_map: Dict[str, str] = {}
        block = self._find_section(self.text, r"附图说明")
        if not block:
            return desc_map
        block = block.replace("：", ":").replace("；", ";")
        pat = re.compile(
            r"(?:\[\d+\]\s*)?图\s*([0-9一二三四五六七八九十]+)\s*(?:为|是|:)\s*(.+?)(?=(?:；|;|。|\.|\n|$))",
            flags=re.IGNORECASE | re.DOTALL
        )
        for num, desc in pat.findall(block):
            idx = self._chs_num_to_int(num) or (int(num) if num.isdigit() else None)
            if not idx:
                continue
            key = f"图{idx}"
            desc_map[key] = re.sub(r"[\s;；。.\u3000]+$", "", desc.strip())
        return desc_map

    def _build_fig_map(self) -> Dict[str, str]:
        """匹配形式：图片行 →（可有空行）→ 下一行 '图N'。"""
        img_map: Dict[str, str] = {}
        img_iter = re.finditer(
            r'!\[.*?\]\((.*?)\)\s*(?:\n[ \t]*){0,2}(图[0-9一二三四五六七八九十]+)',
            self.text, flags=re.IGNORECASE
        )
        for m in img_iter:
            rel_path = m.group(1).strip()
            tag = m.group(2).strip()
            mnum = re.match(r"图\s*([0-9一二三四五六七八九十]+)", tag)
            if not mnum:
                continue
            idx_raw = mnum.group(1)
            idx = self._chs_num_to_int(idx_raw) or (int(idx_raw) if idx_raw.isdigit() else None)
            if not idx:
                continue
            key = f"图{idx}"
            abs_path = str((self.base_dir / rel_path).resolve())
            if Path(abs_path).exists():
                img_map[key] = abs_path
        return img_map

    def _extract_img_metadata(self) -> Dict[str, Dict[str, Union[List[str], None]]]:
        """整合摘要图 + 附图说明 + 图片路径为 fig_list。"""
        _, abs_meta = self._extract_abstract_img()
        abs_item = abs_meta["fig_list"].get("abs_im")
        desc_map = self._parse_figure_descriptions()
        img_map  = self._build_fig_map()
        fig_list: Dict[str, Union[List[str], None]] = {}
        for k, path in img_map.items():
            desc = desc_map.get(k, "")
            fig_list[k] = [desc, path]
        if abs_item and isinstance(abs_item, list) and len(abs_item) == 2:
            fig_list["abs_im"] = abs_item
        return {"fig_list": fig_list}

    # ============== 文本清理 ==============
    def _filter_lines(self, md_text: str) -> str:
        """
        清除所有图片引用 + 紧随的“图X …”行；再清除**孤立的“图N”整行**。
        """
        # 1) 图片 + 紧随“图N …”的一行
        pattern = re.compile(
            r'!\[.*?\]\([^)]*\)[ \t]*(?:\n[ \t]*)*\n?[ \t]*图[0-9一二三四五六七八九十]+[：:\s]*[^\n]*(?:\n|$)',
            flags=re.IGNORECASE | re.MULTILINE
        )
        out = pattern.sub('', md_text)
        # 2) 裸图片
        out = re.sub(r'!\[.*?\]\([^)]*\)\s*\n?', '', out)
        # 3) NEW: 孤立“图N”整行（只保留纯正文里诸如“如图2所示”，不会删掉这类）
        out = re.sub(r'(?m)^\s*图[0-9一二三四五六七八九十]+\s*$', '', out)
        out = re.sub(r'(?m)^\s*图[0-9一二三四五六七八九十]+\s*[：:]\s*$', '', out)
        # 4) 清理多余空行（可选）
        out = re.sub(r'\n{3,}', '\n\n', out).strip() + "\n"
        return out

    def _clean_abstract_images(self, md_text: str) -> str:
        """专门对 (57)摘要 段做兜底去图（防止摘要内还有残留图片标记）。"""
        sec = self._find_section(md_text, r"\(?57\)?\s*摘要")
        if not sec:
            return md_text
        sec_clean = re.sub(r'!\[.*?\]\([^)]*\)', '', sec).strip()
        return md_text.replace(sec, sec_clean)

    def _filter_trailing_image_blocks(self, text: str) -> str:
        """
        删除文末连续的图块。
        支持两种形式反复出现：
          a) [图片] (+空行) + 图N...
          b) 纯“图N...”行（没有配图）
        """
        new_text = re.sub(
            r'('
            r'\n\s*(?:!\[.*?\]\([^)]+\)\s*)?(?:\n[ \t]*)*图[0-9一二三四五六七八九十]+[^\n]*\s*'
            r')+$',
            '',
            text,
            flags=re.MULTILINE | re.IGNORECASE
        ).rstrip() + "\n"
        return new_text
    
    # ============== 时间格式 ==============
    def _normalize_date(self, s: str) -> str:
        s = (s or "").strip()
        if not s:
            return ""
        s = s.replace("年", ".").replace("月", ".").replace("日", "")
        s = s.replace("/", ".").replace("-", ".")
        s = re.sub(r'\.+', '.', s).strip(".")
        return s

    # ============== 元数据抽取（从『全文』提取，避免被裁剪掉） ==============
    def _extract_meta_blocks(self, full_text: str) -> Dict[str, str]:
        
        # (12) 文献类型
        m12 = re.search(r'\(12\)\s*([^\n]+)', full_text)
        if m12:
            doc_type = re.sub(r"\s+", "", m12.group(1))
            
        
        # (54) 标题：下一行是 title
        m_title = re.search(r'(?m)^#\s*\(54\)\s*(?:实用新型|发明)\s*名称\s*\n(.+)$', full_text)
        title = (m_title.group(1).strip() if m_title else "")

        # (21) 申请号
        m_apply_no = re.search(r'\(21\)\s*申请号\s*([0-9]+(?:\.[0-9A-Za-z])?)', full_text)
        apply_no = (m_apply_no.group(1).strip() if m_apply_no else "")

        # (22) 申请日：形如 2020.09.02 或 2020-09-02 或 2020年09月02日
        m_apply = re.search(r'\(22\)\s*申请日\s*([0-9.\-年月日/]+)', full_text)
        apply_time = (m_apply.group(1).strip() if m_apply else "")
        apply_time = apply_time.replace("年",".").replace("月",".").replace("日","").strip(".")

        # (73) 专利权人  or (71)申请人   +   地址（可能连写：地址<邮编><地址>）   gg  
        applicant, address = "", ""
        # 统一从 (73)/(71) 段落中抽取，允许跨行直到下一个 (XX) 字段或文末
        block_pat = re.compile(
                r'\((?P<code>71|73)\)'                    # (71) 或 (73)
                r'\s*(?:申\s*请\s*人|专\s*利\s*权\s*人)?'  # 可选“申请人/专利权人”字样
                r'\s*[:：]?\s*'
                r'(?P<name>.*?)'                          # 申请人/专利权人（非贪婪）
                r'(?:地址|住\s*址)\s*[:：]?\s*'            # 地址提示词
                r'(?:(?P<zip>\d{6})\s*[，,、]?\s*)?'      # 可选 6 位邮编
                r'(?P<addr>[^\(\)\r\n]+)',                # 地址主体
                flags=re.S | re.I
            )
        m = block_pat.search(full_text)
        if m:
            applicant = m.group('name').strip()
            zip_code = m.group('zip') or ''
            addr = m.group('addr').strip()
            address = f"{zip_code} {addr}".strip()
        
        # # 2. 兜底：单行抓“申请人/专利权人”
        # for tag in ('(71)', '(73)'):
        #     m_name = re.search(rf'{re.escape(tag)}\s*(?:申\s*请\s*人|专\s*利\s*权\s*人)?\s*[:：]?\s*([^\r\n]+)', full_text, re.I)
        #     if m_name:
        #         applicant = m_name.group(1).strip()
        #         break
        
        # # 3. 兜底：单行抓“地址”
        # m_addr = re.search(r'(?:地址|住\s*址)\s*[:：]?\s*(\d{6})?[，,、]?\s*([^\r\n]+)', full_text, re.I)
        # if m_addr:
        #     zip_code = m_addr.group(1) or ''
        #     addr = m_addr.group(2).strip()
        #     address = f"{zip_code} {addr}".strip()
              

        # (72) 发明人 or 设计人 → 多个人名，返回 ['xx', 'xx', ...]   --gg  解析还是有点问题
        inventors_list = []
        m_72 = re.search(
            # 抓住 (72) 行后直到“下一字段头（形如 \n(XX)）”或文末为止
            r'(?ms)^\(72\)\s*(?:发明人|设计人)\s*(.+?)(?=\n\(\d{2}\)|\Z)',
            full_text
        )
        if m_72:
            raw_inv = m_72.group(1)

            # 统一空白（包含全角空格/换行等）为单空格
            raw_inv = re.sub(r'\s+', ' ', raw_inv).strip()

            # 先把常见分隔符与连接词统一成 "、"
            # 顿号/逗号/分号/斜杠 以及 “和/与/及” 两侧可能有空格
            normalized = re.sub(r'\s*(?:、|，|,|；|;|／|/|和|与|及)\s*', '、', raw_inv)

            # 关键：中文名之间仅以空格分隔的情况，也将这个空格视为一个分隔符
            normalized = re.sub(r'(?<=[\u4e00-\u9fff])\s+(?=[\u4e00-\u9fff])', '、', normalized)

            # 分割并清洗
            parts = [p.strip() for p in normalized.split('、') if p.strip()]

            seen = set()
            for p in parts:
                # 去掉尾缀“等/等人/等等人”等
                p = re.sub(r'(?:等(?:等)?(?:人)?)$', '', p)
                if p and p not in seen:
                    inventors_list.append(p)
                    seen.add(p)

        # 结果：列表形式
        inventors = inventors_list

        #  (43)/(45)/关键词“公开/公告/公布/授权公告(日|日期)” —— 取最先匹配到的
        pub_date = ""
        for pat in [
            r'\(45\)\s*授权公告日\s*([0-9.\-年月日/]+)',
            r'\(43\)\s*(?:公开|公告|公布)\s*(?:日|日期)?\s*([0-9.\-年月日/]+)',
            r'(?:授权公告|公开|公告|公布)\s*(?:日|日期)\s*[:：]?\s*([0-9.\-年月日/]+)',
        ]:
            m = re.search(pat, full_text)
            if m:
                pub_date = self._normalize_date(m.group(1))
                break

        return {
            "title": title,
            "apply_no":apply_no,
            "apply_time": apply_time,
            "applicant": applicant,
            "address": address,
            "inventors": inventors,
            "publ_date": pub_date,  
            "doc_type":doc_type, 
        }
    
    # <公布号/公告号>  ->  授权 专利号
    def MetaDict_norm(self):
        publ_no = self.meta_schema.get("publ_no", None)
        if not publ_no: 
            # 打印版的PDF文件，没法
            # 既然不知道 <公布号/公告号>， 也无法得到 patent_no
            del self.meta_schema['patent_no']
            return 
        
        appl_no = self.meta_schema.get("apply_no", None)
        if not appl_no: 
            return 
        pub_kind = publ_no.strip()[-1]  # 取<公开号>最后一位
        # A  发明专利申请，未授权， 无专利号
        # U/C/S 实用新型/发明/外观设计授权, 授权， 有专利号
        if pub_kind in ['U', 'C', 'S']:
            patent_no = f"ZL{appl_no}"  # 授权，专利号=ZL申请号
            self.meta_schema['patent_no'] = patent_no
            self.meta_schema['is_granted'] = True 
        elif pub_kind == 'A':
            del self.meta_schema['patent_no']
        else:
            del self.meta_schema['patent_no']
        return 
    
    def _is_empty(self, x) -> bool:
        """定义‘空’：None、空字符串、空的 dict/OrderedDict/list/tuple/set。"""
        if x is None:
            return True
        if isinstance(x, str):
            return x == ""        # 需要把全空白也当空的话：x.strip() == ""
        if isinstance(x, (dict, OrderedDict, list, tuple, set)):
            return len(x) == 0
        return False  # False、0 等都不是“空”

    def MetaDict_NoEmptyValue(self, x) -> OrderedDict:
        """
        递归删除 dict / OrderedDict / list / tuple 中的『空』值。
        空定义：None、""、[]、{}（含 OrderedDict）；保留 False、0 等。
        保持原容器类型（dict / OrderedDict / list / tuple）。
        """
        
        # 映射类型：保持原类型
        if isinstance(x, (dict, OrderedDict)):
            cls = OrderedDict if isinstance(x, OrderedDict) else dict
            out = cls()
            for k, v in x.items():
                v2 = self.MetaDict_NoEmptyValue(v)           # 先递归清理
                if not self._is_empty(v2):       # 再按清理后的结果决定是否保留
                    out[k] = v2
            return out
        
        # 列表
        if isinstance(x, list):
            out = []
            for item in x:
                i2 = self.MetaDict_NoEmptyValue(item)
                if not self._is_empty(i2):
                    out.append(i2)
            return out

        # 元组（可选：保持元组类型）
        if isinstance(x, tuple):
            out = []
            for item in x:
                i2 = self.MetaDict_NoEmptyValue(item)
                if self._is_empty(i2):
                    out.append(i2)
            return tuple(out)

        # 其他原子类型原样返回（False、0 会被保留）
        return x
        
    # 提取指定标题段的“纯正文”（不含标题本身）
    def _extract_section_plain_text(self, full_text: str, title_pattern: str, strip_para_tags: bool = False) -> str:
        sec = self._find_section(full_text, title_pattern)
        if not sec:
            return ""
        txt = sec.strip()
        if strip_para_tags:
            # 去掉每段行首的 [0001]、[0017] 等
            txt = re.sub(r'(?m)^\s*\[\d+\]\s*', '', txt)
        return txt

    # ============== PDF 公告号  ==============
    def _extract_pubno(self) -> Dict[str, str]:
        """
        直接从 markdown 文本中提取公告号/授权公告号（如 CN110405743A、CN211234567U、CN123456789B1）。
        提取顺序：
        1) (11) 行（公开号/公告号/授权公告号）
        2) 带“公告号/公开号/授权公告号”关键词的行
        """
        text = self.text

        # 1) (11) 行优先
        m_11 = re.search(r'\(11\)[^\n]*?(CN[0-9]{7,12}[A-Z0-9]?)', text, flags=re.IGNORECASE)
        if m_11:
            return {"publ_no": m_11.group(1).upper(), "pdf_path": self.pdfp or ""}

        # 2) 关键词行（公告号/公开号/授权公告号）
        m_kw = re.search(
            r'(公告号|公布号|申请公布号|授权公告号)\s*[:：]?\s*(CN[0-9]{7,12}[A-Z0-9]?)',
            text, flags=re.IGNORECASE
        )
        if m_kw:
            return {"publ_no": m_kw.group(2).upper(), "pdf_path": self.pdfp or ""}

        # 都没找到则置空
        return {"publ_no": "", "pdf_path": self.pdfp or ""}
    
    # ============== 写文件 ==============
    def _write_structured_md(self, content: str) -> Path:
        self.out_dir.mkdir(parents=True, exist_ok=True)
        in_name = Path(self.markdown_file).stem
        out_path = self.out_dir / f"{in_name}_filtered.md"
        out_path.write_text(content, encoding="utf-8")
        
        # self.meta_schema 写入json
        json_path = self.out_dir / "MetaDict_filtered.json"
        with open(json_path, "w", encoding='utf-8') as fj:
            json.dump(self.meta_schema, fj, ensure_ascii=False, indent=4, default=str)
        return out_path

if __name__ == '__main__':

    root_dir = r".log/SimplePDF"
    sub_dirs = list(Path(root_dir).glob("*/full.md"))
    print(len(sub_dirs))  # ok 
    for md in sub_dirs:
        mdp = Path(md)
        parsers = patentMD_parser(mdp)
        doc = parsers()
        print(parsers.meta_schema)

In [None]:
from pathlib import Path 

zhuanli_path_pdf = Path(r".\demo.pdf")
zhuanli_path_md = Path(r".\demo.md")

In [None]:
from pypdf import PdfReader


pdfp = r"demo.pdf"  # 打印版的PDF    都不行


# # 一般般
reader = PdfReader(pdfp)
text = ""
for page in reader.pages:
    text += page.extract_text()
    # print(text)   # 倒数第二行的样子
    break


# # # 一般般
import fitz  # PyMuPDF
doc = fitz.open(pdfp)
for page in doc:
    text = page.get_text()
    # print(text)
    break

import pdfplumber
with pdfplumber.open(str(pdfp)) as pdf:
    first_page = pdf.pages[0]
    textp = first_page.extract_text()
print(textp)


In [None]:
import re 
from typing import Dict 
from pathlib import Path 

def get_md_text(mdp):
    return Path(mdp).read_text(encoding="utf-8", errors="ignore")


def extract_pubno(mdp) -> Dict[str, str]:
    """
    直接从 markdown 文本中提取公告号/授权公告号（如 CN110405743A、CN211234567U、CN123456789B1）。
    提取顺序：
    1) (11) 行（公开号/公告号/授权公告号）
    2) 带“公告号/公开号/授权公告号”关键词的行
    3) 全文兜底扫描 CN + 数字 + 可选结尾字母（且不跟小数点，避免把申请号 CN2020...*.5 误判）
    4) 文件名 / 目录名兜底扫描
    """
    text = get_md_text(mdp)
    
    pdfp = next(Path(mdp).parent.glob("*_origin.pdf"),None)

    # 1) (11) 行优先
    m_11 = re.search(r'\(11\)[^\n]*?(CN[0-9]{7,12}[A-Z0-9]?)', text, flags=re.IGNORECASE)
    if m_11:
        return {"pubno": m_11.group(1).upper(), "pdf_path": pdfp or ""}

    # 2) 关键词行（公告号|公布号|申请公布号|授权公告号）
    m_kw = re.search(
        r'(公告号|公布号|申请公布号|授权公告号)\s*[:：]?\s*(CN[0-9]{7,12}[A-Z0-9]?)',
        text, flags=re.IGNORECASE
    )
    if m_kw:
        return {"pubno": m_kw.group(2).upper(), "pdf_path": pdfp or ""}

    # 3) 全文兜底扫描（避免匹配到带小数点的申请号：使用负向前瞻 (?!\.) ）
    m_any = re.search(r'(CN[0-9]{7,12}[A-Z0-9]?)(?!\.)', text, flags=re.IGNORECASE)
    if m_any:
        return {"pubno": m_any.group(1).upper(), "pdf_path": pdfp or ""}

    # 4) 文件名/目录名兜底   
    candidates = [str(Path(mdp).name), str(Path(mdp).parent)]
    for s in candidates:
        m_fs = re.search(r'(CN[0-9]{7,12}[A-Z0-9]?)(?!\.)', s, flags=re.IGNORECASE)
        if m_fs:
            return {"pubno": m_fs.group(1).upper(), "pdf_path": pdfp or ""}

    # 都没找到则置空
    return {"pubno": "", "pdf_path": pdfp or ""}
mdp = r"sucai\CN202430526527.2-机械手（机器人灵巧手）.pdf-cf433862-c84e-4835-8712-c29434c3e6f5\full.md"
extract_pubno(mdp)
# 'pubno': 'CN20243052652',   这个不对，文件名中的 CN202430526527.2 是申请号，
# apply_id这是需要添加到MedaDict中的
# 可以尝试用 from pypdf import PdfReader  PdfReader读取第一页pdf然后匹配

In [None]:
def is_granted(patent):
    pub_kind = patent['publ_no'].strip()[-1]  # 取<公开号>最后一位
    # A  发明专利申请，未授权， 无专利号
    # U/C/S 实用新型/发明/外观设计授权, 授权， 有专利号
    if pub_kind in ['U', 'C', 'S']:
        return True, f"ZL {patent['appl_no']}"  # 授权，专利号=ZL申请号
    elif pub_kind == 'A':
        return False, None  # 未授权，无专利号
    else:
        return None, None   # 未授权
    




In [None]:
# 申请公布号： 文件名中获取 or 正文部分匹配 （CN + 申请号， (21)申请号 202411146096.2） ） or  PdfReader读取pdf的第一页
# 申请公布日： 正文部分匹配（(43)申请公布日 2024.10.11）  or  PdfReader读取pdf的第一页

# PdfReader读取pdf的第一页 会得到这样的结果：
"""   
(19)中华人民共和国国家知识产权局
(12)实用新型专利
(10)授权公告号 
(45)授权公告日 
(21)申请号 202021894937.5
(22)申请日 2020.09.02
(73)专利权人 杭州宇树科技有限公司
地址 310053 浙江省杭州市滨江区西兴街
道东流路88号1幢306室
(72)发明人 王兴兴　
(74)专利代理机构 浙江翔隆专利事务所(普通
合伙) 33206
代理人 许守金
(51)Int.Cl.
B25J 5/00(2006.01)
B25J 9/10(2006.01)
B62D 57/032(2006.01)
 
(54)实用新型名称
一种结构紧凑的回转动力单元以及应用其
的机器人
(57)摘要
本实用新型公开了一种结构紧凑的回转动
力单元以及应用其的机器人，属于动力单元以及
机器人技术领域。现有回转单元方案，需要在输
...
CN 213034612 U
2021.04.23
CN 213034612 U
"""
# 末尾处就是我们期待的

In [None]:
from pathlib import Path 
mdp = r"sucai\CN202430526527.2-机械手（机器人灵巧手）.pdf-cf433862-c84e-4835-8712-c29434c3e6f5\full.md"
pdfp = next(Path(mdp).parent.glob("*_origin.pdf"), None) # 202021894937.5
assert pdfp is not None

In [None]:
import pdfplumber
from pathlib import Path 
mdp = r"D:\ddesktop\agentdemos\codespace\zhuanliParser\result\CN202130119955.X-机器人足端.pdf-b78a68ee-f572-4c41-8774-00c792a7761e\full.md"
pdfp = next(Path(mdp).parent.glob("*_origin.pdf"), None) 
# print(pdfp)
assert Path(pdfp).is_file() is True 
with pdfplumber.open(str(pdfp)) as pdf:
    first_page = pdf.pages[0]
    textp = first_page.extract_text()
print(textp)

In [None]:

import pdfplumber, re
pdfp = r"demo.pdf"

def pdfplumber_extract_publno(pdfp)->str:
    with pdfplumber.open(str(pdfp)) as pdf:
        first_page = pdf.pages[0]
        textp = first_page.extract_text()
    print(f"{textp=}")
    m_kw = re.search(
    r'(公告号|公布号|申请公布号|授权公告号)\s*[:：]?\s*(CN[0-9]{7,12}[A-Z0-9]?)',
    textp, flags=re.IGNORECASE)
    if m_kw:
        publno = m_kw.group(2).upper()
        return publno
    return None 
pdfplumber_extract_publno(pdfp)

In [None]:
from collections import OrderedDict
from pathlib import Path
import pdfplumber
import re



def parse_PdfFontPage(pdf_path: str|Path) -> OrderedDict:
    """ 
    用pdfplumber读取 专利pdf 的第一页，解析
    OrderedDict({
         "publ_no":   <(10) 公开号/授权公告号>,
         "publ_date": <(45)/(43) 公告/公开日>,
         "doc_type":  <(12) 文献类型，如“实用新型专利/发明专利申请/外观设计专利”>
      })
    """
    pdf_path= Path(pdf_path)
    with pdfplumber.open(pdf_path) as pdf:
        if not pdf.pages:
            return OrderedDict(publ_no="", publ_date="", doc_type="")
        text = pdf.pages[0].extract_text() or ""

    # 规范化常见字符
    norm = (text or "").replace("：", ":").replace("\u3000", " ")
    
    
    def _norm_date(s: str) -> str:
        s = (s.strip()
               .replace("年", ".").replace("月", ".").replace("日", "")
               .replace("/", ".").replace("-", "."))
        parts = [p for p in s.split(".") if p]
        if len(parts) >= 3:
            y, m, d = parts[0], parts[1], parts[2]
            return f"{y}.{m.zfill(2)}.{d.zfill(2)}"
        return s
    
    def _clean_cn_no(s: str) -> str:
        s = re.sub(r"\s+", "", s.upper())
        s = s if s.startswith("CN") else "CN" + s
        return re.sub(r"[^A-Z0-9]+$", "", s)

    def _find1(pat: str, txt: str, flags=0) -> str:
        m = re.search(pat, txt, flags)
        return m.group(1).strip() if m else ""

    # ---- (10) 公开号/授权公告号 ----
    publ_no = ""
    for pat in [
        r"\(10\)\s*(?:授权公告号|公告号|公开号|申请公布号)\s*([A-Z]{0,2}\s*\d{6,12}\s*[A-Z0-9]?)",
        r"\(10\)[^\n]*?(CN\s*\d{6,12}\s*[A-Z0-9]?)",
    ]:
        v = _find1(pat, norm, flags=re.IGNORECASE)
        if v:
            publ_no = _clean_cn_no(v)
            break

    # ---- (45)/(43) 公告/公开日 ----
    publ_date = ""
    for pat in [
        r"\(45\)\s*(?:授权公告日|公告日)\s*([0-9.\-年月日/]+)",
        r"\(43\)\s*(?:公开|公告|公布|申请公布)\s*(?:日|日期)?\s*([0-9.\-年月日/]+)",
        r"(?:授权公告|公开|公告|公布|申请公布)\s*(?:日|日期)\s*[:：]?\s*([0-9.\-年月日/]+)",
    ]:
        v = _find1(pat, norm)
        if v:
            publ_date = _norm_date(v)
            break

    # ---- (12) 文献类型 ----
    doc_type = ""
    # 常见是“(12)实用新型专利 / (12)发明专利申请 / (12)外观设计专利”
    v = _find1(r"\(12\)\s*([^\n]+)", norm)
    if v:
        # 简单清洗：去两端空格、多余空白
        doc_type = re.sub(r"\s+", "", v)

    # ---- 兜底：页末“CN …\nYYYY.MM.DD”版式 ----
    if not publ_no:
        cns = re.findall(r"\bCN\s*\d{6,12}\s*[A-Z0-9]?\b", norm, flags=re.IGNORECASE)
        if cns:
            publ_no = _clean_cn_no(cns[-1])

    if not publ_date:
        dates = re.findall(r"\b\d{4}[./-]\d{1,2}[./-]\d{1,2}\b", norm)
        if dates:
            publ_date = _norm_date(dates[-1])

    return OrderedDict(publ_no=publ_no, publ_date=publ_date, doc_type=doc_type)

root_dir = r"D:\ddesktop\agentdemos\codespace\zhuanliParser\result"
sub_dirs = list(Path(root_dir).glob("*/full.md"))
print(len(sub_dirs))  # ok 
for md in sub_dirs:
    mdp = Path(md)
    pdfp = next(mdp.parent.glob("*_origin.pdf"),None)
    assert pdfp is not None 
    xx = parse_PdfFontPage(pdfp)
    print(xx)


# parse_PdfFontPage(pdfp)


In [None]:
import fitz

doc = fitz.open(pdfp)
page = doc[0]
text = page.get_text()
print(text)

In [None]:





def get_FirstPdfPageText(pdf_path: str):
    reader = PdfReader(pdf_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text()
        # print(text)   # 倒数第二行的样子
        break
    return text
 


def _normalize_date(s: str) -> str:
    s = s.strip()
    s = s.replace("年", ".").replace("月", ".").replace("日", "")
    s = s.replace("/", ".").replace("-", ".")
    parts = [p for p in s.split(".") if p]
    if len(parts) >= 3:
        y, m, d = parts[0], parts[1], parts[2]
        # 统一补零
        m = m.zfill(2)
        d = d.zfill(2)
        return f"{y}.{m}.{d}"
    return s



def extract_pubno_from_text(textIn: str) -> str:
    # 关键处常见格式： 'CN 213034612 U' or 'CN213034612U'
    m = re.search(r'(CN\s*\d{7,12}\s*[A-Z0-9]?)', textIn, flags=re.IGNORECASE)
    if m:
        return re.sub(r'\s+', '', m.group(1).upper())
    return ""




def extract_pubdate_from_text(textIn: str) -> str:
    # 先关键字段
    for pat in [
        r'\(45\)\s*授权公告日\s*([0-9.\-年月日/]+)',
        r'\(43\)\s*(?:公开|公告|公布|申请公布)\s*(?:日|日期)?\s*([0-9.\-年月日/]+)',
        r'(?:授权公告|公开|公告|公布|申请公布)\s*(?:日|日期)\s*[:：]?\s*([0-9.\-年月日/]+)',
    ]:
        m = re.search(pat, textIn)
        if m:
            return _normalize_date(m.group(1))

    # 兜底：抓所有“像日期”的，取时间上最大的一个
    candidates = re.findall(r'(20\d{2}[.\-/年]\d{1,2}[.\-/月]\d{1,2})', textIn)
    if not candidates:
        return ""
    def _to_key(s: str):
        norm = _normalize_date(s)  # YYYY.MM.DD
        mm = re.match(r'(\d{4})\.(\d{1,2})\.(\d{1,2})$', norm)
        if not mm:
            return (0,0,0)
        return (int(mm.group(1)), int(mm.group(2)), int(mm.group(3)))
    best = max(candidates, key=_to_key)
    return _normalize_date(best)



# texts = get_md_text(mdp)
# extract_pubno_from_text(texts)  # ''

texts = get_FirstPdfPageText(pdfp)
print(texts)
extract_pubno_from_text(texts)  # '' 

 
# extract_pubdate_from_text(texts)  # ok


## ok
# def extract_applyno_from_fname(md_path: str):
#     sub_root_name = Path(md_path).parent.name
#     applyno = sub_root_name.split('-')[0]
#     print(applyno)
    
# extract_applyno_from_fname(md_path=mdp)
    



In [None]:
from __future__ import annotations
from pathlib import Path
import logging, os
from typing import List

from llama_index.core import Document, VectorStoreIndex, StorageContext, load_index_from_storage
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core.node_parser import SentenceWindowNodeParser, SentenceSplitter
from llama_index.core.ingestion import run_transformations
from llama_index.core.schema import BaseNode, MetadataMode, NodeWithScore

from llama_index.readers.file import PyMuPDFReader
from llama_index.readers.file.markdown import MarkdownReader
import chromadb

# ---------- logging ----------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s.%(msecs)03d [%(levelname)-8s] %(name)25s - %(message)s",
    datefmt="%H:%M:%S",
)
log = logging.getLogger("patent_parser_min")

# ---------- paths ----------
PERSIST_DIR = Path("../temp").resolve()
PERSIST_DIR.mkdir(parents=True, exist_ok=True)
CHROMA_DIR = PERSIST_DIR / "chroma_db"

# ---------- components ----------
def build_embedding():
    return HuggingFaceEmbedding(model_name="Qwen/Qwen3-Embedding-0.6B")

def build_vector_store():
    client = chromadb.PersistentClient(path=str(CHROMA_DIR))
    collection = client.get_or_create_collection("patents")
    return ChromaVectorStore(chroma_collection=collection)

def read_file_to_documents(path: Path) -> List[Document]:
    ext = path.suffix.lower()
    if ext == ".pdf":
        return PyMuPDFReader().load_data(str(path))
    if ext == ".md":
        return MarkdownReader().load_data(str(path))
    # 兜底：按纯文本
    text = path.read_text(encoding="utf-8", errors="ignore")
    return [Document(text=text, metadata={"file_name": path.name})]

def show_nodes(nodes: List[BaseNode], max_n: int = 5):
    print("\n=== Sample BaseNodes ===")
    for i, n in enumerate(nodes[:max_n], 1):
        preview = n.get_content()[:160].replace("\n", " ")
        print(f"[{i}] node_id={n.node_id}  ref_doc_id={n.ref_doc_id}")
        print("    text:", preview, "..." if len(n.get_content()) > 160 else "")
        print("    meta:", {k: n.metadata.get(k) for k in list(n.metadata)[:6]})
        # 邻接
        print("    prev:", getattr(n.prev_node, "node_id", None), "  next:", getattr(n.next_node, "node_id", None))

def main(input_path: str, query: str | None = None, use_window: bool = True):
    path = Path(input_path).resolve()
    assert path.exists(), f"File not found: {path}"

    # 1) 读取文档
    docs = read_file_to_documents(path)
    for d in docs:
        d.metadata.setdefault("file_name", path.name)
        d.metadata["doc_id"] = d.doc_id
        # 与 private-gpt 一致：不把这些元数据送进 embedding / LLM
        d.excluded_embed_metadata_keys = ["doc_id"]
        d.excluded_llm_metadata_keys = ["file_name", "doc_id", "page_label"]

    # 2) 组件
    embed = build_embedding()
    vs = build_vector_store()
    storage = StorageContext.from_defaults(vector_store=vs)

    # 3) 选择分块器
    if use_window:
        # 与 private-gpt 默认一致：句子 + window（后续对话可用 MetadataReplacementPostProcessor("window")）
        parser = SentenceWindowNodeParser.from_defaults()  # window_size 默认 3
    else:
        # 定长切片示例
        parser = SentenceSplitter.from_defaults(chunk_size=600, chunk_overlap=200)

    # 4) 先直接跑 transformations 看看“裸的 Node”
    nodes = list(run_transformations(docs, [parser, embed], show_progress=True))
    show_nodes(nodes, max_n=5)

    # 5) 写索引 + 向量库（并持久化）
    index = VectorStoreIndex.from_documents(
        docs,
        storage_context=storage,
        embed_model=embed,
        transformations=[parser],   # 也可只给 parser，embedding 由 embed_model 负责
        show_progress=True,
    )
    index.storage_context.persist(persist_dir=str(PERSIST_DIR))
    print(f"\n✅ Persisted to: {PERSIST_DIR}")

    # 6) 即时检索（看 NodeWithScore）
    if query:
        retriever = index.as_retriever(similarity_top_k=5)
        results: List[NodeWithScore] = retriever.retrieve(query)
        print("\n=== Top matches ===")
        for r in results:
            txt = r.node.get_content()[:120].replace("\n", " ")
            print(f"score={round(r.score or 0, 3)}  node_id={r.node.node_id}  -> {txt} ...")

    # 7) 模拟“重新加载后”查看 DocStore 里的节点
    storage2 = StorageContext.from_defaults(vector_store=vs)
    _ = load_index_from_storage(storage2, persist_dir=str(PERSIST_DIR))
    ref_infos = storage2.docstore.get_all_ref_doc_info() or {}
    print(f"\n=== DocStore has {len(ref_infos)} ref docs ===")
    for doc_id, info in list(ref_infos.items())[:3]:
        print("ref_doc:", doc_id, "  meta keys:", list((info.metadata or {}).keys()))
        # 如果你的 LlamaIndex 版本携带 node_ids，可以尝试：
        node_ids = getattr(info, "node_ids", None)
        if node_ids:
            some_nodes = storage2.docstore.get_nodes(node_ids[:3])
            print("  sample nodes:", [n.node_id for n in some_nodes])

if __name__ == "__main__":
    
    inp = r"./demo.md"
    q = "解析一下专利的摘要"
    main(inp, q)
    

In [None]:
# llama-index 解析 专利pdf、专利md（MinerU解析得到的md）

from pydantic import BaseModel, Field
from typing import Literal, Any, List, Tuple, Union, Optional 
import abc,  itertools, multiprocessing, multiprocessing.pool, threading
from injector import inject, Injector, singleton 
from queue import Queue


from llama_index.core.data_structs import IndexDict
from llama_index.core.embeddings.utils import EmbedType
from llama_index.core.indices import VectorStoreIndex, load_index_from_storage
from llama_index.core.indices.base import BaseIndex
from llama_index.core.ingestion import run_transformations
from llama_index.core.schema import BaseNode, Document, TransformComponent
from llama_index.core.storage import StorageContext
from llama_index.core.readers import StringIterableReader
from llama_index.core.readers.base import BaseReader
from llama_index.core.readers.json import JSONReader




import logging
# Set to 'DEBUG' to have extensive logging turned on, even for libraries
ROOT_LOG_LEVEL = "INFO"
PRETTY_LOG_FORMAT = (
    "%(asctime)s.%(msecs)03d [%(levelname)-8s] %(name)+25s - %(message)s"
)
logging.basicConfig(level=ROOT_LOG_LEVEL, format=PRETTY_LOG_FORMAT, datefmt="%H:%M:%S")
logging.captureWarnings(True)
logger = logging.getLogger(__name__)

# setting
local_data_path = zhuanli_path_pdf




# file loader    【"zhuanli.pdf", "zhuanli.md"】 "image.jpg"
def _try_loading_included_file_formats() -> dict[str, type[BaseReader]]:
    try:
        from llama_index.readers.file.docs import PDFReader
        from llama_index.readers.file.markdown import MarkdownReader
        from llama_index.readers.file.image import ImageReader  # type: ignore
    except ImportError as e: raise ImportError("import error")
    
    default_file_reader_cls: dict[str, type[BaseReader]] = {
        ".pdf": PDFReader,
        ".md": MarkdownReader,
        ".jpg": ImageReader,
        ".png": ImageReader,
        ".jpeg": ImageReader,
    }
    return default_file_reader_cls
file_reader_cls = _try_loading_included_file_formats()
file_reader_cls.update({".json": JSONReader})
class IngestionHelper:
    """Helper class to transform a file into a list of documents.

    This class should be used to transform a file into a list of documents.
    These methods are thread-safe (and multiprocessing-safe).
    """

    @staticmethod
    def transform_file_into_documents(
        file_name: str, file_data: Path
    ) -> list[Document]:
        documents = IngestionHelper._load_file_to_documents(file_name, file_data)
        for document in documents:
            document.metadata["file_name"] = file_name
        IngestionHelper._exclude_metadata(documents)
        return documents

    @staticmethod
    def _load_file_to_documents(file_name: str, file_data: Path) -> list[Document]:
        logger.debug("Transforming file_name=%s into documents", file_name)
        extension = Path(file_name).suffix
        reader_cls = file_reader_cls.get(extension)
        if reader_cls is None:
            logger.debug(
                "No reader found for extension=%s, using default string reader",
                extension,
            )
            # Read as a plain text
            string_reader = StringIterableReader()
            return string_reader.load_data([file_data.read_text()])

        logger.debug("Specific reader found for extension=%s", extension)
        documents = reader_cls().load_data(file_data)

        # Sanitize NUL bytes in text which can't be stored in Postgres
        for i in range(len(documents)):
            documents[i].text = documents[i].text.replace("\u0000", "")

        return documents

    @staticmethod
    def _exclude_metadata(documents: list[Document]) -> None:
        logger.debug("Excluding metadata from count=%s documents", len(documents))
        for document in documents:
            document.metadata["doc_id"] = document.doc_id
            # We don't want the Embeddings search to receive this metadata
            document.excluded_embed_metadata_keys = ["doc_id"]
            # We don't want the LLM to receive these metadata in the context
            document.excluded_llm_metadata_keys = ["file_name", "doc_id", "page_label"]
    




class IngestedDoc(BaseModel):
    object: Literal["ingest.document"]
    doc_id: str = Field(examples=["c202d5e6-7b69-4869-81cc-dd574ee8ee11"])
    doc_metadata: dict[str, Any] | None = Field(
        examples=[
            {
                "page_label": "2",
                "file_name": "Sales Report Q3 2023.pdf",
            }
        ]
    )

    @staticmethod
    def curate_metadata(metadata: dict[str, Any]) -> dict[str, Any]:
        """Remove unwanted metadata keys."""
        for key in ["doc_id", "window", "original_text"]:
            metadata.pop(key, None)
        return metadata

    @staticmethod
    def from_document(document: Document) -> "IngestedDoc":
        return IngestedDoc(
            object="ingest.document",
            doc_id=document.doc_id,
            doc_metadata=IngestedDoc.curate_metadata(document.metadata),
        )

class BaseIngestComponent(abc.ABC):
    def __init__(
        self,
        storage_context: StorageContext,
        embed_model: EmbedType,
        transformations: list[TransformComponent],
        *args: Any,
        **kwargs: Any,
    ) -> None:
        logger.debug("Initializing base ingest component type=%s", type(self).__name__)
        self.storage_context = storage_context
        self.embed_model = embed_model
        self.transformations = transformations

    @abc.abstractmethod
    def ingest(self, file_name: str, file_data: Path) -> list[Document]:
        pass

    @abc.abstractmethod
    def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
        pass

    @abc.abstractmethod
    def delete(self, doc_id: str) -> None:
        pass

class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC):
    def __init__(
        self,
        storage_context: StorageContext,
        embed_model: EmbedType,
        transformations: list[TransformComponent],
        *args: Any,
        **kwargs: Any,
    ) -> None:
        super().__init__(storage_context, embed_model, transformations, *args, **kwargs)

        self.show_progress = True
        self._index_thread_lock = (
            threading.Lock()
        )  # Thread lock! Not Multiprocessing lock
        self._index = self._initialize_index()

    def _initialize_index(self) -> BaseIndex[IndexDict]:
        """Initialize the index from the storage context."""
        try:
            # Load the index with store_nodes_override=True to be able to delete them
            index = load_index_from_storage(
                storage_context=self.storage_context,
                store_nodes_override=True,  # Force store nodes in index and document stores
                show_progress=self.show_progress,
                embed_model=self.embed_model,
                transformations=self.transformations,
            )
        except ValueError:
            # There are no index in the storage context, creating a new one
            logger.info("Creating a new vector store index")
            index = VectorStoreIndex.from_documents(
                [],
                storage_context=self.storage_context,
                store_nodes_override=True,  # Force store nodes in index and document stores
                show_progress=self.show_progress,
                embed_model=self.embed_model,
                transformations=self.transformations,
            )
            index.storage_context.persist(persist_dir=local_data_path)
        return index

    def _save_index(self) -> None:
        self._index.storage_context.persist(persist_dir=local_data_path)

    def delete(self, doc_id: str) -> None:
        with self._index_thread_lock:
            # Delete the document from the index
            self._index.delete_ref_doc(doc_id, delete_from_docstore=True)

            # Save the index
            self._save_index()

class BatchIngestComponent(BaseIngestComponentWithIndex):
    """Parallelize the file reading and parsing on multiple CPU core.

    This also makes the embeddings to be computed in batches (on GPU or CPU).
    """

    def __init__(
        self,
        storage_context: StorageContext,
        embed_model: EmbedType,
        transformations: list[TransformComponent],
        count_workers: int,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        super().__init__(storage_context, embed_model, transformations, *args, **kwargs)
        # Make an efficient use of the CPU and GPU, the embedding
        # must be in the transformations
        assert (
            len(self.transformations) >= 2
        ), "Embeddings must be in the transformations"
        assert count_workers > 0, "count_workers must be > 0"
        self.count_workers = count_workers

        self._file_to_documents_work_pool = multiprocessing.Pool(
            processes=self.count_workers
        )

    def ingest(self, file_name: str, file_data: Path) -> list[Document]:
        logger.info("Ingesting file_name=%s", file_name)
        documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
        logger.info(
            "Transformed file=%s into count=%s documents", file_name, len(documents)
        )
        logger.debug("Saving the documents in the index and doc store")
        return self._save_docs(documents)

    def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
        documents = list(
            itertools.chain.from_iterable(
                self._file_to_documents_work_pool.starmap(
                    IngestionHelper.transform_file_into_documents, files
                )
            )
        )
        logger.info(
            "Transformed count=%s files into count=%s documents",
            len(files),
            len(documents),
        )
        return self._save_docs(documents)

    def _save_docs(self, documents: list[Document]) -> list[Document]:
        logger.debug("Transforming count=%s documents into nodes", len(documents))
        nodes = run_transformations(
            documents,  # type: ignore[arg-type]
            self.transformations,
            show_progress=self.show_progress,
        )
        # Locking the index to avoid concurrent writes
        with self._index_thread_lock:
            logger.info("Inserting count=%s nodes in the index", len(nodes))
            self._index.insert_nodes(nodes, show_progress=True)
            for document in documents:
                self._index.docstore.set_document_hash(
                    document.get_doc_id(), document.hash
                )
            logger.debug("Persisting the index and nodes")
            # persist the index and nodes
            self._save_index()
            logger.debug("Persisted the index and nodes")
        return documents



def get_ingestion_component(
    storage_context: StorageContext,
    embed_model: EmbedType,
    transformations: list[TransformComponent],
    settings: Settings,
) -> BaseIngestComponent:
    """Get the ingestion component for the given configuration."""
    ingest_mode = settings.embedding.ingest_mode
    if ingest_mode == "batch":
        return BatchIngestComponent(
            storage_context=storage_context,
            embed_model=embed_model,
            transformations=transformations,
            count_workers=settings.embedding.count_workers,
        )

    elif ingest_mode == "pipeline":
        return PipelineIngestComponent(
            storage_context=storage_context,
            embed_model=embed_model,
            transformations=transformations,
            count_workers=settings.embedding.count_workers,
        )
    else:
        return SimpleIngestComponent(
            storage_context=storage_context,
            embed_model=embed_model,
            transformations=transformations,
        )
    



@singleton
class IngestService:
    @inject
    def __init__(
        self,
        llm_component: LLMComponent,
        vector_store_component: VectorStoreComponent,
        embedding_component: EmbeddingComponent,
        node_store_component: NodeStoreComponent,
    ) -> None:
        self.llm_service = llm_component
        self.storage_context = StorageContext.from_defaults(
            vector_store=vector_store_component.vector_store,
            docstore=node_store_component.doc_store,
            index_store=node_store_component.index_store,
        )
        node_parser = SentenceWindowNodeParser.from_defaults()

        self.ingest_component = get_ingestion_component(
            self.storage_context,
            embed_model=embedding_component.embedding_model,
            transformations=[node_parser, embedding_component.embedding_model],
            settings=settings(),
        )

    def _ingest_data(self, file_name: str, file_data: AnyStr) -> list[IngestedDoc]:
        logger.debug("Got file data of size=%s to ingest", len(file_data))
        # llama-index mainly supports reading from files, so
        # we have to create a tmp file to read for it to work
        # delete=False to avoid a Windows 11 permission error.
        with tempfile.NamedTemporaryFile(delete=False) as tmp:
            try:
                path_to_tmp = Path(tmp.name)
                if isinstance(file_data, bytes):
                    path_to_tmp.write_bytes(file_data)
                else:
                    path_to_tmp.write_text(str(file_data))
                return self.ingest_file(file_name, path_to_tmp)
            finally:
                tmp.close()
                path_to_tmp.unlink()

    def ingest_file(self, file_name: str, file_data: Path) -> list[IngestedDoc]:
        logger.info("Ingesting file_name=%s", file_name)
        documents = self.ingest_component.ingest(file_name, file_data)
        logger.info("Finished ingestion file_name=%s", file_name)
        return [IngestedDoc.from_document(document) for document in documents]

    def ingest_text(self, file_name: str, text: str) -> list[IngestedDoc]:
        logger.debug("Ingesting text data with file_name=%s", file_name)
        return self._ingest_data(file_name, text)

    def ingest_bin_data(
        self, file_name: str, raw_file_data: BinaryIO
    ) -> list[IngestedDoc]:
        logger.debug("Ingesting binary data with file_name=%s", file_name)
        file_data = raw_file_data.read()
        return self._ingest_data(file_name, file_data)

    def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[IngestedDoc]:
        logger.info("Ingesting file_names=%s", [f[0] for f in files])
        documents = self.ingest_component.bulk_ingest(files)
        logger.info("Finished ingestion file_name=%s", [f[0] for f in files])
        return [IngestedDoc.from_document(document) for document in documents]

    def list_ingested(self) -> list[IngestedDoc]:
        ingested_docs: list[IngestedDoc] = []
        try:
            docstore = self.storage_context.docstore
            ref_docs: dict[str, RefDocInfo] | None = docstore.get_all_ref_doc_info()

            if not ref_docs:
                return ingested_docs

            for doc_id, ref_doc_info in ref_docs.items():
                doc_metadata = None
                if ref_doc_info is not None and ref_doc_info.metadata is not None:
                    doc_metadata = IngestedDoc.curate_metadata(ref_doc_info.metadata)
                ingested_docs.append(
                    IngestedDoc(
                        object="ingest.document",
                        doc_id=doc_id,
                        doc_metadata=doc_metadata,
                    )
                )
        except ValueError:
            logger.warning("Got an exception when getting list of docs", exc_info=True)
            pass
        logger.debug("Found count=%s ingested documents", len(ingested_docs))
        return ingested_docs

    def delete(self, doc_id: str) -> None:
        """Delete an ingested document.

        :raises ValueError: if the document does not exist
        """
        logger.info(
            "Deleting the ingested document=%s in the doc and index store", doc_id
        )
        self.ingest_component.delete(doc_id)

In [None]:
import span_marker
from llama_index.extractors.entity import EntityExtractor 
from llama_index.core.node_parser import SentenceSplitter 

In [None]:
# 删除嵌套字典中，值为空的键值对

# ###1 返回新对象
from collections import OrderedDict

def _is_empty(x):
    """定义‘空’：None、空字符串、空的 dict/OrderedDict/list/tuple/set。"""
    if x is None:
        return True
    if isinstance(x, str):
        return x == ""        # 需要把全空白也当空的话：x.strip() == ""
    if isinstance(x, (dict, OrderedDict, list, tuple, set)):
        return len(x) == 0
    return False  # False、0 等都不是“空”

def del_empty(x):
    """
    递归删除 dict / OrderedDict / list / tuple 中的『空』值。
    空定义：None、""、[]、{}（含 OrderedDict）；保留 False、0 等。
    保持原容器类型（dict / OrderedDict / list / tuple）。
    """
    # 映射类型：保持原类型
    if isinstance(x, (dict, OrderedDict)):
        cls = OrderedDict if isinstance(x, OrderedDict) else dict
        out = cls()
        for k, v in x.items():
            v2 = del_empty(v)           # 先递归清理
            if not _is_empty(v2):       # 再按清理后的结果决定是否保留
                out[k] = v2
        return out

    # 列表
    if isinstance(x, list):
        out = []
        for item in x:
            i2 = del_empty(item)
            if not _is_empty(i2):
                out.append(i2)
        return out

    # 元组（可选：保持元组类型）
    if isinstance(x, tuple):
        out = []
        for item in x:
            i2 = del_empty(item)
            if not _is_empty(i2):
                out.append(i2)
        return tuple(out)

    # 其他原子类型原样返回（False、0 会被保留）
    return x


# 示例
meta_schema = OrderedDict({
    "publ_no": "",
    "publ_date": "2024-06-01",
    "is_granted": False,
    "patent_no": "",
    "apply_no": "",
    "apply_time": "",
    "title": "一种可折叠无人机",
    "applicant": "",
    "address": "",
    "inventors": ["张三", "李四"],
    "doc_type": "发明专利",
    "tech_field": "",
    "root_dir": "/data/patents/2024",
    "pdf_path": "/data/patents/2024/xxx.pdf",
    "fig_list": {"abs_im": ["摘要图", ""], "图1": ["", ""]}
})

cleaned = del_empty(meta_schema)
print(cleaned)
# OrderedDict({'publ_date': '2024-06-01', 'is_granted': False, 'title': '一种可折叠无人机', 'inventors': ['张三', '李四'], 'doc_type': '发明专利', 'root_dir': '/data/patents/2024', 'pdf_path': '/data/patents/2024/xxx.pdf', 'fig_list': {'abs_im': ['摘要图'], '图1': []}})

In [None]:

#### 2 原地更新

from collections import OrderedDict

def _is_empty(x):
    if x is None:
        return True
    if isinstance(x, str):
        return x == ""
    if isinstance(x, (dict, OrderedDict, list, tuple, set)):
        return len(x) == 0
    return False

def del_empty_inplace(x):
    if isinstance(x, (dict, OrderedDict)):
        for k in list(x.keys()):
            v2 = del_empty_inplace(x[k])
            if _is_empty(v2):
                del x[k]
            else:
                x[k] = v2
        return x
    if isinstance(x, list):
        i = 0
        while i < len(x):
            v2 = del_empty_inplace(x[i])
            if _is_empty(v2):
                x.pop(i)
            else:
                x[i] = v2
                i += 1
        return x
    if isinstance(x, tuple):
        # 返回“新元组”，调用方需接收
        return tuple(v for v in (del_empty_inplace(i) for i in x) if not _is_empty(v))
    return x
