In [1]:
# Cell 0 — Imports & Device
# ============================

import os
import sys
import math
import glob
import torch
import torchaudio
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm
from torch import nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split

from sklearn import metrics
from sklearn.preprocessing import minmax_scale
from scipy.stats import rankdata
from sklearn.metrics import roc_auc_score, roc_curve, auc
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neighbors import NearestNeighbors, LocalOutlierFactor
from sklearn.cluster import KMeans
from sklearn.mixture import GaussianMixture
from scipy.spatial import distance
from torch.cuda.amp import autocast

alpha = 0.5
m = 0.7
gpu_num = 0
epoch = 300
num_classes = 41
split_ratio = 0.95
batch_size = 64
lr = 0.0001
mode = "noisy_arcmix"

save_path = r"D:\DCASE2020\Noisy-ArcMix\check_points\TASTgram_noisy_arcmix.pth"
root_path = r"D:\DCASE2020\Noisy-ArcMix\datasets"
name_list = ['fan', 'pump', 'slider', 'ToyCar', 'ToyConveyor', 'valve']

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
# Cell 1 — Class Dataset
# ============================

def file_to_log_mel_spectrogram(y, sr, n_mels, n_fft, hop_length, power):
    transform = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, power=power, pad_mode='constant', norm='slaney', mel_scale='slaney')
    mel_spectrogram = transform(y)
    log_mel_spectrogram = 20.0 / power * torch.log10(mel_spectrogram + sys.float_info.epsilon)
    return log_mel_spectrogram

class test_dataset(torch.utils.data.Dataset):
    def __init__(self, root_path, test_name, name_list):
        dataset_dir = os.path.join(root_path, test_name, 'test')
        normal_files = sorted(glob.glob('{dir}/normal_*'.format(dir=dataset_dir)))
        anomaly_files = sorted(glob.glob('{dir}/anomaly_*'.format(dir=dataset_dir)))
        
        self.test_files = np.concatenate((normal_files, anomaly_files), axis=0)
        
        normal_labels = np.zeros(len(normal_files))
        anomaly_labels = np.ones(len(anomaly_files))
        self.y_true = torch.LongTensor(np.concatenate((normal_labels, anomaly_labels), axis=0))
        
        target_idx = name_list.index(test_name)
        
        label_init_num = 0
        for i, name in enumerate(name_list):
            if i == target_idx:
                break
            label_init_num+=len(self._get_label_list(name))
            
        self.labels = []
        label_list = self._get_label_list(test_name)
        for file_name in self.test_files:
            for idx, label_idx in enumerate(label_list):
                if label_idx in file_name:
                    self.labels.append(idx + label_init_num)
        
        self.labels = torch.LongTensor(self.labels)
        
        self.y_list = []
        self.y_spec_list = []
        
        for i in tqdm(range(len(self.test_files))):
            y, sr = self._file_load(self.test_files[i])
            y_specgram = file_to_log_mel_spectrogram(y, sr, n_fft=2048, hop_length=512, n_mels=128, power=2)
            self.y_list.append(y)
            self.y_spec_list.append(y_specgram)
    
    def __getitem__(self, idx):
        anomal_label = self.y_true[idx]
        label = self.labels[idx]
        return self.y_list[idx], self.y_spec_list[idx], label, anomal_label 

    def __len__(self):
        return len(self.test_files)
    
    def _file_load(self, file_name):
        try:
            y, sr = torchaudio.load(file_name)
            y = y[..., :sr * 10]
            return y, sr
        except:
            print("file_broken or not exists!! : {}".format(file_name))
    
    def _get_label_list(self, name):
        if name == 'ToyConveyor':
            label_list = ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06'] 
    
        elif name == 'ToyCar':
            label_list = ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06', 'id_07']
        
        else:
            label_list = ['id_00', 'id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06']
            
        return label_list

