## 批量为招聘数据匹配上市公司股票代码（Polars）
- 使用天眼查桥接表（`原文件导入名称` → `系统匹配企业名称`）对齐名称。
- 使用上市公司信息 JSON（如 `STK_LISTEDCOINFOANL.json`）获取（中文全称 → 股票代码）。
- 规范化公司名称（去除中英文括号及其中内容、trim 空白）。
- 对 `batch_1.parquet` 到 `batch_5.parquet` 逐个处理，输出 `_with_code.parquet`。


In [1]:
import polars as pl
import re
import json
from pathlib import Path

BASE = Path(r"D:/BaiduNetdiskDownload/招聘数据/合并上市公司")

# 规范化：去除中英文括号，并去除两端空白、全角空格
PAREN_PATTERN = re.compile(r"[（()）]")
FULLWIDTH_SPACE = "\u3000"

def normalize_company_name(name: str | None) -> str | None:
    if name is None:
        return None
    s = str(name).strip().replace(FULLWIDTH_SPACE, " ")
    if not s:
        return s
    s = PAREN_PATTERN.sub("", s)
    s = s.strip()
    return s

normalize_company_name


<function __main__.normalize_company_name(name: str | None) -> str | None>

In [2]:
# 测试规范化
samples = [
    "南京银行股份有限公司(上海分行)",
    "深圳A科技有限公司（集团）",
    "  北京XX公司  ",
    None,
    "阿里巴巴(中国)有限公司（杭州滨江）",
]
[normalize_company_name(s) for s in samples]


['南京银行股份有限公司上海分行', '深圳A科技有限公司集团', '北京XX公司', None, '阿里巴巴中国有限公司杭州滨江']

In [3]:
# 构建上市公司（中文全称 → 股票代码）映射
# STK_LISTEDCOINFOANL.json 可能为 JSON 行或数组，使用惰性读取

import ijson

listed_name_to_code: dict[str, str] = {}
listed_short_to_code: dict[str, str] = {}

stk_json_path = BASE / "STK_LISTEDCOINFOANL.json"
with open(stk_json_path, "r", encoding="utf-8") as f:
    try:
        # 先尝试读取整个文件作为JSON
        content = f.read().strip()
        
        # 检查是否为JSONL格式（每行一个JSON对象）
        if '\n' in content and not content.startswith('['):
            # JSONL格式处理
            for line in content.split('\n'):
                line = line.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    symbol = obj.get("Symbol") or obj.get("股票代码")
                    full_name = obj.get("FullName") or obj.get("中文全称")
                    short_name = obj.get("ShortName") or obj.get("股票简称")
                    if symbol and full_name:
                        norm = normalize_company_name(full_name)
                        if norm:
                            listed_name_to_code.setdefault(norm, str(symbol))
                    if symbol and short_name:
                        listed_short_to_code.setdefault(str(short_name).strip(), str(symbol))
                except json.JSONDecodeError:
                    continue
        else:
            # 标准JSON格式处理
            obj = json.loads(content)
            if isinstance(obj, list):
                for it in obj:
                    symbol = it.get("Symbol") or it.get("股票代码")
                    full_name = it.get("FullName") or it.get("中文全称")
                    short_name = it.get("ShortName") or it.get("股票简称")
                    if symbol and full_name:
                        norm = normalize_company_name(full_name)
                        if norm:
                            listed_name_to_code.setdefault(norm, str(symbol))
                    if symbol and short_name:
                        listed_short_to_code.setdefault(str(short_name).strip(), str(symbol))
            elif isinstance(obj, dict):
                # 如果是字典，尝试处理其值
                for key, it in obj.items():
                    if isinstance(it, dict):
                        symbol = it.get("Symbol") or it.get("股票代码")
                        full_name = it.get("FullName") or it.get("中文全称")
                        short_name = it.get("ShortName") or it.get("股票简称")
                        if symbol and full_name:
                            norm = normalize_company_name(full_name)
                            if norm:
                                listed_name_to_code.setdefault(norm, str(symbol))
                        if symbol and short_name:
                            listed_short_to_code.setdefault(str(short_name).strip(), str(symbol))
                    elif isinstance(it, list):
                        # 如果值是列表，处理列表中的每个元素
                        for item in it:
                            if isinstance(item, dict):
                                symbol = item.get("Symbol") or item.get("股票代码")
                                full_name = item.get("FullName") or item.get("中文全称")
                                short_name = item.get("ShortName") or item.get("股票简称")
                                if symbol and full_name:
                                    norm = normalize_company_name(full_name)
                                    if norm:
                                        listed_name_to_code.setdefault(norm, str(symbol))
                                if symbol and short_name:
                                    listed_short_to_code.setdefault(str(short_name).strip(), str(symbol))
                
    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        print(f"Error reading {stk_json_path}: {e}")
        print("Please check the file format and encoding")

