# Unsupervised Audio Anomaly Detection using Gaussian Mixture Model
This notebook demonstrates "Unsupervised Audio Anoamly Detection" using Gaussian Mixture Model and Isotronic Regression.

In [2]:
import scipy
import scipy.fftpack
import numpy as np
from matplotlib import pyplot as plt
import scipy.io.wavfile as wavfile
from sklearn import svm
from sklearn import preprocessing
import multiprocessing
import librosa
from scipy.signal import welch
from multiprocessing import Pool
from tqdm import tqdm_notebook
import time
from sklearn.preprocessing import StandardScaler

SAMPLE_RATE = 32000 #sampling rate of audio
SEGMENT_TIME = 3 # in seconds, duration of each split

In [3]:
# Function to extract features from input signal(in our case Audio). 
# Various features are extracted using Librosa and concatenated as a 1D Array
def extract_feature(X):
    sample_rate = SAMPLE_RATE
    
    stft = np.abs(librosa.stft(X))
    mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T,axis=0)
    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
    mel = np.mean(librosa.feature.melspectrogram(X, sr=sample_rate).T,axis=0)
    contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate).T,axis=0)
    tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(X), sr=sample_rate).T,axis=0)
    return np.hstack([mfccs,chroma,mel,contrast,tonnetz])

In [4]:
# Function to read file, split into segments and retrieve features for each segment
def read_process_file(filename):
    print("Processing file: ", filename)
    
    # Read file into memory
    signal, fs_rate = librosa.core.load(filename, SAMPLE_RATE)
    N = len(signal)
    duration = int(SEGMENT_TIME * fs_rate)

    # Split input signal into segments
    signal = signal[: N - (N % duration)]
    samples = np.reshape(signal, (-1, duration))
    
    # Process each segment to get features
    with Pool(4) as p: # Use 4 threads to accellerate processing
         features = list(tqdm_notebook(p.imap(extract_feature, samples), total=len(samples)))

    return np.array(features).astype('float')

In [5]:
# Build single dataset with size of [N, F], 
# where N is number of segments and F is number of features for each segment
def get_files(dirname, files):
    X = None
    for file in files:
        samples_features = read_process_file(dirname + file)
        if X is None:
            X = samples_features
        else:
            X = np.vstack((X, samples_features))
    return X

In [7]:
# Specify audio files with normal sound recording
# Used for training, testing and validation
normal_file_dir = 'sound_recordings/'
normal_files = [
    'polishing_machine_single.wav',
    'vertical_cutter_machine_single.wav',
    'horizontal_cutter_machine_single.wav',
    'around_all.wav',
    'main_machine_cut_single.wav',
    'main_machine_cut_around.wav',
    'main_machine_idle_single.wav',
    'main_machine_idle_around.wav'
]

# Append label to each segment
X_normal = get_files(normal_file_dir, normal_files)
normal_labels = np.ones((X_normal.shape[0], 1))
X_normal = np.hstack((X_normal, normal_labels))

Processing file:  sound_recordings/polishing_machine_single.wav


HBox(children=(IntProgress(value=0, max=150), HTML(value='')))


Processing file:  sound_recordings/vertical_cutter_machine_single.wav


HBox(children=(IntProgress(value=0, max=89), HTML(value='')))


Processing file:  sound_recordings/horizontal_cutter_machine_single.wav


HBox(children=(IntProgress(value=0, max=117), HTML(value='')))


Processing file:  sound_recordings/around_all.wav


HBox(children=(IntProgress(value=0, max=101), HTML(value='')))


Processing file:  sound_recordings/main_machine_cut_single.wav


HBox(children=(IntProgress(value=0, max=265), HTML(value='')))


Processing file:  sound_recordings/main_machine_cut_around.wav


HBox(children=(IntProgress(value=0, max=297), HTML(value='')))


Processing file:  sound_recordings/main_machine_idle_single.wav


HBox(children=(IntProgress(value=0, max=115), HTML(value='')))


Processing file:  sound_recordings/main_machine_idle_around.wav


HBox(children=(IntProgress(value=0, max=57), HTML(value='')))




In [9]:
# Specify audio files with abnormal sound recording
# Used for testing and validation
abnormal_file_dir = 'sound_recordings/metal/'
abnormal_files = [
    'metal_cut_1.wav',
    'metal_cut_2.wav',
    'metal_cut_3.wav',
    #'metal_cut_4.wav',
    'metal_cut_5.wav',
    'metal_cut_6.wav',
    'metal_cut_7.wav',
    'metal_cut_8.wav',
    'metal_cut_9.wav',
    'metal_cut_10.wav',
    'metal_cut_11.wav',
    'metal_cut_12.wav',
    'metal_cut_13.wav',
    'metal_cut_14.wav',
    'metal_cut_auto_1.wav',
    'metal_cut_auto_2.wav',
    'metal_cut_auto_3.wav',
    'metal_cut_auto_4.wav',
    'metal_cut_auto_5.wav',
    'metal_cut_auto_6.wav',
    'metal_cut_auto_7.wav',
    'metal_cut_auto_8.wav',
    'metal_cut_auto_9.wav'
]

# Append label to each segment
X_abnormal = get_files(abnormal_file_dir, abnormal_files)
abnormal_labels = np.zeros((X_abnormal.shape[0], 1))
X_abnormal = np.hstack((X_abnormal, abnormal_labels))

