<a href="https://colab.research.google.com/github/hsswkwk/turbo-chainsaw/blob/feature-add-anomaly-detection/notebooks/anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 異常検知（Aanomaly detection）
標準的な状態や想定から逸脱しているデータや事象を特定する技術


## 正規分布に従うデータの異常検知
### 例
製品の温度をセンサーで監視している場合の異常検知

### 手法

#### 3$\sigma$法
データが正規分布に従うと仮定し、平均値から標準偏差の3倍以上離れた値を異常値とみなす方法

#### マハラノビス・タグチ法
データの各次元間の相関を考慮した距離尺度を用いて、平均値から離れた値を異常値とみなす方法

#### ホテリングの$T^2$法
マハラノビス距離を拡張した手法で、データの平均値からのずれを検定統計量として用いて異常値を検出する方法

#### 密度比推定


In [21]:
import numpy as np
from scipy.spatial.distance import mahalanobis
from scipy.stats import f
from sklearn.neighbors import KernelDensity


# 正常データ生成
normal_data = np.random.normal(loc=25, scale=2, size=100)

# 異常値の追加
anomaly_indices = [10, 20, 30]  # 異常値のインデックス
anomaly_values = [35, 15, 32]  # 異常値
normal_data[anomaly_indices] = anomaly_values

# 生成したデータ
temperature_data = normal_data


### 3σ法
def three_sigma_method(data, threshold=3):
  mean = np.mean(data)
  std = np.std(data)
  lower_bound = mean - threshold * std
  upper_bound = mean + threshold * std
  anomaly_indices = np.where((data < lower_bound) | (data > upper_bound))[0]
  return data[anomaly_indices]

### マハラノビス距離
def mahalanobis_distance_method(data, threshold_percentile=99):
  # dataを2次元配列に変換
  data = np.array([data, data])

  mean = np.mean(data, axis=0)
  cov = np.cov(data.T)

  # 共分散行列が特異行列の場合、微小な値を加算して正則化する
  if np.linalg.cond(cov) > 1e15:  # 条件数が大きい場合、特異行列とみなす
      cov += np.eye(cov.shape[0]) * 1e-6  # 単位行列に微小な値を掛けて加算

  inv_cov = np.linalg.inv(cov)
  distances = [mahalanobis(x, mean, inv_cov) for x in data]

  # 異常値とみなす閾値を設定
  threshold = np.percentile(distances, threshold_percentile)

  # 閾値を超えるデータを異常値として検出
  anomalies = [data[i] for i, distance in enumerate(distances) if distance > threshold]
  return anomalies

### ホテリングのT^2法
# def hotelling_t2_method(data):
#   mean = np.mean(data, axis=0)
#   inv_cov = np.linalg.inv(np.cov(data.T))
#   n = data.shape[0]
#   p = data.shape[1]
#   t2_values = [(x - mean).reshape(1, -1) @ inv_cov @ (x - mean).reshape(-1, 1) for x in data]
#   threshold = p * (n - 1) / (n - p) * f.ppf(0.95, p, n - p)  # 異常値とみなす閾値を設定
#   anomalies = [data[i] for i, t2 in enumerate(t2_values) if t2 > threshold]
#   return anomalies

# # 正常データ
# normal_data = np.random.randn(100, 2)

# # 異常検知の実行
# anomalies = hotelling_t2_method(normal_data)

### 密度比推定
# 正常データと異常データ
# normal_data = np.random.randn(100, 2)
# anomaly_data = np.random.rand(10, 2) * 5

# # KLIEPによる密度比推定 (簡易的な実装)
# def kliep(normal_data, anomaly_data, bandwidth=0.1):
#   kde_normal = KernelDensity(bandwidth=bandwidth).fit(normal_data)
#   kde_anomaly = KernelDensity(bandwidth=bandwidth).fit(anomaly_data)
#   density_ratio = np.exp(kde_normal.score_samples(anomaly_data) - kde_anomaly.score_samples(anomaly_data))
#   return density_ratio

# # 密度比推定
# density_ratio = kliep(normal_data, anomaly_data)

# # 異常度の算出
# anomaly_score = -np.log(density_ratio)

# # 閾値の設定 (例: 異常度の95%点)
# threshold = np.percentile(anomaly_score, 95)

# # 異常検知
# predicted_anomalies = anomaly_data[anomaly_score > threshold]

### マハラノビス・タグチ法
# # 正常データ
# normal_data = np.random.randn(100, 2)

# # マハラノビス距離の計算
# mean = np.mean(normal_data, axis=0)
# inv_cov = np.linalg.inv(np.cov(normal_data.T))
# distances = [mahalanobis(x, mean, inv_cov) for x in normal_data]

# # 閾値の設定 (例: 99%点)
# threshold = np.percentile(distances, 99)

# # 異常検知
# def detect_anomaly(x):
#   distance = mahalanobis(x, mean, inv_cov)
#   return distance > threshold

# # テストデータ
# test_data = np.array([[1, 2], [3, 4]])

# # 異常検知の実行
# for x in test_data:
#   if detect_anomaly(x):
#     print(f"Data point {x} is anomalous.")
#   else:
#     print(f"Data point {x} is normal.")


# 異常検知の実行
anomalies_three_sigma_method = three_sigma_method(temperature_data)
anomalies_mahalanobis_distance_method = mahalanobis_distance_method(temperature_data)

# 結果の出力
print("================ 3σ法 ================")
print("異常値:", anomalies_three_sigma_method)
print("\n")

print("===== マハラノビス距離による異常検知 =====")
print("異常値:", anomalies_mahalanobis_distance_method)
print("\n")


異常値: [35. 15.]


===== マハラノビス距離による異常検知 =====
異常値: []




## 非正規データの異常検知
### 例
ウェブサイトへのアクセス数の異常検知


## 不要次元のある次元データの異常検知
### 例
顧客の購買データを用いた異常検知


## 入出力関係のあるデータの異常検知
### 例
製造工程における品質管理


## 時系列データの異常検知
### 例
サーバーのCPU使用率の異常検知


## 変数間に関係があるデータの異常検知
### 例
クレジットカードの不正利用検知
