# LSTM Autoencoder による時系列異常検知

このノートブックでは、**LSTM Autoencoder**を使った時系列データの異常検知を学びます。

## LSTM Autoencoderとは？

**LSTM Autoencoder**は、時系列データの正常なパターンを学習し、そのパターンから外れたデータを異常として検出する深層学習の手法です。

### なぜ時系列異常検知に有効なのか？

1. **時間的な文脈を考慮**: 過去の値の連鎖パターンを学習
2. **複雑なパターンを学習**: 非線形なパターンも捉えられる
3. **教師なし学習**: 正常データのみで学習可能
4. **柔軟性**: 多変量時系列にも対応

### Isolation Forestとの違い

| 特徴 | Isolation Forest | LSTM Autoencoder |
|------|------------------|------------------|
| データ | 静的データ | 時系列データ |
| 学習内容 | 異常を隔離 | 正常パターンを再構成 |
| 時間的依存性 | 考慮しない | **考慮する** |
| 計算速度 | 高速 | やや遅い |
| 複雑なパターン | シンプル | 複雑なパターンも学習 |

### サイバー攻撃検知での応用

- **DDoS攻撃**: トラフィックの急激な増加パターン
- **ポートスキャン**: 短時間での連続アクセスパターン
- **データ漏洩**: 通常と異なるデータ転送パターン
- **異常ログイン**: 時間帯・頻度の異常パターン

## 1. ライブラリのインポート

In [1]:
# データ処理
import numpy as np
import pandas as pd

# 可視化
import matplotlib.pyplot as plt
import seaborn as sns

# Keras (深層学習)
import os
os.environ['KERAS_BACKEND'] = 'numpy'  # NumPyバックエンドを使用（軽量）

import keras
from keras import layers, models, callbacks

# scikit-learn（前処理・評価）
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.ensemble import IsolationForest

# 設定
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
np.random.seed(42)

print(f"Keras version: {keras.__version__}")
print(f"Keras backend: {keras.backend.backend()}")

%matplotlib inline

ModuleNotFoundError: No module named 'jax'

## 2. LSTM Autoencoderの理論

### 2.1 Autoencoderとは？

**Autoencoder**は、入力データを圧縮（エンコード）し、再び元の形に復元（デコード）するニューラルネットワークです。

```
入力データ → [Encoder] → 潜在表現 → [Decoder] → 復元データ
```

#### 学習の目的
- **入力と出力がなるべく同じになるように学習**
- 正常データで学習すると、**正常パターンを再構成できる**
- 異常データは**再構成誤差が大きくなる**

### 2.2 LSTM (Long Short-Term Memory)

**LSTM**は、時系列データの長期的な依存関係を学習できるリカレントニューラルネットワーク（RNN）の一種です。

#### LSTMの特徴
- **記憶セル**: 過去の情報を保持
- **ゲート機構**: 情報の流れを制御
  - **入力ゲート**: 新しい情報をどれだけ記憶するか
  - **忘却ゲート**: 古い情報をどれだけ忘れるか
  - **出力ゲート**: 記憶からどれだけ出力するか

### 2.3 LSTM Autoencoderの構造

```
時系列データ (例: [t1, t2, t3, t4, t5])
    ↓
[LSTM Encoder]
    - 時系列を圧縮して潜在表現を作成
    - 最後の隠れ状態がエンコード結果
    ↓
潜在表現 (圧縮された特徴)
    ↓
[RepeatVector]
    - 潜在表現を時系列の長さ分繰り返す
    ↓
[LSTM Decoder]
    - 潜在表現から元の時系列を復元
    ↓
復元された時系列 (例: [t1', t2', t3', t4', t5'])
```

### 2.4 異常検知の仕組み

1. **正常データで学習**: 正常なパターンを再構成する能力を獲得
2. **再構成誤差を計算**: 入力と出力の差（MSE、MAEなど）
3. **閾値設定**: 再構成誤差がある閾値を超えたら異常と判定