len(listed_name_to_code), len(listed_short_to_code)


(8094, 9348)

In [6]:
# 构建子公司 → 上市公司股票代码映射
# 读取 FN_Fn061.json / FN_Fn0611.json，字段：Stkcd（证券代码）、FN_Fn06101（子公司名称）

subsidiary_to_code: dict[str, str] = {}

for fname in ["FN_Fn061.json", "FN_Fn0611.json"]:
    fpath = BASE / fname
    if not fpath.exists():
        continue
    
    try:
        # 先尝试读取文件内容
        with open(fpath, "r", encoding="utf-8") as f:
            content = f.read().strip()
        
        if not content:
            continue
            
        # 尝试解析为标准JSON
        try:
            # 标准JSON格式处理
            obj = json.loads(content)
            if isinstance(obj, list):
                for it in obj:
                    code = it.get("Stkcd") or it.get("证券代码")
                    sub_name = it.get("FN_Fn06101") or it.get("子公司名称")
                    if not sub_name or not code:
                        continue
                    norm = normalize_company_name(sub_name)
                    if norm:
                        subsidiary_to_code.setdefault(norm, str(code))
            elif isinstance(obj, dict):
                # 如果是字典，尝试处理其值
                for key, it in obj.items():
                    if isinstance(it, dict):
                        code = it.get("Stkcd") or it.get("证券代码")
                        sub_name = it.get("FN_Fn06101") or it.get("子公司名称")
                        if not sub_name or not code:
                            continue
                        norm = normalize_company_name(sub_name)
                        if norm:
                            subsidiary_to_code.setdefault(norm, str(code))
                    elif isinstance(it, list):
                        # 如果值是列表，处理列表中的每个元素
                        for item in it:
                            if isinstance(item, dict):
                                code = item.get("Stkcd") or item.get("证券代码")
                                sub_name = item.get("FN_Fn06101") or item.get("子公司名称")
                                if not sub_name or not code:
                                    continue
                                norm = normalize_company_name(sub_name)
                                if norm:
                                    subsidiary_to_code.setdefault(norm, str(code))
                
        except json.JSONDecodeError:
            # 如果标准JSON解析失败，尝试逐行解析
            lines = content.strip().split('\n')
            for line in lines:
                line = line.strip()
                if not line:
                    continue
                try:
                    obj = json.loads(line)
                    code = obj.get("Stkcd") or obj.get("证券代码")
                    sub_name = obj.get("FN_Fn06101") or obj.get("子公司名称")
                    if not sub_name or not code:
                        continue
                    norm = normalize_company_name(sub_name)
                    if norm:
                        subsidiary_to_code.setdefault(norm, str(code))
                except json.JSONDecodeError:
                    continue
                
    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        print(f"Error reading {fpath}: {e}")
        print("Please check the file format and encoding")

len(subsidiary_to_code)


257849

In [7]:
# 构建天眼查桥接表：原文件导入名称（≈ batch 的公司名称） → 系统匹配企业名称（对齐上市库的名称）
# 这个桥接表的作用是将batch数据中的非标准公司名称映射到标准的企业名称

bridge_path = BASE / "tianyancha.parquet"
bridge_df = pl.read_parquet(bridge_path)

# 自动识别桥接表中的关键字段名称，因为不同数据源的字段名可能不同
# name_src_col: 对应batch数据中的原始公司名称字段
# name_std_col: 对应标准化后的企业名称字段
cols = {c: c for c in bridge_df.columns}
name_src_col = next((c for c in bridge_df.columns if c in ("原文件导入名称", "原始公司名称", "公司名称", "name_raw")), None)
name_std_col = next((c for c in bridge_df.columns if c in ("系统匹配企业名称", "标准企业名称", "匹配企业名称", "name_std")), None)

# 如果找不到必要的字段，抛出错误
if name_src_col is None or name_std_col is None:
    raise ValueError(f"桥接表缺少必要字段，现有列：{bridge_df.columns}")

