<a href="https://colab.research.google.com/github/SY-256/anomaly_detection/blob/main/notebook/chapter6_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 多変数のホテリング理論による異常検知

多変数のホテリング理論による異常検知
- A.変数選択
- B.モデルの学習
- C.推論

## A.変数選択

In [None]:
# ヒストグラムによる正常と異常の分離性の確認
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

df = pd.read_csv("https://raw.githubusercontent.com/ghmagazine/python_anomaly_detection_book/refs/heads/main/notebooks/datasets/ch2_dataset_train.csv")
# "label"列を削除（数値型の列のみとする）
df_val = df.drop("label", axis=1)
# 正常データのみ取り出す
df_normal_val = df[df["label"] == "normal"].drop("label", axis=1)
# 異常データのみ取り出す
df_anomaly_val = df[df["label"] == "anomaly"].drop("label", axis=1)
fig, ax = plt.subplots(nrows=1, ncols=5, figsize=(20, 4))

# すべての変数をループで走査
for i, colname in enumerate(df_normal_val.columns):
    # 正常データのヒストグラム描画
    sns.histplot(data=df_normal_val[colname], bins="sturges",
                 color="#bbbbbb", ax=ax[i],
                 stat="density", label="normal")
    # 異常データのヒストグラム描画
    sns.histplot(data=df_anomaly_val[colname], bins="sturges",
                 color="#111111", ax=ax[i],
                 stat="density", label="anomaly")

    # 凡例を追加
    ax[i].legend()
    # 変数名を図のタイトルとして追加
    ax[i].set_title(colname, fontsize=14)

plt.tight_layout()
plt.show()

変数`temp1`、`temp2`、`temp3`において正常と異常の分布がある程度分離しているもの対し、`temp4`と`temp5`は両者の分布に目に見える差がない。よって、本データの異常の検知に寄与する可能性が低く、モデルの入力する必要性は低いと判断できる。

In [None]:
# 散布図（pairplot）による相関の確認
sns.pairplot(
    data=df,
    vars=["temp1", "temp2", "temp3"],
    hue="label",
    palette=["#999999", "#111111"]
)
plt.show()

ホテリング理論は変数間の相関に比較的強いアルゴリズムであるので、弱相関程度であれば変数を削除しなくても良い。（今回は便宜上モデルの判定を可視化し易くするために、入力変数は`temp1`と`temp2`の2変数に絞る。

## B.モデルの学習

学習フェーズでは、最尤推定による多次元正規分布パラメータの推定、および異常度の閾値算出を実施

In [None]:
# 多変数のホテリング理論による異常検知の実装例（学習）
# N >> Mが成り立つ場合の学習の実装（カイ二乗分布）
import pandas as pd
import numpy as np
from scipy import stats

##### 学習データの読み込みと前処理 #####
df = pd.read_csv("https://raw.githubusercontent.com/ghmagazine/python_anomaly_detection_book/refs/heads/main/notebooks/datasets/ch2_dataset_train.csv")
# "temp2"と"temp1"に欠損があるデータを削除
df_dropna = df.dropna(subset=["temp1", "temp2"], axis=0)
# 正常データのみを抽出
df_normal = df_dropna[df_dropna["label"] == "normal"]
# "temp2", "temp1"列のみ取り出してNumpyのndarray化し、学習データとする
X_train = df_normal[["temp2", "temp1"]].to_numpy()

###### 学習ステップ1. 正常のモデルを作成 #####
mu = np.mean(X_train, axis=0) # 標本平均ベクトルμを算出
# 標本分散共分散行列Σを算出（転置と自由度ddofに注意）
Sigma = np.cov(X_train.T, ddof=0)

##### 学習ステップ2. 異常を表す指標（異常度）を定義する #####
# 式のみの定義

#### 学習ステップ3. 異常度に閾値を設ける #####
TARGET_FP_RATE = 0.0027 # ターゲットとする誤報率（正規分布の3σ相当=0.0027）
n_features = X_train.shape[1] # 変数の数M
# 自由度（M）のカイ二乗分布の累積分布関数の逆関数から閾値を算出
a_th = stats.chi2.ppf(1-TARGET_FP_RATE, df=n_features)

##### 学習で求めたパラメータをすべて表示 #####
print(f"mu={mu}")
print(f"Sigma={Sigma}")
print(f"a_th={a_th}")

In [None]:
# N >> Mが成り立たない場合の学習の実装（F分布 自由度(M, N - M)）
##### 学習ステップ1. 正常のモデルを作成する #####
mu = np.mean(X_train, axis=0) # 標本平均ベクトルμを算出
# 標本分散共分散行列Σを算出
Sigma = np.cov(X_train.T, ddof=0)

