# BETH Anomaly Detection — End‑to‑End Notebook
*Generated on:* 2025-10-31 08:02:30

本 Notebook 覆盖完整流程：数据读取 → 预处理 → 训练 → 验证 → 测试 → 生成 submission CSV。
默认模型：`IsolationForest`（无监督）。你可以在配置区修改路径与参数。

## 0. 环境检查

In [1]:

import sys, os, platform, numpy as np, pandas as pd
import sklearn
print('Python:', sys.version.split()[0])
print('OS:', platform.platform())
print('NumPy:', np.__version__)
print('pandas:', pd.__version__)
print('scikit-learn:', sklearn.__version__)


Python: 3.10.16
OS: Windows-10-10.0.19041-SP0
NumPy: 2.1.2
pandas: 2.2.3
scikit-learn: 1.7.2


## 1. 配置区（路径与超参数）

In [2]:

# === 路径设置（按需修改）===
DATA_DIR = "/mnt/data"  # 你的数据目录
TRAIN_CSV = r"D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\processes_train.csv"
VALID_CSV = r"D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\processes_valid.csv"  # 若不存在，将从训练集切分
TEST_CSV  = r"D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\processes_test.csv"

# 提交文件设置
SUBMIT_ID_COL_CANDIDATES = ["Id", "id", "index"]  # 自动优先使用存在的列作为ID
SUBMIT_SCORE_COL = "anomaly_score"               # 提交分数列名
SUBMIT_PATH = r"D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\submission_isoforest.csv"

# 预处理开关
PROCESS_ARGS = False   # 是否展开 args（默认 False，仅保留 argsNum）
DROP_RAW_COLS = True   # 是否删除原始大字段，如 args / stackAddresses

# 归一化设置
NORM_METHOD = "minmax"   # "minmax" | "rank"

# 训练/验证设置
RANDOM_STATE = 42
VALID_RATIO = 0.2        # 若没有独立验证集，将从训练集中切分
USE_LABELS_IF_AVAILABLE = True  # 如果存在 target 列，则用于评估（不参与无监督训练）

# 模型超参
from sklearn.ensemble import IsolationForest
ISO_PARAMS = dict(
    n_estimators=150,
    contamination=0.04,   # 如有标签，可用训练集阳性占比近似设置
    max_features=1.0,
    max_samples=20000,
    random_state=RANDOM_STATE,
    n_jobs=-1
)

# 评估与阈值
THRESHOLD_STRATEGY = "percentile"  # "percentile" | "contamination"
THRESHOLD_PERCENTILE = 97.0        # 当使用 percentile 时生效


## 2. 工具函数与特征工程

In [46]:

import numpy as np
import pandas as pd
from typing import Tuple, List, Dict, Optional
from sklearn.preprocessing import LabelEncoder

# 固定特征列（与项目对齐）
REQ_FEATURES = [
    "timestamp",         # 连续值，后续会做相对化
    "processId",         # 连续/整数
    "parentProcessId",   # 连续/整数
    "userId",            # 连续/整数
    "mountNamespace",    # 连续/整数/类别
    "processName",       # 类别
    "hostName",          # 类别
    "eventName",         # 类别
    "argsNum",           # 连续/计数
    "returnValue",       # 连续/整数
    "stack_depth"        # 由 stackAddresses 衍生
]

CATEGORICAL_COLS = ["processName", "hostName", "eventName"]

def compute_stack_depth(df: pd.DataFrame) -> pd.Series:
    if "stack_depth" in df.columns:
        return df["stack_depth"]
    if "stackAddresses" in df.columns:
        # stackAddresses 可能是字符串 "[]" 或 list
        def _len_safe(x):
            if isinstance(x, list):
                return len(x)
            if isinstance(x, str):
                if x.strip() == "" or x.strip() == "[]":
                    return 0
                # 粗略判断逗号个数
                return x.count(",") + 1 if "[" in x and "]" in x else 0
            return 0
        return df["stackAddresses"].apply(_len_safe).astype("int64")
    # 都没有时，补 0
    return pd.Series(0, index=df.index, dtype="int64")

def make_timestamp_relative(df: pd.DataFrame) -> pd.Series:
    # 按 processId 分组：timestamp - min(timestamp)
    if "timestamp" not in df.columns or "processId" not in df.columns:
        return df.get("timestamp", pd.Series(0.0, index=df.index))
    ts = df["timestamp"].astype("float64")
    pid = df["processId"]
    rel = ts - ts.groupby(pid).transform("min")
    return rel

