In [None]:
import pandas as pd
import glob
import os

In [None]:
# 读取所有抖音场景 CSV 文件
csv_files = glob.glob("../data/psi_douyin_*.csv")
print(f"找到 {len(csv_files)} 个 CSV 文件:")
for f in csv_files:
    print(f"  - {os.path.basename(f)}")

# 合并所有文件
dfs = []
for f in csv_files:
    df_temp = pd.read_csv(f)
    df_temp['source_file'] = os.path.basename(f)
    dfs.append(df_temp)

df = pd.concat(dfs, ignore_index=True)
print(f"\n总共 {len(df)} 行数据")

In [None]:
# 丢弃每个文件第一行（初始 delta 不准确）
first_rows = df.groupby('source_file').head(1).index
df = df.drop(first_rows).reset_index(drop=True)
print(f"去掉首行后: {len(df)} 行")
print(f"columns: {df.columns.tolist()}")

In [None]:
# 剔除 idle 脏数据和物理极限异常点
print(f"清理前: {len(df)} 行")
df = df[df['phase'] != 'idle']
df = df[df['some_delta'] < 600000]  # 剔除单次超过 60万 微秒的离谱波动
print(f"清理异常值后: {len(df)} 行")

# 彻底删除 phase 和 ts，坚决不让模型看到人工标签
df = df.drop(columns=['phase', 'ts'], errors='ignore')
print(f"抛弃 phase 后，当前输入特征: {df.columns.tolist()}")

In [None]:
# 未来 1 秒 = 下两个样本（采样间隔 500ms）
# 按 source_file 分组计算，避免跨文件
df['future_1s_some'] = df.groupby('source_file')['some_delta'].shift(-1) + \
                        df.groupby('source_file')['some_delta'].shift(-2)
df['future_1s_full'] = df.groupby('source_file')['full_delta'].shift(-1) + \
                        df.groupby('source_file')['full_delta'].shift(-2)

# 删除每个文件尾部 2 行（无法计算 future）
df = df.dropna(subset=['future_1s_some', 'future_1s_full'])

In [None]:
# 统一特征工程（集中维护）
g = df.groupby('source_file')

# ===== 滑动窗口特征（按 session 分组，避免跨 session 污染）=====
for w in [2, 6, 10]:
    df[f"some_sum_{w}"]   = g["some_delta"].rolling(w).sum().reset_index(level=0, drop=True)
    df[f"pgscan_sum_{w}"] = g["pgscan_direct"].rolling(w).sum().reset_index(level=0, drop=True)
    df[f"pgmaj_sum_{w}"]  = g["pgmajfault"].rolling(w).sum().reset_index(level=0, drop=True)
    df[f"some_max_{w}"]   = g["some_delta"].rolling(w).max().reset_index(level=0, drop=True)
    df[f"some_min_{w}"]   = g["some_delta"].rolling(w).min().reset_index(level=0, drop=True)

# ===== 一阶差分（按 session 分组）=====
df["diff_some"]   = g["some_delta"].diff()
df["diff_pgscan"] = g["pgscan_direct"].diff()
df["mem_drop"]    = g["mem_available"].diff()

# ===== 波动性特征（按 session 分组）=====
df["some_std_6"]   = g["some_delta"].rolling(6).std().reset_index(level=0, drop=True)
df["pgscan_std_6"] = g["pgscan_direct"].rolling(6).std().reset_index(level=0, drop=True)

# ===== Lag 特征（统一覆盖 full/some 训练）=====
for lag in [1, 2, 3, 4, 6, 8]:
    df[f"some_lag_{lag}"]   = g["some_delta"].shift(lag)
    df[f"full_lag_{lag}"]   = g["full_delta"].shift(lag)
    df[f"pgscan_lag_{lag}"] = g["pgscan_direct"].shift(lag)

# ===== EWM 指数加权均值 =====
for span in [3, 6, 10]:
    df[f"some_ewm_{span}"] = g["some_delta"].transform(lambda x: x.ewm(span=span).mean())
    df[f"mem_ewm_{span}"]  = g["mem_available"].transform(lambda x: x.ewm(span=span).mean())
    df[f"pgscan_ewm_{span}"] = g["pgscan_direct"].transform(lambda x: x.ewm(span=span).mean())

# ===== 比率/交互特征 =====
df["pgscan_per_mem"]     = df["pgscan_direct"] / (df["mem_available"] + 1)
df["alloc_per_mem"]      = df["allocstall"] / (df["mem_available"] + 1)
df["swap_ratio"]         = (df["pswpin"] + df["pswpout"]) / (df["mem_available"] + 1)
df["pgscan_steal_ratio"] = df["pgsteal_direct"] / (df["pgscan_direct"] + 1)
df["swap_net"]           = df["pswpin"] - df["pswpout"]
df["mem_avail_pct_change"] = g["mem_available"].pct_change()

# 删除含有 NaN 的行（每个 session 开头几行会有 NaN）
df = df.dropna().reset_index(drop=True)
print(f"特征工程后: {len(df)} 行")
print(f"特征数量: {len(df.columns)} 列")

In [None]:
# 检查标签分布
print("future_1s_full > 0 ratio:",
      (df["future_1s_full"] > 0).mean())

print("future_1s_some > 0 ratio:",
      (df["future_1s_some"] > 0).mean())

print(f"\n最终特征列 ({len(df.columns)} 列):")
print(df.columns.tolist())

In [None]:
# 构建二分类预测目标
# 1. 对于 SOME，用相对阈值（比如系统前 10% 的高压时刻）
# THRESHOLD_SOME = df['future_1s_some'].quantile(0.90)
# print(f"未来 1s [SOME] 压力的 90th 分位数阈值为: {THRESHOLD_SOME:.1f}")
# df['is_spike_some'] = (df['future_1s_some'] > THRESHOLD_SOME).astype(int)

# # 2. 对于 FULL，用操作系统的绝对物理阈值
# # 1秒内 FULL 阻塞超过 100ms (100,000微秒) 视为严重卡顿
# THRESHOLD_FULL = 100000 
# print(f"未来 1s [FULL] 压力的物理绝对阈值为: {THRESHOLD_FULL}")
# df['is_spike_full'] = (df['future_1s_full'] > THRESHOLD_FULL).astype(int)

# # 3. 检查正样本占比
# print(f"\n[SOME] 触发卡顿 (is_spike_some=1) 的样本占比: {df['is_spike_some'].mean():.4f}")
# print(f"[FULL] 触发卡顿 (is_spike_full=1) 的样本占比: {df['is_spike_full'].mean():.4f}")

# print(f"\n最终特征列 ({len(df.columns)} 列):")
# print(df.columns.tolist())

In [None]:
# 保存处理后的数据
os.makedirs("../processed_data", exist_ok=True)
df.to_csv("../processed_data/processed_psi_douyin.csv", index=False)
print(f"处理后的数据已保存到 ../processed_data/processed_psi_douyin.csv")
print(f"共 {len(df)} 行, {len(df.columns)} 列")