In [9]:
import sys
import subprocess

def ensure_packages(packages):
    for pkg in packages:
        try:
            __import__(pkg)
        except ImportError:
            subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

ensure_packages(["pandas", "openpyxl"])

import pandas as pd
from pathlib import Path

print("依赖就绪：pandas", pd.__version__)

依赖就绪：pandas 2.2.2


In [None]:
REGION_PREFIX = "云南"
YEARS = ["2017", "2018", "2019", "2020", "2021"]  # ← 按需修改
OUTPUT_CSV_PATH = "temp.csv"
SHEET_NAME = None

In [None]:
RETAIN_COLUMNS = [
    "年份","学校","_985","_211","双一流","科类","批次","专业","最低分","最低分排名","全国统一招生代码","招生类型", "生源地"
]

YN_MAP_TRUE = {"是","Y","y","1","True","true"}
YN_MAP_FALSE = {"否","N","n","0","False","false"}

def to_binary(v):
    if pd.isna(v):
        return pd.NA
    s = str(v).strip()
    if s in YN_MAP_TRUE:
        return 1
    if s in YN_MAP_FALSE:
        return 0
    # 其它未知值保持为空，便于后续人工核查
    return pd.NA

def process_excel_to_df(input_path: str, sheet: str | None = None) -> pd.DataFrame:
    # 读取 Excel，强制以字符串读入，避免类型混乱
    if sheet is None:
        df = pd.read_excel(input_path, dtype=str)
    else:
        df = pd.read_excel(input_path, sheet_name=sheet, dtype=str)

    # 去除列名前后空格
    df.columns = [str(c).strip() for c in df.columns]

    # 校验必需列是否存在
    required_columns = [c for c in RETAIN_COLUMNS if c not in ["专业", "生源地"]]
    missing_required = [c for c in required_columns if c not in df.columns]
    if missing_required:
        raise ValueError(f"缺少列: {', '.join(missing_required)}")

    if "专业" not in df.columns:
        df["专业"] = pd.NA

    # 若缺少“生源地”列或存在缺失/空字符串，则用 REGION_PREFIX 填充
    if "生源地" not in df.columns:
        df["生源地"] = REGION_PREFIX
    else:
        # 先填充缺失值，再清理空白并将空字符串/字符串'nan'填为 REGION_PREFIX
        filled = df["生源地"].fillna(REGION_PREFIX).astype(str).str.strip()
        df["生源地"] = filled.replace({"": REGION_PREFIX, "nan": REGION_PREFIX})

    # 保留指定列
    df = df[RETAIN_COLUMNS].copy()

    # 是/否 → 1/0
    for col in ["_985","_211","双一流"]:
        df[col] = df[col].map(to_binary)

    # 数值列转换
    for col in ["最低分","最低分排名"]:
        s = pd.to_numeric(df[col], errors="coerce")
        non_int = int((s.dropna() % 1 != 0).sum())
        if non_int > 0:
            print(f"警告：{col}发现非整数值 {non_int} 个，已四舍五入处理")
            s = s.round()
        df[col] = s.astype("Int64")

    return df


for YEAR_SUFFIX in YEARS:
    input_path = f"{REGION_PREFIX}_专业分数线_{YEAR_SUFFIX}.xlsx"
    final_csv_path = f"{YEAR_SUFFIX}.csv"

    # 读取并清洗（保留列、是/否转1/0、数值列转换）
    if not Path(input_path).exists():
        print(f"[{YEAR_SUFFIX}] 跳过，未找到输入：{input_path}")
        continue

    df_batch = process_excel_to_df(input_path, SHEET_NAME)
    df_batch.to_csv(OUTPUT_CSV_PATH, index=False, encoding="utf-8-sig")
    print(f"[{YEAR_SUFFIX}] 已导出中间版 CSV：{OUTPUT_CSV_PATH}（{len(df_batch)} 行）")

    # 后置过滤：移除 _985、_211、双一流 三列同时为 0 的行（NaN 不算 0）
    df_final_batch = pd.read_csv(OUTPUT_CSV_PATH)
    bin_cols = ["_985", "_211", "双一流"]
    df_final_batch[bin_cols] = df_final_batch[bin_cols].apply(pd.to_numeric, errors="coerce")

    for col in ["最低分","最低分排名"]:
        s = pd.to_numeric(df_final_batch[col], errors="coerce")
        non_int = int((s.dropna() % 1 != 0).sum())
        if non_int > 0:
            print(f"警告：{col}发现非整数值 {non_int} 个，已四舍五入处理")
            s = s.round()
        df_final_batch[col] = s.astype("Int64")

    batch_series = df_final_batch["批次"].astype(str).str.replace(r"\s+", "", regex=True).str.strip()
    keep_mask = batch_series.str.startswith("本科一批") | batch_series.str.startswith("本科批")

    removed_batch = len(df_final_batch) - int(keep_mask.sum())
    df_final_batch = df_final_batch[keep_mask].copy()
    print(f"[{YEAR_SUFFIX}] 已保留批次=本科一批，移除 {removed_batch} 行，剩余 {len(df_final_batch)}")

    mask_all_zero = (df_final_batch["_985"] == 0) & (df_final_batch["_211"] == 0) & (df_final_batch["双一流"] == 0)
    removed = int(mask_all_zero.sum())
    df_final_batch = df_final_batch[~mask_all_zero].copy()

    # 导出最终版并删除中间版
    df_final_batch.to_csv(final_csv_path, index=False, encoding="utf-8-sig")
    print(f"[{YEAR_SUFFIX}] 已导出最终版 CSV：{final_csv_path}（移除 {removed} 行，剩余 {len(df_final_batch)} 行）")

    try:
        if Path(OUTPUT_CSV_PATH).exists():
            Path(OUTPUT_CSV_PATH).unlink()
            print(f"[{YEAR_SUFFIX}] 已删除中间版 CSV：{OUTPUT_CSV_PATH}")
        else:
            print(f"[{YEAR_SUFFIX}] 未找到中间版 CSV（可能已删除）：{OUTPUT_CSV_PATH}")
    except Exception as e:
        print(f"[{YEAR_SUFFIX}] 删除中间版 CSV 失败：{e}")