In [None]:
AUC : 0.77

In [None]:
import os
import pandas as pd
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torch
import numpy as np
from scipy import signal
from sklearn.preprocessing import MinMaxScaler
import utils
import pywt


def wav_to_log_mel(wav_path, sr, n_fft, win_length, hop_length, n_mels, power):
    # mel_transform = torchaudio.transforms.MelSpectrogram(
    #     sample_rate=sr,
    #     n_fft=n_fft,
    #     win_length=win_length,
    #     hop_length=hop_length,
    #     n_mels=n_mels,
    #     power=power,
    # )

    mel_transform = torchaudio.transforms.Spectrogram(
        n_fft=n_fft,
        win_length=win_length,
        hop_length=hop_length,
        power=power,
        normalized = True
    )

    # wav_data, _ = torchaudio.load(wav_path)
    File = pd.read_csv(wav_path)
    amp_to_db = torchaudio.transforms.AmplitudeToDB()
    
    wav_data = torch.Tensor(File.values[:,0])
    mel_spec = mel_transform(wav_data)
    # mel_spec = torch.transpose(mel_transform(wav_data), 0, 1)

    DbScale = amp_to_db(mel_spec)
    X = torch.Tensor(mel_spec[:, :512])
    MinMax_X = torch.Tensor(MinMaxScaler().fit_transform(X))
    
    wav_data = torch.Tensor(File.values[:,1])
    mel_spec = mel_transform(wav_data)
    # mel_spec = torch.transpose(mel_transform(wav_data), 0, 1)
    DbScale = amp_to_db(mel_spec)
    Y = torch.Tensor(mel_spec[:, :512])
    MinMax_Y = torch.Tensor(MinMaxScaler().fit_transform(Y))
    

    mel_spec = torch.concat((X, MinMax_X), dim = 1)

    # noise_wav = utils.add_noise_to_wav(wav_data, -4)

    # mel_spec = mel_transform(noise_wav)
    
    # mel_spec = mel_transform(wav_data)
    return mel_spec


def get_train_loader(args):
    train_dir = args.train_dir
    sr = args.sr
    n_fft = args.n_fft
    win_length = args.win_length
    hop_length = args.hop_length
    n_mels = args.n_mels
    power = args.power

    file_list = os.listdir(train_dir)
    file_list.sort()
    file_list = [os.path.join(train_dir, file) for file in file_list if 'csv' in file]
    train_dataloader = BaselineDataLoader(
        file_list, sr, n_fft, win_length, hop_length, n_mels, power
    )

    train_loader = DataLoader(
        train_dataloader,
        batch_size=args.batch_size,
        shuffle=True,
        num_workers=args.n_workers,
    )
    return train_loader


def get_eval_loader(args):
    eval_dir = args.eval_dir
    sr = args.sr
    n_fft = args.n_fft
    win_length = args.win_length
    hop_length = args.hop_length
    n_mels = args.n_mels
    power = args.power
    
    file_list = os.listdir(eval_dir)
    file_list.sort()
    file_list = [os.path.join(eval_dir, file) for file in file_list if 'csv' in file]
    eval_dataloader = BaselineDataLoader(
        file_list, sr, n_fft, win_length, hop_length, n_mels, power
    )

    eval_loader = DataLoader(
        eval_dataloader, batch_size=1, shuffle=False, num_workers=0
    )
    return eval_loader, file_list


def get_test_loader(args):
    test_dir = args.test_dir
    sr = args.sr
    n_fft = args.n_fft
    win_length = args.win_length
    hop_length = args.hop_length
    n_mels = args.n_mels
    power = args.power

    file_list = os.listdir(test_dir)
    file_list.sort()
    file_list = [os.path.join(test_dir, file) for file in file_list if 'csv' in file]
    eval_dataloader = BaselineDataLoader(
        file_list, sr, n_fft, win_length, hop_length, n_mels, power
    )

    eval_loader = DataLoader(
        eval_dataloader, batch_size=1, shuffle=False, num_workers=0
    )
    return eval_loader, file_list


class BaselineDataLoader(Dataset):
    def __init__(self, data_path, sr, n_fft, win_length, hop_length, n_mels, power):
        self.data_path = data_path
        self.sr = sr
        self.n_fft = n_fft
        self.win_length = win_length
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.power = power

    def __len__(self):
        return len(self.data_path)

    def __getitem__(self, idx):
        wav_path = self.data_path[idx]
        log_mel_spec = wav_to_log_mel(
            wav_path,
            self.sr,
            self.n_fft,
            self.win_length,
            self.hop_length,
            self.n_mels,
            self.power,
        )

        anomaly_label = utils.get_fault_label(wav_path)

        return log_mel_spec, anomaly_label

In [None]:
# net.py

import torch
import torch.nn as nn
import torch.nn.functional as F


class Encoder(nn.Module):
  def __init__(self, in_size, latent_size):
    super().__init__()
    self.linear1 = nn.Linear(int(in_size), int(in_size/2))
    self.linear2 = nn.Linear(int(in_size/2), int(in_size/4))
    self.linear3 = nn.Linear(int(in_size/4), latent_size)
    self.relu = nn.ReLU(True)
        
  def forward(self, w):
    out = self.linear1(w)
    z1 = self.relu(out)
    out = self.linear2(out)
    z2 = self.relu(out)
    out = self.linear3(out)
    z3 = self.relu(out)
    return z1, z2, z3