def basic_clean(df: pd.DataFrame, process_args: bool = False, drop_raw_cols: bool = True) -> pd.DataFrame:
    df = df.copy()
    # 衍生 stack_depth
    df["stack_depth"] = compute_stack_depth(df)
    # timestamp 相对化
    if "timestamp" in df.columns:
        df["timestamp"] = make_timestamp_relative(df)
    # argsNum 尽量保证存在
    if "argsNum" not in df.columns and "args" in df.columns:
        def _len_args(a):
            if isinstance(a, list):
                return len(a)
            if isinstance(a, str):
                return a.count("name") if "name" in a else 0
            return 0
        try:
            df["argsNum"] = df["args"].apply(lambda x: len(x) if isinstance(x, list) else 0)
        except Exception:
            df["argsNum"] = 0

    # 可选：删除大字段
    if drop_raw_cols:
        for col in ["args", "stackAddresses"]:
            if col in df.columns:
                del df[col]
    return df

def ensure_req_features(df: pd.DataFrame) -> pd.DataFrame:
    # 缺失列补 NaN / 0；类型尽量转为数值/字符串
    out = df.copy()
    if "stack_depth" not in out.columns:
        out["stack_depth"] = 0
    for c in REQ_FEATURES:
        if c not in out.columns:
            out[c] = np.nan
    # 类型标准化
    for c in ["processId", "parentProcessId", "userId", "mountNamespace", "argsNum", "returnValue", "stack_depth"]:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")
    for c in ["processName", "hostName", "eventName"]:
        if c in out.columns:
            out[c] = out[c].astype(str)
    return out[REQ_FEATURES]

def fit_label_encoders(df_list: List[pd.DataFrame], cat_cols: List[str]) -> Dict[str, LabelEncoder]:
    encoders = {}
    for c in cat_cols:
        le = LabelEncoder()
        # 合并拟合，避免验证/测试集出现未知值
        vals = pd.concat([d[c].astype(str) for d in df_list if c in d.columns], axis=0)
        le.fit(vals.fillna(""))
        encoders[c] = le
    return encoders

def apply_label_encoders(df: pd.DataFrame, encoders: Dict[str, LabelEncoder]) -> pd.DataFrame:
    df = df.copy()
    for c, le in encoders.items():
        if c in df.columns:
            df[c] = le.transform(df[c].astype(str).fillna(""))
    return df

def z01(scores: np.ndarray, method: str = "minmax") -> np.ndarray:
    s = np.asarray(scores, dtype=float)
    if method == "minmax":
        lo, hi = np.nanmin(s), np.nanmax(s)
        if hi - lo < 1e-12:
            return np.zeros_like(s)
        return (s - lo) / (hi - lo)
    elif method == "rank":
        # 分位数映射到 [0,1]
        ranks = pd.Series(s).rank(method="average") / len(s)
        return ranks.to_numpy()
    else:
        raise ValueError("Unknown norm method")

def choose_id_column(df: pd.DataFrame, candidates: List[str]) -> str:
    for c in candidates:
        if c in df.columns:
            return c
    # 兜底：保存一个 index 列
    return None


## 3. 读取数据

In [None]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split

def load_data(train_csv, valid_csv, test_csv, valid_ratio=0.2, random_state=42):
    train_df = pd.read_csv(train_csv) if os.path.exists(train_csv) else None
    valid_df = pd.read_csv(valid_csv) if os.path.exists(valid_csv) else None
    test_df  = pd.read_csv(test_csv)  if os.path.exists(test_csv)  else None

    if train_df is None:
        raise FileNotFoundError(f"训练集不存在：{train_csv}")

    if valid_df is None:
        # 从训练集切分
        if 'target' in train_df.columns:
            train_part, valid_part = train_test_split(
                train_df, test_size=valid_ratio, stratify=train_df['target'], random_state=random_state
            )
        else:
            train_part, valid_part = train_test_split(
                train_df, test_size=valid_ratio, random_state=random_state
            )
        train_df, valid_df = train_part.reset_index(drop=True), valid_part.reset_index(drop=True)
        print(f"未发现独立验证集，已从训练集按 {valid_ratio:.0%} 切分。")
    else:
        print("使用独立验证集：", valid_csv)

    if test_df is None:
        print("警告：未发现测试集，将仅进行训练与验证。")
    return train_df.reset_index(drop=True), valid_df.reset_index(drop=True), (None if test_df is None else test_df.reset_index(drop=True))

