In [None]:
import json
import re
from pathlib import Path
from typing import List, Dict, Union, Optional
from PIL import Image
import pytesseract
from datetime import datetime
import matplotlib.pyplot as plt

# 设置 Tesseract 路径（Windows 用户）
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'

# ===========================================================
#  工具函数：读取配置文件
# ===========================================================
def load_config(config_path: str) -> Dict:
    """加载配置文件"""
    with open(config_path, 'r', encoding='utf-8') as f:
        return json.load(f)

# ===========================================================
#  工具函数：裁剪表格区域
# ===========================================================
def crop_table_area(img: Image.Image, config: Dict) -> Image.Image:
    """根据配置文件中的比例裁剪表格区域"""
    bounds = config["table_area"]["bounds"]
    w, h = img.size
    left = int(w * bounds["left_ratio"])
    right = int(w * bounds["right_ratio"])
    top = int(h * bounds["top_ratio"])
    bottom = int(h * bounds["bottom_ratio"])
    return img.crop((left, top, right, bottom))

# ===========================================================
#  工具函数：OCR 识别
# ===========================================================
def try_ocr(img: Image.Image, lang: str = "chi_sim") -> Optional[str]:
    """使用 pytesseract 对图片执行 OCR 并返回文本"""
    try:
        text = pytesseract.image_to_string(img, lang=lang)
        # test: print(f"text: {text}")
        return text.strip() if text else None
    except Exception as e:
        print(f"[try_ocr] OCR 失败: {e}")
        return None


In [None]:
# ===========================================================
#  工具函数：解析 OCR 文本
# ===========================================================
def parse_ocr_text(ocr_text: str, config: Dict) -> List[Dict]:
    """将 OCR 文本解析为结构化条目"""
    if not ocr_text:
        return []

    column_indices = config["table_area"]["column_indices"]
    lines = [line.strip() for line in ocr_text.splitlines() if line.strip()]
    entries = []

    for line in lines:
        ts_match = re.search(r"(\d{4}[/-]\d{2}[/-]\d{2}[ T]\d{2}:\d{2}:\d{2})", line)
        if not ts_match:
            ts_match = re.search(r"(\d{4}[/-]\d{2}[/-]\d{2})", line)

        if ts_match:
            ts = ts_match.group(1)
            left_part = line[:ts_match.start()].strip(" |,-\t")

            # 优先尝试用竖线分割，如果没有竖线，再按多个连续空格分割
            if "|" in left_part:
                parts = [p.strip() for p in left_part.split("|")]
            else:
                parts = re.split(r"\s{2,}", left_part)

            item = parts[column_indices["item"]].strip() if len(parts) > column_indices["item"] else ""
            pool = parts[column_indices["pool"]].strip() if len(parts) > column_indices["pool"] else ""

            entries.append({"item": item, "pool": pool, "time": ts})
    #  test:　print(f"entries: {entries}")
    return entries


In [53]:
# ===========================================================
#  工具函数：修复时间
# ===========================================================
def fix_timestamp(ts: str) -> Optional[str]:
    """修复时间格式"""
    if not ts:
        return None

    s = ts.strip().replace("T", " ").replace(".", "-").replace("/", "-")
    s = re.sub(r'[年月日]', '-', s)
    s = re.sub(r'[^0-9\- :]', '', s)

    candidates = [
        "%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d",
        "%Y%m%d %H%M%S", "%Y%m%d%H%M%S"
    ]

    for fmt in candidates:
        try:
            dt = datetime.strptime(s, fmt)
            return dt.strftime("%Y-%m-%d %H:%M:%S")
        except:
            continue

    digits = re.sub(r'\D', '', s)
    if len(digits) >= 14:
        try:
            dt = datetime.strptime(digits[:14], "%Y%m%d%H%M%S")
            return dt.strftime("%Y-%m-%d %H:%M:%S")
        except:
            pass
    elif len(digits) >= 8:
        try:
            dt = datetime.strptime(digits[:8], "%Y%m%d")
            return dt.strftime("%Y-%m-%d 00:00:00")
        except:
            pass
    return None


In [54]:
# ===========================================================
#  工具函数：纠正名称
# ===========================================================
def correct_name(name: str, valid_names: set) -> Dict[str, Union[str, bool]]:
    """纠正识别错误的名称"""
    if name in valid_names:
        return {"name": name, "is_valid": True}

    def edit_distance(s1: str, s2: str) -> int:
        if len(s1) < len(s2):
            return edit_distance(s2, s1)
        if len(s2) == 0:
            return len(s1)

        previous_row = range(len(s2) + 1)
        for i, c1 in enumerate(s1):
            current_row = [i + 1]
            for j, c2 in enumerate(s2):
                insertions = previous_row[j + 1] + 1
                deletions = current_row[j] + 1
                substitutions = previous_row[j] + (c1 != c2)
                current_row.append(min(insertions, deletions, substitutions))
            previous_row = current_row

        return previous_row[-1]

    best_match = min(valid_names, key=lambda x: edit_distance(name, x))
    if edit_distance(name, best_match) <= len(name) // 2:
        return {"name": best_match, "is_valid": True}

    # 如果无法纠正，返回原始名称和无效标识
    print(f"[Warning] 无法纠正名称: {name}")
    return {"name": name, "is_valid": False}

