按照上述代码，对“宏观月度数据Verison20”表格进行清洗：
1. 保留表格中第一行与所有第一列包含时间的数据行，其余全部删除
2. 标记表格内的缺失值
3. 对经过季调的水平值进行计算环比增速，并重命名后替代原列
4. 对于未经季调的水平值进行季调与环比增速计算，并重命名后替代原列
5. 从第一个“至多有一个缺失值”的行开始的时序数据
6. 对表格的变量命名进行英文简化

In [1]:
# -*- coding: utf-8 -*-
import re, math
import numpy as np
import pandas as pd
from pathlib import Path
from pandas.tseries.offsets import MonthEnd
from statsmodels.tsa.seasonal import STL

# ---------- 路径 ----------
cands = [Path("/mnt/data/宏观月度数据Verison20.xlsx"),
         Path("宏观月度数据Verison20.xlsx")]
IN_PATH  = next((p for p in cands if p.exists()), cands[0])
OUT_PATH = IN_PATH.with_name(IN_PATH.stem + "_仅时间行_清洗后_月末索引_abbr.xlsx")

# ---------- 规则 ----------
SA_TOKENS = ["季调","季节调整","x-13","x13","seasadj","seasonally adjusted","sa"]
GROWTH_TOKENS = ["环比","同比","增速","增长率","变化率","变动率","mom","qoq","yoy","%","pct","rate"]

def is_growth_col(name:str)->bool:
    s = str(name).lower()
    return any(t in s for t in [k.lower() for k in GROWTH_TOKENS])

def is_sa_level_col(name:str)->bool:
    s = str(name).lower()
    return (not is_growth_col(name)) and any(t in s for t in [k.lower() for k in SA_TOKENS])

# ---------- 时间识别 ----------
def is_number(x):
    return isinstance(x, (int,float)) and not isinstance(x,bool) and not math.isnan(x)

def looks_like_time(v):
    if v is None or (isinstance(v,float) and math.isnan(v)): return False
    if isinstance(v, pd.Timestamp): return True
    s = str(v).strip()
    if s == "": return False
    if re.match(r"^\d{4}[-/.]\d{1,2}([-/\.]\d{1,2})?$", s):
        try: pd.to_datetime(s, errors="raise"); return True
        except: pass
    if re.match(r"^\d{4}年\d{1,2}月(\d{1,2}日)?$", s):
        try:
            ss = s.replace("年","-").replace("月","-").replace("日","").strip("-")
            pd.to_datetime(ss, errors="raise"); return True
        except: pass
    if re.match(r"^\d{4}[-]?Q[1-4]$", s, flags=re.I): return True
    if re.match(r"^\d{6}$", s):
        try: pd.to_datetime(s, format="%Y%m", errors="raise"); return True
        except: pass
    if re.match(r"^\d{8}$", s):
        try: pd.to_datetime(s, format="%Y%m%d", errors="raise"); return True
        except: pass
    if is_number(v) and 1 <= float(v) <= 60000:
        try: pd.to_datetime(v, unit="D", origin="1899-12-30"); return True
        except: pass
    try: pd.to_datetime(s, errors="raise", yearfirst=True); return True
    except: return False