train_df_raw, valid_df_raw, test_df_raw = load_data(TRAIN_CSV, VALID_CSV, TEST_CSV, VALID_RATIO, RANDOM_STATE)
train_df_raw.head(3)


使用独立验证集： D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\processes_valid.csv


Unnamed: 0,index,target,timestamp,processId,threadId,parentProcessId,userId,mountNamespace,processName,hostName,eventId,eventName,stackAddresses,argsNum,returnValue,args
0,0,0,124.439221,381,381,1,101,4026532232,systemd-resolve,0,41,socket,"[139913106282763, 139913103116537, 94901962555...",3,15,"[{'name': 'domain', 'type': 'int', 'value': 'A..."
1,2,0,124.439958,1,1,0,0,4026531840,systemd,0,1005,security_file_open,"[140074839310116, 8103505641674583864]",4,0,"[{'name': 'pathname', 'type': 'const char*', '..."
2,4,0,124.440037,1,1,0,0,4026531840,systemd,0,5,fstat,[140074839307913],2,0,"[{'name': 'fd', 'type': 'int', 'value': 12}, {..."


## 4. 预处理（对齐特征）

In [48]:

# 基础清洗
train_df = basic_clean(train_df_raw, process_args=PROCESS_ARGS, drop_raw_cols=DROP_RAW_COLS)
valid_df = basic_clean(valid_df_raw, process_args=PROCESS_ARGS, drop_raw_cols=DROP_RAW_COLS)
test_df  = None if test_df_raw is None else basic_clean(test_df_raw, process_args=PROCESS_ARGS, drop_raw_cols=DROP_RAW_COLS)

# 对齐必需特征
train_df = ensure_req_features(train_df)
valid_df = ensure_req_features(valid_df)
if test_df is not None:
    test_df = ensure_req_features(test_df)

# 类别编码（使用 训练+验证+测试 的并集拟合）
enc_fit_list = [train_df, valid_df] + ([test_df] if test_df is not None else [])
encoders = fit_label_encoders(enc_fit_list, CATEGORICAL_COLS)
train_enc = apply_label_encoders(train_df, encoders)
valid_enc = apply_label_encoders(valid_df, encoders)
test_enc  = None if test_df is None else apply_label_encoders(test_df, encoders)

print('Train shape:', train_enc.shape)
print('Valid shape:', valid_enc.shape)
if test_enc is not None:
    print('Test shape:', test_enc.shape)

train_enc.head(3)


Train shape: (638720, 11)
Valid shape: (102160, 11)
Test shape: (259293, 11)


Unnamed: 0,timestamp,processId,parentProcessId,userId,mountNamespace,processName,hostName,eventName,argsNum,returnValue,stack_depth
0,0.0,381,1,101,4026532232,91,0,40,3,15,3
1,0.0,1,0,0,4026531840,86,0,32,4,0,2
2,7.9e-05,1,0,0,4026531840,86,0,17,2,0,1


## 5. 训练 IsolationForest

In [49]:

from sklearn.ensemble import IsolationForest

iso = IsolationForest(**ISO_PARAMS)
iso.fit(train_enc.values)  # 无监督训练，不用 y

# 保存模型（可选）
import pickle, os
# MODEL_PATH = os.path.join(DATA_DIR, "isoforest_model.pkl")
# with open(MODEL_PATH, "wb") as f:
#     pickle.dump(dict(model=iso, encoders=encoders, req_features=REQ_FEATURES), f)
# print("模型已保存：", MODEL_PATH)


## 6. 验证集打分与评估

In [50]:

import numpy as np
import pandas as pd

# 原始分数：scikit-learn 中 score_samples 越大越正常 => 我们将其取反作为异常分数
raw_scores_valid = -iso.score_samples(valid_enc.values)
# 归一化到 [0,1]
z_valid = z01(raw_scores_valid, method=NORM_METHOD)