Processing file:  sound_recordings/metal/metal_cut_1.wav


HBox(children=(IntProgress(value=0, max=10), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_2.wav


HBox(children=(IntProgress(value=0, max=5), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_3.wav


HBox(children=(IntProgress(value=0, max=6), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_5.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_6.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_7.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_8.wav


HBox(children=(IntProgress(value=0, max=2), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_9.wav


HBox(children=(IntProgress(value=0, max=4), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_10.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_11.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_12.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_13.wav


HBox(children=(IntProgress(value=0, max=3), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_14.wav


HBox(children=(IntProgress(value=0, max=7), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_1.wav


HBox(children=(IntProgress(value=0, max=6), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_2.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_3.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_4.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_5.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_6.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_7.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_8.wav


HBox(children=(IntProgress(value=0, max=8), HTML(value='')))


Processing file:  sound_recordings/metal/metal_cut_auto_9.wav


HBox(children=(IntProgress(value=0, max=7), HTML(value='')))




In [10]:
# Show some output about data
print("Normal data shape: ", X_normal.shape)
print("Abnormal data shape: ", X_abnormal.shape)

# Split normal data, get training dataset
train_mask = np.random.choice([False, True], len(X_normal), p=[0.40, 0.60])
X_train = X_normal[train_mask]
np.random.shuffle(X_train)
X_other = X_normal[~train_mask]

# Concatenate remaining normal data (on that's not used for training) and abnormal data
X_other_abnormal = np.vstack((X_abnormal, X_other))

# Split concatenated data for validation and test, also shuffle data
other_abnormal_mask = np.random.choice([False, True], len(X_other_abnormal), p=[0.50, 0.50])
X_val = X_other_abnormal[other_abnormal_mask]
np.random.shuffle(X_val)
X_test = X_other_abnormal[~other_abnormal_mask]
np.random.shuffle(X_test)

# Separate labels from segment features
label_index = 193
Y_train = X_train[:, label_index]
X_train = X_train[:, 0:label_index]

Y_val = X_val[:, label_index]
X_val = X_val[:, 0:label_index]

Y_test = X_test[:, label_index]
X_test = X_test[:, 0:label_index]

# Show some output about data
print("Train data shape: ", X_train.shape)
print("Train labels shape: ", Y_train.shape)
print("Validation data shape: ", X_val.shape)
print("Validation labels shape: ", Y_val.shape)
print("Test data shape: ", X_test.shape)
print("Test labels shape: ", Y_test.shape)

Normal data shape:  (1191, 194)
Abnormal data shape:  (124, 194)
Train data shape:  (728, 193)
Train labels shape:  (728,)
Validation data shape:  (280, 193)
Validation labels shape:  (280,)
Test data shape:  (307, 193)
Test labels shape:  (307,)


In [11]:
# Apply standard scaler to the data
ss = StandardScaler()
ss.fit(X_train)
X_train = ss.transform(X_train)
X_val = ss.transform(X_val)
X_test = ss.transform(X_test)

In [14]:
from sklearn.mixture import GaussianMixture
from sklearn.isotonic import IsotonicRegression

# Build Gaussian Mixture Model and fit to training set
gmm_clf = GaussianMixture(covariance_type='spherical', n_components=10, max_iter=int(1e7))  # Obtained via grid search
gmm_clf.fit(X_train)
log_probs_val = gmm_clf.score_samples(X_val)
# Also build Isotonic Regression for predictions
isotonic_regressor = IsotonicRegression(out_of_bounds='clip')
isotonic_regressor.fit(log_probs_val, Y_val)  # y_val is for labels 0 - not food 1 - food (validation set)

# Obtaining results on the validation set
log_probs_val = gmm_clf.score_samples(X_val)
val_probabilities = isotonic_regressor.predict(log_probs_val)
val_predictions = [1 if prob >= 0.5 else 0 for prob in val_probabilities]

# Calculate accuracy metrics
val_correct_pred = np.equal(Y_val, val_predictions)
val_acc = np.sum(val_correct_pred) / val_correct_pred.shape[0]
print("Validation accuracy: ", val_acc)

# Obtaining results on the test set
log_probs_test = gmm_clf.score_samples(X_test)
test_probabilities = isotonic_regressor.predict(log_probs_test)
test_predictions = [1 if prob >= 0.5 else 0 for prob in test_probabilities]

# Calculate accuracy metrics
test_correct_pred = np.equal(Y_test, test_predictions)
test_acc = np.sum(test_correct_pred) / test_correct_pred.shape[0]
print("Test accuracy: ", test_acc)


Validation accuracy:  0.9964285714285714
Test accuracy:  0.9771986970684039


In [13]:
# Also test our model with a file that was not before inside of any dataset
unseen_data = get_files(abnormal_file_dir, ['metal_cut_4.wav'])  # abnormal audio recording
log_probs_unseen = gmm_clf.score_samples(unseen_data)
unseen_probabilities = isotonic_regressor.predict(log_probs_unseen)
unseen_predictions = [1 if prob >= 0.5 else 0 for prob in unseen_probabilities]
print(unseen_predictions) # all should be 0 (abnormal data)

Processing file:  sound_recordings/metal/metal_cut_4.wav


HBox(children=(IntProgress(value=0, max=4), HTML(value='')))


[0, 0, 0, 0]
