# 脏数据清洗速查本（Dataset Interview 用）

这份 notebook 用来在现场快速落地：数据体检 → 切分与泄露防控 → 清洗（缺失/异常/重复/类型/类别脏值）→ 产出可复现的 Pipeline → A/B 对比。

**目标**：
- 先拿到一个能跑通的 baseline（最小可用清洗）
- 再集中火力处理最大的噪声源（高缺失、明显错误、强异常、泄露风险）
- 每一步保留日志：改了多少行、改了多少值、分布变化如何


## 0. 运行前约定
- 原始数据只读：`df_raw` 永远保留不改
- 每一步清洗产出新对象：`df = df.copy()`
- 任何用到统计量（均值/标准差/分位数/编码等）的步骤：只在 train 上 fit，在 test 上 transform
- 时间序列：先锁定时间切分，再做任何特征工程
- 产出物：
  - `quality_report.csv`（列级质量表）
  - `cleaning_log.json`（清洗日志）
  - `pipeline.pkl`（可复现的 sklearn Pipeline，如果需要）


In [None]:
import numpy as np
import pandas as pd

from dataclasses import dataclass, asdict
import json
from pathlib import Path

# 仅用 sklearn / pandas / numpy 的通用做法
from sklearn.model_selection import TimeSeriesSplit, GroupKFold
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.linear_model import Ridge, Lasso, LogisticRegression
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier


In [None]:
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True, parents=True)

cleaning_log = []

def log_step(name: str, details: dict):
    rec = {"step": name, **details}
    cleaning_log.append(rec)

def save_log(path=OUTPUT_DIR/"cleaning_log.json"):
    path.write_text(json.dumps(cleaning_log, ensure_ascii=False, indent=2))
    return path


## 1. 数据加载
把下面的读取代码替换成现场数据路径与格式。


In [None]:
# 示例：CSV
# df_raw = pd.read_csv("data.csv")

# 示例：Parquet
# df_raw = pd.read_parquet("data.parquet")

# 本 cell 运行后确保有 df_raw


## 2. 快速体检（Profiling）
输出列级质量表：缺失率、唯一值数、疑似常数列、数值分位数、类别 top-k、时间列单调性等。


In [None]:
def infer_column_roles(df: pd.DataFrame, time_hint=None, target_hint=None, id_hints=None):
    """粗略推断列角色：time / target / id / numeric / categorical."""
    cols = list(df.columns)
    time_col = time_hint if time_hint in cols else None
    target_col = target_hint if target_hint in cols else None
    id_cols = [c for c in (id_hints or []) if c in cols]

    # 初步：object + category 视为 categorical，数值为 numeric
    numeric_cols = [c for c in cols if pd.api.types.is_numeric_dtype(df[c])]
    categorical_cols = [c for c in cols if (c not in numeric_cols)]
    # 排除已知角色
    for c in [time_col, target_col, *id_cols]:
        if c in numeric_cols: numeric_cols.remove(c)
        if c in categorical_cols: categorical_cols.remove(c)

    return {
        "time_col": time_col,
        "target_col": target_col,
        "id_cols": id_cols,
        "numeric_cols": numeric_cols,
        "categorical_cols": categorical_cols,
    }

def quality_report(df: pd.DataFrame, topk=5) -> pd.DataFrame:
    rows = []
    n = len(df)
    for c in df.columns:
        s = df[c]
        missing = s.isna().mean()
        nunique = s.nunique(dropna=True)
        is_const = (nunique <= 1)
        dtype = str(s.dtype)

        row = {
            "col": c,
            "dtype": dtype,
            "missing_rate": float(missing),
            "nunique": int(nunique),
            "is_constant": bool(is_const),
        }

        if pd.api.types.is_numeric_dtype(s):
            q = s.quantile([0.0, 0.01, 0.05, 0.5, 0.95, 0.99, 1.0], interpolation="linear")
            row.update({f"q{int(k*100):02d}": float(v) if pd.notna(v) else np.nan for k, v in q.items()})
        else:
            vc = s.astype("object").value_counts(dropna=True).head(topk)
            row.update({f"top{i+1}": str(k) for i, k in enumerate(vc.index)})
            row.update({f"top{i+1}_cnt": int(v) for i, v in enumerate(vc.values)})

        rows.append(row)

    rep = pd.DataFrame(rows).sort_values(["missing_rate", "nunique"], ascending=[False, True])
    return rep

