## Imports

In [2]:
import datasets
import numpy as np
import matplotlib.pyplot as plt
import librosa
from IPython.display import Audio
import mir_eval.sonify

import torch
import torch.nn.functional as F
import math
from torch.nn import Parameter
import torchaudio

import sklearn

from sklearn.mixture import GaussianMixture
from torchvision.models import resnet50, ResNet50_Weights
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torchvision
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn.functional as F
from sklearn import metrics
import torchvision.transforms as transforms
import os,sys
import torchvision.models as models

from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

## Load the dataset
they are stored as below:
[audio, audio_fft, ID, label]

In [4]:
import os
path = "dev_data_pump/pump/train"
dir = os.listdir( path )
TRAIN = []
for file in dir:
    audio, sr = librosa.load(path + "/" + file, sr=16000)
    audio_fft = np.abs(librosa.stft(audio[:160000], n_fft=512))
    id = int(file.split("_")[2])
    label = 0 if file.split("_")[0] == "normal" else 1
    TRAIN.append([audio, audio_fft, id, label])

path = "dev_data_pump/pump/test"
dir = os.listdir( path )
TEST = []
for file in dir:
    audio, sr = librosa.load(path + "/" + file, sr=16000)
    audio_fft = np.abs(librosa.stft(audio[:160000], n_fft=512))
    id = int(file.split("_")[2])
    label = 0 if file.split("_")[0] == "normal" else 1
    TEST.append([audio, audio_fft, id, label])

In [5]:
train_pd = pd.DataFrame(TRAIN, columns = ["audio", "audio_fft", "id", "label"])
test_pd = pd.DataFrame(TEST, columns = ["audio", "audio_fft", "id", "label"])

## Convert ID to class number
Eg. ID: [2, 4, 6] --> class number: [0, 1, 2]

In [6]:
machine_id = set(train_pd["id"])
machine_dict = {}
for i, id in enumerate(machine_id):
    machine_dict[id] = i

In [7]:
val_size = int(len(TEST) * 0.2)

## Split validation and testing data

* val: validation set
* test: testing set

In [8]:
np.random.seed = 42
np.random.shuffle(TEST)
val, test = TEST[:val_size], TEST[val_size:]
val = [[audio, audio_fft,machine_dict[type],label, 0] for audio, audio_fft, type, label in val]
test = [[audio, audio_fft,machine_dict[type],label, 0] for audio, audio_fft, type, label in test]

In [34]:
val

