In [None]:
import os
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import IsolationForest
from sklearn.metrics import (roc_auc_score, recall_score, precision_score, f1_score, roc_curve,
                             confusion_matrix, ConfusionMatrixDisplay)
import pandas as pd

# Paths to audio files
anomalous_audio_path = "../Data/raw/11_A_09_experiment7/A_09_10_experiment7_8.wav"
normal_audio_path = "../Data/raw/11_A_09_experiment7/N_09_experiment7.wav"

# Paths for saving frames and datasets
output_anomalous_frames_path = "../Data/frames/anomalous_frames.npy"
output_normal_frames_path = "../Data/frames/normal_frames.npy"
train_frames_path = "../Data/datasets/train_frames.npy"
test_frames_path = "../Data/datasets/test_frames.npy"
test_labels_path = "../Data/datasets/test_labels.npy"

# Ensure necessary directories exist
os.makedirs(os.path.dirname(output_anomalous_frames_path), exist_ok=True)
os.makedirs(os.path.dirname(train_frames_path), exist_ok=True)

# Function to generate a normalized Mel-spectrogram
def generate_mel_spectrogram(audio_path):
    audio, sr = librosa.load(audio_path, sr=None)
    stft = librosa.stft(audio, n_fft=1024, hop_length=512)
    mel = librosa.feature.melspectrogram(S=np.abs(stft)**2, sr=sr, n_mels=128)
    mel_db = librosa.power_to_db(mel, ref=np.max)
    # Normalize to [0, 1]
    mel_db_norm = (mel_db - mel_db.min()) / (mel_db.max() - mel_db.min())
    return mel_db_norm, sr

# Function to generate overlapping frames from a Mel-spectrogram
def generate_frames(mel_spectrogram, frame_size, hop_size):
    num_frames = (mel_spectrogram.shape[1] - frame_size) // hop_size + 1
    frames = np.zeros((num_frames, mel_spectrogram.shape[0], frame_size))
    for i in range(num_frames):
        start = i * hop_size
        frames[i] = mel_spectrogram[:, start:start + frame_size]
    return frames

# Adjustable parameters
time_per_frame = 0.4  # Duration of one frame in seconds
hop_ratio = 0.2       # Overlap ratio between frames

# Generate Mel-spectrograms
mel_db_anomalous, sr_anomalous = generate_mel_spectrogram(anomalous_audio_path)
mel_db_normal, sr_normal = generate_mel_spectrogram(normal_audio_path)

# Ensure sampling rates match
assert sr_anomalous == sr_normal, "Sampling rates do not match!"

# Calculate frame and hop sizes
hop_length = 512
frame_size = int((time_per_frame * sr_anomalous) / hop_length)
hop_size = int(frame_size * hop_ratio)

# Generate frames
anomalous_frames = generate_frames(mel_db_anomalous, frame_size, hop_size)
normal_frames = generate_frames(mel_db_normal, frame_size, hop_size)

# Save frames
np.save(output_anomalous_frames_path, anomalous_frames)
np.save(output_normal_frames_path, normal_frames)

# Load frames
anomalous_frames = np.load(output_anomalous_frames_path)
normal_frames = np.load(output_normal_frames_path)

# Get the number of Mel bands and frame size for reshaping later
num_mels = anomalous_frames.shape[1]

# Split normal frames into training and test sets
normal_train, normal_test = train_test_split(normal_frames, test_size=0.15, random_state=42)

# Combine normal_test and anomalous_frames for testing
test_frames = np.concatenate([normal_test, anomalous_frames], axis=0)
test_labels = np.concatenate([np.zeros(len(normal_test)), np.ones(len(anomalous_frames))])

# Shuffle the test set
indices = np.arange(len(test_frames))
np.random.shuffle(indices)
test_frames = test_frames[indices]
test_labels = test_labels[indices]

# Save datasets
np.save(train_frames_path, normal_train)
np.save(test_frames_path, test_frames)
np.save(test_labels_path, test_labels)