# 处理桥接表数据：
# 1. 选择并重命名关键字段为统一的名称
# 2. 对原始名称和标准名称都进行规范化处理（去除括号等）
# 3. 去重，保留第一条记录（基于规范化后的原始名称）
bridge_df = bridge_df.select([
    pl.col(name_src_col).alias("name_raw"),        # 原始公司名称
    pl.col(name_std_col).alias("name_std")         # 标准企业名称
]).with_columns([
    # 对原始名称进行规范化处理
    pl.col("name_raw").map_elements(normalize_company_name, return_dtype=pl.Utf8).alias("name_raw_norm"),
    # 对标准名称进行规范化处理
    pl.col("name_std").map_elements(normalize_company_name, return_dtype=pl.Utf8).alias("name_std_norm")
]).unique(subset=["name_raw_norm"], keep="first")  # 基于规范化的原始名称去重

bridge_df.head()


name_raw,name_std,name_raw_norm,name_std_norm
str,str,str,str
"""广州权嚎门投资管理有限公司""","""广州权嚎门投资管理有限公司""","""广州权嚎门投资管理有限公司""","""广州权嚎门投资管理有限公司"""
"""深圳澳源达国际物流有限公司""","""深圳澳源达国际物流有限公司""","""深圳澳源达国际物流有限公司""","""深圳澳源达国际物流有限公司"""
"""厦门市望众达贸易有限公司""","""厦门市望众达贸易有限公司""","""厦门市望众达贸易有限公司""","""厦门市望众达贸易有限公司"""
"""合肥趁坦代驾服务有限公司""","""合肥趁坦代驾服务有限公司""","""合肥趁坦代驾服务有限公司""","""合肥趁坦代驾服务有限公司"""
"""二道区爱儿推推拿馆""","""二道区爱儿推推拿馆""","""二道区爱儿推推拿馆""","""二道区爱儿推推拿馆"""


In [8]:
# 生成用于连接的上市公司和子公司 DataFrame（便于 Polars join）

listed_df = pl.DataFrame({
    "listed_full_norm": list(listed_name_to_code.keys()),
    "stock_code": list(listed_name_to_code.values()),
})

listed_short_df = pl.DataFrame({
    "listed_short": list(listed_short_to_code.keys()),
    "stock_code": list(listed_short_to_code.values()),
})

subsidiary_df = pl.DataFrame({
    "subsidiary_norm": list(subsidiary_to_code.keys()),
    "stock_code": list(subsidiary_to_code.values()),
})

listed_df.head(), listed_short_df.head(), subsidiary_df.head()