```python
再構成誤差 = MSE(入力, 復元データ)

if 再構成誤差 > 閾値:
    異常
else:
    正常
```

## 3. シンプルな例: 正弦波の異常検知

まず、基礎を理解するためにシンプルな正弦波データで異常検知を実装します。

### データの生成
- **正常データ**: 滑らかな正弦波
- **異常データ**: スパイク（急激な値の変化）を含む波形

In [None]:
# 正弦波データの生成
def generate_sine_wave(n_samples=1000, noise_level=0.1):
    """
    正弦波データを生成
    
    Parameters:
    - n_samples: サンプル数
    - noise_level: ノイズの強度
    """
    x = np.linspace(0, 50, n_samples)
    y = np.sin(x) + np.random.normal(0, noise_level, n_samples)
    return y

# 異常データの生成（スパイクを追加）
def add_anomalies(data, n_anomalies=5, spike_magnitude=3):
    """
    データに異常（スパイク）を追加
    
    Parameters:
    - data: 元のデータ
    - n_anomalies: 異常の数
    - spike_magnitude: スパイクの大きさ
    """
    data_with_anomalies = data.copy()
    anomaly_indices = np.random.choice(len(data), n_anomalies, replace=False)
    
    for idx in anomaly_indices:
        data_with_anomalies[idx] += spike_magnitude * np.random.choice([-1, 1])
    
    return data_with_anomalies, anomaly_indices

# データ生成
normal_data = generate_sine_wave(n_samples=1000, noise_level=0.1)
test_data, anomaly_indices = add_anomalies(normal_data, n_anomalies=10, spike_magnitude=3)

print(f"正常データのサイズ: {normal_data.shape}")
print(f"テストデータのサイズ: {test_data.shape}")
print(f"異常の位置: {sorted(anomaly_indices)}")

In [None]:
# データの可視化
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# 正常データ
axes[0].plot(normal_data, label='Normal Data', color='blue', alpha=0.7)
axes[0].set_title('Normal Sine Wave', fontsize=14)
axes[0].set_xlabel('Time Steps')
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 異常データ
axes[1].plot(test_data, label='Test Data', color='blue', alpha=0.7)
axes[1].scatter(anomaly_indices, test_data[anomaly_indices], 
                color='red', s=100, label='Anomalies', zorder=5)
axes[1].set_title('Sine Wave with Anomalies (Spikes)', fontsize=14)
axes[1].set_xlabel('Time Steps')
axes[1].set_ylabel('Value')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### 3.1 時系列データの準備

LSTM Autoencoderに入力するため、時系列データを**ウィンドウ**に分割します。

#### ウィンドウとは？
- 連続したデータポイントの塊
- 例: `window_size=10` の場合、[t1, t2, ..., t10]が1つのサンプル

```
元データ: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
window_size=5

サンプル1: [1, 2, 3, 4, 5]
サンプル2: [2, 3, 4, 5, 6]
サンプル3: [3, 4, 5, 6, 7]
...
```

In [None]:
def create_sequences(data, window_size):
    """
    時系列データをウィンドウに分割
    
    Parameters:
    - data: 1次元の時系列データ
    - window_size: ウィンドウのサイズ
    
    Returns:
    - sequences: (サンプル数, window_size, 1)の3次元配列
    """
    sequences = []
    for i in range(len(data) - window_size + 1):
        seq = data[i:i + window_size]
        sequences.append(seq)
    
    sequences = np.array(sequences)
    # LSTMの入力形式: (サンプル数, タイムステップ, 特徴量数)
    sequences = sequences.reshape((sequences.shape[0], sequences.shape[1], 1))
    return sequences

# ウィンドウサイズの設定
WINDOW_SIZE = 50  # 50ステップを1つのシーケンスとして扱う

# 正常データをウィンドウに分割（訓練用）
X_train = create_sequences(normal_data, WINDOW_SIZE)

# テストデータをウィンドウに分割
X_test = create_sequences(test_data, WINDOW_SIZE)