[[array([-0.0140686 , -0.01171875,  0.02084351, ...,  0.03231812,
          0.01644897,  0.00357056], dtype=float32),
  array([[0.24568477, 0.277211  , 0.05879305, ..., 0.65732175, 0.5531994 ,
          0.13453136],
         [0.19376574, 0.2730755 , 0.15147308, ..., 0.8691327 , 0.76502246,
          0.28746235],
         [0.07285001, 0.16690996, 0.19792154, ..., 0.72000396, 0.6468803 ,
          0.25124276],
         ...,
         [0.01654259, 0.00749196, 0.05837253, ..., 0.00505285, 0.02096094,
          0.03086548],
         [0.0167755 , 0.00965187, 0.04819172, ..., 0.03108035, 0.04782298,
          0.04613841],
         [0.01977892, 0.01779791, 0.05183254, ..., 0.05557375, 0.07579678,
          0.05398811]], dtype=float32),
  0,
  0,
  0],
 [array([-0.00158691,  0.00238037,  0.00231934, ...,  0.00137329,
         -0.00073242, -0.00054932], dtype=float32),
  array([[0.45986822, 0.39469555, 0.01927609, ..., 0.15277615, 0.00602053,
          0.27411768],
         [0.4678367 , 0.4349417

## Define model

In [9]:
class Autoencoder(nn.Module):
    def __init__(self, input_shape):
        super().__init__()
        
        self.encoder_hidden_layer = nn.Linear(in_features=input_shape, out_features=1024)
        self.encoder_middle = nn.Linear(in_features=1024,out_features=512)
        self.encoder_output_layer = nn.Linear(in_features=512,out_features=256)
        self.decoder_hidden_layer = nn.Linear(in_features=256,out_features=512)
        self.decoder_middle = nn.Linear(in_features=512,out_features=1024)
        self.decoder_output_layer = nn.Linear(in_features=1024,out_features=input_shape)
        
    
    def forward(self,features):
        x = self.encoder_hidden_layer(features)
        x = F.leaky_relu(x)
        x = self.encoder_middle(x)
        activation = F.leaky_relu(x)
        code = F.leaky_relu(self.encoder_output_layer(activation)) 
        activation = F.leaky_relu(self.decoder_middle(F.leaky_relu(self.decoder_hidden_layer(code))))
        reconstructed = torch.sigmoid(self.decoder_output_layer(activation))
        return reconstructed,code

In [10]:
class ResNetAudioClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ResNetAudioClassifier, self).__init__()
        self.resnet = models.resnet18()
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        self.resnet.fc = nn.Linear(in_features=self.resnet.fc.in_features, out_features=num_classes)

    def forward(self, x):
        x = self.resnet(x)
        return x

## Define test function

### autoencoder testing function

In [11]:
def testing_function(net, dataset, batch_size):
    net.eval()
    result = dict()
    result['score'] = list()
    criterion = nn.L1Loss()
    for x, x_stft, id, label, _ in DataLoader(dataset,batch_size=batch_size):
        with torch.no_grad():
            reconstruct, code = net(x_stft.float().to(device))
        
        for idx, r in enumerate(reconstruct):
            reconstruct_loss = criterion(r.to('cpu'), x_stft[idx].float())
            result['score'].append(reconstruct_loss.numpy())
    result['score'] = result['score']
    return result

### classifier testing function

In [12]:
def testing_function_class(net, dataset, batch_size):
    net.eval()
    result = dict()
    result['score'] = list()
    result['label'] = list()
    result['test'] = list()
    result['id'] = list()
    criterion = nn.CrossEntropyLoss()
    for x, x_stft, id, label, code in DataLoader(dataset,batch_size=batch_size):
        with torch.no_grad():
            score = net(code.unsqueeze(1).to(device))
            score = score.softmax(dim=1)
        
        for idx, s in enumerate(score):
            result['id'].append(id[idx].item())
            result['label'].append(label[idx].item())
            result['test'].append(s)
            result['score'].append(1-s[id[idx]].item())

    return result

## Utility function

In [13]:
def normalize(anomaly_score, min_score=None, max_score=None):
    if max_score == None:
        max_score = max(anomaly_score)

    if min_score == None:
        min_score = min(anomaly_score)

    norm_score = [(score- min_score)/(max_score - min_score) for score in anomaly_score]

    return norm_score

In [14]:

def roc_evaluate(y_true, y_score):
    fpr, tpr, _ = roc_curve(y_true, y_score)
    auc = roc_auc_score(y_true, y_score)

    return fpr, tpr, auc

## Load the model


In [15]:
AE = Autoencoder(1251).to(device)
AE.load_state_dict(torch.load("model/autoencoder_pump.pt", map_location='cpu'))

<All keys matched successfully>

In [16]:
Classifier = ResNetAudioClassifier(len(machine_dict.keys())).to(device)
Classifier.load_state_dict(torch.load("model/best_classifier_pump.pt", map_location='cpu'))

<All keys matched successfully>

## [TODO] create database (memory bank)

In [36]:
val_loader = DataLoader(val, batch_size=16)
hidden_layer_dict = {}

for x, x_stft, id, _, _ in val_loader:
    with torch.no_grad():
        _, code = AE(x_stft.float().to(device))
    
    for i, machine_id in enumerate(id):
        if machine_id.item() not in hidden_layer_dict:
            hidden_layer_dict[machine_id.item()] = []
        hidden_layer_dict[machine_id.item()].append(code[i].cpu().numpy())

# 計算每個ID對應的code的mean，即center
center_dict = {}
for machine_id, hidden_layers in hidden_layer_dict.items():
    center_dict[machine_id] = np.mean(hidden_layers, axis=0)


## Create new validation and testing data

*new validation and testing contains the autoencoder's feature: [audio, audio_fft, ID, label, feature]*

* new_val: new validation
* new_test: new testing

In [27]:
new_val = []
for x, x_stft, id, label, _ in DataLoader(val,batch_size=16):
    with torch.no_grad():
        reconstruct, code = AE(x_stft.to(device).float())
    for idx, c in enumerate(code):
        new_val.append([x[idx].cpu().numpy(), x_stft[idx].cpu().numpy(), id[idx].cpu().item(), label[idx].cpu().item(), c.cpu().numpy()])

In [28]:
new_test = []
for x, x_stft, id, label, _ in DataLoader(test,batch_size=16):
    with torch.no_grad():
        reconstruct, code = AE(x_stft.to(device).float())
    for idx, c in enumerate(code):
        new_test.append([x[idx].cpu().numpy(), x_stft[idx].cpu().numpy(), id[idx].cpu().item(), label[idx].cpu().item(), c.cpu().numpy()])

## Test Autoencoder on validation set so that we can know its min and max value
Its min and max value will be used for normalization

In [29]:
test_label = [label for audio, audio_fft, type, label, _ in new_val]
AE_result_val = testing_function(AE, val, 125)
_,_,auc = roc_evaluate(test_label,AE_result_val["score"])


## Autoencoder: predict reconstruction score (testing data)

In [30]:
test_label = [label for audio, audio_fft, type, label, _ in new_test]

AE_result_test = testing_function(AE, test, 125)
_,_,auc = roc_evaluate(test_label,AE_result_test["score"])
print(f"Autoencoder AUC = {auc}")

Autoencoder AUC = 0.7131144729168811


## Classifier: predict anomaly score (testing data)

In [31]:
test_label = [label for audio, audio_fft, type, label, _ in new_test]

Class_result = testing_function_class(Classifier, new_test, 32)
_,_,auc = roc_evaluate(test_label,Class_result["score"])
print(f"Classifier AUC = {auc}")


Classifier AUC = 0.6652945010462763


## Get reconstruction score
use min and max from val to normalize

In [32]:
reconstruction_score = normalize(AE_result_test['score'], np.min(AE_result_val['score']), np.max(AE_result_val['score']))
reconstruction_score = np.array(reconstruction_score)

## Get anomaly score (classifier)

In [33]:
class_anomaly_score = Class_result['score']
class_anomaly_score = np.array(class_anomaly_score)

## [TODO] Get distance from center / neighbour / ...

In [46]:
from sklearn.svm import OneClassSVM

def calculate_distance_to_center(data, center_dict): # 計算每個樣本到中心的距離
    distances = []
    for sample in data:
        # columns=["audio", "audio_fft", "id", "label", "code"])
        sample_id = sample[2]  # sample ID
        sample_code = sample[4]  # sample code
        center = center_dict.get(sample_id)  # 取得對應ID的center
        if center is not None:
            distance = np.linalg.norm(sample_code - center)  # 計算距離
            distances.append(distance)
    return distances

def normalize_distances(distances):
    max_distance = max(distances)
    min_distance = min(distances)
    return [(distance - min_distance) / (max_distance - min_distance) for distance in distances]


val_dist = calculate_distance_to_center(new_val, center_dict)
val_dist_normalized = normalize_distances(val_dist)

# SVDDmodel
svdd_model = OneClassSVM(kernel='rbf', nu=0.5) 
svdd_model.fit(np.array(val_dist_normalized).reshape(-1, 1))

test_dist = calculate_distance_to_center(new_test, center_dict)
test_dist_normalized = normalize_distances(test_dist)
anomaly_scores = -svdd_model.decision_function(np.array(test_dist_normalized).reshape(-1, 1))

auc_distance = roc_auc_score(test_label, anomaly_scores)
print(f"Distance AUC: {auc_distance}")


Distance AUC: 0.5395955541833899


## Calculate final score for each data

In [59]:
final_score = (reconstruction_score + class_anomaly_score)/2

## Final AUC

In [60]:
test_label = [label for audio, audio_fft, type, label, _ in new_test]
_,_,auc = roc_evaluate(test_label,final_score)
print(f"final auc = {auc}")

final auc = 0.7431837266293079