def monotonicity_check(df: pd.DataFrame, time_col: str):
    s = df[time_col]
    if not pd.api.types.is_datetime64_any_dtype(s):
        s2 = pd.to_datetime(s, errors="coerce")
    else:
        s2 = s
    valid = s2.dropna()
    if len(valid) <= 2:
        return {"is_monotonic_increasing": None, "duplicate_rate": None}
    return {
        "is_monotonic_increasing": bool(valid.is_monotonic_increasing),
        "duplicate_rate": float(valid.duplicated().mean()),
        "nat_rate": float(s2.isna().mean()),
    }


In [None]:
# === 需要在现场填的参数 ===
TIME_COL = None      # e.g. "timestamp"
TARGET_COL = None    # e.g. "y"
ID_COLS = []         # e.g. ["asset_id", "instrument", "user_id"]

roles = infer_column_roles(df_raw, time_hint=TIME_COL, target_hint=TARGET_COL, id_hints=ID_COLS)
roles


In [None]:
rep = quality_report(df_raw)
rep.head(30)


In [None]:
rep_path = OUTPUT_DIR / "quality_report.csv"
rep.to_csv(rep_path, index=False)
log_step("quality_report", {"path": str(rep_path), "n_rows": int(len(df_raw)), "n_cols": int(df_raw.shape[1])})
rep_path


### 2.1 快速排雷清单
- 主键是否唯一（如 `time + id`）
- 时间列是否能解析、是否乱序、是否重复
- target 是否存在明显泄露（例如特征里出现 target 变体、或者未来信息）
- 是否存在超高缺失/常数列（直接丢弃）


In [None]:
def check_key_uniqueness(df: pd.DataFrame, key_cols):
    if not key_cols:
        return {"key_cols": key_cols, "is_unique": None, "dup_rate": None}
    dup_rate = df.duplicated(subset=key_cols, keep=False).mean()
    return {"key_cols": key_cols, "is_unique": float(dup_rate) == 0.0, "dup_rate": float(dup_rate)}

# e.g. KEY_COLS = [TIME_COL] + ID_COLS (如果这些应当唯一)
KEY_COLS = [c for c in ([TIME_COL] + ID_COLS) if c]
key_check = check_key_uniqueness(df_raw, KEY_COLS)
log_step("key_uniqueness_check", key_check)
key_check


In [None]:
if TIME_COL:
    mono = monotonicity_check(df_raw, TIME_COL)
    log_step("monotonicity_check", {"time_col": TIME_COL, **mono})
    mono


## 3. 切分（先锁边界再清洗）
时间序列默认按时间切分：train 在前，test 在后。

如果存在实体（如 asset/instrument/user），可选：
- 训练/验证按时间滚动（TimeSeriesSplit）
- 或按 group 切分（GroupKFold）防止同一实体泄露


In [None]:
def time_train_test_split(df: pd.DataFrame, time_col: str, test_size=0.2):
    d = df.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    d = d.sort_values(time_col)
    n = len(d)
    cut = int(np.floor(n * (1 - test_size)))
    train = d.iloc[:cut].copy()
    test = d.iloc[cut:].copy()
    return train, test

# 用法：
# train_df, test_df = time_train_test_split(df_raw, TIME_COL, test_size=0.2)


## 4. 最小可用清洗（先跑通 baseline）
步骤：
- 解析时间（如果有）
- 去掉常数列 / 极高缺失列
- 处理明显类型错误（数值列里混了字符串）
- 主键重复：聚合或取最新