print(f"訓練データの形状: {X_train.shape}")
print(f"テストデータの形状: {X_test.shape}")
print(f"\n形状の意味: (サンプル数={X_train.shape[0]}, ウィンドウサイズ={X_train.shape[1]}, 特徴量数={X_train.shape[2]})")

In [None]:
# データの正規化（0-1の範囲にスケーリング）
scaler = MinMaxScaler()

# 訓練データでfitしてtransform
X_train_scaled = scaler.fit_transform(X_train
                                      .reshape(-1, 1)).reshape(X_train.shape)

# テストデータはtransformのみ（訓練データの統計量を使用）
X_test_scaled = scaler.transform(X_test.reshape(-1, 1)).reshape(X_test.shape)

print(f"訓練データの範囲: [{X_train_scaled.min():.4f}, {X_train_scaled.max():.4f}]")
print(f"テストデータの範囲: [{X_test_scaled.min():.4f}, {X_test_scaled.max():.4f}]")

### 3.2 LSTM Autoencoderモデルの構築

Kerasを使ってLSTM Autoencoderを実装します。

#### モデルの構造

```
Input: (window_size, 1)
    ↓
LSTM(128) - Encoder
    ↓
LSTM(64) - Encoder
    ↓
LSTM(32) - Encoder (潜在表現)
    ↓
RepeatVector(window_size) - 潜在表現を繰り返す
    ↓
LSTM(32, return_sequences=True) - Decoder
    ↓
LSTM(64, return_sequences=True) - Decoder
    ↓
LSTM(128, return_sequences=True) - Decoder
    ↓
TimeDistributed(Dense(1)) - 各タイムステップで出力
    ↓
Output: (window_size, 1)
```

In [None]:
def build_lstm_autoencoder(window_size, n_features=1):
    """
    LSTM Autoencoderモデルを構築
    
    Parameters:
    - window_size: ウィンドウサイズ（タイムステップ数）
    - n_features: 特徴量の数
    
    Returns:
    - model: Kerasモデル
    """
    model = models.Sequential([
        # Encoder
        layers.Input(shape=(window_size, n_features)),
        layers.LSTM(128, activation='relu', return_sequences=True),
        layers.LSTM(64, activation='relu', return_sequences=True),
        layers.LSTM(32, activation='relu', return_sequences=False),  # 潜在表現
        
        # 潜在表現を繰り返す
        layers.RepeatVector(window_size),
        
        # Decoder
        layers.LSTM(32, activation='relu', return_sequences=True),
        layers.LSTM(64, activation='relu', return_sequences=True),
        layers.LSTM(128, activation='relu', return_sequences=True),
        
        # 出力層（各タイムステップで1つの値を出力）
        layers.TimeDistributed(layers.Dense(n_features))
    ], name='LSTM_Autoencoder')
    
    return model

# モデルの構築
model = build_lstm_autoencoder(window_size=WINDOW_SIZE, n_features=1)

# モデルのコンパイル
model.compile(
    optimizer='adam',
    loss='mse',  # 平均二乗誤差
    metrics=['mae']  # 平均絶対誤差
)

# モデルのサマリーを表示
model.summary()

### 3.3 モデルの訓練

正常データのみを使ってモデルを訓練します。

In [None]:
# Early Stopping（過学習を防ぐ）
early_stopping = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
    restore_best_weights=True,
    verbose=1
)

# モデルの訓練
# 注: 入力と出力が同じ（Autoencoderの特徴）
history = model.fit(
    X_train_scaled, X_train_scaled,  # 入力 = 出力
    epochs=100,
    batch_size=32,
    validation_split=0.2,
    callbacks=[early_stopping],
    verbose=1
)

print("\n訓練完了！")

In [None]:
# 訓練の履歴を可視化
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Loss
axes[0].plot(history.history['loss'], label='Training Loss')
axes[0].plot(history.history['val_loss'], label='Validation Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss (MSE)')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# MAE
axes[1].plot(history.history['mae'], label='Training MAE')
axes[1].plot(history.history['val_mae'], label='Validation MAE')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('MAE')
axes[1].set_title('Training and Validation MAE')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### 3.4 異常検知の実行