# Load datasets
X_train = np.load(train_frames_path)
X_test = np.load(test_frames_path)
y_test = np.load(test_labels_path)

# Flatten the frames for model input
X_train_flat = X_train.reshape(len(X_train), -1)
X_test_flat = X_test.reshape(len(X_test), -1)

# Adjust labels: 1 for normal, -1 for anomalous
y_test_adjusted = np.where(y_test == 0, 1, -1)

# Define the classify_anomalies function with adjustments
def classify_anomalies(train_data, test_data, test_labels, machine, feat_label):
    scaler = MinMaxScaler(feature_range=(-5, 5))
    
    # LOF without PCA
    lof = LocalOutlierFactor(novelty=True, contamination=0.1)
    lof_pipe = Pipeline([("scaler", scaler), ("classifier", lof)])
    
    # LOF with PCA
    pca_lof = PCA(0.9)
    lof_pca_pipe = Pipeline([("scaler", scaler), ("pca", pca_lof), ("classifier", lof)])
    
    # Isolation Forest without PCA
    isolation_forest = IsolationForest(contamination=0.1, random_state=42)
    isolation_forest_pipe = Pipeline([("scaler", scaler), ("classifier", isolation_forest)])
    
    # Isolation Forest with PCA
    pca_if = PCA(0.9)
    isolation_forest_pca_pipe = Pipeline([("scaler", scaler), ("pca", pca_if), ("classifier", isolation_forest)])
    
    models = {
        "LOF": lof_pipe,
        "LOF + PCA": lof_pca_pipe,
        "Isolation Forest": isolation_forest_pipe,
        "Isolation Forest + PCA": isolation_forest_pca_pipe,
    }
    
    results = {
        "Machine": machine,
        "Features": feat_label,
        "Model": [],
        "AUC": [],
        "Recall": [],
        "Precision": [],
        "Abnormal (-1) F1": [],
    }
    
    model_results = []
    
    fig, axes = plt.subplots(ncols=len(models), nrows=1, figsize=(16, 4))

    for i, (model_name, model) in enumerate(models.items()):
        roc_ax = axes[i]
        model.fit(train_data)
        scores = model.decision_function(test_data)
        auc = roc_auc_score(test_labels, scores)
        fpr, tpr, thresholds = roc_curve(test_labels, scores)
        
        roc_ax.plot(fpr, tpr)
        roc_ax.plot([0, 1], [0, 1], linestyle='--', color='gray', alpha=0.7)
        roc_ax.set_title(f'{model_name}\n ROC Curve\n AUC = {auc:.2f}')
        roc_ax.set_xlabel('False Positive Rate')
        roc_ax.set_ylabel('True Positive Rate')
        roc_ax.legend(['ROC curve', 'Random'], fontsize="9", loc='lower right')

        # Threshold for 85% correctly classified normal data
        sorted_scores = np.sort(scores[test_labels == 1])
        threshold_index = int(0.85 * len(sorted_scores))
        threshold = sorted_scores[threshold_index]
        predictions = np.where(scores >= threshold, 1, -1)

        recall = recall_score(test_labels, predictions, pos_label=-1)
        precision = precision_score(test_labels, predictions, pos_label=-1)
        abf1 = f1_score(test_labels, predictions, pos_label=-1)
        
        # Compute confusion matrix
        conf_matrix = confusion_matrix(test_labels, predictions)

        results["Model"].append(model_name)
        results["AUC"].append(auc)
        results["Recall"].append(recall)
        results["Precision"].append(precision)
        results["Abnormal (-1) F1"].append(abf1)
        
        # Store results for this model
        model_results.append({
            "model_name": model_name,
            "scores": scores,
            "predictions": predictions,
            "conf_matrix": conf_matrix,
            "roc_auc": auc,
            "fpr": fpr,
            "tpr": tpr,
            "thresholds": thresholds,
        })

    results_df = pd.DataFrame(results)
    print(f"{feat_label} features were reduced to {pca_lof.n_components_} components for LOF")
    print(f"{feat_label} features were reduced to {pca_if.n_components_} components for Isolation Forest")
    plt.tight_layout()
    plt.show()
    
    return results_df, model_results