##### 学習ステップ2. 異常を表す指標（異常度）を算出 #####
# 式を定義するのみ

##### 学習ステップ3. 異常度に閾値を設ける #####
TARGET_FP_RATE = 0.0027 # ターゲットとする誤報率（正規分布の3σ相当=0.0027）
sample_size = len(X_train) # サンプルサイズN
n_features = X_train.shape[1] # 変数の数M
# 自由度（M, N - M）のF分布の累積分布関数の逆関数から閾値を算出
a_th = (sample_size+1)*n_features/(sample_size-n_features) \
* stats.f.ppf(1-TARGET_FP_RATE, dfn=n_features, dfd=sample_size-n_features)

##### 学習で求めたパラメータを表示 #####
print(f"mu={mu}")
print(f"Sigma={Sigma}")
print(f"a_th={a_th}")

カイ二乗分布を使用した場合と比較して、異常度の閾値がやや大きくなっており、誤報率を減らす方向（見逃し寄り）に閾値が設定されてることがわかる（右側より）

## 推論

学習フェーズで求めたパラメータと閾値を用いて、推論データに対する異常度の算出と異常判定を行う。なお、異常度の分布にカイ二乗分布を用いる（$N ≫ M$が成り立つ）場合もF分布を用いる（$N \gg M$が成り立たない）場合も、推論の実装方法に変化はない。

In [None]:
# 多変数のホテリング理論による異常検知の実装例（推論）

##### 学習したパラメータをここに記載 #####
MU = [350.23771153, 400.21552883] # 標本平均ベクトルμ
SIGMA = [[26.04088919, 14.52874949],
         [14.52874949, 24.58401158]] # 標本分散共分散行列Σ
A_TH=11.82900701194368 # 異常度のしきい値

##### 推論データの読み込みと前処理 #####
df_inference = pd.read_csv("https://raw.githubusercontent.com/ghmagazine/python_anomaly_detection_book/refs/heads/main/notebooks/datasets/ch2_dataset_inference.csv")
# "temp2"、"temp1"変数に欠損があるデータを削除
df_inference_dropna = df_inference.dropna(subset=["temp2", "temp1"])
# "temp2"、"temp1"列のみ取り出してNumpyのndarray化し、推論データとする
X_inference = df_inference_dropna[["temp2", "temp1"]].to_numpy()
# 標本分散共分散行列の逆行列
Sigma_inv = np.linalg.inv(np.array(SIGMA))

##### 推論を実行 #####
# 異常度を算出
X_dev = (X_inference-np.array(MU)).T # 推論データと平均ベクトルとの差 (x-μ)
Dev_Sigma = X_dev.T @ Sigma_inv # (x-μ)^T * Σ^-1
anomaly_scores = np.sum(np.multiply(Dev_Sigma, X_dev.T), axis=1)
# 閾値により異常の有無を判定
pred = np.where(anomaly_scores > A_TH, "anomaly", "normal")
# 推論結果を表示
print(pred)

推論結果の決定境界（異常と正常の判定の境界）を可視化

In [None]:
# 推論結果の決定境界を可視化
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns

# 異常度と異常判定をDataFrameに列として追加
df_inference_dropna["anomaly_score"] = anomaly_scores
df_inference_dropna["prediction"] = pred

fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))

##### 正常と異常の範囲の色分け ######
# (x1,x2)格子点を作成（'temp2'をx1としている）
x1_grid = np.linspace(df_inference_dropna["temp2"].min()-5,
                      df_inference_dropna["temp2"].max()+5, 200)
x2_grid = np.linspace(df_inference_dropna["temp1"].min()-5,
                      df_inference_dropna["temp1"].max()+5, 200)
X1, X2 = np.meshgrid(x1_grid, x2_grid)
X_grid = np.c_[X1.ravel(), X2.ravel()]
# 異常度を算出
X_dev_grid = (X_grid-np.array(MU)).T # 推論データと平均ベクトルとの差（x-μ）
Dev_Sigma_grid = X_dev_grid.T @ Sigma_inv # (x-μ)^T * Σ^-1
anomaly_scores_grid = np.sum(np.multiply(Dev_Sigma_grid, X_dev_grid.T), axis=1)
# 閾値判定
pred_grid = np.where(anomaly_scores_grid > A_TH, 0, 1)
# 正常と異常の境界をプロット
pred_pivot = pred_grid.reshape(X1.shape)
ax.contourf(X1, X2, pred_pivot,
            cmap=cm.gray, alpha=0.5)