metrics = {}
if USE_LABELS_IF_AVAILABLE and ('target' in valid_df_raw.columns):
    from sklearn.metrics import average_precision_score, precision_recall_fscore_support

    y_true = valid_df_raw['target'].to_numpy().astype(int)
    ap = average_precision_score(y_true, z_valid)

    # 阈值策略
    if THRESHOLD_STRATEGY == "percentile":
        thr = np.percentile(z_valid, THRESHOLD_PERCENTILE)
    elif THRESHOLD_STRATEGY == "contamination":
        # 使用训练参数 contamination 作为阈值占比
        frac = ISO_PARAMS.get("contamination", 0.05)
        thr = np.percentile(z_valid, 100 * (1 - frac))
    else:
        thr = np.percentile(z_valid, 97.0)

    y_pred = (z_valid >= thr).astype(int)
    P, R, F1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    metrics = dict(AP=ap, Precision=P, Recall=R, F1=F1, thr=thr)
    print("Validation metrics:", metrics)
else:
    print("验证集无标签（target），仅输出分数统计。")
    print(pd.Series(z_valid).describe())

# 输出前几行查看
pd.DataFrame({"score": z_valid}).head(5)


Validation metrics: {'AP': 0.9065966655906399, 'Precision': 0.928897586431833, 'Recall': 0.7311938382541721, 'F1': 0.8182732366039362, 'thr': np.float64(0.6859453803171673)}


Unnamed: 0,score
0,0.644011
1,0.352499
2,0.550478
3,0.567437
4,0.618471


## 7. 测试集打分与生成 Submission CSV

In [51]:
import numpy as np
import pandas as pd
from sklearn.metrics import (
    average_precision_score,
    precision_recall_fscore_support,
    accuracy_score,
    confusion_matrix,
)

def validate_isoforest(
    valid_df_raw: pd.DataFrame,
    valid_enc: pd.DataFrame,
    model,
    norm_method: str = "minmax",          # "minmax" 或 "rank"
    threshold_strategy: str = "percentile",# "percentile" 或 "contamination"
    threshold_percentile: float = 97.0,
    contamination: float = 0.04,
    positive_label: int = 1,
    print_report: bool = True,
):
    """
    使用验证集评估无监督异常检测（IsolationForest）。
    要求 valid_df_raw 中包含 'target' 列（0/1）。
    参数:
      - valid_df_raw: 原始验证集（含 target）
      - valid_enc:    已完成特征对齐与编码的验证集特征矩阵（与训练同构）
      - model:        已 fit 的 IsolationForest
      - norm_method:  分数归一化方法（"minmax" 或 "rank"）
      - threshold_strategy:
            * "percentile": 用验证集分数的百分位设阈值（默认 97%）
            * "contamination": 用污染率设置阈值（百分位 = 100 * (1 - contamination)）
      - contamination: 当 threshold_strategy="contamination" 时使用
      - positive_label: 正类标签（异常=1）
    返回:
      - metrics: dict，包含 AP, Precision, Recall, F1, Accuracy, Threshold, 混淆矩阵等
      - z: np.ndarray，归一化后的异常分数（越大越异常）
    """
    if "target" not in valid_df_raw.columns:
        raise ValueError("valid_df_raw 必须包含 'target' 列用于评估。")

    # 1) 原始异常分数：sklearn 的 score_samples 越大越正常，所以取反
    raw = -model.score_samples(valid_enc.values)

    # 2) 归一化到 [0,1]
    if norm_method == "minmax":
        lo, hi = np.nanmin(raw), np.nanmax(raw)
        z = np.zeros_like(raw) if hi - lo < 1e-12 else (raw - lo) / (hi - lo)
    elif norm_method == "rank":
        z = pd.Series(raw).rank(method="average").to_numpy() / len(raw)
    else:
        raise ValueError("norm_method 仅支持 'minmax' 或 'rank'。")

    # 3) AP（不依赖阈值）
    y_true = valid_df_raw["target"].to_numpy().astype(int)
    ap = average_precision_score(y_true, z)

    # 4) 选阈值并二分类
    if threshold_strategy == "percentile":
        thr = np.percentile(z, threshold_percentile)
    elif threshold_strategy == "contamination":
        thr = np.percentile(z, 100 * (1 - contamination))
    else:
        raise ValueError("threshold_strategy 仅支持 'percentile' 或 'contamination'。")

    y_pred = (z >= thr).astype(int)

    # 5) 指标与混淆矩阵
    P, R, F1, _ = precision_recall_fscore_support(
        y_true, y_pred, average="binary", pos_label=positive_label, zero_division=0
    )
    acc = accuracy_score(y_true, y_pred)
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred, labels=[0, 1]).ravel()

    metrics = {
        "AP": float(ap),
        "Precision": float(P),
        "Recall": float(R),
        "F1": float(F1),
        "Accuracy": float(acc),
        "Threshold": float(thr),
        "TP": int(tp),
        "FP": int(fp),
        "TN": int(tn),
        "FN": int(fn),
        "Strategy": threshold_strategy,
        "Percentile(%)": threshold_percentile if threshold_strategy == "percentile" else None,
        "Contamination": contamination if threshold_strategy == "contamination" else None,
        "Norm": norm_method,
    }

    if print_report:
        print(
            f"AP={metrics['AP']:.4f} | P={metrics['Precision']:.4f} "
            f"R={metrics['Recall']:.4f} F1={metrics['F1']:.4f} Acc={metrics['Accuracy']:.4f} "
            f"| thr={metrics['Threshold']:.6f} ({metrics['Strategy']})"
        )
        print(f"Confusion Matrix: TP={tp}, FP={fp}, TN={tn}, FN={fn}")

    return metrics, z