# ---------- 工具 ----------
def to_num(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.replace("%","",regex=False).str.replace(",","",regex=False)
    return pd.to_numeric(s, errors="coerce")

def month_index(col):
    dt = pd.to_datetime(col, errors="coerce")
    return (dt.dt.to_period("ME").dt.to_timestamp("ME")).astype("datetime64[ns]")

def sa_and_mom_from_level(y: pd.Series) -> pd.DataFrame:
    """y: 月度水平，DatetimeIndex(月末)。返回 sa, mom_full, mom_masked。"""
    y = y[~y.index.duplicated()].sort_index()
    idx = pd.date_range(y.index.min(), y.index.max(), freq="ME")
    y2  = y.reindex(idx)
    obs = y2.notna()

    # 只为让 STL 能跑的内部填充；不用于回写
    yfill = y2.interpolate(method="time").ffill().bfill()
    out = pd.DataFrame(index=idx)
    out["obs"] = obs
    if yfill.notna().sum() >= 24:
        stl = STL(yfill, period=12, robust=True).fit()
        sa = yfill - stl.seasonal
        mom = sa.pct_change() * 100.0
        valid = obs & obs.shift(1, fill_value=False)  # 不向前取值：仅相邻两期原始观测都存在
        out["sa"] = sa
        out["mom_full"] = mom
        out["mom_masked"] = mom.where(valid)
    else:
        out[["sa","mom_full","mom_masked"]] = np.nan
    return out

# ---------- 重命名 ----------
def slugify(s: str) -> str:
    s = s.lower()
    s = re.sub(r"[ \t\r\n·、，,；;：:（）()［］\[\]{}<>“”\"'’‘]+","_", s)
    s = re.sub(r"[^a-z0-9_]", "", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

BASE_MAP = [
    # 消费
    ("社会消费品零售总额","rs"), ("消费品零售总额","rs"), ("社会消费品零售","rs"),
    ("消费品零售","rs"), ("社零","rs"), ("社消","rs"), ("消费品","cons"),
    # 价格
    ("工业生产者出厂价格指数","ppi"), ("工业生产者购进价格指数","ppi_in"),
    ("居民消费价格指数","cpi"), ("核心","core"), ("食品","food"),
    ("非食品","nfood"), ("服务","srv"),
    # 实体
    ("规模以上工业增加值","ip"), ("工业增加值","ip"),
    ("固定资产投资","fai"), ("房地产开发投资","re_inv"),
    ("商品房销售面积","re_sa"), ("商品房销售额","re_sv"),
    ("进出口总额","trade"), ("出口","exp"), ("进口","imp"), ("贸易顺差","tb"),
    ("用电量","power"), ("发电量","gen"),
    # 财政与税
    ("一般公共预算收入","fiscal_rev"), ("财政收入","fiscal_rev"),
    ("税收收入","tax_rev"), ("税收合计","tax"), ("税收","tax"), ("税金","tax"),
    # 就业
    ("城镇调查失业率","ur"), ("调查失业率","ur"), ("失业率","ur"),
    ("城镇登记失业率","ur_reg"), ("登记失业率","ur_reg"),
    ("青年调查失业率","ur_youth"), ("16-24岁调查失业率","ur_youth"),
    # 金融货币
    ("人民币贷款","loan"), ("社会融资规模","tsf"),
    ("广义货币M2","m2"), ("货币供应量M2","m2"), ("货币供应M2","m2"),
    ("狭义货币M1","m1"), ("货币供应量M1","m1"), ("货币供应M1","m1"), ("M1货币供应量","m1"),
    ("存款准备金","rrr"), ("利率","rate"), ("汇率","fx"),
    # 其它
    ("房价指数","hpi"), ("制造业PMI","pmi_mfg"), ("非制造业PMI","pmi_nmi"),
    ("PMI","pmi"), ("GDP","gdp"), ("指数","idx"), ("价格","px"),
    ("产量","out"), ("销量","sales"), ("库存","inv"),
]

SUFFIX_RULES = [
    (["季调","季节调整","x-13","x13","seasadj","seasonally adjusted","sa"], "sa"),
    (["同比","yoy"], "yoy"),
    (["环比","mom"], "mom"),
    (["季度环比","qoq"], "qoq"),
    (["累计","累计值","累计同比","累计增速","cum"], "cum"),
    (["当月","本月","cur"], "cur"),
]

def map_base_token(name: str) -> str:
    s = str(name) if name is not None else ""
    for zh, en in BASE_MAP:
        if zh in s: return en
    low = s.lower()
    if re.search(r'(?<![0-9a-z])m0(?![0-9a-z])', low): return "m0"
    if re.search(r'(?<![0-9a-z])m1(?![0-9a-z])', low): return "m1"
    if re.search(r'(?<![0-9a-z])m2(?![0-9a-z])', low): return "m2"
    m = re.findall(r"[A-Za-z]{2,}", s)
    if m: return re.sub(r"[^A-Za-z0-9_]", "", m[0]).lower()
    if "消费" in s: return "cons"
    return "var"

def detect_suffixes(name: str):
    low = str(name).lower()
    found = []
    for keys, tag in SUFFIX_RULES:
        if any(k.lower() in low for k in keys): found.append(tag)
    order = ["sa","yoy","mom","qoq","cum","cur"]
    return [t for t in order if t in found]

def rename_columns(header_row):
    new_names, seen = [], {}
    for j, col in enumerate(header_row):
        if j == 0:
            nm = "date"
        else:
            base = map_base_token(col)
            suffix = detect_suffixes(col)
            nm = "_".join([base] + suffix) if suffix else base
            nm = slugify(nm)[:24] or f"col{j}"
        if nm in seen:
            seen[nm] += 1; nm = f"{nm}_{seen[nm]}"
        else:
            seen[nm] = 1
        new_names.append(nm)
    return new_names

# ---------- 主处理：每表 ----------
def process_sheet(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty: return df

    # 1) 仅保留首行 + 首列为时间的行
    keep = pd.Series(False, index=df.index)
    keep.iloc[0] = True
    first_col = df.iloc[:,0] if df.shape[1]>0 else pd.Series([None]*len(df))
    keep |= first_col.apply(looks_like_time)
    df = df.loc[keep].copy()

    # 2) 标记缺失
    df = df.replace(r"^\s*$", pd.NA, regex=True)

    if df.shape[0] <= 1 or df.shape[1] <= 1:
        return df

    header = df.iloc[0,:].astype(str).fillna("")
    body   = df.iloc[1:,:].copy()

    # 构造月末索引（不改首列单元格值）
    dt = month_index(body.iloc[:,0])
    body.index = dt

    # 3&4) 已季调水平 → 直接 MoM；未季调水平 → STL季调后 MoM；增长列不动
    for j in range(1, df.shape[1]):
        colname = header.iloc[j] if j < len(header) else f"col{j}"
        if is_growth_col(colname):  # 增速列跳过
            continue
        x = to_num(body.iloc[:, j])
        x.index = dt

        if is_sa_level_col(colname):
            mom = x.pct_change()*100.0
            valid = x.notna() & x.shift(1).notna()  # 不向前取值
            mom = mom.where(valid)
            body.iloc[:, j] = mom.values
            header.iloc[j] = f"{colname}_mom"
        else:
            tmp = sa_and_mom_from_level(x)
            mom = tmp["mom_masked"].reindex(body.index)
            body.iloc[:, j] = mom.values
            header.iloc[j] = f"{colname}_mom"

    # 5) 从首个“至多有一缺失”的行开始
    vars_ = body.iloc[:,1:]
    miss_cnt = vars_.isna().sum(axis=1)
    cond = (miss_cnt <= 1)
    if cond.any():
        start_idx = cond[cond].index[0]
        body = body.loc[start_idx:]

    # 回组 + 6) 英文简化命名
    out = pd.concat([header.to_frame().T, body.reset_index(drop=True)], ignore_index=True)
    out.iloc[0,:] = rename_columns(out.iloc[0,:].tolist())
    return out

# ---------- 执行 ----------
xl = pd.ExcelFile(IN_PATH)
wrote = False
with pd.ExcelWriter(OUT_PATH, engine="openpyxl", mode="w") as writer:
    for sh in xl.sheet_names:
        try:
            raw = xl.parse(sh, header=None, dtype=object)
        except Exception:
            continue
        out = process_sheet(raw)
        out.to_excel(writer, sheet_name=sh, header=False, index=False)
        wrote = True
    if not wrote:
        pd.DataFrame({"info":["no data after processing"]}).to_excel(
            writer, sheet_name="placeholder", index=False
        )

print(f"输出：{OUT_PATH}")

  idx = pd.date_range(y.index.min(), y.index.max(), freq="M")
  idx = pd.date_range(y.index.min(), y.index.max(), freq="M")


输出：宏观月度数据Verison20_仅时间行_清洗后_月末索引_abbr.xlsx


## -------------- 具体步骤 Version2 ----------------

In [1]:
# -*- coding: utf-8 -*-
import re
import math
import numpy as np
import pandas as pd
from pathlib import Path
from pandas.tseries.offsets import MonthEnd

# 输入候选（按存在优先）
candidates = [
    Path("/mnt/data/宏观月度数据Verison20.xlsx"),
    Path("宏观月度数据Verison20.xlsx"),
]
in_path = next((p for p in candidates if p.exists()), candidates[0])
out_path = in_path.with_name(in_path.stem + "_仅时间行_清洗后_月末索引.xlsx")

def is_number(x):
    return isinstance(x, (int, float)) and not isinstance(x, bool) and not math.isnan(x)

def looks_like_time(v):
    if v is None or (isinstance(v, float) and math.isnan(v)):
        return False
    if isinstance(v, (pd.Timestamp, )):
        return True
    s = str(v).strip()
    if s == "":
        return False
    # 常见格式
    if re.match(r"^\d{4}[-/.]\d{1,2}([-/\.]\d{1,2})?$", s):
        try: pd.to_datetime(s, errors="raise"); return True
        except: pass
    if re.match(r"^\d{4}年\d{1,2}月(\d{1,2}日)?$", s):
        try:
            ss = s.replace("年","-").replace("月","-").replace("日","").strip("-")
            pd.to_datetime(ss, errors="raise"); return True
        except: pass
    if re.match(r"^\d{4}[-]?Q[1-4]$", s, flags=re.I):
        return True
    if re.match(r"^\d{6}$", s):
        try: pd.to_datetime(s, format="%Y%m", errors="raise"); return True
        except: pass
    if re.match(r"^\d{8}$", s):
        try: pd.to_datetime(s, format="%Y%m%d", errors="raise"); return True
        except: pass
    if is_number(v) and 1 <= float(v) <= 60000:
        try: pd.to_datetime(v, unit="D", origin="1899-12-30"); return True
        except: pass
    try: pd.to_datetime(s, errors="raise", yearfirst=True); return True
    except: return False

def to_month_end_cell(v):
    if v is None or (isinstance(v, float) and math.isnan(v)):
        return v
    if isinstance(v, pd.Timestamp):
        return v + MonthEnd(0)
    if isinstance(v, (int, float)) and not isinstance(v, bool):
        if 1 <= float(v) <= 60000:
            try: return pd.to_datetime(v, unit="D", origin="1899-12-30") + MonthEnd(0)
            except: pass
        try: return pd.to_datetime(v, errors="raise", yearfirst=True) + MonthEnd(0)
        except: return v
    s = str(v).strip()
    for fmt in [None, "%Y%m", "%Y%m%d"]:
        try:
            dt = pd.to_datetime(s, format=fmt, errors="raise") if fmt else pd.to_datetime(s, errors="raise", yearfirst=True)
            return dt + MonthEnd(0)
        except: continue
    try:
        s2 = s.replace("年","-").replace("月","-").replace("日","").strip("-/ .")
        return pd.to_datetime(s2, errors="raise", yearfirst=True) + MonthEnd(0)
    except:
        return v

def mark_missing(df: pd.DataFrame) -> pd.DataFrame:
    return df.replace(r"^\s*$", pd.NA, regex=True)

def keep_header_and_time_rows(dfm: pd.DataFrame) -> pd.DataFrame:
    if dfm.shape[0] == 0: return dfm
    keep = pd.Series(False, index=dfm.index)
    keep.iloc[0] = True
    if dfm.shape[1] > 0:
        keep |= dfm.iloc[:, 0].apply(looks_like_time)
    return dfm.loc[keep]

def trim_from_first_eq1_missing(dfm: pd.DataFrame) -> pd.DataFrame:
    # 首行=表头保留；从“变量列恰有1个缺失”的首行起截断；无则退到全非缺失；再无则不截断
    if dfm.shape[0] <= 1 or dfm.shape[1] <= 1:
        return dfm
    header = dfm.iloc[:1]
    data   = dfm.iloc[1:]
    vars_  = data.iloc[:, 1:]
    miss_cnt = vars_.isna().sum(axis=1)
    cond = (miss_cnt == 1)
    if not cond.any():
        cond = vars_.notna().all(axis=1)
    if cond.any():
        start_idx = cond[cond].index[0]
        return pd.concat([header, dfm.loc[start_idx:]], axis=0)
    return dfm

# 逐表处理
xl = pd.ExcelFile(in_path)
wrote = False
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
    for sh in xl.sheet_names:
        try:
            raw = xl.parse(sh, header=None, dtype=object)
        except Exception:
            continue
        # 1) 空白→缺失
        df1 = mark_missing(raw)
        # 2) 仅保留首行 + 首列可解析为时间的行
        df2 = keep_header_and_time_rows(df1)
        # 3) 从“恰有1个缺失”的首行起截断（否则退到全非缺失）
        df3 = trim_from_first_eq1_missing(df2)
        # 4) 首列日期改为当月月末（仅数据行，不动首行）
        if df3.shape[1] >= 1 and df3.shape[0] >= 2:
            df3.iloc[1:, 0] = df3.iloc[1:, 0].apply(to_month_end_cell)
        # 写回
        df3.to_excel(writer, sheet_name=sh, header=False, index=False)
        wrote = True
    if not wrote:
        pd.DataFrame({"info": ["no data after processing"]}).to_excel(
            writer, sheet_name="placeholder", index=False
        )

print(f"输出：{out_path}")

输出：宏观月度数据Verison20_仅时间行_清洗后_月末索引.xlsx


In [8]:
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pathlib import Path
from pandas.tseries.offsets import MonthEnd
from statsmodels.tsa.seasonal import STL
try:
    from statsmodels.tsa.x13 import x13_arima_analysis as x13aa
    HAS_X13 = True
except Exception:
    HAS_X13 = False

IN_PATH  = Path("宏观月度数据Verison20_仅时间行_清洗后_月末索引.xlsx")
OUT_PATH = IN_PATH.with_name(IN_PATH.stem + "_税收季调环比_fix.xlsx")

TAX_TOKENS = ["税","税收","税收收入","税收合计","tax","tax_rev","fiscal_rev"]
GROWTH_TOKENS = ["环比","同比","增速","增长率","变化率","变动率","mom","qoq","yoy","%","pct","rate"]

def is_tax_level_col(name: str) -> bool:
    if name is None: return False
    s = str(name).strip().lower()
    if not s: return False
    if any(tok in s for tok in (t.lower() for t in GROWTH_TOKENS)): return False
    return any(tok in s for tok in (t.lower() for t in TAX_TOKENS))

def to_num(x: pd.Series) -> pd.Series:
    s = x.astype(str).str.replace("%","",regex=False).str.replace(",","",regex=False)
    return pd.to_numeric(s, errors="coerce")

def month_end(col):
    dt = pd.to_datetime(col, errors="coerce")
    return dt + MonthEnd(0)

def sa_stl_or_x13(s: pd.Series) -> pd.Series:
    # s: 月度不齐频，索引为DatetimeIndex
    s = s[~s.index.duplicated()].sort_index()
    idx = pd.date_range(s.index.min(), s.index.max(), freq="M")
    s2  = s.reindex(idx)
    # 时间插值以便季调算法运行；不回写到结果
    s2_fill = s2.interpolate(method="time").ffill().bfill()
    if s2_fill.notna().sum() < 24:
        return pd.Series(index=idx, dtype="float64")  # 样本太短
    if HAS_X13:
        try:
            res = x13aa(endog=s2_fill, freq="M")
            sa = res.seasadj
            return sa.reindex(idx)
        except Exception:
            pass
    stl = STL(s2_fill, period=12, robust=True).fit()
    sa = s2_fill - stl.seasonal
    return sa.reindex(idx)

def mom_from_sa(sa: pd.Series, obs_mask: pd.Series) -> pd.Series:
    mom = sa.pct_change() * 100.0
    # 仅在“本期与上期均有原始观测”的位置保留
    valid = obs_mask & obs_mask.shift(1, fill_value=False)
    return mom.where(valid)

xl = pd.ExcelFile(IN_PATH)
with pd.ExcelWriter(OUT_PATH, engine="openpyxl", mode="w") as writer:
    for sh in xl.sheet_names:
        df = xl.parse(sh, header=None, dtype=object)
        if df.shape[0] <= 1 or df.shape[1] <= 1:
            df.to_excel(writer, sheet_name=sh, header=False, index=False)
            continue

        header = df.iloc[0, :].astype(str).fillna("")
        body   = df.iloc[1:, :].copy()

        # 时间索引
        dt = month_end(body.iloc[:, 0])
        body.index = dt

        # 处理税收列：覆盖为季调环比%
        for j in range(1, df.shape[1]):
            name = header.iloc[j] if j < len(header) else f"col{j}"
            if not is_tax_level_col(name):
                continue
            x = to_num(body.iloc[:, j])
            x.index = dt
            if x.notna().sum() < 12:
                continue
            sa = sa_stl_or_x13(x)
            obs_mask = x.reindex(sa.index).notna()
            mom = mom_from_sa(sa, obs_mask)
            # 回到原行索引并覆盖
            mom_on_rows = mom.reindex(body.index)
            body.iloc[:, j] = mom_on_rows.values  # 不改列名

        out = pd.concat([header.to_frame().T, body.reset_index(drop=True)], ignore_index=True)
        out.to_excel(writer, sheet_name=sh, header=False, index=False)

print(f"已输出：{OUT_PATH}")
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pathlib import Path
from pandas.tseries.offsets import MonthEnd
from statsmodels.tsa.seasonal import STL
try:
    from statsmodels.tsa.x13 import x13_arima_analysis as x13aa
    HAS_X13 = True
except Exception:
    HAS_X13 = False

IN_PATH  = Path("宏观月度数据Verison20_仅时间行_清洗后_月末索引.xlsx")
OUT_PATH = IN_PATH.with_name(IN_PATH.stem + "_税收季调环比_fix.xlsx")

TAX_TOKENS = ["税","税收","税收收入","税收合计","tax","tax_rev","fiscal_rev"]
GROWTH_TOKENS = ["环比","同比","增速","增长率","变化率","变动率","mom","qoq","yoy","%","pct","rate"]

def is_tax_level_col(name: str) -> bool:
    if name is None: return False
    s = str(name).strip().lower()
    if not s: return False
    if any(tok in s for tok in (t.lower() for t in GROWTH_TOKENS)): return False
    return any(tok in s for tok in (t.lower() for t in TAX_TOKENS))

def to_num(x: pd.Series) -> pd.Series:
    s = x.astype(str).str.replace("%","",regex=False).str.replace(",","",regex=False)
    return pd.to_numeric(s, errors="coerce")

def month_end(col):
    dt = pd.to_datetime(col, errors="coerce")
    return dt + MonthEnd(0)

def sa_stl_or_x13(s: pd.Series) -> pd.Series:
    # s: 月度不齐频，索引为DatetimeIndex
    s = s[~s.index.duplicated()].sort_index()
    idx = pd.date_range(s.index.min(), s.index.max(), freq="M")
    s2  = s.reindex(idx)
    # 时间插值以便季调算法运行；不回写到结果
    s2_fill = s2.interpolate(method="time").ffill().bfill()
    if s2_fill.notna().sum() < 24:
        return pd.Series(index=idx, dtype="float64")  # 样本太短
    if HAS_X13:
        try:
            res = x13aa(endog=s2_fill, freq="M")
            sa = res.seasadj
            return sa.reindex(idx)
        except Exception:
            pass
    stl = STL(s2_fill, period=12, robust=True).fit()
    sa = s2_fill - stl.seasonal
    return sa.reindex(idx)

def mom_from_sa(sa: pd.Series, obs_mask: pd.Series) -> pd.Series:
    mom = sa.pct_change() * 100.0
    # 仅在“本期与上期均有原始观测”的位置保留
    valid = obs_mask & obs_mask.shift(1, fill_value=False)
    return mom.where(valid)

xl = pd.ExcelFile(IN_PATH)
with pd.ExcelWriter(OUT_PATH, engine="openpyxl", mode="w") as writer:
    for sh in xl.sheet_names:
        df = xl.parse(sh, header=None, dtype=object)
        if df.shape[0] <= 1 or df.shape[1] <= 1:
            df.to_excel(writer, sheet_name=sh, header=False, index=False)
            continue

        header = df.iloc[0, :].astype(str).fillna("")
        body   = df.iloc[1:, :].copy()

        # 时间索引
        dt = month_end(body.iloc[:, 0])
        body.index = dt

        # 处理税收列：覆盖为季调环比%
        for j in range(1, df.shape[1]):
            name = header.iloc[j] if j < len(header) else f"col{j}"
            if not is_tax_level_col(name):
                continue
            x = to_num(body.iloc[:, j])
            x.index = dt
            if x.notna().sum() < 12:
                continue
            sa = sa_stl_or_x13(x)
            obs_mask = x.reindex(sa.index).notna()
            mom = mom_from_sa(sa, obs_mask)
            # 回到原行索引并覆盖
            mom_on_rows = mom.reindex(body.index)
            body.iloc[:, j] = mom_on_rows.values  # 不改列名

        out = pd.concat([header.to_frame().T, body.reset_index(drop=True)], ignore_index=True)
        out.to_excel(writer, sheet_name=sh, header=False, index=False)

print(f"已输出：{OUT_PATH}")


已输出：宏观月度数据Verison20_仅时间行_清洗后_月末索引_税收季调环比_fix.xlsx
已输出：宏观月度数据Verison20_仅时间行_清洗后_月末索引_税收季调环比_fix.xlsx


  idx = pd.date_range(s.index.min(), s.index.max(), freq="M")
  idx = pd.date_range(s.index.min(), s.index.max(), freq="M")


In [5]:
# -*- coding: utf-8 -*-
import re
import pandas as pd
from pathlib import Path

in_path  = Path("宏观月度数据Verison20_仅时间行_清洗后_月末索引.xlsx")
out_path = in_path.with_name(in_path.stem + "_abbr.xlsx")

# --- 扩充映射：加入 M1 与 税收 同义词 ---
BASE_MAP = [
     # —— 消费（先放最具体） ——
    ("社会消费品零售总额", "rs"),
    ("消费品零售总额", "rs"),
    ("社会消费品零售", "rs"),
    ("消费品零售", "rs"),
    ("社零", "rs"),
    ("社消", "rs"),
    ("消费品", "cons"),   # 遇到仅“消费品”的通用列

    # —— 价格与物价 ——
    ("工业生产者出厂价格指数", "ppi"),
    ("工业生产者购进价格指数", "ppi_in"),
    ("居民消费价格指数", "cpi"),
    ("核心", "core"),
    ("食品", "food"),
    ("非食品", "nfood"),
    ("服务", "srv"),

    # —— 实体活动 ——
    ("规模以上工业增加值", "ip"),
    ("工业增加值", "ip"),
    ("固定资产投资", "fai"),
    ("房地产开发投资", "re_inv"),
    ("商品房销售面积", "re_sa"),
    ("商品房销售额", "re_sv"),
    ("进出口总额", "trade"),
    ("出口", "exp"),
    ("进口", "imp"),
    ("贸易顺差", "tb"),
    ("财政收入", "fiscal_rev"),
    ("税收收入", "tax_rev"),
    ("税收合计", "tax"),
    ("税收", "tax"),

    ("失业率", "ur"),
    ("用电量", "power"),
    ("发电量", "gen"),
    ("人民币贷款", "loan"),
    ("社会融资规模", "tsf"),

    ("广义货币M2", "m2"),
    ("货币供应量M2", "m2"),
    ("货币供应M2", "m2"),
    ("狭义货币M1", "m1"),
    ("货币供应量M1", "m1"),
    ("货币供应M1", "m1"),
    ("M1货币供应量", "m1"),

    ("存款准备金", "rrr"),
    ("利率", "rate"),
    ("汇率", "fx"),
    ("房价指数", "hpi"),
    ("制造业PMI", "pmi_mfg"),
    ("非制造业PMI", "pmi_nmi"),
    ("PMI", "pmi"),
    ("GDP", "gdp"),
    ("指数", "idx"),
    ("价格", "px"),
    ("产量", "out"),
    ("销量", "sales"),
    ("库存", "inv"),
]



# 派生后缀识别
SUFFIX_RULES = [
    (["季调","季节调整","x-13","x13","seasadj","seasonally adjusted","sa"], "sa"),
    (["同比","yoy"], "yoy"),
    (["环比","mom"], "mom"),
    (["季度环比","qoq"], "qoq"),
    (["累计","累计值","累计同比","累计增速","cum"], "cum"),
    (["当月","本月","cur"], "cur"),
]

def slugify(s: str) -> str:
    s = s.lower()
    s = re.sub(r"[ \t\r\n·、，,；;：:（）()［］\[\]{}<>“”\"'’‘]+", "_", s)
    s = re.sub(r"[^a-z0-9_]", "", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s

def map_base_token(name: str) -> str:
    s = str(name) if name is not None else ""
    # 1) 词典优先
    for zh, en in BASE_MAP:
        if zh in s:
            return en
    low = s.lower()
    # 2) 稳态识别 M0/M1/M2（避免落成 var）
    if re.search(r'(?<![0-9a-z])m0(?![0-9a-z])', low): return "m0"
    if re.search(r'(?<![0-9a-z])m1(?![0-9a-z])', low): return "m1"
    if re.search(r'(?<![0-9a-z])m2(?![0-9a-z])', low): return "m2"
    # 3) 若含英文缩写（≥2字母）直接用
    m = re.findall(r"[A-Za-z]{2,}", s)
    if m:
        return re.sub(r"[^A-Za-z0-9_]", "", m[0]).lower()
    # 4) 兜底
    return "var"

def detect_suffixes(name: str):
    found = []
    low = name.lower()
    for keys, tag in SUFFIX_RULES:
        if any(k.lower() in low for k in keys):
            found.append(tag)
    # 去重并固化顺序
    order = ["sa","yoy","mom","qoq","cum","cur"]
    return [t for t in order if t in found]

def rename_columns(header_row):
    new_names = []
    seen = {}
    for j, col in enumerate(header_row):
        if j == 0:
            base = "date"
            suffix = []
        else:
            col_s = str(col) if pd.notna(col) else f"col{j}"
            base = map_base_token(col_s)
            suffix = detect_suffixes(col_s)
        name = "_".join([base] + suffix) if suffix else base
        name = slugify(name)[:24] or f"col{j}"
        # 确保唯一
        if name in seen:
            seen[name] += 1
            name = f"{name}_{seen[name]}"
        else:
            seen[name] = 1
        new_names.append(name)
    return new_names

# 工作表名精简
SHEET_TAGS = [
    ("价格","price"), ("物价","price"), ("CPI","cpi"), ("PPI","ppi"),
    ("工业","ind"), ("制造","mfg"), ("消费","cons"), ("零售","retail"),
    ("投资","inv"), ("财政","fiscal"), ("税","tax"),
    ("金融","fin"), ("货币","money"), ("贸易","trade"),
    ("房地产","re"), ("房地","re"), ("就业","labor"),
    ("能源","energy"), ("电力","power")
]

def short_sheet_name(name: str, idx: int) -> str:
    parts = []
    n = name
    for zh, tag in SHEET_TAGS:
        if zh in n:
            parts.append(tag)
    if not parts:
        # 保留原英文/数字
        base = slugify(n)
        parts = [base] if base else [f"sh{idx+1}"]
    slug = "_".join(dict.fromkeys(parts))[:20] or f"sh{idx+1}"
    return slug

# 执行
xl = pd.ExcelFile(in_path)
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
    for i, sh in enumerate(xl.sheet_names):
        df = xl.parse(sh, header=None, dtype=object)
        if df.empty:
            pd.DataFrame().to_excel(writer, sheet_name=short_sheet_name(sh, i), index=False, header=False)
            continue
        # 第一行为表头
        header = df.iloc[0, :].astype(str).fillna("")
        new_cols = rename_columns(header.tolist())
        df_out = df.copy()
        df_out.iloc[0, :] = new_cols
        # 表名压缩
        new_sheet = short_sheet_name(sh, i)
        df_out.to_excel(writer, sheet_name=new_sheet, index=False, header=False)

print(f"已输出：{out_path}")

已输出：宏观月度数据Verison20_仅时间行_清洗后_月末索引_abbr.xlsx


# --------------------------- 具体步骤 Version1 ------------------------------------

In [24]:
# -*- coding: utf-8 -*-
import re
import math
from datetime import datetime
import pandas as pd
from pathlib import Path

in_path  = Path("宏观月度数据Verison20.xlsx")   # 输入文件
out_path = in_path.with_name(in_path.stem + "_仅时间行.xlsx")  # 输出文件

def is_number(x):
    return isinstance(x, (int, float)) and not isinstance(x, bool) and not math.isnan(x)

# 判断“时间样式”
def looks_like_time(v):
    if v is None or (isinstance(v, float) and math.isnan(v)):
        return False
    # 直接是时间戳
    if isinstance(v, (pd.Timestamp, datetime)):
        return True
    s = str(v).strip()

    # 常见格式：YYYY-MM, YYYY/MM, YYYY.MM, YYYY-MM-DD, YYYY/MM/DD, YYYY.MM.DD
    if re.match(r"^\d{4}[-/.]\d{1,2}([-/\.]\d{1,2})?$", s):
        try:
            pd.to_datetime(s, errors="raise")
            return True
        except Exception:
            pass

    # 中文：YYYY年M月(可带日)
    if re.match(r"^\d{4}年\d{1,2}月(\d{1,2}日)?$", s):
        ss = s.replace("年","-").replace("月","-").replace("日","").strip("-")
        try:
            pd.to_datetime(ss, errors="raise")
            return True
        except Exception:
            pass

    # 季度：YYYYQ[1-4] 或 YYYY-Q[1-4]
    if re.match(r"^\d{4}[-]?Q[1-4]$", s, flags=re.IGNORECASE):
        return True

    # 紧凑：YYYYMM 或 YYYYMMDD
    if re.match(r"^\d{6}$", s):  # YYYYMM
        try:
            pd.to_datetime(s, format="%Y%m", errors="raise")
            return True
        except Exception:
            pass
    if re.match(r"^\d{8}$", s):  # YYYYMMDD
        try:
            pd.to_datetime(s, format="%Y%m%d", errors="raise")
            return True
        except Exception:
            pass

    # 纯数字，可能为 Excel 序列号（日序）
    if is_number(v) and 1 <= float(v) <= 60000:
        try:
            pd.to_datetime(v, unit="D", origin="1899-12-30")
            return True
        except Exception:
            pass

    # 兜底尝试
    try:
        pd.to_datetime(s, errors="raise", yearfirst=True, dayfirst=False)
        return True
    except Exception:
        return False

# 读取并处理所有工作表
xl = pd.ExcelFile(in_path)
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
    for sh in xl.sheet_names:
        df = pd.read_excel(in_path, sheet_name=sh, header=None, dtype=object)
        if df.shape[0] == 0:
            df.to_excel(writer, sheet_name=sh, header=False, index=False)
            continue

        # 保留规则：第0行；或第0列可解析为时间的行
        keep_mask = pd.Series(False, index=df.index)
        keep_mask.iloc[0] = True
        first_col = df.iloc[:, 0] if df.shape[1] > 0 else pd.Series([None]*len(df))
        keep_mask |= first_col.apply(looks_like_time)

        out = df.loc[keep_mask]
        out.to_excel(writer, sheet_name=sh, header=False, index=False)

print(f"已生成：{out_path}")


已生成：宏观月度数据Verison20_仅时间行.xlsx


In [5]:
# -*- coding: utf-8 -*-
import pandas as pd
from pathlib import Path

# 输入候选
candidates = [Path("宏观月度数据Verison20_仅时间.xlsx"),
              Path("宏观月度数据Verison20_仅时间行.xlsx"),
              Path("宏观月度数据Verison20_仅时间行_清洗后.xlsx")]
in_path = next((p for p in candidates if p.exists()), candidates[0])
out_path = in_path.with_name(in_path.stem + "_从首个仅一缺失行起.xlsx")

def mark_missing(df: pd.DataFrame) -> pd.DataFrame:
    return df.replace(r"^\s*$", pd.NA, regex=True)

def trim_from_first_row_with_one_missing(df_marked: pd.DataFrame) -> pd.DataFrame:
    # 结构检查
    if df_marked.shape[0] <= 1 or df_marked.shape[1] <= 1:
        return df_marked
    # 首行保留，其余为数据；第1列视为时间列，不参与计缺失
    header = df_marked.iloc[:1]
    data   = df_marked.iloc[1:]
    vars_  = data.iloc[:, 1:]

    # 目标1：首个“恰好1个缺失”的行
    miss_cnt = vars_.isna().sum(axis=1)
    cond_eq1 = (miss_cnt == 1)
    if cond_eq1.any():
        start_idx = cond_eq1[cond_eq1].index[0]
        return pd.concat([header, df_marked.loc[start_idx:]], axis=0)

    # 回退1：首个“全非缺失”的行
    cond_full = vars_.notna().all(axis=1)
    if cond_full.any():
        start_idx = cond_full[cond_full].index[0]
        return pd.concat([header, df_marked.loc[start_idx:]], axis=0)

    # 回退2：不裁剪
    return df_marked

# 逐表处理并输出，确保至少写出一张表
xl = pd.ExcelFile(in_path)
wrote = False
with pd.ExcelWriter(out_path, engine="openpyxl", mode="w") as writer:
    for sh in xl.sheet_names:
        try:
            raw = xl.parse(sh, header=None, dtype=object)
        except Exception:
            continue
        dfm = mark_missing(raw)
        out = trim_from_first_row_with_one_missing(dfm)
        out.to_excel(writer, sheet_name=sh, header=False, index=False)
        wrote = True
    if not wrote:
        pd.DataFrame({"info": ["no data after processing"]}).to_excel(
            writer, sheet_name="placeholder", index=False
        )

print(f"输入：{in_path}")
print(f"已输出：{out_path}")


输入：宏观月度数据Verison20_仅时间行.xlsx
已输出：宏观月度数据Verison20_仅时间行_从首个仅一缺失行起.xlsx


In [6]:
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pathlib import Path
from pandas.tseries.offsets import MonthEnd

in_path = Path("宏观月度数据Verison20_仅时间行_从首个仅一缺失行起.xlsx")
out_path = in_path.with_name(in_path.stem + "_月末索引.xlsx")

def to_month_end_cell(v):
    """将单元格转为当月月末日期；无法解析则原样返回。"""
    if v is None or (isinstance(v, float) and np.isnan(v)):
        return v
    # 已是时间戳
    if isinstance(v, pd.Timestamp):
        return v + MonthEnd(0)
    # Excel 日序
    if isinstance(v, (int, float)) and not isinstance(v, bool):
        if 1 <= float(v) <= 60000:
            try:
                dt = pd.to_datetime(v, unit="D", origin="1899-12-30")
                return dt + MonthEnd(0)
            except Exception:
                pass
        # 兜底常规解析
        try:
            dt = pd.to_datetime(v, errors="raise", yearfirst=True)
            return dt + MonthEnd(0)
        except Exception:
            return v
    # 字符串
    s = str(v).strip()
    if s == "":
        return np.nan
    # 常规解析 + 常见紧凑格式
    for fmt in [None, "%Y%m", "%Y%m%d"]:
        try:
            dt = pd.to_datetime(s, format=fmt, errors="raise") if fmt else pd.to_datetime(s, errors="raise", yearfirst=True)
            return dt + MonthEnd(0)
        except Exception:
            continue
    # 中文日期兜底
    try:
        s2 = s.replace("年","-").replace("月","-").replace("日","").strip("-/ .")
        dt = pd.to_datetime(s2, errors="raise", yearfirst=True)
        return dt + MonthEnd(0)
    except Exception:
        return v

xl = pd.ExcelFile(in_path)
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
    for sh in xl.sheet_names:
        df = xl.parse(sh, header=None, dtype=object)
        if df.shape[1] >= 1:
            df.iloc[:, 0] = df.iloc[:, 0].apply(to_month_end_cell)
        df.to_excel(writer, sheet_name=sh, header=False, index=False)

print(f"已输出：{out_path}")


已输出：宏观月度数据Verison20_仅时间行_从首个仅一缺失行起_月末索引.xlsx


In [7]:
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from pathlib import Path

in_path = Path("宏观月度数据Verison20_仅时间行_从首个仅一缺失行起_月末索引.xlsx")
out_path = in_path.with_name(in_path.stem + "_环比替换.xlsx")

SA_TOKENS = ["季调","季节调整","季节性调整","x-13","x13","seasadj","seasonally adjusted","sa","adj"]
GROWTH_TOKENS = ["环比","同比","增速","增长率","变化率","变动率","mom","qoq","yoy","%","pct","rate"]

def is_sa_level_col(name: str) -> bool:
    if name is None:
        return False
    s = str(name).strip().lower()
    if s == "" or s == "nan":
        return False
    if any(tok in s for tok in GROWTH_TOKENS):
        return False
    return any(tok in s for tok in SA_TOKENS)

def to_numeric(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.replace("%","", regex=False).str.replace(",","", regex=False)
    return pd.to_numeric(s, errors="coerce")

xl = pd.ExcelFile(in_path)
with pd.ExcelWriter(out_path, engine="openpyxl") as writer:
    for sh in xl.sheet_names:
        df = xl.parse(sh, header=None, dtype=object)
        if df.empty or df.shape[0] <= 1 or df.shape[1] <= 1:
            df.to_excel(writer, sheet_name=sh, header=False, index=False)
            continue

        header = df.iloc[0, :].astype(str).fillna("")
        data = df.iloc[1:, :].copy()

        for j in range(1, df.shape[1]):  # 跳过第1列时间
            colname = header.iloc[j] if j < len(header) else f"col{j}"
            if is_sa_level_col(colname):
                x = to_numeric(data.iloc[:, j])
                data.iloc[:, j] = x.pct_change() * 100.0

        out = pd.concat([header.to_frame().T, data], ignore_index=True)
        out.to_excel(writer, sheet_name=sh, header=False, index=False)

print(f"已输出：{out_path}")

已输出：宏观月度数据Verison20_仅时间行_从首个仅一缺失行起_月末索引_环比替换.xlsx


  data.iloc[:, j] = x.pct_change() * 100.0
