In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import glob
import os
import warnings
import numpy as np

from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

from sound_utils import extract_log_mel_windows_LSTM, generate_dataset_from_list_LSTM, load_sound_file
from misc import build_files_list, dump_pickle, load_pickle
from eval_perf import (
    get_prediction,
    plot_confusion_matrix,
    plot_histogram_by_class,
    plot_loss_per_epoch,
    plot_pr_curve,
    plot_roc_curve,
)

np.random.seed(42)

In [None]:
import tensorflow as tf

from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input, Dense, BatchNormalization, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.python.client import device_lib

tf.random.set_seed(42)

In [None]:
#from tensorflow.keras.utils import multi_gpu_model

ImportError: cannot import name 'multi_gpu_model' from 'tensorflow.keras.utils' (/usr/local/lib/python3.11/dist-packages/keras/_tf_keras/keras/utils/__init__.py)

In [None]:
from bokeh.io import export_svgs, output_notebook, reset_output
from bokeh.models import BoxAnnotation, ColumnDataSource, HoverTool
from bokeh.plotting import figure, show
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    average_precision_score,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)

output_notebook()

#Processing pipeline


1.   Load data
2.   Split into training, test and validation sets
3.   Extract log-Mel spectrograms
4.   Save the spectrograms



In [None]:
root_dir = "/.../ToyCar_data"
DATA_PATH = "/.../ToyCar_data/ToyADMOS-anomaly-detection"
MODEL_PATH = "/.../ToyCar_data/ToyADMOS-anomaly-detection"

In [None]:
# Load full file lists (assuming build_files_list returns two lists)
normal_files, abnormal_files = build_files_list(root_dir)