In [55]:
# ===========================================================
#  工具函数：清理名称前后缀
# ===========================================================
def clean_name(name: str, prefix_patterns: List[str], suffix_patterns: List[str]) -> str:
    """
    清理名称中的前缀和后缀。
    :param name: 原始名称
    :param prefix_patterns: 前缀的正则表达式列表
    :param suffix_patterns: 后缀的正则表达式列表
    :return: 清理后的名称
    """
    for pattern in prefix_patterns:
        name = re.sub(pattern, "", name).strip()
    for pattern in suffix_patterns:
        name = re.sub(pattern, "", name).strip()
    return name


In [56]:
# ===========================================================
#  主流程
# ===========================================================
def run_pipeline(
    image_source: Union[str, List[str]],
    output_filename: str,
    uid: str = "1234567890",
    timezone: int = 8,
    lang: str = "zh-cn",
):
    """主流程：OCR → 解析 → 名称修正 → 导出"""
    config = load_config("game_config.json")
    game_data = load_config("game_name.json")
    game_id = config["game_info"]["game_id"]
    game_name = config["game_info"]["game_name"]

    # 构建有效名称集合
    valid_items = set(game_data["character"])  # 角色名称
    valid_pools = set(game_data["pool"])  # 卡池名称

    # 是否启用前后缀清理模块
    enable_clean_name = config["text_processing"]["enable_clean_name"]
    prefix_patterns = config["text_processing"]["patterns"]["prefix_patterns"]
    suffix_patterns = config["text_processing"]["patterns"]["suffix_patterns"]

    if isinstance(image_source, str) and Path(image_source).is_dir():
        image_paths = [str(p) for p in Path(image_source).rglob("*.png")]
    else:
        image_paths = list(image_source)

    # 获取当前时间戳
    export_timestamp = int(datetime.now().timestamp())
    export_time_str = datetime.fromtimestamp(export_timestamp).strftime("%Y%m%d_%H%M%S")

    all_entries = []

    for path in image_paths:
        img = Image.open(path)
        cropped_img = crop_table_area(img, config)
        ocr_text = try_ocr(cropped_img)
        entries = parse_ocr_text(ocr_text, config)

        for entry in entries:
            # 清理名称（如果启用）
            if enable_clean_name:
                entry["item"] = clean_name(
                    entry["item"], prefix_patterns, suffix_patterns
                )
                entry["pool"] = clean_name(
                    entry["pool"], prefix_patterns, suffix_patterns
                )

            # 纠正名称
            item_result = correct_name(entry["item"], valid_items)
            pool_result = correct_name(entry["pool"], valid_pools)

            entry["item"] = item_result["name"]
            entry["pool"] = pool_result["name"]
            entry["is_valid"] = item_result["is_valid"] and pool_result["is_valid"]

            # 修复时间
            entry["time"] = fix_timestamp(entry["time"])

        all_entries.extend(entries)

    # 构建导出信息
    export_data = {
        "info": {
            "game_id": game_id,
            "game_name": game_name,
            "export_timestamp": export_timestamp,
            "export_app": "ocr_export",
            "export_app_version": "v0.0.1",
            "export_time": datetime.fromtimestamp(export_timestamp).strftime(
                "%Y-%m-%d %H:%M:%S"
            ),
            "uid": uid,
            "timezone": timezone,
            "lang": lang,
            "total_entries": len(all_entries),
        },
        "data": all_entries,
    }
    output_filename = game_id
    output_file = f"{output_filename}_{export_time_str}.json"
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(export_data, f, ensure_ascii=False, indent=4)


# 示例运行
if __name__ == "__main__":
    image_source = "./images"
    output_filename = "output"
    uid = "104140966"
    timezone = 8
    lang = "zh-cn"
    run_pipeline(image_source, output_filename, uid, timezone, lang)

text: 贝丽尔(5十)                      命运亦需捧场               2025-11-26 23:38:48
冬                         命运亦需捧场             2025-11-25 23:16:24

铝玻璃                    命运亦需捧场            2025-11-24 23:07:44
丽忒&路易斯                 命运亦需捧场            2025-11-24 23:07:36
冬                    命运亦需捧场           2025-11-23 16:58:42

五色月(5十)                命运亦需捧场           2025-11-23 16:58:18
约翰 提托                    命运亦需捧场             2025-11-23 03:04:23
APPLe                      命运亦需捧场             2025-11-23 03:04:14
爱宠                     命运亦需捧场            2025-11-22 01:18:05

红斗篷                    命运亦需捧场            2025-11-22 01:17:41


entries: [{'item': '贝丽尔(5十)', 'pool': '命运亦需捧场', 'time': '2025-11-26 23:38:48'}, {'item': '冬', 'pool': '命运亦需捧场', 'time': '2025-11-25 23:16:24'}, {'item': '铝玻璃', 'pool': '命运亦需捧场', 'time': '2025-11-24 23:07:44'}, {'item': '丽忒&路易斯', 'pool': '命运亦需捧场', 'time': '2025-11-24 23:07:36'}, {'item': '冬', 'pool': '命运亦需捧场', 'time': '2025-11-23 16:58:42'}, {'item': '五色月(5十)',