class train_dataset(torch.utils.data.Dataset):
    def __init__(self, root_path, name_list): 
        data_path = [os.path.join(root_path, name, 'train') for name in name_list]
        
        files_list= [self._file_list_generator(target_path) for target_path in data_path]
        
        self.labels = []

        maximum = 0
        for i, files in enumerate(files_list):
            label_list = self._get_label_list(name_list[i])
            for file_name in files:
                for idx, label_idx in enumerate(label_list):
                    if label_idx in file_name:
                        self.labels.append(idx + maximum)
            maximum = max(self.labels)+1

        self.unrolled_files_list = list(itertools.chain.from_iterable(files_list))
        self.labels = torch.LongTensor(self.labels)
        
        self.y_list = []
        self.y_spec_list = []
        
        for i in tqdm(range(len(self.unrolled_files_list))):
            y, sr = self._file_load(self.unrolled_files_list[i])
            y_specgram = file_to_log_mel_spectrogram(y, sr, n_fft=2048, hop_length=512, n_mels=128, power=2)
            self.y_list.append(y)
            self.y_spec_list.append(y_specgram)
    
    def __getitem__(self, idx):
        return self.y_list[idx], self.y_spec_list[idx], self.labels[idx]
    
    def __len__(self):
        return len(self.unrolled_files_list)
      
    def _get_label_list(self, name):
        if name == 'ToyConveyor':
            label_list = ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06'] 
    
        elif name == 'ToyCar':
            label_list = ['id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06', 'id_07']
        
        else:
            label_list = ['id_00', 'id_01', 'id_02', 'id_03', 'id_04', 'id_05', 'id_06']
            
        return label_list 
    
    def _file_list_generator(self, target_dir):
        training_list_path = os.path.abspath('{dir}/*.wav'.format(dir=target_dir))
        files = sorted(glob.glob(training_list_path))
        if len(files) == 0:
            print('no_wav_file!!')
        return files
    
    def _file_load(self, file_name):
        try:
            y, sr = torchaudio.load(file_name)
            y = y[..., :sr * 10]
            return y, sr
        except:
            print("file_broken or not exists!! : {}".format(file_name))

In [3]:
# Cell 2 — Mode
# ============================

def get_accuracy(logits, y):
    pred_label = torch.argmax(logits, dim=-1)
    return torch.sum(pred_label == y)/len(pred_label)
  
def dataset_split(dataset, split_ratio):
    data_len = len(dataset)
    
    train_len = int(data_len * split_ratio)
    valid_len = data_len - train_len
    
    train_dataset, valid_dataset = random_split(dataset, [train_len, valid_len])

    return train_dataset, valid_dataset

def mixup_data(x_wavs, x_mels, y, device, alpha=0.5):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x_mels.size()[0]
    index = torch.randperm(batch_size).to(device)
    
    mixed_x_wavs = lam * x_wavs + (1 - lam) * x_wavs[index, :]
    mixed_x_mels = lam * x_mels + (1 - lam) * x_mels[index, :]
    y_a, y_b = y, y[index]
    return mixed_x_wavs, mixed_x_mels, y_a, y_b, lam

def noisy_arcmix_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

def arcmix_criterion(criterion, pred, pred2, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred2, y_b)

In [4]:
# Cell 3 — Loss & Network
# ============================

class ASDLoss(nn.Module):
    def __init__(self, reduction=True):
        super(ASDLoss, self).__init__()
        if reduction == True:
            self.ce = nn.CrossEntropyLoss()
        
        else:
            self.ce = nn.CrossEntropyLoss(reduction='none')

    def forward(self, logits, labels):
        loss = self.ce(logits, labels)
        return loss