In [52]:

import numpy as np
import pandas as pd

if test_enc is None:
    print("未提供测试集，跳过提交文件生成。")
else:
    raw_scores_test = -iso.score_samples(test_enc.values)
    z_test = z01(raw_scores_test, method=NORM_METHOD)

    # 选择 ID 列
    id_col = choose_id_column(test_df_raw, SUBMIT_ID_COL_CANDIDATES)
    if id_col is None:
        sub = pd.DataFrame({
            "index": np.arange(len(test_df_raw)),
            SUBMIT_SCORE_COL: z_test
        })
    else:
        sub = pd.DataFrame({
            id_col: test_df_raw[id_col].values,
            SUBMIT_SCORE_COL: z_test
        })
    # sub.to_csv(SUBMIT_PATH, index=False)
    print("已保存提交文件：", SUBMIT_PATH)
    sub.head(10)


# 假设你已经有：
# - valid_df_raw  原始验证集（含 target）
# - valid_enc     编码后的验证特征
# - iso           训练好的 IsolationForest 模型

metrics, z_valid = validate_isoforest(
    valid_df_raw=valid_df_raw,
    valid_enc=valid_enc,
    model=iso,
    norm_method="minmax",
    threshold_strategy="percentile",  # 或 "contamination"
    threshold_percentile=97.0,
    contamination=0.04,
    print_report=True
)

metrics


已保存提交文件： D:\Project\MachineLearningAlgorithm\forestMachineLearning\data\submission_isoforest.csv
AP=0.9066 | P=0.9289 R=0.7312 F1=0.8183 Acc=0.9876 | thr=0.685945 (percentile)
Confusion Matrix: TP=2848, FP=218, TN=98047, FN=1047


{'AP': 0.9065966655906399,
 'Precision': 0.928897586431833,
 'Recall': 0.7311938382541721,
 'F1': 0.8182732366039362,
 'Accuracy': 0.9876174628034455,
 'Threshold': 0.6859453803171673,
 'TP': 2848,
 'FP': 218,
 'TN': 98047,
 'FN': 1047,
 'Strategy': 'percentile',
 'Percentile(%)': 97.0,
 'Contamination': None,
 'Norm': 'minmax'}

## 8. （可选）封装推理函数

In [53]:

def infer_scores(df_raw: pd.DataFrame, model, encoders, req_features=REQ_FEATURES, norm_method="minmax"):
    df = basic_clean(df_raw, process_args=PROCESS_ARGS, drop_raw_cols=DROP_RAW_COLS)
    df = ensure_req_features(df)
    df = apply_label_encoders(df, encoders)
    raw_scores = -model.score_samples(df.values)
    return z01(raw_scores, method=norm_method)

# 示例：对验证集复算
_z = infer_scores(valid_df_raw, iso, encoders, req_features=REQ_FEATURES, norm_method=NORM_METHOD)
print("一致性检查：", np.allclose(_z, z01(-iso.score_samples(valid_enc.values), method=NORM_METHOD)))


一致性检查： True
