In [None]:
import os
import subprocess
import shutil
from pathlib import Path

# 原始 PDF 文件路径
pdf_path = r"D:\workspace\pdf_convert_clean_trans\input\ISO 26262-1-2018.pdf"

# 获取文件名并替换字符
filename = os.path.basename(pdf_path)
new_filename = filename.replace("-", "_").replace(" ", "_")
new_pdf_path = os.path.join(os.getcwd(), ".tmp", new_filename)

# 创建 .tmp 文件夹
tmp_dir = os.path.dirname(new_pdf_path)

# 将原始 PDF 文件复制到 .tmp 文件夹
shutil.copy(pdf_path, new_pdf_path)

os.makedirs(tmp_dir, exist_ok=True)

# 构造输出目录
output_dir = os.path.join(tmp_dir, "origin")

# 创建 origin 子文件夹
os.makedirs(output_dir, exist_ok=True)

# 调用 mineru 命令行工具进行 PDF 转换
result = subprocess.run(
    ["mineru", "-p", new_pdf_path, "-o", output_dir],
    capture_output=True,
    text=True
)

# 检查命令执行结果
if result.returncode == 0:
    print("命令执行成功")
    print("输出：", result.stdout)
else:
    print("命令执行失败")
    print("错误：", result.stderr)


In [None]:
from pathlib import Path

INPUT_DIR = "./.tmp/origin"
EXTRACTED_DIR = "./.tmp/extracted_origin"

def sanitize_name(name: str) -> str:
    """统一替换名称中的空格为下划线"""
    return name.replace(" ", "_")

def extract_auto_contents(input_dir: str, output_dir: str):
    input_path = Path(input_dir)

    for sub_dir in input_path.iterdir():
        if not sub_dir.is_dir():
            continue

        auto_path = sub_dir / "auto"
        if not auto_path.exists():
            continue

        # 替换子文件夹名中的空格
        sanitized_subdir_name = sanitize_name(sub_dir.name)
        target_subdir = os.path.join(output_dir, sanitized_subdir_name)
        os.makedirs(target_subdir, exist_ok=True)

        # 拷贝 auto 下的 Markdown 文件（*.md）
        for md_file in auto_path.glob("*.md"):
            if md_file.is_file():
                sanitized_filename = sanitize_name(md_file.name)
                shutil.copy2(md_file, os.path.join(target_subdir, sanitized_filename))

        # 拷贝 auto/images 目录下的所有文件
        image_dir = auto_path / "images"
        if image_dir.exists() and image_dir.is_dir():
            target_image_dir = os.path.join(target_subdir, "images")
            os.makedirs(target_image_dir, exist_ok=True)

            for img_file in image_dir.rglob("*.*"):
                if img_file.is_file():
                    sanitized_img_name = sanitize_name(img_file.name)
                    shutil.copy2(img_file, os.path.join(target_image_dir, sanitized_img_name))

if __name__ == "__main__":
    extract_auto_contents(INPUT_DIR, EXTRACTED_DIR)
    print("✅ 所有内容提取并重命名完成")


✅ 所有内容提取并重命名完成


In [None]:
import os
import re
import json
import logging
from pathlib import Path
from typing import List, Tuple
from tqdm.notebook import tqdm
from llama_index.core.utils import count_tokens
from llama_index.core.prompts import PromptTemplate
from llama_index.core import Settings
from llama_index.llms.deepseek import DeepSeek
from llama_index.embeddings.openai_like import OpenAILikeEmbedding

# ========== 配置部分 ==========
EXTRACTED_DIR = "./.tmp/extracted_origin"
CLEANED_DIR = "./.tmp/cleaned"
TRANS_DIR = "./.tmp/translated"
DEBUG = True

DEBUG_CLEAN_DIR = "./.tmp/debug_clean_chunk"
DEBUG_TRANS_DIR = "./.tmp/debug_trans_chunk"

MAX_TOKENS_PER_CHUNK = 6000

# 日志配置
logging.basicConfig(
    level=logging.ERROR,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler("errors.log", encoding="utf-8")]
)

