In [1]:
# Import necessary libraries
import sys
sys.path.insert(0, "..\\..\\")  # add the parent directory to path
import os
import numpy as np
import pandas as pd
import random
import json
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import librosa
import librosa.display
from sklearn.svm import OneClassSVM
from sklearn import metrics
import plotly.express as px
import pickle
from sklearn.cluster import KMeans
from sklearn import metrics
from scipy.spatial.distance import cdist

from utils.get_mel_spectrogram import get_mel_spectrogram
from utils.get_melspec_features_from_files import get_melspec_features_from_files
from utils.confusion_metrics import confusion_metrics
from utils.train_test_one_class_svm import train_test_one_class_svm
from utils.get_train_test_data import get_train_test_data
from utils.add_train_test_labels_to_dataframe import add_train_test_labels_to_dataframe

In [2]:
# Directories
parent_dir = os.path.abspath('..\\..\\..\\')  # main directory
# Audio data directory
data_dir = os.path.join(parent_dir, 'Data', 'Source')  # audio data directory
# Features directory
features_dir = os.path.join(parent_dir, 'Data', 'Features')  # audio data directory
# Metada directory
metadata_dir = os.path.join(parent_dir, 'Data', 'Metadata')  # metadata directory

In [3]:
# Load extracted features
machine_type = 'fan'

window = 1
n_mels = 32
overlap = 0.5
feature_type = 'mel_spect_db'
    
# Export the metadata and extracted features DataFrames to csv files
file_name = f"metadata_{machine_type}_all_samples.csv"
df_machine = pd.read_csv(os.path.join(features_dir, file_name), header=0, index_col=0)

file_name = f"features_{machine_type}_{feature_type}_window_{window:.3f}_overlap_ratio_{overlap:.2f}_no_mel_bands_{n_mels:d}.csv"
Xy = pd.read_csv(os.path.join(features_dir, file_name), header=0, index_col=0)

# Load params 
file_name = f"params_{machine_type}_{feature_type}_window_{window:.3f}_overlap_ratio_{overlap:.2f}_no_mel_bands_{n_mels:d}.json"
with open(os.path.join(features_dir, file_name)) as json_file:
    params = json.load(json_file)

In [4]:
# Randomly select a subset of samples for a single machine and a single model
machine = 'fan'
df_machine = df[df.machine=='fan']
df_model_all = df_machine[df_machine.model==0]

df_model = df_model_all.groupby(["anomaly"]).sample(frac=0.8, random_state=13)
df_model_test = df_model_all.drop(df_model.index)
df_model = df_model.reset_index(inplace=False, drop=False)
df_model = df_model.sample(frac=1, random_state=25).reset_index(inplace=False, drop=True)  # shuffle data
df_model_test = df_model_test.reset_index(inplace=False, drop=False)

df_model.groupby(["model", "anomaly"])['file_name'].count().reset_index(name='count')

NameError: name 'df' is not defined

In [None]:
df_machine

In [None]:
# Randomly separate a subset of samples as the final test samples
df_machine_all = df_machine.copy()

df_machine = df_machine.groupby(["anomaly"]).sample(frac=0.8, random_state=13)
df_machine_test = df_machine_all.drop(df_machine.index)

Xy_all = Xy.copy()
Xy = Xy.loc[df_machine.index]
Xy_test = Xy_all.loc[df_machine_test.index]

In [None]:
df_machine = df_machine.reset_index(inplace=False, drop=False)
df_machine_test = df_machine_test.reset_index(inplace=False, drop=False)
Xy = Xy.reset_index(inplace=False, drop=False)
Xy_test = Xy_test.reset_index(inplace=False, drop=False)

df_machine.groupby(["model", "anomaly"])['file_name'].count().reset_index(name='count')

In [None]:
X = Xy.iloc[:, :-1]
X = X.copy()
X

In [None]:
y = Xy.iloc[:, -1]
y = pd.DataFrame(y.copy())
y