訓練したモデルを使ってテストデータの異常を検出します。

In [None]:
# テストデータの再構成
X_test_reconstructed = model.predict(X_test_scaled, verbose=0)

# 再構成誤差を計算（各サンプルごとのMSE）
reconstruction_errors = np.mean(np.square(X_test_scaled - X_test_reconstructed), axis=(1, 2))

print(f"再構成誤差の統計:")
print(f"  平均: {reconstruction_errors.mean():.6f}")
print(f"  標準偏差: {reconstruction_errors.std():.6f}")
print(f"  最小値: {reconstruction_errors.min():.6f}")
print(f"  最大値: {reconstruction_errors.max():.6f}")

In [None]:
# 再構成誤差の分布を可視化
plt.figure(figsize=(12, 5))

plt.hist(reconstruction_errors, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
plt.xlabel('Reconstruction Error (MSE)')
plt.ylabel('Frequency')
plt.title('Distribution of Reconstruction Errors')
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# 閾値の設定（平均 + 3σ）
threshold = reconstruction_errors.mean() + 3 * reconstruction_errors.std()

print(f"異常検知の閾値: {threshold:.6f}")

# 異常の判定
anomalies_detected = reconstruction_errors > threshold

print(f"\n検出された異常数: {anomalies_detected.sum()}")
print(f"異常の割合: {anomalies_detected.sum() / len(anomalies_detected) * 100:.2f}%")

In [None]:
# 結果の可視化
fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# 元のテストデータと真の異常
axes[0].plot(test_data, label='Test Data', color='blue', alpha=0.7)
axes[0].scatter(anomaly_indices, test_data[anomaly_indices], 
                color='red', s=100, label='True Anomalies', zorder=5)
axes[0].set_title('Test Data with True Anomalies', fontsize=14)
axes[0].set_xlabel('Time Steps')
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# 再構成誤差と検出された異常
axes[1].plot(reconstruction_errors, label='Reconstruction Error', color='green', alpha=0.7)
axes[1].axhline(threshold, color='red', linestyle='--', label=f'Threshold ({threshold:.4f})')
axes[1].scatter(np.where(anomalies_detected)[0], 
                reconstruction_errors[anomalies_detected],
                color='red', s=100, label='Detected Anomalies', zorder=5)
axes[1].set_title('Reconstruction Error and Detected Anomalies', fontsize=14)
axes[1].set_xlabel('Window Index')
axes[1].set_ylabel('Reconstruction Error')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# 検出精度の評価
# ウィンドウごとに真の異常が含まれているかをチェック
true_labels = np.zeros(len(X_test), dtype=bool)

for i in range(len(X_test)):
    window_range = range(i, i + WINDOW_SIZE)
    if any(idx in window_range for idx in anomaly_indices):
        true_labels[i] = True

# 混同行列の計算
from sklearn.metrics import confusion_matrix, classification_report

cm = confusion_matrix(true_labels, anomalies_detected)

print("混同行列:")
print(cm)
print("\nClassification Report:")
print(classification_report(true_labels, anomalies_detected, 
                            target_names=['Normal', 'Anomaly']))

### 3.5 再構成の可視化

モデルがどのようにデータを再構成しているかを確認します。

In [None]:
# いくつかのサンプルの再構成を可視化
n_samples_to_plot = 3
sample_indices = [0, len(X_test) // 2, len(X_test) - 1]

fig, axes = plt.subplots(n_samples_to_plot, 1, figsize=(14, 10))

for idx, sample_idx in enumerate(sample_indices):
    original = X_test_scaled[sample_idx].flatten()
    reconstructed = X_test_reconstructed[sample_idx].flatten()
    error = reconstruction_errors[sample_idx]
    is_anomaly = anomalies_detected[sample_idx]
    
    axes[idx].plot(original, label='Original', color='blue', linewidth=2)
    axes[idx].plot(reconstructed, label='Reconstructed', color='orange', linewidth=2, linestyle='--')
    
    title = f'Sample {sample_idx} - Error: {error:.6f}'
    if is_anomaly:
        title += ' [ANOMALY DETECTED]'
        axes[idx].set_facecolor('#ffcccc')
    
    axes[idx].set_title(title, fontsize=12)
    axes[idx].set_xlabel('Time Step')
    axes[idx].set_ylabel('Normalized Value')
    axes[idx].legend()
    axes[idx].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 4. Isolation Forestとの比較

同じデータでIsolation Forestを使った場合と比較してみます。

In [None]:
# Isolation Forestによる異常検知
# ウィンドウデータを2次元に変換（Isolation Forestは時系列を考慮しない）
X_test_flat = X_test_scaled.reshape(X_test_scaled.shape[0], -1)

# モデルの訓練と予測
iso_forest = IsolationForest(
    contamination=0.05,  # 異常の割合
    random_state=42
)

X_train_flat = X_train_scaled.reshape(X_train_scaled.shape[0], -1)
iso_forest.fit(X_train_flat)

# 予測（-1: 異常, 1: 正常）
iso_predictions = iso_forest.predict(X_test_flat)
iso_anomalies = iso_predictions == -1

print(f"Isolation Forestで検出された異常数: {iso_anomalies.sum()}")
print(f"異常の割合: {iso_anomalies.sum() / len(iso_anomalies) * 100:.2f}%")

In [None]:
# 2つの手法の比較
fig, axes = plt.subplots(3, 1, figsize=(14, 12))

# 元のデータと真の異常
axes[0].plot(test_data, label='Test Data', color='blue', alpha=0.7)
axes[0].scatter(anomaly_indices, test_data[anomaly_indices], 
                color='red', s=100, label='True Anomalies', zorder=5)
axes[0].set_title('True Anomalies', fontsize=14)
axes[0].set_xlabel('Time Steps')
axes[0].set_ylabel('Value')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# LSTM Autoencoderの結果
axes[1].plot(test_data, label='Test Data', color='blue', alpha=0.3)
detected_indices_lstm = np.where(anomalies_detected)[0]
for idx in detected_indices_lstm:
    axes[1].axvspan(idx, idx + WINDOW_SIZE, alpha=0.3, color='red')
axes[1].scatter(anomaly_indices, test_data[anomaly_indices], 
                color='red', s=100, label='True Anomalies', zorder=5)
axes[1].set_title('LSTM Autoencoder Detection', fontsize=14)
axes[1].set_xlabel('Time Steps')
axes[1].set_ylabel('Value')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

# Isolation Forestの結果
axes[2].plot(test_data, label='Test Data', color='blue', alpha=0.3)
detected_indices_iso = np.where(iso_anomalies)[0]
for idx in detected_indices_iso:
    axes[2].axvspan(idx, idx + WINDOW_SIZE, alpha=0.3, color='orange')
axes[2].scatter(anomaly_indices, test_data[anomaly_indices], 
                color='red', s=100, label='True Anomalies', zorder=5)
axes[2].set_title('Isolation Forest Detection', fontsize=14)
axes[2].set_xlabel('Time Steps')
axes[2].set_ylabel('Value')
axes[2].legend()
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# 評価指標の比較
from sklearn.metrics import precision_score, recall_score, f1_score

# LSTM Autoencoder
lstm_precision = precision_score(true_labels, anomalies_detected, zero_division=0)
lstm_recall = recall_score(true_labels, anomalies_detected, zero_division=0)
lstm_f1 = f1_score(true_labels, anomalies_detected, zero_division=0)

# Isolation Forest
iso_precision = precision_score(true_labels, iso_anomalies, zero_division=0)
iso_recall = recall_score(true_labels, iso_anomalies, zero_division=0)
iso_f1 = f1_score(true_labels, iso_anomalies, zero_division=0)

# 結果の表示
comparison_df = pd.DataFrame({
    'Method': ['LSTM Autoencoder', 'Isolation Forest'],
    'Precision': [lstm_precision, iso_precision],
    'Recall': [lstm_recall, iso_recall],
    'F1-Score': [lstm_f1, iso_f1]
})

print("\n評価指標の比較:")
print(comparison_df.round(4))

# 可視化
fig, ax = plt.subplots(figsize=(10, 6))
x = np.arange(len(comparison_df))
width = 0.25

ax.bar(x - width, comparison_df['Precision'], width, label='Precision', alpha=0.8)
ax.bar(x, comparison_df['Recall'], width, label='Recall', alpha=0.8)
ax.bar(x + width, comparison_df['F1-Score'], width, label='F1-Score', alpha=0.8)

ax.set_xlabel('Method')
ax.set_ylabel('Score')
ax.set_title('Comparison of LSTM Autoencoder vs Isolation Forest')
ax.set_xticks(x)
ax.set_xticklabels(comparison_df['Method'])
ax.legend()
ax.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 5. まとめ

### 学んだこと

#### 1. LSTM Autoencoderの基礎
- **Autoencoder**: 入力を圧縮して復元するニューラルネットワーク
- **LSTM**: 時系列の長期的な依存関係を学習
- **再構成誤差**: 正常パターンから外れたデータは誤差が大きくなる

#### 2. 実装のポイント
- **ウィンドウ分割**: 時系列データをLSTMに入力するための前処理
- **正規化**: データを0-1の範囲にスケーリング
- **閾値設定**: 平均 + 3σなど、統計的な基準で異常を判定

#### 3. Isolation Forestとの比較

| 特徴 | LSTM Autoencoder | Isolation Forest |
|------|------------------|------------------|
| **時系列の考慮** | ✅ あり | ❌ なし |
| **学習内容** | 正常パターンの再構成 | データポイントの隔離 |
| **計算速度** | 遅い（訓練に時間） | 速い |
| **精度** | 時系列パターンに強い | 静的データに強い |
| **解釈性** | やや複雑 | シンプル |
| **適用場面** | 時系列データの異常検知 | 一般的な異常検知 |

### 実務での活用

#### DDoS攻撃検知への応用
```python
時系列特徴量:
- リクエスト数の時系列
- パケットサイズの時系列
- 送信元IPの多様性の時系列
- レスポンス時間の時系列

→ これらをLSTM Autoencoderで学習
→ 正常な通信パターンを学習
→ 異常なトラフィックパターンを検出
```

#### その他の応用例
- **システム監視**: CPUやメモリ使用率の異常検知
- **製造業**: センサーデータからの設備異常検知
- **金融**: 取引パターンの異常検知
- **医療**: 心電図などの生体信号の異常検知

### 次のステップ

1. **実践的なデータセットで試す**
   - KDD Cup 99、NSL-KDD（ネットワーク侵入検知）
   - CIC-IDS2017/2018（最新のサイバー攻撃データ）
   - UNSW-NB15（オーストラリアのネットワークデータ）

2. **モデルの改善**
   - **Bidirectional LSTM**: 双方向LSTM（前後の文脈を考慮）
   - **Variational Autoencoder (VAE)**: 確率的なAutoencoder
   - **Attention機構**: 重要な時間ステップに注目

3. **多変量時系列への拡張**
   - 複数の特徴量を同時に扱う
   - 特徴量間の相関も学習

4. **リアルタイム検知システムの構築**
   - ストリーミングデータへの対応
   - オンライン学習
   - アラートシステムとの統合

### 参考資料

- [Keras公式ドキュメント](https://keras.io/)
- [LSTM Autoencoder for Anomaly Detection](https://arxiv.org/abs/1607.00148)
- [Time Series Anomaly Detection Tutorial](https://keras.io/examples/timeseries/timeseries_anomaly_detection/)