In [None]:
def drop_constant_and_high_missing(df: pd.DataFrame, missing_thresh=0.95):
    before_cols = df.shape[1]
    miss = df.isna().mean()
    high_miss_cols = miss[miss >= missing_thresh].index.tolist()
    nunique = df.nunique(dropna=True)
    const_cols = nunique[nunique <= 1].index.tolist()

    drop_cols = sorted(set(high_miss_cols) | set(const_cols))
    out = df.drop(columns=drop_cols, errors="ignore")

    log_step("drop_constant_high_missing", {
        "missing_thresh": float(missing_thresh),
        "dropped_cols": drop_cols,
        "n_dropped": int(len(drop_cols)),
        "before_cols": int(before_cols),
        "after_cols": int(out.shape[1]),
    })
    return out

def coerce_numeric_like_columns(df: pd.DataFrame, cols):
    out = df.copy()
    changed = {}
    for c in cols:
        if c not in out.columns:
            continue
        if pd.api.types.is_numeric_dtype(out[c]):
            continue
        # 尝试把 object -> numeric
        s0 = out[c]
        s1 = pd.to_numeric(s0.astype("string").str.replace(",", "", regex=False)
                                 .str.replace("%", "", regex=False)
                                 .str.strip(), errors="coerce")
        # 记录变化：新产生的 NaN 数、有效数
        changed[c] = {
            "before_nonnull": int(s0.notna().sum()),
            "after_nonnull": int(s1.notna().sum()),
            "new_nan": int((s1.isna() & s0.notna()).sum()),
        }
        out[c] = s1
    log_step("coerce_numeric_like", {"cols": cols, "changed": changed})
    return out

def deduplicate_by_key(df: pd.DataFrame, key_cols, keep="last", sort_col=None):
    out = df.copy()
    if sort_col:
        out = out.sort_values(sort_col)
    before = len(out)
    out = out.drop_duplicates(subset=key_cols, keep=keep)
    after = len(out)
    log_step("deduplicate", {"key_cols": key_cols, "keep": keep, "sort_col": sort_col, "before": int(before), "after": int(after)})
    return out


In [None]:
# === 示例流程（按现场数据修改） ===
df = df_raw.copy()

# 时间解析 + 排序（如果有）
if TIME_COL and TIME_COL in df.columns:
    df[TIME_COL] = pd.to_datetime(df[TIME_COL], errors="coerce")
    df = df.sort_values(TIME_COL)

# 删除常数列 / 高缺失列
df = drop_constant_and_high_missing(df, missing_thresh=0.98)

# 如果某些列应该是数值但读成了 object，在这里列出
NUMERIC_LIKE = []  # e.g. ["price", "volume"]
df = coerce_numeric_like_columns(df, NUMERIC_LIKE)

# 主键去重（如果需要）
if KEY_COLS:
    sort_col = TIME_COL if TIME_COL else None
    df = deduplicate_by_key(df, KEY_COLS, keep="last", sort_col=sort_col)

df.shape


## 5. 异常值处理（不盲删）
两类异常：
- 物理/业务不可能：负价格、负成交量、极端时间戳 → 置为缺失 or 修正
- 极端但可能真实：厚尾分布 → clip/winsorize/log1p/robust scaler


In [None]:
def clip_by_quantile(df: pd.DataFrame, cols, q_low=0.01, q_high=0.99):
    out = df.copy()
    bounds = {}
    for c in cols:
        if c not in out.columns: 
            continue
        if not pd.api.types.is_numeric_dtype(out[c]):
            continue
        lo = out[c].quantile(q_low)
        hi = out[c].quantile(q_high)
        bounds[c] = {"lo": float(lo) if pd.notna(lo) else np.nan,
                     "hi": float(hi) if pd.notna(hi) else np.nan}
        out[c] = out[c].clip(lower=lo, upper=hi)
    log_step("clip_by_quantile", {"cols": cols, "q_low": float(q_low), "q_high": float(q_high), "bounds": bounds})
    return out