In [None]:
xxxxxxxxx

In [None]:
# Feature normalization
minval = X.min()
maxval = X.max()
X = (X-minval)/(maxval-minval)

In [None]:
# Add labels for training and test
df_machine = add_train_test_labels_to_dataframe(df_machine, no_seed=12)

In [None]:
# Get training and test samples
X_train, X_test, y_train, y_test = get_train_test_data(X, y, df_machine)

In [None]:
# Histogram of features
plt.hist(X_train.iloc[:, 0])
plt.show()

In [None]:
# One class svm 
gamma = 0.01
nu = 0.5
prctle = 2
out_class = train_test_one_class_svm(X_train.to_numpy(), X_test.to_numpy(), y_train.to_numpy(), y_test.to_numpy(), 
                                     kernel='rbf', gamma=gamma, nu=nu, prctle=prctle, normalize="min-max")
# return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params

In [None]:
cm_plot = out_class[3]
df_cm = pd.DataFrame(cm_plot, range(cm_plot.shape[0]), range(cm_plot.shape[1]))
sn.set(font_scale=1.4) # for label size
sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}) # font size

plt.show()

print(out_class[1])

In [None]:
ran_gamma = [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]
ran_prctl = range(1,15)
nu = 0.5
# Initialize arrays
out_metrics_gn = {}
out_metrics_gn['acc'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_gn['precision']  = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_gn['recall'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_gn['TPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_gn['FPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
for id_g, gamma in enumerate(ran_gamma):
    for id_p, prctle in enumerate(ran_prctl):
        print(gamma, prctle)
        
        out_class = train_test_one_class_svm(X_train, X_test, y_train, y_test, 
                                     kernel='rbf', gamma=gamma, nu=nu, prctle=prctle, normalize="min-max")
        # return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params
        
        out_metrics = out_class[5]
        out_metrics_gn['acc'][id_p, id_g] = out_metrics['acc']
        out_metrics_gn['precision'][id_p, id_g] = out_metrics['precision']
        out_metrics_gn['recall'][id_p, id_g] = out_metrics['recall']
        out_metrics_gn['TPR'][id_p, id_g] = out_metrics['TPR']
        out_metrics_gn['FPR'][id_p, id_g] = out_metrics['FPR']
        

In [None]:
len(ran_gamma)

In [None]:
out_metrics_gn['recall'][:, 7]

In [None]:
out_metrics_gn['precision'][:, 7]

In [None]:
out_metrics_gn['recall'].shape

In [None]:
for id_g, gamma in enumerate(ran_gamma):
    if(id_g<8):
        plt.plot(out_metrics_gn['recall'][:, id_g], out_metrics_gn['precision'][:, id_g])
        plt.xlim([0, 1])
        plt.ylim([0, 1])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.show()

In [None]:
for id_g, gamma in enumerate(ran_gamma):
    if(id_g==5):
        print(gamma)
        plt.plot(out_metrics_gn['recall'][:, id_g], out_metrics_gn['precision'][:, id_g])
        plt.xlim([0, 1])
        plt.ylim([0, 1])
    plt.xlabel('Recall')
    plt.ylabel('Precision')
plt.show()

In [None]:
out_metrics_gn['precision'].shape

In [None]:
id_p = 12
id_g = 5
print(ran_prctl[id_p])
print("precision: ", out_metrics_gn['precision'][id_p, id_g])
print("recall: ", out_metrics_gn['recall'][id_p, id_g])

In [None]:
# Select all abnormal samples as test, equal number of normal also in test, rest in training
df_machine_all = df[df.machine=='fan']

window = 0.5
n_mels = 64
overlap = 0.25

X_all, y_all, params_all = get_melspec_features_from_files(data_dir, df_machine_all, window, n_mels, overlap)

# Add labels for training and test
df_machine_all = add_train_test_labels_to_dataframe(df_machine_all, no_seed=30)

# Get training and test samples
X_train_all, X_test_all, y_train_all, y_test_all = get_train_test_data(X_all, y_all, df_machine_all)

# One class svm 
id_p = 12
id_g = 5
prctl = ran_prctl[id_p]
gamma = ran_gamma[id_g]
nu = 0.5
out_class_selected = train_test_one_class_svm(X_train, X_test, y_train, y_test, 
                                     kernel='rbf', gamma=gamma, nu=nu, prctle=prctle, normalize="min-max")
# return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params

In [None]:
id_g_selected = 5
ran_gamma_selected = [ran_gamma[id_g_selected]]
nu = 0.5
# Initialize arrays
out_metrics_all_gn = {}
out_metrics_all_gn['acc'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['precision']  = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['recall'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['TPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['FPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
for id_g, gamma in enumerate(ran_gamma_selected):
    for id_p, prctle in enumerate(ran_prctl):
        
        out_class = train_test_one_class_svm(X_train_all, X_test_all, y_train_all, y_test_all, 
                                     kernel='rbf', gamma=0.001, nu=0.5, prctle=2, normalize="min-max")
        # return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params
        
        out_metrics = out_class[5]
        out_metrics_all_gn['acc'][id_p, id_g] = out_metrics['acc']
        out_metrics_all_gn['precision'][id_p, id_g] = out_metrics['precision']
        out_metrics_all_gn['recall'][id_p, id_g] = out_metrics['recall']
        out_metrics_all_gn['TPR'][id_p, id_g] = out_metrics['TPR']
        out_metrics_all_gn['FPR'][id_p, id_g] = out_metrics['FPR']
        

In [None]:
ran_gamma = [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]
ran_prctl_selected = 7 # range(0,15)
nu = 0.5
# Initialize arrays
out_metrics_all_gn = {}
out_metrics_all_gn['acc'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['precision']  = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['recall'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['TPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
out_metrics_all_gn['FPR'] = np.zeros((len(ran_prctl), len(ran_gamma)))
for id_g, gamma in enumerate(ran_gamma):
    for id_p, prctle in enumerate(ran_prctl_selected):
        
        out_class = train_test_one_class_svm(X_train_all, X_test_all, y_train_all, y_test_all, 
                                     kernel='rbf', gamma=0.001, nu=0.5, prctle=2, normalize="min-max")
        # return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params
        
        out_metrics = out_class[5]
        out_metrics_all_gn['acc'][id_p, id_g] = out_metrics['acc']
        out_metrics_all_gn['precision'][id_p, id_g] = out_metrics['precision']
        out_metrics_all_gn['recall'][id_p, id_g] = out_metrics['recall']
        out_metrics_all_gn['TPR'][id_p, id_g] = out_metrics['TPR']
        out_metrics_all_gn['FPR'][id_p, id_g] = out_metrics['FPR']
        

In [None]:
for id_g, gamma in enumerate(ran_gamma):
    for id_p, prctle in enumerate(ran_prctl_selected):
        if(id_g==id_g_selected):
            print(gamma)
            plt.plot(out_metrics_all_gn['recall'][:, id_g], out_metrics_all_gn['precision'][:, id_g])
            plt.xlim([0, 1])
            plt.ylim([0, 1])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.show()

In [None]:
# Metrics for the previously chosen gamma and prctle
id_p_selected = 3
prctl = ran_prctle[id_p_selected]
gamma = ran_gamma[id_g_selected]
out_class = train_test_one_class_svm(X_train_all, X_test_all, y_train_all, y_test_all, 
                                     kernel='rbf', gamma=gamma, nu=nu, prctle=prctle, normalize="min-max")
# return OneClassSVM, report, cm_train, cm_test, out_metrics_train, out_metrics_test, params

In [None]:
cm_test

In [None]:
out_metrics_test_all

In [None]:
cm_train

In [None]:
out_metrics_train_all