class Decoder(nn.Module):
  def __init__(self, latent_size, out_size):
    super().__init__()
    self.linear1 = nn.Linear(latent_size, int(out_size/4))
    self.linear2 = nn.Linear(int(out_size/4), int(out_size/2))
    self.linear3 = nn.Linear(int(out_size/2), out_size)
    self.relu = nn.ReLU(True)
    self.sigmoid = nn.Sigmoid()
        
  def forward(self, z1, z2, z3):
    out = self.linear1(z3)
    out = self.relu(out)
    out = self.linear2(out + z2)
    out = self.relu(out)
    out= self.linear3(out + z1)
    w = self.sigmoid(out)
    return w


class AutoencoderModel(nn.Module):
    def __init__(self, w_size, z_size):
        super().__init__()
        self.encoder = Encoder(w_size, z_size)
        self.decoder1 = Decoder(z_size, w_size)
        self.decoder2 = Decoder(z_size, w_size)
  
    def forward(self, batch, n): 
        z1, z2, z3 = self.encoder(batch)
        w1 = self.decoder1(z1, z2, z3)
        w2 = self.decoder2(z1, z2, z3)
        Z1, Z2, Z3 = self.encoder(w1)
        w3 = self.decoder2(Z1, Z2, Z3)
        loss1 = 1/n*torch.mean((batch-w1)**2)+(1-1/n)*torch.mean((batch-w3)**2)
        loss2 = 1/n*torch.mean((batch-w2)**2)-(1-1/n)*torch.mean((batch-w3)**2)
        return loss1,loss2


def Autoencoder(W = 202, Z = 32):
    return AutoencoderModel(W, Z)


In [None]:
# eval.py

import numpy as np
import os
import torch
import sklearn.metrics as metrics

import train
import net
import dataset
import utils

def testing(model, test_data, alpha=.5, beta=.5):
    with torch.no_grad():
        z1, z2, z3 = model.encoder(test_data)
        w1=model.decoder1(z1, z2, z3)
        Z1, Z2, Z3 = model.encoder(w1)
        w2=model.decoder2(Z1, Z2, Z3)
        A = alpha*torch.mean((test_data-w1)**2, axis = (1,2)).cpu()+beta*torch.mean((test_data-w2)**2,axis = (1,2)).cpu()
        B = torch.mean((test_data-w1)**2, axis = (1,2)).cpu()
        C = beta*torch.mean((test_data-w2)**2,axis = (1,2)).cpu()
    return A, B, C

def eval(args):
    print("Evaluation started...")
    os.makedirs(args.result_dir, exist_ok=True)

    model = net.Autoencoder().cuda()
    model_path = os.path.join(args.model_dir, args.model_path)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    dataloader, file_list = dataset.get_eval_loader(args)

    score_list = [["File", "Score"]]

    fault_label = []
    y_true, y_predA, y_predB, y_predC = [], [], [], []
    

    for idx, data in enumerate(dataloader):
        log_mel = data[0].cuda()
        true_label = data[1].cuda()

        A, B, C = testing(model, log_mel)
         
        fault_label.append(true_label.item())

        y_true.append(1 if data[1].item() > 0 else 0)
        y_predA.append(A)
        y_predB.append(B)
        y_predC.append(C)

        file_name = os.path.splitext(file_list[idx].split("/")[-1])[0]

        # score_list.append([file_name, loss])
    y_predA = np.array(y_predA, dtype = np.float32)
    y_predB = np.array(y_predB, dtype = np.float32)
    y_predC = np.array(y_predC, dtype = np.float32)
    auc = metrics.roc_auc_score(np.array(y_true), y_predA)
    print("AUC: ", auc)
    auc = metrics.roc_auc_score(np.array(y_true), y_predB)
    print("AUC: ", auc)
    auc = metrics.roc_auc_score(np.array(y_true), y_predC)
    print("AUC: ", auc)

    # y_predB = (y_predB - np.min(y_predB)) / (np.max(y_predB) - np.min(y_predB))
    # auc = metrics.roc_auc_score(y_true, y_predB)
    # print("AUC: ", auc)
    # utils.save_csv(score_list, os.path.join(args.result_dir, "eval_score.csv"))

    # fault_types = ["normal", "inner", "outer", "ball"]

    # for fault in fault_types:
    #     if fault == "normal":
    #         continue
    #     else:
    #         fault_indices = [
    #             i
    #             for i, label in enumerate(fault_label)
    #             if (label == fault_types.index(fault) or label == 0)
    #         ]
    #         pred_labels = [y_pred[i] for i in fault_indices]
    #         true_labels = [y_true[i] for i in fault_indices]
    #         fault_auc = metrics.roc_auc_score(true_labels, pred_labels)
    #         print(f"{fault} AUC: {fault_auc}")


if __name__ == "__main__":
    os.chdir('E:/DataChallenge/usad-master/Baseline')
    args = train.get_args()
    os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu)

    eval(args)
