#  Anomaly Detection in Time-Series

### Use autoencoder to detect anomalies in ECG time-series data.

#### 1. Prepare the data

In [1]:
import pandas as pd

df = pd.read_csv("http://storage.googleapis.com/" 
                 + "download.tensorflow.org/data/ecg.csv",
                 header=None)
print(df.shape)
df.head()

(4998, 141)


Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,131,132,133,134,135,136,137,138,139,140
0,-0.112522,-2.827204,-3.773897,-4.349751,-4.376041,-3.474986,-2.181408,-1.818286,-1.250522,-0.477492,...,0.792168,0.933541,0.796958,0.578621,0.25774,0.228077,0.123431,0.925286,0.193137,1.0
1,-1.100878,-3.99684,-4.285843,-4.506579,-4.022377,-3.234368,-1.566126,-0.992258,-0.75468,0.042321,...,0.538356,0.656881,0.78749,0.724046,0.555784,0.476333,0.77382,1.119621,-1.43625,1.0
2,-0.567088,-2.59345,-3.87423,-4.584095,-4.187449,-3.151462,-1.74294,-1.490659,-1.18358,-0.394229,...,0.886073,0.531452,0.311377,-0.021919,-0.713683,-0.532197,0.321097,0.904227,-0.421797,1.0
3,0.490473,-1.914407,-3.616364,-4.318823,-4.268016,-3.88111,-2.99328,-1.671131,-1.333884,-0.965629,...,0.350816,0.499111,0.600345,0.842069,0.952074,0.990133,1.086798,1.403011,-0.383564,1.0
4,0.800232,-0.874252,-2.384761,-3.973292,-4.338224,-3.802422,-2.53451,-1.783423,-1.59445,-0.753199,...,1.148884,0.958434,1.059025,1.371682,1.277392,0.960304,0.97102,1.614392,1.421456,1.0


 The dataset has 140 columns which represents the ECG readings and a labels column which has been encoded to 0 or 1 showing whether the ECG is abnormal or normal.

In [2]:
from src.utils import plotters

plotters.show_traces(df.iloc[0:10, :-1])

Split the data into training and testing sets.

In [3]:
from sklearn.model_selection import train_test_split

# Separate the data and labels
data = df.iloc[:,:-1].values
labels = df.iloc[:,-1].values

# Split the data into training, validation, and test sets
train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21)

# Further split the training data into training and validation sets
train_data, val_data, train_labels, val_labels = train_test_split(
    train_data, train_labels, test_size=0.25, random_state=21)  # 验证集占训练集的 25%

print("Training set size:", len(train_data))
print("Validation set size:", len(val_data))
print("Test set size:", len(test_data))

Training set size: 2998
Validation set size: 1000
Test set size: 1000


In [4]:
# from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

# Create a standard scaler
# scaler = StandardScaler()
scaler = MinMaxScaler(feature_range=(0, 1))  # 将特征缩放到 [0, 1] 范围

# Fit and transform the training data
train_data = scaler.fit_transform(train_data)
val_data = scaler.transform(val_data)
test_data = scaler.transform(test_data)

Separate the data for normal and abnormal ECGs

In [5]:
import numpy as np

# [0 or 1] showing whether the ECG is [abnormal or normal].

#The labels are either 0 or 1, so I will convert them into boolean(true or false) 
train_labels = train_labels.astype(bool)
num_true = np.sum(train_labels)
num_false = len(train_labels) - num_true
print(f"True: {num_true}, False: {num_false}")

val_labels = val_labels.astype(bool)
num_true = np.sum(val_labels)
num_false = len(val_labels) - num_true
print(f"True: {num_true}, False: {num_false}")

test_labels = test_labels.astype(bool)
num_true = np.sum(test_labels)
num_false = len(test_labels) - num_true
print(f"True: {num_true}, False: {num_false}")

# Separate the data for normal ECG from that of abnormal ones
# Normal ECG data
normal_train_data = train_data[train_labels]
normal_val_data = val_data[val_labels]
normal_test_data = test_data[test_labels]

normal_train_labels = train_labels[train_labels]
normal_val_labels = val_labels[val_labels]
normal_test_labels = test_labels[test_labels]

# Abnormal ECG data
abnormal_train_data = train_data[~train_labels]
abnormal_val_data = val_data[~val_labels]
abnormal_test_data = test_data[~test_labels]