##### 各データを散布図としてプロット #####
sns.scatterplot(data=df_inference_dropna, x="temp2", y="temp1",
                hue="label", palette=["#999999", "#111111"], ax=ax)
# 凡例を追加
ax.legend()
plt.show()

In [None]:
# カイ二乗分布のパラメータ推定の実装
# 自由度kの値を学習データの異常度から求める
###### 学習ステップ3: 異常度に閾値を設ける ######
TARGET_FP_RATE = 0.0027 # ターゲットとする誤報率（正規分布の3σ相当=0.0027）
# 学習データから異常度を算出
Sigma_inv = np.linalg.inv(Sigma) # 標本分散共分散行列の逆行列
X_dev = (X_train - mu).T # 推論データと平均ベクトルの差
Dev_Sigma = X_dev.T @ Sigma_inv # (X - μ)^T * Σ^-1
anomaly_scores_train = np.sum(np.multiply(Dev_Sigma, X_dev.T), axis=1)
# カイ二乗分布の自由度dfとスケールパラメータscaleを異常度の分布から最尤推定する
params = stats.chi2.fit(anomaly_scores_train, floc=0)
df = params[0]
scale = params[2]
# 自由度df, スケールパラメータscaleを持つ
# カイ二乗分布の累積分布関数の逆関数から閾値を算出
# スケールパラメータも最尤推定の対象にするのがポイント（前項では固定していた）
a_th = stats.chi2.ppf(1-TARGET_FP_RATE, df=df, scale=scale)

In [None]:
print(a_th)

# マハラノビス・タグチ法による異常検知

- 変数ごとの寄与を評価する仕組みを導入した手法

### マハラノビス・タグチ法による異常検知の概要
1. マハラノビス距離の閾値判定で異常を検知する
2. 変数ごとの異常への寄与度（SN比）を求める

In [None]:
# パラメータ学習
# 多変数のホテリング理論による異常検知の実装例（学習）
import pandas as pd
import numpy as np
from scipy import stats

###### 学習データの読み込みと前処理 ######
df = pd.read_csv("https://raw.githubusercontent.com/ghmagazine/python_anomaly_detection_book/refs/heads/main/notebooks/datasets/ch2_dataset_train.csv")
# "temp2", "temp1"変数に欠損がある行を削除
df_dropna = df.dropna(subset=["temp2", "temp1"])
# 正常データのみ抽出
df_normal = df_dropna[df_dropna["label"] == "normal"]
# "temp2", "temp1"列のみ取り出してNumpyのndarray化し、学習データとする
X_train = df_normal[["temp2", "temp1"]].to_numpy()

###### 学習ステップ1. 正常のモデルを作成する ######
mu = np.mean(X_train, axis=0) # 標本平均ベクトルμを算出
# 標本分散共分散行Σを算出（転置と自由度ddofに注意）
Sigma = np.cov(X_train.T, ddof=0)

###### 学習ステップ2. 異常を表す指標（異常度）を定義する ######
# 式のみで定義

###### 学習ステップ3. 異常度に閾値を設ける ######
TARGET_FP_RATE = 0.0027
n_features = X_train.shape[1] # 変数の数M
# 自由度（M）のカイ二乗分布の累積分布関数の逆関数から閾値算出
a_th = stats.chi2.ppf(1-TARGET_FP_RATE, df=n_features)

###### 学習で求めたパラメータをすべて表示 ######
print(f"mu={mu}")
print(f"Sigma={Sigma}")
print(f"a_th={a_th}")

In [None]:
# 推論
# マハラノビス・タグチ法による異常検知の実装例（推論）

###### 学習したパラメータを記載 ######
MU = [350.23771153, 400.21552883] # 標本平均ベクトルμ
SIGMA = [[26.04088919, 14.52874949],
         [14.52874949, 24.58401158]] # 標本分散共分散行列Σ
A_TH=11.82900701194368 # 異常度のしきい値

###### 推論データの読み込みと前処理 ######
df_inference = pd.read_csv("https://raw.githubusercontent.com/ghmagazine/python_anomaly_detection_book/refs/heads/main/notebooks/datasets/ch2_dataset_inference.csv")
# "temp2", "temp1"に欠損があるデータを削除
df_inference_dropna = df_inference.dropna(subset=["temp2", "temp1"])
# "temp2", "temp1"列のみ取り出してNumpyのndarray化し、推論データとする
X_inference = df_inference_dropna[["temp2", "temp1"]].to_numpy()
# 標本分散共分散行列Σの逆行列
Sigma_inv = np.linalg.inv(np.array(SIGMA))