# Randomly sample 50% of each
normal_sample_indices = np.random.choice(len(normal_files), size=len(normal_files) // 2, replace=False)
abnormal_sample_indices = np.random.choice(len(abnormal_files), size=len(abnormal_files) // 2, replace=False)

normal_files_sampled = [normal_files[i] for i in normal_sample_indices]
abnormal_files_sampled = [abnormal_files[i] for i in abnormal_sample_indices]

# Create labels for the sampled files
normal_labels = np.zeros(len(normal_files_sampled))
abnormal_labels = np.ones(len(abnormal_files_sampled))

# Split normal files into train/test
train_files, test_files, train_labels, test_labels = train_test_split(
    normal_files_sampled, normal_labels, train_size=0.8, random_state=42, shuffle=True
)

# Add abnormal files to test set
test_files = np.concatenate((test_files, abnormal_files_sampled), axis=0)
test_labels = np.concatenate((test_labels, abnormal_labels), axis=0)

# Shuffle test set
test_indices = np.arange(len(test_files))
np.random.shuffle(test_indices)

test_files = test_files[test_indices]
test_labels = test_labels[test_indices]

# Print dataset stats
print(
    f"Train set has {train_labels.shape[0]} signals including abnormal {train_labels.sum():.0f} signals, "
    f"but test set has {test_labels.shape[0]} signals including abnormal {test_labels.sum():.0f} signals."
)

Train set has 2160 signals including abnormal 0 signals, but test set has 1069 signals including abnormal 529 signals.


In [None]:
dataset = {
    "train_files": train_files,
    "test_files": test_files,
    "train_labels": train_labels,
    "test_labels": test_labels,
}

for key, values in dataset.items():
    file_name = os.path.join(DATA_PATH, "dataset", key + ".txt")
    with open(file_name, "w") as f:
        for item in values:
            f.write(str(item) + "\n")

In [None]:
# Extract spectrograms for training set
n_fft = 1024
hop_length = 512
n_mels = 80
frames = 5

train_data_path = os.path.join(DATA_PATH, "dataset", "train_data_LSTM" + ".pkl")

if os.path.exists(train_data_path):
    print("Train data already exists, loading from file...")
    train_data = load_pickle(train_data_path)

else:
    train_data = generate_dataset_from_list_LSTM(
        train_files, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, frames=frames
    )
    print("Saving train data to disk...")
    dump_pickle(train_data_path, train_data)
    print("Done.")

print(f"Train data has a {train_data.shape} shape.")

Extracting features: 100%|██████████| 2160/2160 [32:41<00:00,  1.10it/s]


Saving train data to disk...
Done.
Train data has a (734400, 5, 80) shape.


#Model 3 : LSTM Autoencoder

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, RepeatVector, TimeDistributed, Dense, BatchNormalization
from tensorflow.keras.optimizers import Adam

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, RepeatVector, TimeDistributed, Dense, BatchNormalization
from tensorflow.keras.optimizers import Adam

# Model parameters
timesteps = 5
features = 80
latent_dim = 16  # compressed representation

def LSTM_autoencoder(timesteps, features, latent_dim):
    # Input
    input_layer = Input(shape=(timesteps, features), name='encoder_input')

    # Encoder
    encoded = LSTM(128, activation='relu', return_sequences=True, name='encoder_LSTM_1')(input_layer)
    encoded = BatchNormalization(name='encoder_BN_1')(encoded)
    encoded = LSTM(64, activation='relu', return_sequences=True, name='encoder_LSTM_2')(encoded)
    encoded = BatchNormalization(name='encoder_BN_2')(encoded)
    encoded = LSTM(32, activation='relu', return_sequences=True, name='encoder_LSTM_3')(encoded)
    encoded = BatchNormalization(name='encoder_BN_3')(encoded)

    encoded = LSTM(latent_dim, activation='relu',return_sequences=False, name='encoder_bottleneck')(encoded)
    #encoded = BatchNormalization(name='encoder_BN_4')(encoded)

    # Repeat the latent vector
    repeated = RepeatVector(timesteps, name='repeat_vector')(encoded)

    # Decoder
    decoded = LSTM(32, activation='relu', return_sequences=True, name='decoder_LSTM_1')(repeated)
    decoded = BatchNormalization(name='encoder_BN_5')(decoded)
    decoded = LSTM(64, activation='relu', return_sequences=True, name='decoder_LSTM_2')(decoded)
    decoded = BatchNormalization(name='encoder_BN_6')(decoded)
    decoded = LSTM(128, activation='relu', return_sequences=True, name='decoder_LSTM_3')(decoded)
    decoded = BatchNormalization(name='decoder_BN_7')(decoded)


    # Output
    output = TimeDistributed(Dense(features), name='decoder_output')(decoded)

    # Define model
    model = Model(inputs=input_layer, outputs=output)
    return model


# Build, compile, and summarize
autoencoder = LSTM_autoencoder(timesteps=timesteps, features=features, latent_dim=16)
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
autoencoder.summary()


In [None]:
# Create model
autoencoder = LSTM_autoencoder(timesteps=timesteps, features=features, latent_dim=latent_dim)

# Compile model
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')

# Summary
autoencoder.summary()

In [None]:
# Define global constants to be used in this notebook
%%time

history = autoencoder.fit(train_data,train_data,
                  batch_size=512,
                  epochs=100,
                  callbacks=[EarlyStopping(monitor="val_loss", patience=10)],
                  validation_split=0.2,
                  verbose=1,
                  shuffle=True)

Epoch 1/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m49s[0m 21ms/step - loss: 758.6886 - val_loss: 9.7054
Epoch 2/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 8.7743 - val_loss: 8.1965
Epoch 3/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.9880 - val_loss: 8.0068
Epoch 4/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.4973 - val_loss: 7.2849
Epoch 5/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.3425 - val_loss: 6.9529
Epoch 6/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.2131 - val_loss: 7.6479
Epoch 7/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.1290 - val_loss: 6.9809
Epoch 8/100
[1m1148/1148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 7ms/step - loss: 7.0795 - val_loss: 7.1016
Epoch 9/100


In [None]:
MODEL_NAME = "Model3_LSTM_AutoEncoder"

autoencoder.save(os.path.join(MODEL_PATH, MODEL_NAME + ".h5"))



#Performance Evaluation

In [None]:
plot_loss_per_epoch(
    history, model_name=MODEL_NAME
)

In [None]:
from tqdm import tqdm
import numpy as np

recon_errors = []

for file_path in tqdm(test_files, desc="Evaluating test files"):
    # Extract log-mel spectrogram windows
    features = extract_log_mel_windows_LSTM(
        file_path,
        sr=16000,
        n_fft=1024,
        hop_length=512,
        n_mels=80,
        frames=5
    )

    if features.size == 0:
        recon_errors.append(np.nan)
        continue

    # Predict reconstructed features from the model
    predictions = autoencoder.predict(features, verbose=0)

    # Compute mean squared error per window and average over all windows
    mse_per_window = np.mean(np.square(features - predictions), axis=(1, 2))  # shape: (num_windows,)
    file_error = np.mean(mse_per_window)
    recon_errors.append(file_error)

Evaluating test files: 100%|██████████| 1069/1069 [18:52<00:00,  1.06s/it]


In [None]:
stack = np.column_stack((range(len(recon_errors)), recon_errors))
score_false = stack[test_labels == 0][:, 1]
score_true = stack[test_labels == 1][:, 1]

plot_histogram_by_class(
    score_false,
    score_true,
    bins=[20, 30],
    model_name=MODEL_NAME,
)

In [None]:
THRESHOLD_MIN =4
THRESHOLD_MAX = 8

p = figure(
    width=600,
    height=400,
    title=f"{MODEL_NAME}: Threshold Range Exploration",
    x_axis_label="Samples",
    y_axis_label="Reconstruction Error",
)

source = ColumnDataSource(
    dict(index=stack[test_labels == 0][:, 0], error=stack[test_labels == 0][:, 1])
)

p.scatter(
    "index",
    "error",
    fill_alpha=0.6,
    fill_color="crimson",
    line_color=None,
    legend_label="Normal Signals",
    source=source,
)

source = ColumnDataSource(
    dict(index=stack[test_labels == 1][:, 0], error=stack[test_labels == 1][:, 1])
)

p.scatter(
    "index",
    "error",
    fill_alpha=0.6,
    fill_color="indigo",
    line_color=None,
    legend_label="Abnormal Signals",
    source=source,
)

source = ColumnDataSource(
    data=dict(
        index=stack[:, 0],
        threshold_min=np.repeat(THRESHOLD_MIN, stack.shape[0]),
        threshold_max=np.repeat(THRESHOLD_MAX, stack.shape[0]),
    )
)

box = BoxAnnotation(
    bottom=THRESHOLD_MIN,
    top=THRESHOLD_MAX,
    fill_alpha=0.1,
    fill_color="magenta",
    line_color="darkmagenta",
    line_width=1.0,
)
p.add_layout(box)

p.legend.label_text_font_size = "8pt"
p.legend.location = "top_right"
p.title.align = "center"
p.title.text_font_size = "12pt"

p.add_tools(HoverTool(tooltips=[("index", "@index"), ("error", "@error")]))

show(p)

In [None]:
THRESHOLD_MIN =4
THRESHOLD_MAX = 8
THRESHOLD_STEP = 0.2

thresholds = np.arange(THRESHOLD_MIN, THRESHOLD_MAX + THRESHOLD_STEP, THRESHOLD_STEP)
errors = []

for threshold in thresholds:
    predictions = get_prediction(stack[:, 1], threshold=threshold)
    conf_mat = confusion_matrix(test_labels, predictions)
    errors.append([threshold, conf_mat[1, 0], conf_mat[0, 1]])

errors = np.array(errors)

p = figure(
    width=600,
    height=400,
    title=f"{MODEL_NAME}: Best Threshold Exploration",
    x_axis_label="Reconstruction Error Threshold (%)",
    y_axis_label="# Samples",
)

source = ColumnDataSource(
    data=dict(
        threshold=errors[:, 0], false_negative=errors[:, 1], false_positive=errors[:, 2]
    )
)

p.line(
    x="threshold",
    y="false_negative",
    color="crimson",
    legend_label="False Negative",
    source=source,
)

p.line(
    x="threshold",
    y="false_positive",
    color="indigo",
    legend_label="False Positive",
    source=source,
)

p.legend.label_text_font_size = "8pt"
p.legend.location = "top_left"
p.legend.click_policy = "hide"
p.title.align = "center"
p.title.text_font_size = "12pt"

p.add_tools(
    HoverTool(
        tooltips=[
            ("threshold", "@threshold"),
            ("false_negative", "@false_negative"),
            ("false_positive", "@false_positive"),
        ]
    )
)
show(p)


In [None]:
THRESHOLD = 10
predictions = get_prediction(stack[:, 1], threshold=THRESHOLD)

plot_confusion_matrix(
    confusion_matrix(test_labels, predictions),
    model_name=MODEL_NAME,
)

print(
    f"Accuracy: {accuracy_score(test_labels, predictions):.2%}, \
Precision: {precision_score(test_labels, predictions):.2%}, \
Recall: {recall_score(test_labels, predictions):.2%}, \
F1: {f1_score(test_labels, predictions):.2%}"
)

Accuracy: 78.67%, Precision: 100.00%, Recall: 56.90%, F1: 72.53%


In [None]:
plot_roc_curve(
    roc_curve(test_labels, recon_errors),
    roc_auc_score(test_labels, recon_errors),

    model_name=MODEL_NAME
)

In [None]:
auc=roc_auc_score(test_labels, recon_errors)

print(f"AUC score: {auc:.4f}")

AUC score: 0.9814


In [None]:
plot_pr_curve(
    precision_recall_curve(test_labels, recon_errors),
    average_precision_score(test_labels, recon_errors),
    model_name=MODEL_NAME
)



In [None]:
from sklearn.metrics import roc_curve, auc
import numpy as np

def compute_partial_auc(y_true, y_scores, max_fpr=0.1):
    fpr, tpr, _ = roc_curve(y_true, y_scores)

    # Keep only points where FPR <= max_fpr
    mask = fpr <= max_fpr
    fpr_partial = fpr[mask]
    tpr_partial = tpr[mask]

    # Interpolate to add (max_fpr, interpolated_tpr) if needed
    if fpr_partial[-1] < max_fpr:
        # Find next point beyond max_fpr
        idx = np.searchsorted(fpr, max_fpr)
        fpr_left, fpr_right = fpr[idx - 1], fpr[idx]
        tpr_left, tpr_right = tpr[idx - 1], tpr[idx]

        # Linear interpolation
        slope = (tpr_right - tpr_left) / (fpr_right - fpr_left)
        tpr_interp = tpr_left + slope * (max_fpr - fpr_left)

        fpr_partial = np.append(fpr_partial, max_fpr)
        tpr_partial = np.append(tpr_partial, tpr_interp)

    return auc(fpr_partial, tpr_partial)

# Usage
pauc = compute_partial_auc(test_labels, recon_errors, max_fpr=0.1)
print(f"Unnormalized Partial AUC (FPR ≤ 0.1): {pauc:.4f} or the model performs {(pauc/0.1):.1%} as well as a perfect classifier in the region where FPR ≤ 0.1.")

#pauc/0.1 * 100

Unnormalized Partial AUC (FPR ≤ 0.1): 0.0974 or the model performs 97.4% as well as a perfect classifier in the region where FPR ≤ 0.1.