def set_impossible_to_nan(df: pd.DataFrame, rules: dict):
    """rules: {col: ('>=', 0) / ('between', (a,b)) / ... }"""
    out = df.copy()
    changed = {}
    for c, rule in rules.items():
        if c not in out.columns: 
            continue
        op = rule[0]
        before_nan = int(out[c].isna().sum())
        if op == ">=":
            thr = rule[1]
            mask = out[c].notna() & (out[c] < thr)
            out.loc[mask, c] = np.nan
        elif op == "<=":
            thr = rule[1]
            mask = out[c].notna() & (out[c] > thr)
            out.loc[mask, c] = np.nan
        elif op == "between":
            a, b = rule[1]
            mask = out[c].notna() & ~out[c].between(a, b)
            out.loc[mask, c] = np.nan
        else:
            raise ValueError(f"Unsupported op: {op}")
        after_nan = int(out[c].isna().sum())
        changed[c] = {"nan_before": before_nan, "nan_after": after_nan, "new_nan": after_nan - before_nan}
    log_step("set_impossible_to_nan", {"rules": rules, "changed": changed})
    return out


In [None]:
# 例：明显不可能值置为缺失（按现场业务定义）
IMPOSSIBLE_RULES = {
    # "price": (">=", 0),
    # "volume": (">=", 0),
}
df = set_impossible_to_nan(df, IMPOSSIBLE_RULES)

# 例：对数值列做分位数截断（厚尾稳健）
CLIP_COLS = []  # e.g. ["price", "volume", "ret"]
df = clip_by_quantile(df, CLIP_COLS, q_low=0.005, q_high=0.995)

df.shape


## 6. 类别清洗（大小写/空格/长尾合并）


In [None]:
def clean_text_categories(df: pd.DataFrame, cols):
    out = df.copy()
    changed = {}
    for c in cols:
        if c not in out.columns:
            continue
        s0 = out[c]
        s1 = s0.astype("string").str.strip().str.lower()
        out[c] = s1
        changed[c] = {"nunique_before": int(s0.nunique(dropna=True)),
                      "nunique_after": int(s1.nunique(dropna=True))}
    log_step("clean_text_categories", {"cols": cols, "changed": changed})
    return out

def group_rare_categories(df: pd.DataFrame, col: str, min_freq=50, other_label="__other__"):
    out = df.copy()
    vc = out[col].value_counts(dropna=False)
    rare = vc[vc < min_freq].index
    out[col] = out[col].where(~out[col].isin(rare), other_label)
    log_step("group_rare_categories", {"col": col, "min_freq": int(min_freq), "n_rare": int(len(rare))})
    return out


In [None]:
CAT_COLS = roles.get("categorical_cols", [])
# 如果存在明显的类别列（非时间/非目标/非 ID），先做清洗
df = clean_text_categories(df, CAT_COLS)

# 对某些高基数类别列做长尾合并（按现场需要挑选）
# df = group_rare_categories(df, "category_col", min_freq=100)

df.shape


## 7. 时序特征工程（只用过去信息）
常见：lag/rolling/ewm、按实体 group 做统计。所有滚动计算用 `shift(1)` 防止把当前时刻信息泄露进特征。


In [None]:
def add_lag_features(df: pd.DataFrame, group_cols, time_col, value_cols, lags=(1, 2, 5)):
    out = df.copy()
    out = out.sort_values([*group_cols, time_col]) if group_cols else out.sort_values(time_col)
    for c in value_cols:
        for k in lags:
            out[f"{c}_lag{k}"] = out.groupby(group_cols)[c].shift(k) if group_cols else out[c].shift(k)
    log_step("add_lag_features", {"group_cols": group_cols, "time_col": time_col, "value_cols": value_cols, "lags": list(lags)})
    return out