# LLM 配置
Settings.llm = DeepSeek(model="deepseek-chat", api_key="sk-eac019be79f14f948591d963d8c17656")
Settings.embed_model = OpenAILikeEmbedding(
    model_name="text-embedding-v4",
    api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key="sk-e9aeb7dc9a3e4bf784411b295ddfa402",
    embed_batch_size=10,
)
Settings.chunk_size = 512
Settings.chunk_overlap = 64

# Prompt 模板
clean_prompt = PromptTemplate(r"""
你是一个专业的 Markdown 格式修复专家。你将接收由 OCR 或 PDF 转换工具（如 Mineru）生成的 Markdown 文件，该文件格式存在严重错误，请你对其进行以下修复，保持语义和结构不变，仅修复格式问题。

请严格遵守以下规则：

1. **修复标题格式：**
   - 所有形如 `# N` 的标题为一级标题（例如 `# 1`、`# 2`）；
   - 所有形如 `N.N` 的应为二级标题 `## N.N`；
   - 所有形如 `N.N.N` 的为三级标题 `### N.N.N`；
   - 如果标题前存在错误的 `#` 个数（如 `### 3.3` 应是 `## 3.3`），请纠正。

2. **修复列表格式：**
   - 所有以 `—` 或 `–`、长破折号或错误符号开头的条目，应统一为 `-`（标准 Markdown 无序列表）；
   - 若嵌套列表存在缩进错误，按照 2 个空格作为每一级缩进进行规范；
   - 若多级列表中存在自然语言段落描述，请确保子项缩进正确，并在每项后加换行。

3. **修复 NOTE 标注：**
   - 将类似“Note 1 to entry: …” 转换为以下标准格式：
     ```markdown
     > [!NOTE]
     > Note 1 to entry: ...
     ```
   - 同一段中多个 Note，依次编号，保持缩进一致。

4. **修复数学公式污染：**
   - 若出现 LaTeX 数学语法未正确渲染，例如：
     ```markdown
     $" \mathrm { m } { \cdot } \mathrm { n } ^ { \prime \prime }$ 
     ```
     应尽量转换为正常文本（如 `m·n″`），或保留为行内公式 `\( m \cdot n^{\prime\prime} \)`。

5. **修复乱码和不必要符号：**
   - 删除无效转义符（如 `\`、`\``, `\`, `,` 等成对混乱出现的标点）；
   - 修复 markdown 中图像语法错误（如 `![]()`）中路径丢失或注释混乱的问题；
   - 删除重复空格和空行，确保段落之间最多保留一个空行。

6. **保持内容原始语义不变**，仅做格式清洗。

输出格式为**修复后的完整 Markdown 文件**。不要进行额外解释或注释。

请清洗以下 Markdown 文本：

{context_str}
""")

translation_prompt = PromptTemplate(r"""
你是一个专业的 Markdown 文档翻译机器人，负责将英文 Markdown 文档翻译为自然、准确的中文。你收到的 Markdown 文档可能是经过切分的片段，结构可能不完整，存在缺少一级标题、从中间段落开始或标题层级不规范等情况。请严格保留原文 Markdown 格式，不得更改标题层级或破坏任何结构性语法。

请遵循以下翻译规则：

1. **严格保留所有 Markdown 结构和语法**，包括但不限于标题（如 `#` `##` `###` `####` 等）、列表（有序/无序）、链接、图片、代码块、表格、引用、分隔线等，不得增删或调整层级；
2. **所有标题符号必须严格保留，标题等级不可更改**；
3. **所有代码块（```）和行内代码（`code`）必须完整保留，禁止翻译或更动任何字符**；
4. **图片和链接中的 URL、文件名、路径等保持原样，不得翻译或修改**；
5. 正文内容应翻译为专业、通顺、自然的中文；
6. **仅输出翻译后的中文 Markdown 内容**；
7. 即使输入被切分为不完整段落，也**不得破坏原有 Markdown 的结构**；

请翻译以下 Markdown 文本：

{context_str}
""")

# ========== 工具函数 ==========
def extract_code_blocks(text: str) -> Tuple[str, List[str]]:
    code_blocks = []
    def replacer(match):
        code_blocks.append(match.group(0))
        return f"__CODE_BLOCK_{len(code_blocks) - 1}__"
    safe_text = re.sub(r"```.*?\n.*?```", replacer, text, flags=re.DOTALL)
    return safe_text, code_blocks