# Plot the first 10 normal and abnormal ECGs
plotters.show_traces(pd.DataFrame(normal_train_data).iloc[0:10, :],
                     title="Normal ECGs")
plotters.show_traces(pd.DataFrame(abnormal_train_data).iloc[0:10, :],
                     title="Abnormal ECGs")

True: 1771, False: 1227
True: 588, False: 412
True: 560, False: 440


#### 2. Create autoencoder model

AutoEncoder is an unsupervised Artificial Neural Network that attempts to encode the data by compressing it into the lower dimensions (bottleneck layer or code) and then decoding the data to reconstruct the original input. The bottleneck layer (or code) holds the compressed representation of the input data.

In [6]:
import tensorflow as tf
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model

class detector_with_classifier(Model):
    def __init__(self):
        super(detector_with_classifier, self).__init__()
        self.encoder = tf.keras.Sequential([
            layers.Dense(64, activation='relu'),
            layers.Dropout(0.5),
            layers.Dense(32, activation='relu'),
            layers.Dropout(0.5),
            layers.Dense(16, activation='relu')
        ])
        self.decoder = tf.keras.Sequential([
            layers.Dense(16, activation='relu'),
            layers.Dense(32, activation='relu'),
            layers.Dense(140, activation='tanh')
        ])
        self.classifier = tf.keras.Sequential([
            layers.Dense(16, activation='relu'),
            layers.Dense(1, activation='sigmoid')  # 输出分类概率
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        classification = self.classifier(encoded)  # 使用编码特征进行分类
        return decoded, classification

#### 3. Train the model

In [7]:
import tensorflow as tf
from tensorflow.keras import optimizers
import plotly.graph_objects as go

# 初始化模型
model = detector_with_classifier()

# 定义优化器
optimizer = optimizers.Adam(learning_rate=0.001)

# 定义损失函数
mse_loss_fn = tf.keras.losses.MeanSquaredError()
bce_loss_fn = tf.keras.losses.BinaryCrossentropy()

# 训练超参数
epochs_stage_1 = 40  # 第一阶段的训练轮数
epochs_stage_2 = 20  # 第二阶段的训练轮数
batch_size = 32
alpha_stage_1 = 1.0  # 第一阶段仅重建损失的权重
beta_stage_1 = 0.0   # 第一阶段不优化分类损失

# 数据集
# 第一阶段仅使用正常数据
train_dataset_stage_1 = tf.data.Dataset.from_tensor_slices(
    (normal_train_data, normal_train_labels)
    ).batch(batch_size)
val_dataset_stage_1 = tf.data.Dataset.from_tensor_slices(
    (normal_val_data, normal_val_labels)
    ).batch(batch_size)

# 第二阶段混合正常和异常数据
train_dataset_stage_2 = tf.data.Dataset.from_tensor_slices(
    (train_data, train_labels)
    ).batch(batch_size)
val_dataset_stage_2 = tf.data.Dataset.from_tensor_slices(
    (val_data, val_labels)
    ).batch(batch_size)

# 记录每个 epoch 的训练和验证损失
train_losses = []
val_losses = []

# 第一阶段：仅重建损失
print("Stage 1: Training Autoencoder with Reconstruction Loss")
for epoch in range(epochs_stage_1):
    print(f"Epoch {epoch + 1}/{epochs_stage_1}")
    train_loss = 0.0
    val_loss = 0.0

    # 训练阶段
    for step, (x_batch, _) in enumerate(train_dataset_stage_1):  # 只需要 x_batch，忽略标签
        with tf.GradientTape() as tape:
            decoded, _ = model(x_batch)  # 只使用 autoencoder
            reconstruction_loss = mse_loss_fn(x_batch, decoded)  # 重建损失
            total_loss = alpha_stage_1 * reconstruction_loss

        # 反向传播
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        train_loss += total_loss.numpy()

    # 验证阶段
    for x_batch, _ in val_dataset_stage_1:
        decoded, _ = model(x_batch)
        reconstruction_loss = mse_loss_fn(x_batch, decoded)
        total_loss = alpha_stage_1 * reconstruction_loss

        val_loss += total_loss.numpy()

    # 记录平均损失
    avg_train_loss = train_loss / len(train_dataset_stage_1)
    avg_val_loss = val_loss / len(val_dataset_stage_1)
    train_losses.append(avg_train_loss)
    val_losses.append(avg_val_loss)

    print(f"Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

AttributeError: module 'ml_dtypes' has no attribute 'float8_e3m4'
Stage 1: Training Autoencoder with Reconstruction Loss
Epoch 1/40



Gradients do not exist for variables ['detector_with_classifier/sequential_2/dense_6/kernel', 'detector_with_classifier/sequential_2/dense_6/bias', 'detector_with_classifier/sequential_2/dense_7/kernel', 'detector_with_classifier/sequential_2/dense_7/bias'] when minimizing the loss. If using `model.compile()`, did you forget to provide a `loss` argument?



Train Loss: 0.1572, Val Loss: 0.0115
Epoch 2/40
Train Loss: 0.0062, Val Loss: 0.0055
Epoch 3/40
Train Loss: 0.0052, Val Loss: 0.0054
Epoch 4/40
Train Loss: 0.0052, Val Loss: 0.0054
Epoch 5/40
Train Loss: 0.0052, Val Loss: 0.0054
Epoch 6/40
Train Loss: 0.0052, Val Loss: 0.0054
Epoch 7/40
Train Loss: 0.0052, Val Loss: 0.0054
Epoch 8/40
Train Loss: 0.0051, Val Loss: 0.0053
Epoch 9/40
Train Loss: 0.0051, Val Loss: 0.0052
Epoch 10/40
Train Loss: 0.0050, Val Loss: 0.0051
Epoch 11/40
Train Loss: 0.0047, Val Loss: 0.0047
Epoch 12/40
Train Loss: 0.0042, Val Loss: 0.0039
Epoch 13/40
Train Loss: 0.0037, Val Loss: 0.0036
Epoch 14/40
Train Loss: 0.0035, Val Loss: 0.0036
Epoch 15/40
Train Loss: 0.0035, Val Loss: 0.0035
Epoch 16/40
Train Loss: 0.0034, Val Loss: 0.0035
Epoch 17/40
Train Loss: 0.0034, Val Loss: 0.0034
Epoch 18/40
Train Loss: 0.0033, Val Loss: 0.0033
Epoch 19/40
Train Loss: 0.0032, Val Loss: 0.0032
Epoch 20/40
Train Loss: 0.0031, Val Loss: 0.0031
Epoch 21/40
Train Loss: 0.0030, Val Loss

Calculate hte threshold with the mean value and standard deviation of the reconstruction loss, or use the 95 percentile.

In [8]:
# 使用训练集中正常数据的重建误差计算阈值
normal_train_data = train_data[train_labels == 1]  # 假设标签 0 表示正常样本

# 获取模型的重建输出
train_reconstructed, _ = model(normal_train_data)  # 仅使用 decoder 的输出

# 计算每个样本的重建误差
train_reconstruction_error = tf.reduce_mean(
    tf.square(normal_train_data - train_reconstructed), axis=1
).numpy()

# 方法 1：基于平均值和标准差的阈值
threshold_std = train_reconstruction_error.mean() + 2 * train_reconstruction_error.std()
print(f"Reconstruction Error Threshold (2 std): {threshold_std}")

# # 方法 2：基于百分位数的阈值
# threshold_percentile = np.percentile(train_reconstruction_error, 95)
# print(f"Reconstruction Error Threshold (95th percentile): {threshold_percentile}")

threshold = threshold_std

Reconstruction Error Threshold (2 std): 0.006212711799889803


In [9]:
import numpy as np
import tensorflow as tf
import plotly.graph_objects as go
from src.utils import plotters

# 假设 test_data 和 test_labels 是测试数据和标签
# 标签 1 表示正常样本，0 表示异常样本

# 在 Stage 1 训练完成后调用
plotters.plot_reconstruction_error(
    model, test_data, test_labels, threshold, "Reconstruction Error on Test Data"
)
plotters.plot_reconstruction_error(
    model, train_data, train_labels, threshold, "Reconstruction Error on Train Data"
)

In [10]:
from sklearn.metrics import confusion_matrix

# [0 or 1] showing whether the ECG is [abnormal or normal].
# Test dataset: True: 560, False(anomaly): 440

# Step 1: 获取模型重建误差
# 使用测试数据进行前向传播，获取重建结果
reconstructed_data, _ = model(test_data)

# 计算重建误差
reconstruction_error = tf.reduce_mean(
    tf.square(test_data - reconstructed_data),
    axis=1
).numpy()

# Step 2: 获取正常样本和异常样本的索引
# 假设标签 1 表示正常样本，0 表示异常样本
normal_indices = (test_labels == 1)
anomaly_indices = (test_labels == 0)

# Use threshold to classify anomalies
y_pred = (reconstruction_error > threshold).astype(int)
y_pred = 1 - y_pred  # 0: Anomaly, 1: Normal
y_true = test_labels

# Calculate confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) # 0: Anomaly, 1: Normal
print("Confusion Matrix:\n", cm)

# Extract confusion matrix values
TN, FP = cm[0, 0], cm[1, 0]  # TN: Anomaly correctly classified, FP: Anomaly misclassified as Normal
FN, TP = cm[0, 1], cm[1, 1]  # FN: Normal misclassified as Anomaly, TP: Normal correctly classified

# Correctly reorder the confusion matrix for visualization
# custom_cm = [[TP, FP], [FN, TN]]
custom_cm = [[FN, TN], [TP, FP]]
print("Custom Confusion Matrix:\n", custom_cm)

# Define labels
labels = ["Normal (Positive)", "Anomaly (Negative)"]

fig = go.Figure(
    data=go.Heatmap(
        z=custom_cm,
        x=labels,
        y=labels,
        colorscale="Blues",
        texttemplate="%{z}",
        textfont={"size": 14}
    )
)
fig.update_layout(
    title="Confusion Matrix",
    xaxis_title="Predicted Labels",
    yaxis_title="True Labels",
    template="plotly_white",
    xaxis=dict(tickmode="array", tickvals=[0, 1], ticktext=labels),
    yaxis=dict(tickmode="array", tickvals=[1, 0], ticktext=labels),
)
fig.show()

Confusion Matrix:
 [[418  22]
 [  9 551]]
Custom Confusion Matrix:
 [[np.int64(22), np.int64(418)], [np.int64(551), np.int64(9)]]


In [11]:
# Accuracy
accuracy = (TP + TN) / (TP + TN + FP + FN)

# Precision
precision = TP / (TP + FP) if (TP + FP) > 0 else 0

# Recall (Sensitivity)
recall = TP / (TP + FN) if (TP + FN) > 0 else 0

# Specificity
specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

# F1 Score
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

# Print the results
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall (Sensitivity): {recall:.2f}")
print(f"Specificity: {specificity:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Accuracy: 0.97
Precision: 0.98
Recall (Sensitivity): 0.96
Specificity: 0.98
F1 Score: 0.97


Seconde stage of training, freeze the decoder weights and add classification weights to help the encoder to learn the features that are important for classification.

In [12]:
# 第二阶段：同时优化重建和分类损失

alpha_stage_2 = 0.1  # 第二阶段同时优化重建和分类损失
beta_stage_2 = 0.9

# 冻结 decoder 的权重
model.encoder.trainable = True
model.decoder.trainable = False # 确保 decoder 的重建能力不会受到分类损失的影响

print("Stage 2: Training Autoencoder with Classification Loss")
for epoch in range(epochs_stage_2):
    print(f"Epoch {epoch + 1}/{epochs_stage_2}")
    train_loss = 0.0
    val_loss = 0.0

    # 训练阶段
    for step, (x_batch, y_batch) in enumerate(train_dataset_stage_2):
        with tf.GradientTape() as tape:
            # 前向传播
            decoded, classification = model(x_batch)
            # 重建损失
            reconstruction_loss = mse_loss_fn(x_batch, decoded)
                         
            # 定义类别权重为张量
            class_weights = tf.constant([3.0, 1.0], dtype=tf.float32)  # 异常样本权重更高
            # 将 y_batch 转换为整型索引
            y_batch_indices = tf.cast(y_batch, dtype=tf.int32)  # 转换为 int 类型
            # 获取对应权重
            weights = tf.gather(class_weights, y_batch_indices)
            # 分类损失
            classification_loss = tf.reduce_mean(
                bce_loss_fn(y_batch, classification) * weights
            )
            total_loss = (alpha_stage_1 * reconstruction_loss + 
                          beta_stage_2 * classification_loss)

        # 反向传播
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        train_loss += total_loss.numpy()
        
        avg_reconstruction_loss = reconstruction_loss / len(train_dataset_stage_2)
        avg_classification_loss = classification_loss / len(train_dataset_stage_2)
        print(f"Reconstruction Loss: {avg_reconstruction_loss:.4f}",
              f"Classification Loss: {avg_classification_loss:.4f}")


    # 验证阶段
    for x_batch, y_batch in val_dataset_stage_2:
        decoded, classification = model(x_batch)
        reconstruction_loss = mse_loss_fn(x_batch, decoded)
        class_weights = tf.constant([3.0, 1.0], dtype=tf.float32)  # 异常样本权重更高
        # 将 y_batch 转换为整型索引
        y_batch_indices = tf.cast(y_batch, dtype=tf.int32)  # 转换为 int 类型
        # 获取对应权重
        weights = tf.gather(class_weights, y_batch_indices)
        classification_loss = tf.reduce_mean(
            bce_loss_fn(y_batch, classification) * weights
        )

        total_loss = (alpha_stage_1 * reconstruction_loss + 
                        beta_stage_2 * classification_loss)

        val_loss += total_loss.numpy()

    # 记录平均损失
    avg_train_loss = train_loss / len(train_dataset_stage_2)
    avg_val_loss = val_loss / len(val_dataset_stage_2)
    train_losses.append(avg_train_loss)
    val_losses.append(avg_val_loss)

    print(f"Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

Stage 2: Training Autoencoder with Classification Loss
Epoch 1/20
Reconstruction Loss: 0.0001 Classification Loss: 0.0161
Reconstruction Loss: 0.0001 Classification Loss: 0.0135
Reconstruction Loss: 0.0002 Classification Loss: 0.0136
Reconstruction Loss: 0.0003 Classification Loss: 0.0115
Reconstruction Loss: 0.0003 Classification Loss: 0.0126
Reconstruction Loss: 0.0002 Classification Loss: 0.0139
Reconstruction Loss: 0.0003 Classification Loss: 0.0091
Reconstruction Loss: 0.0003 Classification Loss: 0.0115
Reconstruction Loss: 0.0003 Classification Loss: 0.0120
Reconstruction Loss: 0.0003 Classification Loss: 0.0109
Reconstruction Loss: 0.0004 Classification Loss: 0.0091
Reconstruction Loss: 0.0003 Classification Loss: 0.0080
Reconstruction Loss: 0.0004 Classification Loss: 0.0104
Reconstruction Loss: 0.0003 Classification Loss: 0.0050
Reconstruction Loss: 0.0004 Classification Loss: 0.0094
Reconstruction Loss: 0.0004 Classification Loss: 0.0077
Reconstruction Loss: 0.0004 Classifica

Plot training and validation loss using Plotly

In [13]:
import plotly.graph_objects as go

# 假设 Stage 1 和 Stage 2 的损失数据如下：
# Stage 1 的训练和验证损失
train_losses_stage_1 = train_losses[:epochs_stage_1]  # 从已有数据中获取
val_losses_stage_1 = val_losses[:epochs_stage_1]

# Stage 2 的训练和验证损失
train_losses_stage_2 = train_losses[epochs_stage_1:]  # 从已有数据中获取
val_losses_stage_2 = val_losses[epochs_stage_1:]

# 使用 Plotly 绘制损失曲线
fig = go.Figure()

# 添加 Stage 1 的训练损失
fig.add_trace(
    go.Scatter(
        y=train_losses_stage_1,
        mode='lines',
        name='Training Loss (Stage 1)'
    )
)

# 添加 Stage 1 的验证损失
fig.add_trace(
    go.Scatter(
        y=val_losses_stage_1,
        mode='lines',
        name='Validation Loss (Stage 1)'
    )
)

# 添加 Stage 2 的训练损失
fig.add_trace(
    go.Scatter(
        y=train_losses_stage_2,
        mode='lines',
        name='Training Loss (Stage 2)'
    )
)

# 添加 Stage 2 的验证损失
fig.add_trace(
    go.Scatter(
        y=val_losses_stage_2,
        mode='lines',
        name='Validation Loss (Stage 2)'
    )
)

# 更新图表布局
fig.update_layout(
    title='Training and Validation Loss for Stage 1 and Stage 2',
    xaxis_title='Epochs',
    yaxis_title='Loss',
    template='plotly_white',
    legend=dict(title="Legend"),
    xaxis=dict(tickmode='linear')  # 确保 x 轴刻度按线性显示
)

# 显示图表
fig.show()


In [14]:
# 使用训练集中正常数据的重建误差计算阈值
normal_train_data = train_data[train_labels == 1]  # 假设标签 0 表示正常样本

# 获取模型的重建输出
train_reconstructed, _ = model(normal_train_data)  # 仅使用 decoder 的输出

# 计算每个样本的重建误差
train_reconstruction_error = tf.reduce_mean(
    tf.square(normal_train_data - train_reconstructed), axis=1
).numpy()

# 方法 1：基于平均值和标准差的阈值
threshold_std = train_reconstruction_error.mean() + train_reconstruction_error.std()
print(f"Reconstruction Error Threshold (1 std): {threshold_std}")

# # 方法 2：基于百分位数的阈值
# threshold_percentile = np.percentile(train_reconstruction_error, 95)
# print(f"Reconstruction Error Threshold (95th percentile): {threshold_percentile}")

threshold = threshold_std

Reconstruction Error Threshold (1 std): 0.019208043813705444


In [15]:
plotters.plot_reconstruction_error(
    model, train_data, train_labels, threshold, "Reconstruction Error on Train Data"
)
plotters.plot_reconstruction_error(
    model, test_data, test_labels, threshold, "Reconstruction Error on Test Data"
)

In [16]:
from sklearn.metrics import confusion_matrix

# [0 or 1] showing whether the ECG is [abnormal or normal].
# Test dataset: True: 560, False(anomaly): 440

# Step 1: 获取模型重建误差
# 使用测试数据进行前向传播，获取重建结果
reconstructed_data, _ = model(test_data)

# 计算重建误差
reconstruction_error = tf.reduce_mean(
    tf.square(test_data - reconstructed_data),
    axis=1
).numpy()

# Step 2: 获取正常样本和异常样本的索引
# 假设标签 1 表示正常样本，0 表示异常样本
normal_indices = (test_labels == 1)
anomaly_indices = (test_labels == 0)

# Use threshold to classify anomalies
y_pred = (reconstruction_error > threshold).astype(int)
y_pred = 1 - y_pred  # 0: Anomaly, 1: Normal
y_true = test_labels

# Calculate confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) # 0: Anomaly, 1: Normal
print("Confusion Matrix:\n", cm)

# Extract confusion matrix values
TN, FP = cm[0, 0], cm[1, 0]  # TN: Anomaly correctly classified, FP: Anomaly misclassified as Normal
FN, TP = cm[0, 1], cm[1, 1]  # FN: Normal misclassified as Anomaly, TP: Normal correctly classified

# Correctly reorder the confusion matrix for visualization
# custom_cm = [[TP, FP], [FN, TN]]
custom_cm = [[FN, TN], [TP, FP]]
print("Custom Confusion Matrix:\n", custom_cm)

# Define labels
labels = ["Normal (Positive)", "Anomaly (Negative)"]

fig = go.Figure(
    data=go.Heatmap(
        z=custom_cm,
        x=labels,
        y=labels,
        colorscale="Blues",
        texttemplate="%{z}",
        textfont={"size": 14}
    )
)
fig.update_layout(
    title="Confusion Matrix",
    xaxis_title="Predicted Labels",
    yaxis_title="True Labels",
    template="plotly_white",
    xaxis=dict(tickmode="array", tickvals=[0, 1], ticktext=labels),
    yaxis=dict(tickmode="array", tickvals=[1, 0], ticktext=labels),
)
fig.show()

Confusion Matrix:
 [[433   7]
 [104 456]]
Custom Confusion Matrix:
 [[np.int64(7), np.int64(433)], [np.int64(456), np.int64(104)]]


Model evaluation metrics

In [17]:
# Accuracy
accuracy = (TP + TN) / (TP + TN + FP + FN)

# Precision
precision = TP / (TP + FP) if (TP + FP) > 0 else 0

# Recall (Sensitivity)
recall = TP / (TP + FN) if (TP + FN) > 0 else 0

# Specificity
specificity = TN / (TN + FP) if (TN + FP) > 0 else 0

# F1 Score
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

# Print the results
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall (Sensitivity): {recall:.2f}")
print(f"Specificity: {specificity:.2f}")
print(f"F1 Score: {f1_score:.2f}")

Accuracy: 0.89
Precision: 0.81
Recall (Sensitivity): 0.98
Specificity: 0.81
F1 Score: 0.89


calculate_probabilities

In [18]:
from sklearn.metrics import precision_recall_curve, auc, roc_curve
import plotly.graph_objects as go

def calculate_probabilities(reconstruction_error, threshold):
    return 1 / (1 + np.exp(-(reconstruction_error - threshold)))

probabilities = calculate_probabilities(reconstruction_error, threshold)

# Min-Max Normalization
min_error = np.min(reconstruction_error)
max_error = np.max(reconstruction_error)
normalized_error = (reconstruction_error - min_error) / (max_error - min_error)

# Z-Score Normalization
mean_error = np.mean(reconstruction_error)
std_error = np.std(reconstruction_error)
z_scores = (reconstruction_error - mean_error) / std_error
standarlized_error = 1 / (1 + np.exp(-z_scores))

# # log error
# log_error = np.log1p(reconstruction_error)  # 使用对数扩展

probabilities = standarlized_error

Plot reconstruction loss distribution

In [19]:
import plotly.graph_objects as go
import numpy as np

y_true_binary = np.array(y_true, dtype=int)

# Negative and Positive samples based on y_true_binary
negative_samples = reconstruction_error[y_true_binary == 0]
positive_samples = reconstruction_error[y_true_binary == 1]

# Create histogram for Negative Samples
hist_negative = go.Histogram(
    x=negative_samples,
    nbinsx=50,
    opacity=0.3,
    name="Negative Samples",
    marker_color="blue"
)

# Create histogram for Positive Samples
hist_positive = go.Histogram(
    x=positive_samples,
    nbinsx=50,
    opacity=0.3,
    name="Positive Samples",
    marker_color="red"
)

# Create figure and layout
fig = go.Figure()
fig.add_trace(hist_negative)
fig.add_trace(hist_positive)

# Update layout
fig.update_layout(
    title="Reconstruction Error Distribution",
    xaxis_title="Reconstruction Error",
    yaxis_title="Frequency",
    barmode="overlay",  # Overlay histograms
    template="plotly_white"
)

# Show figure
fig.show()


可视化特征分布：
使用 t-SNE 或 PCA 对 encoder 的特征进行降维，可视化正负样本的分布情况，检查是否存在明显分离。

### PR 曲线和 ROC 曲线的重要性

1. **PR 曲线**（Precision-Recall Curve）：
   - 适用于样本类别不平衡的数据集（异常样本占比很低的场景）。
   - 描述分类器在不同阈值下的精确率和召回率之间的权衡。

2. **ROC 曲线**（Receiver Operating Characteristic Curve）：
   - 描述分类器在不同阈值下的 TPR（True Positive Rate）和 FPR（False Positive Rate）。
   - 更适合类别分布相对均衡的场景。

如果你的任务目标是优化对异常样本的检测（低漏检率和高准确性），PR 曲线的表现更为关键。



In [20]:
y_true_binary = np.array(y_true, dtype=int)

# Calculate precision-recall values
precision_values, recall_values, thresholds_pr  = precision_recall_curve(
    y_true_binary, probabilities
)
# Calculate F1 score values
best_threshold_pr = thresholds_pr[
    np.argmax(2 * precision_values * recall_values / (precision_values + recall_values))
]
print(f"Best Threshold for PR: {best_threshold_pr}")

# Calculate ROC curve values
pr_auc = auc(recall_values, precision_values)

# Calculate ROC curve values
fpr, tpr, thresholds_roc  = roc_curve(y_true_binary, probabilities)
best_threshold_roc = thresholds_roc[np.argmax(tpr - fpr)]
print(f"Best Threshold for ROC: {best_threshold_roc}")
roc_auc = auc(fpr, tpr)

# Plot PR curve
fig_pr = go.Figure()
fig_pr.add_trace(go.Scatter(
    x=recall_values, y=precision_values,
    mode='lines', name='PR Curve',
    line=dict(color='blue', width=2)
))
fig_pr.add_trace(go.Scatter(
    x=[0, 1], y=[1, 0],
    mode='lines', name='Random Classifier',
    line=dict(dash='dash', color='gray')
))
fig_pr.update_layout(
    title=f'Precision-Recall Curve (AUC: {pr_auc:.2f})',
    xaxis_title='Recall',
    yaxis_title='Precision',
    template='plotly_white'
)
fig_pr.show()

# Plot ROC curve
fig_roc = go.Figure()
fig_roc.add_trace(go.Scatter(
    x=fpr, y=tpr,
    mode='lines', name='ROC Curve',
    line=dict(color='red', width=2)
))
fig_roc.add_trace(go.Scatter(
    x=[0, 1], y=[0, 1],
    mode='lines', name='Random Classifier',
    line=dict(dash='dash', color='gray')
))
fig_roc.update_layout(
    title=f'Receiver Operating Characteristic (AUC: {roc_auc:.2f})',
    xaxis_title='False Positive Rate (FPR)',
    yaxis_title='True Positive Rate (TPR)',
    template='plotly_white'
)
fig_roc.show()

# Print AUC values
print(f"PR AUC: {pr_auc:.2f}")
print(f"ROC AUC: {roc_auc:.2f}")


Best Threshold for PR: 0.8573020696640015
Best Threshold for ROC: inf



invalid value encountered in divide



PR AUC: 0.36
ROC AUC: 0.03


#### 4. Evaluate the model on the test set

In [25]:
model.summary()

In [32]:
decoded, _ = model.predict(normal_test_data)
reconstruction_loss = mse_loss_fn(normal_test_data, decoded)
print(f"Reconstruction Loss: {reconstruction_loss.numpy()}")

[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step
Reconstruction Loss: 0.012214087881147861


In [34]:
# 获取模型的分类输出
_, classification_output = model.predict(normal_test_data)

# 定义分类损失函数
bce_loss_fn = tf.keras.losses.BinaryCrossentropy()

# 假设 normal_test_labels 是真实的分类标签
classification_loss = bce_loss_fn(normal_test_labels, classification_output)

print(f"Classification Loss: {classification_loss.numpy()}")


[1m18/18[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step 
Classification Loss: 0.028463060036301613


### **1. 理解 Reconstruction Loss 和 Classification Loss 的含义**

- Reconstruction Loss

    表示模型对输入数据的重建能力，反映了输入数据与其重建数据之间的差异。值越小，说明模型在重建方面的表现越好。

- Classification Loss
    
    反映模型的分类性能，表示模型预测的类别概率与实际标签之间的差异。值越小，说明分类性能越好。

### **2. 选择合适的评估指标**

如果分类任务很重要，您会关注分类性能指标（如 Precision, Recall, F1 Score）。
如果重建任务也重要（如异常检测），需要综合考虑 Reconstruction Loss。

In [23]:
import plotly.graph_objects as go
import numpy as np

enc_img = model.encoder(normal_test_data)
dec_img = model.decoder(enc_img)

input_data = normal_test_data[0]
reconstructed_data = dec_img[0]

# 计算误差
error = np.abs(input_data - reconstructed_data)
# Plot the error between the input and the reconstructed data
plotters.compare_org_reconstructed(input_data, reconstructed_data)


In [24]:
enc_img = model.encoder(abnormal_test_data)
dec_img = model.decoder(enc_img)

input_data = abnormal_test_data[0]
reconstructed_data = dec_img[0]

# 计算误差
error = np.abs(input_data - reconstructed_data)
# Plot the error between the input and the reconstructed data
plotters.compare_org_reconstructed(input_data, reconstructed_data)

In [40]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from sklearn.metrics import confusion_matrix

metrics = ['Reconstruction Loss', 'Classification Loss', 'Accuracy', 'Precision', 'Recall', 'Specificity', 'F1 Score']
values = [reconstruction_loss, classification_loss, accuracy, precision, recall, specificity, f1_score]

# 创建柱状图
fig1 = go.Figure(
    data=[go.Bar(x=metrics, y=values, text=values, textposition='auto')],
    layout_title_text="Comparison of Loss and Classification Metrics"
)

# 展示图形
fig1.show()