def add_rolling_features(df: pd.DataFrame, group_cols, time_col, value_cols, windows=(5, 20), funcs=("mean", "std")):
    out = df.copy()
    out = out.sort_values([*group_cols, time_col]) if group_cols else out.sort_values(time_col)
    for c in value_cols:
        base = out.groupby(group_cols)[c] if group_cols else out[c]
        shifted = base.shift(1)  # 关键：只用过去
        for w in windows:
            roll = shifted.rolling(window=w, min_periods=max(2, w//3))
            if "mean" in funcs:
                out[f"{c}_roll{w}_mean"] = roll.mean()
            if "std" in funcs:
                out[f"{c}_roll{w}_std"] = roll.std()
    log_step("add_rolling_features", {"group_cols": group_cols, "time_col": time_col, "value_cols": value_cols, "windows": list(windows), "funcs": list(funcs)})
    return out


In [None]:
# 现场选择：哪一些数值列值得做 lag/rolling（通常是价格、收益、成交量等）
TS_VALUE_COLS = []  # e.g. ["price", "ret", "volume"]
GROUP_COLS = [c for c in ID_COLS if c]  # e.g. ["asset_id"]

if TIME_COL and TS_VALUE_COLS:
    df = add_lag_features(df, group_cols=GROUP_COLS, time_col=TIME_COL, value_cols=TS_VALUE_COLS, lags=(1,2,5))
    df = add_rolling_features(df, group_cols=GROUP_COLS, time_col=TIME_COL, value_cols=TS_VALUE_COLS, windows=(5,20), funcs=("mean","std"))

df.shape


## 8. 泄露排查（快速版）
常见泄露来源：
- 特征包含未来值（next/future/lead）
- 特征直接等于 target 或 target 的线性变换
- 目标编码/标准化/分位数等在全样本 fit
- 用到了同一时刻的聚合但没有 shift


In [None]:
import re

def suspicious_feature_names(cols):
    patt = re.compile(r"(future|next|lead|target|label|y\b)", re.IGNORECASE)
    return [c for c in cols if patt.search(c)]

def target_correlation_scan(df: pd.DataFrame, target_col: str, topk=20):
    if target_col not in df.columns:
        return None
    y = df[target_col]
    num_cols = [c for c in df.columns if c != target_col and pd.api.types.is_numeric_dtype(df[c])]
    corr = {}
    for c in num_cols:
        s = df[c]
        m = y.notna() & s.notna()
        if m.sum() < 50:
            continue
        corr[c] = float(np.corrcoef(y[m], s[m])[0,1])
    out = pd.Series(corr).dropna().sort_values(key=lambda x: x.abs(), ascending=False).head(topk)
    return out

sus = suspicious_feature_names(df.columns)
log_step("suspicious_feature_names", {"matches": sus})
sus[:30]


In [None]:
if TARGET_COL:
    corr_top = target_correlation_scan(df, TARGET_COL, topk=30)
    corr_top


## 9. 建模用 Pipeline（可复现、避免泄露）
数值：median 填补 + RobustScaler
类别：most_frequent 填补 + OneHotEncoder(handle_unknown='ignore')

模型：
- 回归：Ridge / RandomForestRegressor
- 分类：LogisticRegression / RandomForestClassifier


In [None]:
def build_pipeline(numeric_cols, categorical_cols, task="regression", model_name="ridge"):
    numeric_tf = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median", add_indicator=True)),
        ("scaler", RobustScaler(with_centering=True)),
    ])
    categorical_tf = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=True)),
    ])

    pre = ColumnTransformer(
        transformers=[
            ("num", numeric_tf, numeric_cols),
            ("cat", categorical_tf, categorical_cols),
        ],
        remainder="drop",
        sparse_threshold=0.3,
    )

    if task == "regression":
        if model_name == "ridge":
            model = Ridge(alpha=1.0, random_state=0)
        elif model_name == "rf":
            model = RandomForestRegressor(n_estimators=400, random_state=0, n_jobs=-1)
        else:
            raise ValueError("unknown model")
    else:
        if model_name == "logreg":
            model = LogisticRegression(max_iter=2000)
        elif model_name == "rf":
            model = RandomForestClassifier(n_estimators=400, random_state=0, n_jobs=-1)
        else:
            raise ValueError("unknown model")

    pipe = Pipeline(steps=[("pre", pre), ("model", model)])
    return pipe