def restore_code_blocks(text: str, code_blocks: List[str]) -> str:
    for i, block in enumerate(code_blocks):
        text = text.replace(f"__CODE_BLOCK_{i}__", block)
    return text

def split_markdown_by_heading(text: str, max_chars_per_chunk: int = 3000) -> List[str]:
    safe_text, code_blocks = extract_code_blocks(text)
    headings = list(re.finditer(r'^(#{1,6}\s+.*)', safe_text, re.MULTILINE))
    
    sections = []
    for i, match in enumerate(headings):
        start = match.start()
        end = headings[i + 1].start() if i + 1 < len(headings) else len(safe_text)
        sections.append(safe_text[start:end].strip())

    chunks, current_chunk = [], ""
    for section in sections:
        if len(current_chunk) + len(section) < max_chars_per_chunk:
            current_chunk += section + "\n\n"
        else:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = section + "\n\n"
    if current_chunk:
        chunks.append(current_chunk.strip())

    return [restore_code_blocks(chunk, code_blocks) for chunk in chunks]

def run_cleaning(chunks: List[str], prompt_template, llm) -> List[str]:
    results = []
    for i, chunk in enumerate(tqdm(chunks, desc="🧹 Cleaning Chunks", leave=False)):
        try:
            prompt = prompt_template.format(context_str=chunk)
            response = llm.complete(prompt)
            results.append(response.text.strip())
        except Exception as e:
            logging.error(f"[Chunk {i+1}] 清洗失败: {e}")
            results.append(f"<!-- 清洗失败：{e} -->")
    return results

def write_debug_chunks(original_chunks, processed_chunks, rel_path: str):
    sep = "\n" + "=" * 100 + "\n"
    rel_md_path = Path(rel_path).with_suffix(".md")

    # 原始切片
    input_path = Path(DEBUG_CLEAN_DIR) / ("input_"+rel_md_path)
    input_path.parent.mkdir(parents=True, exist_ok=True)
    input_path.write_text(sep.join(original_chunks), encoding="utf-8")

    # 处理后切片
    output_path = Path(DEBUG_CLEAN_DIR) / ("processed_"+rel_md_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(sep.join(processed_chunks), encoding="utf-8")


def clean_md(input_path: str, output_path: str, debug: bool = False,
             prompt_template=None, llm=None, input_dir=None, debug_input_dir=None, 
             debug_output_dir=None, max_tokens=6000):
    text = Path(input_path).read_text(encoding="utf-8")
    chunks = split_markdown_by_heading(text, max_chars_per_chunk=max_tokens)
    results = run_cleaning(chunks, prompt_template, llm)
    Path(output_path).parent.mkdir(parents=True, exist_ok=True)
    Path(output_path).write_text("\n\n".join(results), encoding="utf-8")

    if debug:
        rel_path = os.path.relpath(input_path, input_dir)
        write_debug_chunks(chunks, results, rel_path)


In [24]:
from IPython.display import Markdown, display

def idisplay_markdown(markdown_text: str):
    """在 Jupyter 中渲染显示 Markdown 内容"""
    display(Markdown(markdown_text))

In [None]:
files = list(Path(EXTRACTED_DIR).rglob("*.*"))

for file_path in tqdm(files, desc="🧹 Cleaning Files"):
    rel_path = os.path.relpath(file_path, EXTRACTED_DIR)
    output_path = os.path.join(CLEANED_DIR, rel_path)

    try:
        if file_path.suffix.lower() in [".md", ".mdx"]:
            clean_md(
                input_path=str(file_path),
                output_path=output_path,
                debug=DEBUG,
                prompt_template=clean_prompt,
                llm=Settings.llm,
                input_dir=EXTRACTED_DIR,
                debug_input_dir=DEBUG_CLEAN_DIR,
                debug_output_dir=DEBUG_CLEAN_DIR,
                max_tokens=MAX_TOKENS_PER_CHUNK
            )
        else:
            shutil.copyfile(file_path, output_path)
    except Exception as e:
        logging.error(f"[{rel_path}] 文件清洗失败: {e}")

🧹 Cleaning Files:   0%|          | 0/6 [00:00<?, ?it/s]

🧹 Cleaning Chunks:   0%|          | 0/16 [00:00<?, ?it/s]