###### 推論フェーズ1. マハラノビス距離の閾値判定で異常を検知（ホテリング理論と同じ） ######
# 異常度を算出
X_dev = (X_inference-np.array(MU)).T # 推論データと平均ベクトルとの差（x-μ）
Dev_Sigma = X_dev.T @ Sigma_inv # (x-μ)^T * Σ^-1
anomaly_scores = np.sum(np.multiply(Dev_Sigma, X_dev.T), axis=1)
# 閾値により異常の有無を判定
pred = np.where(anomaly_scores > A_TH, "anomaly", "normal")
# 推論結果を表示
print(pred)

###### 推論フェーズ2. 異常データに対して変数ごとのSN比を算出 ######
# 異常データのみを抽出
df_inference_dropna["prediction"] = pred
df_pred_anomaly = df_inference_dropna[df_inference_dropna["prediction"] == "anomaly"]
# 各変数の分散（標本分散共分散行列Σの対角成分）
var = np.diag(np.array(SIGMA))
# 変数ごとにSN比を計算
for j, feature in enumerate(["temp2", "temp1"]):
    sn_ratio = 10 * np.log10((df_pred_anomaly[feature]-MU[j]) ** 2 / var[j])
    df_pred_anomaly[f"sn_{feature}"] = sn_ratio
# 計算したSN比を含む異常データを表示
print(df_pred_anomaly)

異常判定されたデータに対してSN比を算出してみる

In [None]:
# 異常判定されたデータのSN比を可視化
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import seaborn as sns

# 異常度と異常判定をDataFrameに列として追加
df_inference_dropna["anomaly_score"] = anomaly_scores
df_inference_dropna["prediction"] = pred
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))

###### 正常と異常の範囲の色分け #####
# (x1, x2)格子点を作成('temp2'をx1としている点に注意)
x1_grid = np.linspace(df_inference_dropna["temp2"].min()-5,
                      df_inference_dropna["temp2"].max()+5, 100)
x2_grid = np.linspace(df_inference_dropna["temp1"].min()-5,
                      df_inference_dropna["temp1"].max()+5, 100)
X1, X2 = np.meshgrid(x1_grid, x2_grid)
X_grid = np.c_[X1.ravel(), X2.ravel()]
# 異常度を算出
X_dev_grid = (X_grid-np.array(MU)).T # 推論データと平均ベクトルとの差（x-μ）
Dev_Sigma_grid = X_dev_grid.T @ Sigma_inv # (x-μ)^T * Σ^-1
anomaly_score_grid = np.sum(np.multiply(Dev_Sigma_grid, X_dev_grid.T), axis=1)
# 閾値判定
pred_grid = np.where(anomaly_score_grid > A_TH, 0, 1)
# 正常と異常の境界をプロット
pred_pivot = pred_grid.reshape(X1.shape)
ax.contourf(X1, X2, pred_pivot,
            cmap=cm.gray, alpha=0.5)


##### 各データを散布図としてプロット #####
sns.scatterplot(data=df_inference_dropna, x="temp2", y="temp1",
                hue="label", palette=["#999999", "#111111"], ax=ax)

# 異常判定されたデータのみインデックスを表示
for i, row in df_pred_anomaly.iterrows():
    ax.text(row["temp2"], row["temp1"], str(i),
            verticalalignment="bottom", horizontalalignment="center")
# 凡例を表示
ax.legend()
# グラフを表示
plt.show()

###### 異常判定されたデータのSN比を表示 ######
for i, row in df_pred_anomaly.iterrows():
    fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(3, 4))
    # SN比を描画
    sn_names = [f"sn_{feature}" for feature in ["temp2", "temp1"]]
    sn_ratio = [row[sn_name] for sn_name in sn_names]
    axes.bar(sn_names, sn_ratio, color="#888888")
    axes.set_title(f"index={i}")
    plt.xticks(fontsize=12)
    # グラフを表示
    plt.show()

各データについて、どの変数が異常に強く寄与していたかを定量的に把握することができる。
特に、正常ラベルでありながら異常と判定された誤報データ（`index=142`）では、他の異常データとSN比の傾向が異なることが確認できる。これは、両者の特徴空間での方向性が異なるためであり、このようなSN比の傾向を分析することが、誤報や未知の異常パターンの判別に役立つ。
このような特徴空間での方向性は、2変数であらば散布図でも把握可能ですが、3変数以上になると可視化困難になる。そのため、SN比の活用価値は高次元データにおいて特に高まる。