In [None]:
# === 训练/测试构造 ===
df_model = df.copy()

# 丢掉时间列（一般不直接喂入原始 timestamp）
drop_cols = [c for c in [TIME_COL] if c]
df_model = df_model.drop(columns=drop_cols, errors="ignore")

assert TARGET_COL is None or TARGET_COL in df_raw.columns or TARGET_COL in df_model.columns, "TARGET_COL 未设置或不存在"

df_model.shape


In [None]:
# === 简单 time split（如果 TIME_COL 有设置，则用 df_raw 的排序切分更稳） ===
if TIME_COL and TARGET_COL:
    train_df, test_df = time_train_test_split(df, TIME_COL, test_size=0.2)
    train_df = train_df.drop(columns=[TIME_COL], errors="ignore")
    test_df  = test_df.drop(columns=[TIME_COL], errors="ignore")
else:
    # 兜底：随机切分在时序任务里可能不合适，只用于跑通
    from sklearn.model_selection import train_test_split
    train_df, test_df = train_test_split(df_model, test_size=0.2, random_state=0)

if TARGET_COL:
    X_train = train_df.drop(columns=[TARGET_COL], errors="ignore")
    y_train = train_df[TARGET_COL]
    X_test = test_df.drop(columns=[TARGET_COL], errors="ignore")
    y_test = test_df[TARGET_COL]


In [None]:
if TARGET_COL:
    # 重新识别数值/类别列（基于训练集）
    numeric_cols = [c for c in X_train.columns if pd.api.types.is_numeric_dtype(X_train[c])]
    categorical_cols = [c for c in X_train.columns if c not in numeric_cols]

    pipe = build_pipeline(numeric_cols, categorical_cols, task="regression", model_name="ridge")
    pipe


In [None]:
if TARGET_COL:
    pipe.fit(X_train, y_train)
    pred = pipe.predict(X_test)

    rmse = float(np.sqrt(mean_squared_error(y_test, pred)))
    mae = float(mean_absolute_error(y_test, pred))
    r2 = float(r2_score(y_test, pred))

    metrics = {"rmse": rmse, "mae": mae, "r2": r2}
    log_step("baseline_metrics", metrics)
    metrics


## 10. A/B 对比记录
同一套切分下，比较：
- baseline（最小清洗）
- + clip / + rare merge / + rolling / + 更强模型

只要保证每次改动都写进 log，就能快速解释每个步骤带来的收益/代价。


In [None]:
ab_results = []

def record_result(name: str, metrics: dict, notes: str = ""):
    rec = {"name": name, **metrics, "notes": notes}
    ab_results.append(rec)
    log_step("ab_record", rec)

# 用法：
# record_result("baseline_ridge", metrics, notes="median+indicator, robust scaler, ohe")


## 11. 导出
- 日志：`cleaning_log.json`
- 清洗后数据：`df_clean.parquet`（如允许）
- A/B 结果：`ab_results.csv`


In [None]:
# 保存日志
log_path = save_log()
log_path


In [None]:
# 保存 A/B
if len(ab_results) > 0:
    ab_path = OUTPUT_DIR / "ab_results.csv"
    pd.DataFrame(ab_results).to_csv(ab_path, index=False)
    ab_path


In [None]:
# 保存清洗后的数据（可按需开启）
# clean_path = OUTPUT_DIR / "df_clean.parquet"
# df.to_parquet(clean_path, index=False)
# clean_path


## 12. 面试口述模板（写在这里方便照着讲）
- 先做列级质量表，确认缺失、常数列、类型错、主键重复、时间乱序。
- 再锁定切分方式（时间切分 / group 切分），避免任何泄露。
- 先做最小可用清洗跑通 baseline。
- 针对最大噪声源做专项处理：不可能值→缺失，厚尾→clip，类别脏值→规范化/长尾合并，时序特征→shift(1)+rolling。
- 每一步都有日志与 A/B，对比收益与风险。