# ArcFace is referred to https://github.com/ronghuaiyang/arcface-pytorch/blob/master/models/metrics.py
class ArcMarginProduct(nn.Module):
    def __init__(self, in_features=128, out_features=200, s=30.0, m=0.7, sub=1, easy_margin=False):
        super(ArcMarginProduct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.sub = sub
        self.weight = Parameter(torch.Tensor(out_features * sub, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        
        # make the function cos(theta+m) monotonic decreasing while theta in [0°,180°]
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, x, label):
        cosine = F.linear(F.normalize(x), F.normalize(self.weight))
        
        if self.sub > 1:
            cosine = cosine.view(-1, self.out_features, self.sub)
            cosine, _ = torch.max(cosine, dim=2)
        sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
        phi = cosine * self.cos_m - sine * self.sin_m
        
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where((cosine - self.th) > 0, phi, cosine - self.mm)

        one_hot = torch.zeros(cosine.size(), device=x.device)
        one_hot.scatter_(1, label.view(-1, 1).long(), 1)
        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        output = output * self.s
        return output

class Bottleneck(nn.Module):
    def __init__(self, inp, oup, stride, expansion):
        super(Bottleneck, self).__init__()
        self.connect = stride == 1 and inp == oup
        #
        self.conv = nn.Sequential(
            # pw
            nn.Conv2d(inp, inp * expansion, 1, 1, 0, bias=False),
            nn.BatchNorm2d(inp * expansion),
            nn.PReLU(inp * expansion),
            # dw
            nn.Conv2d(inp * expansion, inp * expansion, 3, stride, 1, groups=inp * expansion, bias=False),
            nn.BatchNorm2d(inp * expansion),
            nn.PReLU(inp * expansion),

            # pw-linear
            nn.Conv2d(inp * expansion, oup, 1, 1, 0, bias=False),
            nn.BatchNorm2d(oup),
        )

    def forward(self, x):
        if self.connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class ConvBlock(nn.Module):
    def __init__(self, inp, oup, k, s, p, dw=False, linear=False):
        super(ConvBlock, self).__init__()
        self.linear = linear
        if dw:
            self.conv = nn.Conv2d(inp, oup, k, s, p, groups=inp, bias=False)
        else:
            self.conv = nn.Conv2d(inp, oup, k, s, p, bias=False)
        self.bn = nn.BatchNorm2d(oup)
        if not linear:
            self.prelu = nn.PReLU(oup)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        if self.linear:
            return x
        else:
            return self.prelu(x)


#https://dcase.community/documents/challenge2022/technical_reports/DCASE2022_Liu_8_t2.pdf
Mobilefacenet_bottleneck_setting = [
    # t, c , n ,s
    [2, 128, 2, 2],
    [4, 128, 2, 2],
    [4, 128, 2, 2],
]


class MobileFaceNet(nn.Module):
    def __init__(self,
                 num_class,
                 bottleneck_setting=Mobilefacenet_bottleneck_setting):
        super(MobileFaceNet, self).__init__()

        self.conv1 = ConvBlock(3, 64, 3, 2, 1)

        self.dw_conv1 = ConvBlock(64, 64, 3, 1, 1, dw=True)

        self.inplanes = 64
        block = Bottleneck
        self.blocks = self._make_layer(block, bottleneck_setting)

        self.conv2 = ConvBlock(bottleneck_setting[-1][1], 512, 1, 1, 0)
        # 20(10), 4(2), 8(4)
        self.linear7 = ConvBlock(512, 512, (8, 20), 1, 0, dw=True, linear=True)
        
        self.linear1 = ConvBlock(512, 128, 1, 1, 0, linear=True)

        self.fc_out = nn.Linear(128, num_class)
        # init
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def _make_layer(self, block, setting):
        layers = []
        for t, c, n, s in setting:
            for i in range(n):
                if i == 0:
                    layers.append(block(self.inplanes, c, s, t))
                else:
                    layers.append(block(self.inplanes, c, 1, t))
                self.inplanes = c
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.dw_conv1(x)
        x = self.blocks(x)
        x = self.conv2(x)
        x = self.linear7(x)
        x = self.linear1(x)
        feature = x.view(x.size(0), -1)
        out = self.fc_out(feature)
        return out, feature


class TgramNet(nn.Module):
    def __init__(self, num_layer=3, mel_bins=128, win_len=1024, hop_len=512):
        super(TgramNet, self).__init__()
        # if "center=True" of stft, padding = win_len / 2
        self.conv_extrctor = nn.Conv1d(1, mel_bins, win_len, hop_len, win_len // 2, bias=False)
        self.conv_encoder = nn.Sequential(
            *[nn.Sequential(
                # 313(10) , 63(2), 126(4)
                nn.LayerNorm(313),
                nn.LeakyReLU(0.2, inplace=True),
                nn.Conv1d(mel_bins, mel_bins, 3, 1, 1, bias=False),
            ) for _ in range(num_layer)])

    def forward(self, x):
        out = self.conv_extrctor(x)
        out = self.conv_encoder(out)
        return out


class TASTgramMFN(nn.Module):
    def __init__(self, num_classes, mode,
                c_dim=128,
                win_len=1024,
                hop_len=512,
                bottleneck_setting=Mobilefacenet_bottleneck_setting,
                use_arcface=True, m=0.7, s=30, sub=1
                ):
        super().__init__()
        
        self.arcface = ArcMarginProduct(in_features=c_dim, out_features=num_classes,
                                        m=m, s=s, sub=sub) if use_arcface else use_arcface
        self.tgramnet = TgramNet(mel_bins=c_dim, win_len=win_len, hop_len=hop_len)
        self.mobilefacenet = MobileFaceNet(num_class=num_classes,
                                          bottleneck_setting=bottleneck_setting)
        self.mode = mode
        
        if mode not in ['arcface', 'arcmix', 'noisy_arcmix']:
            raise ValueError('Choose one of [arcface, arcmix, noisy_arcmix]')
        
        self.temporal_attention = Temporal_Attention(feature_dim=c_dim)
        
    def get_tgram(self, x_wav):
        return self.tgramnet(x_wav)

    def forward(self, x_wav, x_mel, label, train=True):
        x_t = self.tgramnet(x_wav).unsqueeze(1)
        
        x_mel_temp_att = self.temporal_attention(x_mel).unsqueeze(1)
        
        x = torch.cat((x_t, x_mel, x_mel_temp_att), dim=1)
        
        out, feature = self.mobilefacenet(x)
        
        if self.mode == 'arcmix':
            if train:
                out = self.arcface(feature, label[0])
                out_shuffled = self.arcface(feature, label[1])
                return out, out_shuffled, feature
            else:
                out = self.arcface(feature, label)
                return out, feature
        
        else:
            out = self.arcface(feature, label)
            return out, feature
        
        
class Temporal_Attention(nn.Module):
  def __init__(self, feature_dim=128):
    super().__init__()
    
    self.feature_dim = feature_dim
    self.max_pool = nn.AdaptiveMaxPool1d(1)
    self.avg_pool = nn.AdaptiveAvgPool1d(1)
    self.sigmoid = nn.Sigmoid()
    
  def forward(self, x):
    # x: (B, 1, 128, 313)
    x = x.squeeze(1)
    
    x = x.transpose(1,2) # (B, 313, 128)

    x1 = self.max_pool(x) # (B, 313, 1)
    x2 = self.avg_pool(x) # (B, 313, 1)
    
    feats = x1 + x2
    
    feats = feats.repeat(1, 1, self.feature_dim)
    
    refined_feats = self.sigmoid(feats).transpose(1,2) * x.transpose(1,2)
    return refined_feats

In [5]:
# Define Network

net = TASTgramMFN(num_classes=41, m=0.7, mode=mode).to(device)

In [6]:
# Cell 4 — Trainer
# ============================

class Trainer:
    def __init__(self, device, mode, m, alpha, epochs=300, class_num=41, lr=1e-4):
        self.device = device
        self.epochs = epochs
        self.alpha = alpha
        self.net = TASTgramMFN(num_classes=class_num, mode=mode, use_arcface=True, m=m).to(self.device)
        self.optimizer = torch.optim.AdamW(self.net.parameters(), lr=lr)
        
        self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(self.optimizer, T_max=epochs, eta_min=0.1*float(lr))
        self.criterion = ASDLoss().to(self.device)
        self.test_criterion = ASDLoss(reduction=False).to(self.device)
        self.mode = mode
        
        if mode not in ['arcface', 'arcmix', 'noisy_arcmix']:
            raise ValueError('Mode should be one of [arcface, arcmix, noisy_arcmix]')
        
        print(f'{mode} mode has been selected...')
        
    def train(self, train_loader, valid_loader, save_path):
        num_steps = len(train_loader)
        min_val_loss = 1e10
        
        for epoch in tqdm(range(self.epochs), total=self.epochs):
            sum_loss = 0.
            sum_accuracy = 0.
            
            for _, (x_wavs, x_mels, labels) in tqdm(enumerate(train_loader), total=num_steps):
                self.net.train()
                
                x_wavs, x_mels, labels = x_wavs.to(self.device), x_mels.to(self.device), labels.to(self.device)
                
                with autocast():
                    if self.mode == 'arcface':
                        logits, _ = self.net(x_wavs, x_mels, labels)
                        loss = self.criterion(logits, labels)
                    
                    elif self.mode == 'noisy_arcmix':
                        mixed_x_wavs, mixed_x_mels, y_a, y_b, lam = mixup_data(x_wavs, x_mels, labels, self.device, alpha=self.alpha)
                        logits, _ = self.net(mixed_x_wavs, mixed_x_mels, labels)
                        loss = noisy_arcmix_criterion(self.criterion, logits, y_a, y_b, lam)
                    
                    elif self.mode == 'arcmix':
                        mixed_x_wavs, mixed_x_mels, y_a, y_b, lam = mixup_data(x_wavs, x_mels, labels, self.device, alpha=self.alpha)
                        logits, logits_shuffled, _ = self.net(mixed_x_wavs, mixed_x_mels, [y_a, y_b])
                        loss = arcmix_criterion(self.criterion, logits, logits_shuffled, y_a, y_b, lam)
                
                sum_accuracy += get_accuracy(logits, labels)
                
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                
                sum_loss += loss.item()
            self.scheduler.step()
                
            avg_loss = sum_loss / num_steps
            avg_accuracy = sum_accuracy / num_steps
            
            valid_loss, valid_accuracy = self.valid(valid_loader)
            
            if min_val_loss > valid_loss:
                min_val_loss = valid_loss
                lr = self.scheduler.get_last_lr()[0]
                print("model has been saved!")
                print(f'lr: {lr:.7f} | EPOCH: {epoch} | Train_loss: {avg_loss:.5f} | Train_accuracy: {avg_accuracy:.5f} | Valid_loss: {valid_loss:.5f} | Valid_accuracy: {valid_accuracy:.5f}')
                # torch.save(self.net.state_dict(), save_path)
                
    def valid(self, valid_loader):
        self.net.eval()
        
        num_steps = len(valid_loader)
        sum_loss = 0.
        sum_accuracy = 0.
        
        for (x_wavs, x_mels, labels) in valid_loader:
            x_wavs, x_mels, labels = x_wavs.to(self.device), x_mels.to(self.device), labels.to(self.device)
            logits, _ = self.net(x_wavs, x_mels, labels, train=False)
            sum_accuracy += get_accuracy(logits, labels)
            loss = self.criterion(logits, labels)
            sum_loss += loss.item()
            
        avg_loss = sum_loss / num_steps 
        avg_accuracy = sum_accuracy / num_steps 
        return avg_loss, avg_accuracy
    
    def test(self, test_loader):
        self.net.eval()
        
        y_true = []
        y_pred = []
        
        sum_accuracy = 0.
        with torch.no_grad():
            for x_wavs, x_mels, labels, AN_N_labels in test_loader:
                x_wavs, x_mels, labels, AN_N_labels = x_wavs.to(self.device), x_mels.to(self.device), labels.to(self.device), AN_N_labels.to(self.device)
                
                logits, _ = self.net(x_wavs, x_mels, labels, train=False)
                score = self.test_criterion(logits, labels)
                sum_accuracy += get_accuracy(logits, labels)
                
                y_pred.extend(score.tolist())
                y_true.extend(AN_N_labels.tolist())
        auc = metrics.roc_auc_score(y_true, y_pred)
        #pauc = metrics.roc_auc_score(y_true, y_pred, max_fpr=0.1)
        return auc, sum_accuracy / len(test_loader)

In [7]:
# Cell 5 — Evaluator
# ============================

def evaluator(net, test_loader, criterion, device):
    net.eval()
    
    y_true = []
    y_pred = []
    
    with torch.no_grad():
        for x_wavs, x_mels, labels, AN_N_labels in test_loader:
            x_wavs, x_mels, labels, AN_N_labels = x_wavs.to(device), x_mels.to(device), labels.to(device), AN_N_labels.to(device)
            
            logits, _ = net(x_wavs, x_mels, labels, train=False)
            
            score = criterion(logits, labels)

            y_pred.extend(score.tolist())
            y_true.extend(AN_N_labels.tolist())
    
    auc = metrics.roc_auc_score(y_true, y_pred)
    pauc = metrics.roc_auc_score(y_true, y_pred, max_fpr=0.1)
    return auc, pauc                

In [8]:
# Cell 6 — Train Dataset
# ============================

print('training dataset loading...')
train_ds = train_dataset(root_path, name_list)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)

training dataset loading...


100%|██████████| 20119/20119 [04:04<00:00, 82.33it/s]


In [9]:
# Cell 7 — Anomaly Score Function
# ============================

def anomaly_scores(net, arcface, train_loader, test_loader, device):
    net.eval()

    # === Extract train features (normalized) ===
    train_features = []
    with torch.no_grad():
        for x_wavs, x_mels, labels in train_loader:
            x_wavs, x_mels, labels = x_wavs.to(device), x_mels.to(device), labels.to(device)
            _, features = net(x_wavs, x_mels, labels, train=False)
            train_features.append(features.cpu())
    train_features = torch.cat(train_features, dim=0).to(device)
    train_norm = F.normalize(train_features, dim=1)
    train_np = train_norm.cpu().numpy()

    # === Extract test features and ground truth labels ===
    test_features = []
    y_true = []
    logits_list = []
    label_list = []
    with torch.no_grad():
        for x_wavs, x_mels, labels, AN_N_labels in test_loader:
            x_wavs, x_mels, labels, AN_N_labels = x_wavs.to(device), x_mels.to(device), labels.to(device), AN_N_labels.to(device)
            logits, features = net(x_wavs, x_mels, labels, train=False)
            test_features.append(features.cpu())
            logits_list.append(logits.cpu())
            label_list.append(labels.cpu())
            y_true.extend(AN_N_labels.cpu().numpy())

    test_features = torch.cat(test_features, dim=0).to(device)
    test_norm = F.normalize(test_features, dim=1)
    test_np = test_norm.cpu().numpy()
    y_true = np.array(y_true)

    results = {}

    # === 1. Cross Entropy
    logits_all = torch.cat(logits_list, dim=0)
    labels_all = torch.cat(label_list, dim=0)
    ce_criterion = torch.nn.CrossEntropyLoss(reduction='none')
    ce_losses = ce_criterion(logits_all, labels_all)
    ce_scores = ce_losses.detach().cpu().numpy()

    results['CrossEntropy'] = (
        roc_auc_score(y_true, ce_scores),
        roc_auc_score(y_true, ce_scores, max_fpr=0.1)
    )

    # # === 2. Cosine Similarity using ArcFace Centers
    # arc_weights = arcface.weight.to(device)
    # arc_centers = arc_weights.view(arcface.out_features, arcface.sub, -1).mean(dim=1)
    # arc_centers = F.normalize(arc_centers, dim=1)

    # cos_sim = F.cosine_similarity(test_norm.unsqueeze(1), arc_centers.unsqueeze(0), dim=2)
    # best_cos = torch.abs(cos_sim).max(dim=1)[0]
    # cos_scores = -torch.log(best_cos + 1e-8).detach().cpu().numpy()

    # results['Cosine'] = (
    #     roc_auc_score(y_true, cos_scores),
    #     roc_auc_score(y_true, cos_scores, max_fpr=0.1)
    # )

    # === 3. KMeans=60
    km = KMeans(n_clusters=60, random_state=0).fit(train_np)
    km_centers = torch.tensor(km.cluster_centers_, dtype=torch.float32).to(device)
    km_centers = F.normalize(km_centers, dim=1)

    cos_sim_km = F.cosine_similarity(test_norm.unsqueeze(1), km_centers.unsqueeze(0), dim=2)
    best_km = torch.abs(cos_sim_km).max(dim=1)[0]
    kmeans_scores = -torch.log(best_km + 1e-8).cpu().numpy()

    results['KMeans=60'] = (
        roc_auc_score(y_true, kmeans_scores),
        roc_auc_score(y_true, kmeans_scores, max_fpr=0.1)
    )

    # === 3. KMeans=80
    km = KMeans(n_clusters=60, random_state=0).fit(train_np)
    km_centers = torch.tensor(km.cluster_centers_, dtype=torch.float32).to(device)
    km_centers = F.normalize(km_centers, dim=1)

    cos_sim_km = F.cosine_similarity(test_norm.unsqueeze(1), km_centers.unsqueeze(0), dim=2)
    best_km = torch.abs(cos_sim_km).max(dim=1)[0]
    kmeans_scores = -torch.log(best_km + 1e-8).cpu().numpy()

    results['KMeans=80'] = (
        roc_auc_score(y_true, kmeans_scores),
        roc_auc_score(y_true, kmeans_scores, max_fpr=0.1)
    )

    # === 4. GMM=60
    gmm = GaussianMixture(n_components=40, random_state=0, covariance_type='full').fit(train_np)
    gmm_scores = -gmm.score_samples(test_np)
    results['GMM=40'] = (
        roc_auc_score(y_true, gmm_scores),
        roc_auc_score(y_true, gmm_scores, max_fpr=0.1)
    )

    # === 4. GMM=80
    gmm = GaussianMixture(n_components=60, random_state=0, covariance_type='full').fit(train_np)
    gmm_scores = -gmm.score_samples(test_np)
    results['GMM=60'] = (
        roc_auc_score(y_true, gmm_scores),
        roc_auc_score(y_true, gmm_scores, max_fpr=0.1)
    )

    # === 5. LOF=5
    lof = LocalOutlierFactor(n_neighbors=5, novelty=True).fit(train_np)
    lof_scores = -lof.score_samples(test_np)
    results['LOF=5'] = (
        roc_auc_score(y_true, lof_scores),
        roc_auc_score(y_true, lof_scores, max_fpr=0.1)
    )

    # === 5. LOF=10
    lof = LocalOutlierFactor(n_neighbors=10, novelty=True).fit(train_np)
    lof_scores = -lof.score_samples(test_np)
    results['LOF=10'] = (
        roc_auc_score(y_true, lof_scores),
        roc_auc_score(y_true, lof_scores, max_fpr=0.1)
    )

    # === 6. KNN=5
    knn = NearestNeighbors(n_neighbors=5).fit(train_np)
    distances, _ = knn.kneighbors(test_np)
    knn_scores = distances.mean(axis=1)
    results['KNN=5'] = (
        roc_auc_score(y_true, knn_scores),
        roc_auc_score(y_true, knn_scores, max_fpr=0.1)
    )

    # === 6. KNN=3
    knn = NearestNeighbors(n_neighbors=3).fit(train_np)
    distances, _ = knn.kneighbors(test_np)
    knn_scores = distances.mean(axis=1)
    results['KNN=3'] = (
        roc_auc_score(y_true, knn_scores),
        roc_auc_score(y_true, knn_scores, max_fpr=0.1)
    )

    # # === 7. Mahalanobis
    # train_mean = np.mean(train_np, axis=0)
    # train_cov = np.cov(train_np, rowvar=False)
    # try:
    #     inv_cov = np.linalg.inv(train_cov)
    #     maha_scores = [distance.mahalanobis(x, train_mean, inv_cov) for x in test_np]
    #     maha_scores = np.array(maha_scores)
    #     results['Mahalanobis'] = (
    #         roc_auc_score(y_true, maha_scores),
    #         roc_auc_score(y_true, maha_scores, max_fpr=0.1)
    #     )
    # except np.linalg.LinAlgError:
    #     print("Mahalanobis: Covariance matrix is not invertible.")
    #     results['Mahalanobis'] = (0.0, 0.0)

    # === Print
    for method, (auc, pauc) in results.items():
        print(f"{method} → AUC: {auc:.4f}, pAUC: {pauc:.4f}")

    best_method = max(results.items(), key=lambda x: x[1][0])
    print(f"\nBest Method: {best_method[0]} → AUC: {best_method[1][0]:.4f}, pAUC: {best_method[1][1]:.4f}")

    return results

In [None]:
# Cell 8 — Run
# ============================

net.load_state_dict(torch.load(save_path, map_location=torch.device('cpu')))
net.eval()

avg_results = {
    'CrossEntropy': [0.0, 0.0],
    'Cosine': [0.0, 0.0],
    'KMeans=40': [0.0, 0.0],
    'KMeans=60': [0.0, 0.0],
    'GMM=60': [0.0, 0.0],
    'GMM=80': [0.0, 0.0],
    'LOF=5': [0.0, 0.0],
    'LOF=10': [0.0, 0.0],
    'KNN=5': [0.0, 0.0],
    'KNN=3': [0.0, 0.0],
    'Mahalanobis': [0.0, 0.0]
}

best_method_avg_auc = []
best_method_avg_pauc = []

for i in range(len(name_list)):
    print(f"Anomaly Score Calculation on Machine: {name_list[i]}")

    test_ds = test_dataset(root_path, name_list[i], name_list)
    test_loader = DataLoader(test_ds, batch_size=1, shuffle=False)

    results = anomaly_scores(net=net, arcface=net.arcface, train_loader=train_loader, test_loader=test_loader, device=device)

    # Average Results of Each Anomaly Score Methods
    # for method, (auc, pauc) in results.items():
    #     avg_results[method][0] += auc
    #     avg_results[method][1] += pauc

    best_method, (best_auc, best_pauc) = max(results.items(), key=lambda x: [1][0])
    best_method_avg_auc.append(best_auc)
    best_method_avg_pauc.append(best_pauc)

    print("\n" + "=" * 50 + "\n")

# Print Average Results of Each Anomaly Score Methods
# for method, (total_auc, total_pauc) in avg_results.items():
#     avg_auc = total_auc / len(name_list)
#     avg_pauc = total_pauc / len(name_list)
#     print(f"Average {method} - AUC: {avg_auc:.4f}, pAUC: {avg_pauc:.4f}")

# Print Average Results of Max Anomaly Score from Each Machine Type
print("Average of Best Methods:")
print(f"AUC: {np.mean(best_method_avg_auc):.4f}, pAUC: {np.mean(best_method_avg_pauc):.4f}")

Anomaly Score Calculation on Machine: fan


100%|██████████| 1875/1875 [00:23<00:00, 78.31it/s]


CrossEntropy → AUC: 0.9832, pAUC: 0.9534
KMeans=60 → AUC: 0.9788, pAUC: 0.9494
KMeans=80 → AUC: 0.9788, pAUC: 0.9494
GMM=40 → AUC: 0.9740, pAUC: 0.9450
GMM=60 → AUC: 0.9744, pAUC: 0.9449
LOF=5 → AUC: 0.9845, pAUC: 0.9542
LOF=10 → AUC: 0.9840, pAUC: 0.9550
KNN=5 → AUC: 0.9778, pAUC: 0.9483
KNN=3 → AUC: 0.9780, pAUC: 0.9485

Best Method: LOF=5 → AUC: 0.9845, pAUC: 0.9542


Anomaly Score Calculation on Machine: pump


100%|██████████| 856/856 [00:10<00:00, 81.69it/s]


CrossEntropy → AUC: 0.9544, pAUC: 0.8599
KMeans=60 → AUC: 0.9427, pAUC: 0.8465
KMeans=80 → AUC: 0.9427, pAUC: 0.8465
GMM=40 → AUC: 0.9372, pAUC: 0.8493
GMM=60 → AUC: 0.9279, pAUC: 0.8381
LOF=5 → AUC: 0.9433, pAUC: 0.8454
LOF=10 → AUC: 0.9421, pAUC: 0.8453
KNN=5 → AUC: 0.9384, pAUC: 0.8472
KNN=3 → AUC: 0.9385, pAUC: 0.8476

Best Method: CrossEntropy → AUC: 0.9544, pAUC: 0.8599


Anomaly Score Calculation on Machine: slider


100%|██████████| 1290/1290 [00:16<00:00, 77.26it/s]


CrossEntropy → AUC: 0.9953, pAUC: 0.9750
KMeans=60 → AUC: 0.9963, pAUC: 0.9803
KMeans=80 → AUC: 0.9963, pAUC: 0.9803
GMM=40 → AUC: 0.9963, pAUC: 0.9804
GMM=60 → AUC: 0.9962, pAUC: 0.9799
LOF=5 → AUC: 0.9969, pAUC: 0.9836
LOF=10 → AUC: 0.9973, pAUC: 0.9858
KNN=5 → AUC: 0.9965, pAUC: 0.9818
KNN=3 → AUC: 0.9967, pAUC: 0.9828

Best Method: LOF=10 → AUC: 0.9973, pAUC: 0.9858


Anomaly Score Calculation on Machine: ToyCar


100%|██████████| 2459/2459 [00:30<00:00, 79.72it/s]


CrossEntropy → AUC: 0.9676, pAUC: 0.9011
KMeans=60 → AUC: 0.9331, pAUC: 0.8140
KMeans=80 → AUC: 0.9331, pAUC: 0.8140
GMM=40 → AUC: 0.9112, pAUC: 0.7914
GMM=60 → AUC: 0.9030, pAUC: 0.7827
LOF=5 → AUC: 0.9341, pAUC: 0.8274
LOF=10 → AUC: 0.9369, pAUC: 0.8249
KNN=5 → AUC: 0.9203, pAUC: 0.8060
KNN=3 → AUC: 0.9196, pAUC: 0.8063

Best Method: CrossEntropy → AUC: 0.9676, pAUC: 0.9011


Anomaly Score Calculation on Machine: ToyConveyor


100%|██████████| 3509/3509 [00:45<00:00, 77.50it/s]


CrossEntropy → AUC: 0.7790, pAUC: 0.6715
KMeans=60 → AUC: 0.7485, pAUC: 0.6436
KMeans=80 → AUC: 0.7485, pAUC: 0.6436
GMM=40 → AUC: 0.7732, pAUC: 0.6595
GMM=60 → AUC: 0.7731, pAUC: 0.6600
LOF=5 → AUC: 0.7931, pAUC: 0.6696
LOF=10 → AUC: 0.7850, pAUC: 0.6727
KNN=5 → AUC: 0.7531, pAUC: 0.6540
KNN=3 → AUC: 0.7561, pAUC: 0.6553

Best Method: LOF=5 → AUC: 0.7931, pAUC: 0.6696


Anomaly Score Calculation on Machine: valve


100%|██████████| 879/879 [00:11<00:00, 77.96it/s]


CrossEntropy → AUC: 0.9995, pAUC: 0.9974
KMeans=60 → AUC: 0.9998, pAUC: 0.9988
KMeans=80 → AUC: 0.9998, pAUC: 0.9988
GMM=40 → AUC: 0.9992, pAUC: 0.9957
GMM=60 → AUC: 0.9992, pAUC: 0.9960
LOF=5 → AUC: 0.9983, pAUC: 0.9913
LOF=10 → AUC: 0.9986, pAUC: 0.9928
KNN=5 → AUC: 0.9996, pAUC: 0.9980
KNN=3 → AUC: 0.9996, pAUC: 0.9981

Best Method: KMeans=60 → AUC: 0.9998, pAUC: 0.9988


Average of Best Methods:
AUC: 0.9465
pAUC: 0.8931