# Run the classification
results_df, model_results = classify_anomalies(X_train_flat, X_test_flat, y_test_adjusted, "Hydropower Machine", "Mel-Spectrogram Frames")
print(results_df)

# Select the best model based on AUC
best_model_index = np.argmax(results_df['AUC'])
best_model_name = results_df['Model'][best_model_index]
print(f"Best model based on AUC: {best_model_name}")

# Get the results for the best model
best_model_results = model_results[best_model_index]

predictions = best_model_results['predictions']
scores = best_model_results['scores']
conf_matrix = best_model_results['conf_matrix']
roc_auc = best_model_results['roc_auc']
fpr = best_model_results['fpr']
tpr = best_model_results['tpr']
thresholds = best_model_results['thresholds']

# Reshape test_frames back to original shape for plotting
frame_size = X_test_flat.shape[1] // num_mels
test_frames_reshaped = X_test_flat.reshape(-1, num_mels, frame_size)

# Identify indices
correctly_classified_anomalies = np.where((y_test_adjusted == -1) & (predictions == -1))[0]
wrongly_classified_anomalies = np.where((y_test_adjusted == -1) & (predictions == 1))[0]
wrongly_classified_normals = np.where((y_test_adjusted == 1) & (predictions == -1))[0]

# Select up to 5 samples from each category
num_samples = 5
correct_anomaly_indices = correctly_classified_anomalies[:num_samples]
wrong_anomaly_indices = wrongly_classified_anomalies[:num_samples]
wrong_normal_indices = wrongly_classified_normals[:num_samples]

# Function to plot specified frames
def plot_specified_frames(frames, indices, title_prefix="Frame"):
    num_frames = len(indices)
    fig, axes = plt.subplots(1, num_frames, figsize=(15, 5))
    for i, idx in enumerate(indices):
        ax = axes[i] if num_frames > 1 else axes
        img = librosa.display.specshow(frames[idx], x_axis=None, y_axis="mel", cmap="viridis", ax=ax)
        ax.set_title(f"{title_prefix} {i+1}")
        ax.axis("off")
    plt.tight_layout()
    plt.show()

# Plot frames
print("Correctly Classified Anomalies:")
if len(correct_anomaly_indices) > 0:
    plot_specified_frames(test_frames_reshaped, correct_anomaly_indices, title_prefix="Correct Anomaly")
else:
    print("No correctly classified anomalies.")

print("Wrongly Classified Anomalies:")
if len(wrong_anomaly_indices) > 0:
    plot_specified_frames(test_frames_reshaped, wrong_anomaly_indices, title_prefix="Wrongly Classified Anomaly")
else:
    print("No wrongly classified anomalies.")

print("Wrongly Classified Normal Frames:")
if len(wrong_normal_indices) > 0:
    plot_specified_frames(test_frames_reshaped, wrong_normal_indices, title_prefix="Wrongly Classified Normal")
else:
    print("No wrongly classified normal frames.")

# Plot Confusion Matrix
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=['Normal', 'Anomalous'])
disp.plot(cmap=plt.cm.Blues)
plt.title(f'Confusion Matrix for {best_model_name}')
plt.show()

# Plot ROC Curve
plt.figure()
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {roc_auc:.2f})')
plt.plot([0,1], [0,1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(f'Receiver Operating Characteristic for {best_model_name}')
plt.legend(loc='lower right')
plt.show()


found 0 physical cores < 1
  File "c:\Users\s.krummenacher\AppData\Local\Programs\Python\Python311\Lib\site-packages\joblib\externals\loky\backend\context.py", line 282, in _count_physical_cores
    raise ValueError(f"found {cpu_count_physical} physical cores < 1")