(shape: (5, 2)
 ┌────────────────────────────────┬────────────┐
 │ listed_full_norm               ┆ stock_code │
 │ ---                            ┆ ---        │
 │ str                            ┆ str        │
 ╞════════════════════════════════╪════════════╡
 │ 深圳发展银行股份有限公司       ┆ 000001     │
 │ 平安银行股份有限公司           ┆ 000001     │
 │ 万科企业股份有限公司           ┆ 000002     │
 │ 金田实业集团股份有限公司       ┆ 000003     │
 │ 深圳市蛇口安达实业股份有限公司 ┆ 000004     │
 └────────────────────────────────┴────────────┘,
 shape: (5, 2)
 ┌──────────────┬────────────┐
 │ listed_short ┆ stock_code │
 │ ---          ┆ ---        │
 │ str          ┆ str        │
 ╞══════════════╪════════════╡
 │ 深发展A      ┆ 000001     │
 │ 平安银行     ┆ 000001     │
 │ 深万科A      ┆ 000002     │
 │ 万科A        ┆ 000002     │
 │ G 万科A      ┆ 000002     │
 └──────────────┴────────────┘,
 shape: (5, 2)
 ┌────────────────────────────┬────────────┐
 │ subsidiary_norm            ┆ stock_code │
 │ ---                        ┆ ---        │
 │ str     

In [9]:
# 处理 batch_* 数据：
# 1) 规范化 batch 的公司名称
# 2) 与桥接表用 name_raw_norm → name_std_norm 对齐
# 3) 优先用标准全称去上市公司表匹配，其次用简称匹配，再用子公司匹配
# 4) 输出带 stock_code 的 parquet

from tqdm import tqdm

batch_files = [BASE / f"batch_{i}.parquet" for i in range(1, 6)]

for fpath in tqdm(batch_files, desc="处理batch文件"):
    if not fpath.exists():
        continue
    df = pl.read_parquet(fpath)

    # 猜测 batch 公司名字段
    batch_name_col = next((c for c in df.columns if c in ("公司名称", "企业名称", "name", "company_name")), None)
    if batch_name_col is None:
        raise ValueError(f"{fpath.name} 缺少公司名称字段，现有列：{df.columns}")

    df = df.with_columns([
        pl.col(batch_name_col).map_elements(normalize_company_name, return_dtype=pl.Utf8).alias("name_raw_norm")
    ])

    # 与桥接表对齐，得到标准名称
    df = df.join(
        bridge_df.select(["name_raw_norm", "name_std_norm"]),
        on="name_raw_norm",
        how="left",
    )

    # 用标准全称匹配上市公司
    df = df.join(
        listed_df,
        left_on="name_std_norm",
        right_on="listed_full_norm",
        how="left",
        suffix="_listed_full",
    )

    # 若未命中，尝试用标准名称（可能为简称）去短名表匹配
    df = df.with_columns([
        pl.when(pl.col("stock_code").is_null())
          .then(pl.col("name_std_norm"))
          .otherwise(None)
          .alias("tmp_short")
    ])
    df = df.join(
        listed_short_df.rename({"stock_code": "stock_code_short"}),
        left_on="tmp_short",
        right_on="listed_short",
        how="left",
    )

    # 若还未命中，尝试子公司表（用原始规范化名称，以覆盖"公司名称"为子公司名的情形）
    df = df.join(
        subsidiary_df.rename({"stock_code": "stock_code_sub"}),
        left_on="name_raw_norm",
        right_on="subsidiary_norm",
        how="left",
    )

    # 汇总优先级：全称匹配 > 简称匹配 > 子公司匹配
    df = df.with_columns([
        pl.coalesce([pl.col("stock_code"), pl.col("stock_code_short"), pl.col("stock_code_sub")]).alias("stock_code_final")
    ]).drop([c for c in ["tmp_short", "listed_full_norm", "listed_short", "subsidiary_norm", "stock_code", "stock_code_short", "stock_code_sub"] if c in df.columns])

    out_path = fpath.with_name(fpath.stem + "_with_code.parquet")
    df.write_parquet(out_path)
    print(f"wrote: {out_path}")


处理batch文件:  20%|██        | 1/5 [01:27<05:49, 87.38s/it]

wrote: D:\BaiduNetdiskDownload\招聘数据\合并上市公司\batch_1_with_code.parquet


处理batch文件:  40%|████      | 2/5 [03:04<04:39, 93.20s/it]

wrote: D:\BaiduNetdiskDownload\招聘数据\合并上市公司\batch_2_with_code.parquet


处理batch文件:  60%|██████    | 3/5 [05:06<03:32, 106.20s/it]

wrote: D:\BaiduNetdiskDownload\招聘数据\合并上市公司\batch_3_with_code.parquet


处理batch文件:  80%|████████  | 4/5 [06:43<01:42, 102.64s/it]

wrote: D:\BaiduNetdiskDownload\招聘数据\合并上市公司\batch_4_with_code.parquet


处理batch文件: 100%|██████████| 5/5 [07:14<00:00, 86.90s/it] 

wrote: D:\BaiduNetdiskDownload\招聘数据\合并上市公司\batch_5_with_code.parquet





In [3]:
# 过滤出能匹配到股票代码的记录，分别与汇总保存
from pathlib import Path
import polars as pl

BASE = Path(".")  # 定义BASE路径

matched_paths = []
for i in range(1, 6):
    in_path = BASE / f"batch_{i}_with_code.parquet"
    if not in_path.exists():
        continue
    df = pl.read_parquet(in_path)
    if "stock_code_final" not in df.columns:
        # 兼容如果用户直接在原始 batch 上执行此单元格
        if "stock_code" in df.columns:
            code_col = "stock_code"
        else:
            raise ValueError(f"{in_path.name} 缺少 stock_code_final/stock_code 列")
    else:
        code_col = "stock_code_final"

    df_matched = df.filter(pl.col(code_col).is_not_null())
    out_path = BASE / f"batch_{i}_matched.parquet"
    df_matched.write_parquet(out_path)
    matched_paths.append(out_path)
    print(f"wrote: {out_path}, rows: {df_matched.height}")

# 汇总输出（若存在多个文件）
if matched_paths:
    dfs = [pl.read_parquet(p) for p in matched_paths]
    pl.concat(dfs, how="vertical_relaxed").write_parquet(BASE / "matched_all.parquet")
    print("wrote: matched_all.parquet")
else:
    print("未发现 matched 数据文件可汇总")


wrote: batch_1_matched.parquet, rows: 898915
wrote: batch_2_matched.parquet, rows: 931774
wrote: batch_3_matched.parquet, rows: 1076243
wrote: batch_4_matched.parquet, rows: 1407198
wrote: batch_5_matched.parquet, rows: 714201
wrote: matched_all.parquet
