# Supervised classification of normal/anomalous sounds.


## Import packages

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
%matplotlib inline
import seaborn as sns

from IPython.display import Image, display

from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve, f1_score, accuracy_score, classification_report
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split


import os
#import cv2


## Show the gpu infos
... and change if needed in "Execution" panel

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

Thu Dec  5 09:57:52 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   50C    P8              10W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [None]:
import sys
print("Python version:", sys.version)


Python version: 3.10.12 (main, Nov  6 2024, 20:22:13) [GCC 11.4.0]


## Import the spectrograms from Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## Build selection dataframe

The spectrogram are stored in the folder `Features/melspec_313_128/`. Each machine has its folder `fan/`, `valve/` etc... <br />
Each audio sample has its own mespectrogram flattened stored as a .npy file. So each file is one line of 313*128 = 40064 features.

We first build a dataframe df which gives us all the properties of the files. This will be used to select sounds according to their section, sound_type, etc ...

In [None]:
import os
import pandas as pd
import numpy as np

def build_selection_dataframe(folder_path):
    list_dict_file = []  # List of dictionaries for creating DataFrame

    # Define a helper function to process files from a folder
    def process_folder(folder_path):
        for subdirectory, _, files in os.walk(folder_path):
            # Extract the last part of the folder path (the folder name)
            folder_name = os.path.basename(subdirectory)

            # Determine if the folder is 'ChildrenPlayTrimmed' (normal) or an anomaly folder
            sound_type = 'normal' if folder_name == 'ChildrenPlayTrimmed' else 'anomaly'

            # Loop through files
            for file in files:
                if file.endswith('.npy'):  # Ensure we only process .npy files
                    path_file = os.path.join(subdirectory, file)
                    splitted_filename = file.split('_')
                    # Append dictionary to list
                    list_dict_file.append({
                        'filepath': path_file,
                        'machine': folder_name,  # Use folder_name as machine or category
                        'section': folder_name,  # Use folder_name as section
                        'domain_env': 'source',  # Assuming domain_env is 'source'
                        'dir': 'train',  # Initially set all as 'train'
                        'sound_type': sound_type,  # 'normal' or 'anomaly'
                        'id': path_file,  # Use path as unique ID
                        'suffix': '_'.join(splitted_filename[6:]).split('.npy')[0]  # Extract suffix
                    })

    # Process the folder
    process_folder(folder_path)

    # Convert to DataFrame
    df = pd.DataFrame(list_dict_file)

    # Split 15% of the data into 'test' for both normal and anomaly sounds
    test_sample = df.sample(frac=0.15, random_state=42)

    # Update the 'dir' for the selected 15% to 'test'
    df.loc[test_sample.index, 'dir'] = 'test'

    return df

# Paths to the folder containing your data
folder_path = '/content/drive/MyDrive/Colab Notebooks/Dataset_Melspectrograms'

# Generate the combined DataFrame
df = build_selection_dataframe(folder_path)

# Display the generated DataFrame
print("Combined DataFrame:")
print(df.head())

# Show distribution of train/test and normal/anomaly
print("\nDistribution of Data:")
print(df.groupby(['dir', 'sound_type']).size())


Combined DataFrame:
                                            filepath              machine  \
0  /content/drive/MyDrive/Colab Notebooks/Dataset...  ChildrenPlayTrimmed   
1  /content/drive/MyDrive/Colab Notebooks/Dataset...  ChildrenPlayTrimmed   
2  /content/drive/MyDrive/Colab Notebooks/Dataset...  ChildrenPlayTrimmed   
3  /content/drive/MyDrive/Colab Notebooks/Dataset...  ChildrenPlayTrimmed   
4  /content/drive/MyDrive/Colab Notebooks/Dataset...  ChildrenPlayTrimmed   

               section domain_env    dir sound_type  \
0  ChildrenPlayTrimmed     source  train     normal   
1  ChildrenPlayTrimmed     source  train     normal   
2  ChildrenPlayTrimmed     source  train     normal   
3  ChildrenPlayTrimmed     source   test     normal   
4  ChildrenPlayTrimmed     source  train     normal   

                                                  id suffix  
0  /content/drive/MyDrive/Colab Notebooks/Dataset...         
1  /content/drive/MyDrive/Colab Notebooks/Dataset...         


In [None]:
df_test = df[df['dir']=='test']
df['filepath'][0]

'/content/drive/MyDrive/Colab Notebooks/Dataset_Melspectrograms/ChildrenPlayTrimmed/Lz0Oh2cxoTU_28133-28296.npy'

In [None]:
#Encoding labels
label_encoder = LabelEncoder()
df['machine_num'] = label_encoder.fit_transform(df['machine'])

dico = {}
for i in range(6):
    dico[label_encoder.inverse_transform([i])[0]] = i
df_machine_to_num = pd.DataFrame.from_dict(dico, orient = 'index', columns = ['machine_num']).reset_index()
df_machine_to_num

Unnamed: 0,index,machine_num
0,ChildrenPlayTrimmed,0
1,accidents,1
2,car_crash,2
3,conversation,3
4,gun_shot,4
5,scream,5


## Data Generator

Since the datasets are quite big, let's make a data generator.<br/>

In [None]:
import torch
import numpy as np
import random

def data_generator(file_list, label_list, batch_size, num_classes=6, to_fit=True, device='cpu'):

    # Index used to go over file list
    index = 0

    # Infinite loop
    while True:

        # Case we looped over all the files
        if (index + 1) * batch_size >= len(file_list):
            # Reinitialize variables for a next round
            index = 0

            # Shuffle list to have different batches
            randomize = np.arange(len(file_list))
            np.random.shuffle(randomize)
            file_list = file_list[randomize]
            label_list = label_list[randomize]

        # Loop over files from index * batch size to (index + 1) * batch size
        else:
            # Get file paths
            file_chunk = file_list[index * batch_size:(index + 1) * batch_size]
            label_chunk = label_list[index * batch_size:(index + 1) * batch_size]

            # Init data and labels list
            data = []
            labels = []

            # Loop over batch files
            for file, label in zip(file_chunk, label_chunk):
                # Load the data and reshape it as necessary
                file_data = np.load(file).reshape(128, 431, 1)  # Assuming this shape is correct
                file_data = torch.tensor(file_data, dtype=torch.float32).to(device)  # Convert to PyTorch tensor

                # One-hot encoding for labels
                label_tensor = torch.tensor(label, dtype=torch.long)
                label_one_hot = torch.nn.functional.one_hot(label_tensor, num_classes=num_classes).to(device)

                data.append(file_data)
                labels.append(label_one_hot)

            # Convert lists to PyTorch tensors
            data = torch.stack(data)  # Shape: (batch_size, 128, 431, 1)
            labels = torch.stack(labels)  # Shape: (batch_size, num_classes)

            if to_fit:
                yield data, labels  # Return data and labels for training
            else:
                yield data  # Return only data for inference

            # Increment index
            index = index + 1


## Deep learning tools

## Tools to analyze results

In [None]:
def build_result_df(pred_machine_proba, model = 'model', source = False, section = False, section_num = 0):

    # Choose data
    if source:     # we take only data from the source domain
        if section:
            test_df = df[(df['dir']=='test') & (df['domain_env']=='source') & (df['section']==section_num)]
        else:
            test_df = df[(df['dir']=='test') & (df['domain_env']=='source')]
    else:         # we take data from the source and target domains
        if section:
            test_df = df[(df['dir']=='test') & (df['section']==section_num)]
        else:
            test_df = df[df['dir']=='test']

    # Reciprocal function of the sigmoid function
    # To spread input x (between 0 and 1) from ~ -11.5 to 13.8
    def inv_sigmoid(x):      # x is a proba between 0 and 1
        if 0.00000001<x and x<0.999999:
            y = np.log(x/(1-x))
        elif x<=0.00000001:
             y = np.log(0.00000001/(1-0.00000001))
        else:
             y = np.log(0.999999/(1-0.999999))
        return y

    y_pred_machine = np.argmax(pred_machine_proba, axis=1)
    pred_machine_probamax = np.max(pred_machine_proba, axis=1)
    anomaly_scores = [inv_sigmoid(1 - pred_machine_probamax[i]) for i in range(len(pred_machine_probamax))]
    y_pred_soundtype_proba = 1 - pred_machine_probamax   # proba estimate of being an anomalous sound

    y_true_machine = test_df['machine_num'].values[:len(y_pred_machine)]
    y_true_soundtype = test_df['sound_type'].replace(['normal', 'anomaly'], [0, 1]).values[:len(y_pred_machine)]

    dico = {'anomaly_score': anomaly_scores, 'true_class': y_true_machine, \
            'pred_class': y_pred_machine, 'true_soundtype': y_true_soundtype, \
            'pred_soundtype_proba':  y_pred_soundtype_proba, 'section': test_df['section'], \
            'domain': test_df['domain_env'],\
            'model': model}
    data = pd.DataFrame.from_dict(dico)

    return data

In [None]:
def show_evaluations_machine(df_result, source = False, section = False, section_num = 0):
    if source and not section:
        title = 'source domain'
    elif source and  section:
        title = 'source domain' + " - section "+str(section_num)
    else:
        title = None
    print(title)

    print(classification_report(df_result['true_class'], df_result['pred_class']))
    display(pd.crosstab(df_result['true_class'], df_result['pred_class'], rownames = ['True'], colnames = ['Pred']))
    print('\n')

In [None]:
def plot_anomaly_scores(df_res, source = False, section = False, section_num = 0):
    if source and not section:
        title = 'source domain'
    elif source and  section:
        title = 'source domain' + " - section "+str(section_num)
    else:
        title = None

    fig = plt.figure(figsize = (12, 4))
    ax1 = fig.add_subplot(121)
    ax1.set_title(title)
    g= sns.stripplot(data = df_res, x = 'true_class_name', y = 'anomaly_score', hue = 'true_soundtype', dodge = True, ax = ax1)
    g.set_xlabel("True class",fontsize=10)
    g.set_ylabel("Anomaly score",fontsize=10)
    g.tick_params(labelsize=10)
    handles, _ = g.get_legend_handles_labels()
    g.legend(handles, ['normal', 'anormal'], loc = 'lower right', title = 'sound type')

    ax2 = fig.add_subplot(122)
    h = sns.stripplot(data = df_res, x = 'true_class_name', y = 'anomaly_score', hue = 'pred_class_name', dodge = True, ax = ax2)
    h.set_xlabel("True class",fontsize=10)
    h.set_ylabel("Anomaly score",fontsize=10)
    h.tick_params(labelsize=10)
    ax2.set_title(title)
    h.legend(loc = 'lower right', title = 'pred class')

In [None]:
def plot_anomaly_acc_vs_threshold(df_res, source = False, section = False, section_num = 0):
    if source and not section:
        title = 'source domain'
    elif source and section:
        title = 'source domain' + " - section "+str(section_num)
    else:
        title = None

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize = (12, 4))

    for machine in ['ChildrenPlayTrimmed', 'accidents', 'car_crash', 'conversation', 'gun_shot', 'scream']:
        df_res_mach = df_res[df_res['true_machine_name'] == machine].copy()

        # Use a threshold on the anomaly score
        accuracies_1 = []
        score_thresholds = np.linspace(-18.4, 5, 3000)
        for threshold in score_thresholds:
            df_res_mach['pred_soundtype_1'] = df_res_mach['anomaly_score'].apply(lambda score: 1 if score > threshold else 0)
            accuracy = accuracy_score(df_res_mach['true_soundtype'], df_res_mach['pred_soundtype_1'])
            accuracies_1.append(accuracy)

        # Use a threshold on the prediction proba
        accuracies_2 = []
        proba_thresholds = np.linspace(0, 1, 3000)
        for threshold in proba_thresholds:
            df_res_mach['pred_soundtype_2'] = df_res_mach['pred_soundtype_proba'].apply(lambda p: 1 if p > threshold else 0)
            accuracy = accuracy_score(df_res_mach['true_soundtype'], df_res_mach['pred_soundtype_2'])
            accuracies_2.append(accuracy)

        # Plot soundtype accuracies vs threshold for all machines
        ax1.plot(score_thresholds, accuracies_1, label = machine)
        ax2.plot(proba_thresholds, accuracies_2, label = machine)

    ax1.legend()
    ax1.set_xlabel("Anomaly score threshold")
    ax1.set_ylabel("Accuracy")
    ax1.set_title(title)
    ax2.legend()
    ax2.set_xlabel("Probability threshold")
    ax2.set_ylabel("Accuracy")
    ax2.set_title(title)





In [None]:
def compute_AUCs(df_res, source = False, section = False, section_num = 0):
    dict_aucs = {}
    for machine in ['ChildrenPlayTrimmed', 'accidents', 'car_crash', 'conversation', 'gun_shot', 'scream']:
        df_res_mach = df_res[df_res['true_machine_name'] == machine].copy()
        auc = roc_auc_score(df_res_mach['true_soundtype'], df_res_mach['pred_soundtype_proba'])
        dict_aucs[machine] = auc

    # Plot all machine AUCs
    if source and not section:
        title = 'source domain'
    elif source and section:
        title = 'source domain' + " - section "+str(section_num)
    else:
        title = None

    fig = plt.figure(figsize = (6, 4))
    plt.xticks(range(7), dict_aucs.keys())
    plt.ylabel("AUC")
    plt.grid(True, axis = 'y')
    plt.bar(range(7), dict_aucs.values())
    plt.title(title)




In [None]:
# Other definition closer to the challenge version
# Gives absurd results -> where is the mistake?
def compute_AUCs_2(df_res, source = False, section = False, section_num = 0):
    if source==False or section==False:
        RaiseValueError("The AUC is not (yet) defined in this case")

    dict_aucs = {}
    for machine in ['ChildrenPlayTrimmed', 'accidents', 'car_crash', 'conversation', 'gun_shot', 'scream']:
        df_res_mach = df_res[df_res['true_machine_name'] == machine].copy()
        #auc = roc_auc_score(df_res_mach['true_soundtype'], df_res_mach['pred_soundtype_proba'])
        # Anomaly scores of normal sounds sorted by descending order
        y_normal_sorted = df_res_mach[df_res_mach['true_soundtype'] == 0] \
                            .sort_values(by = 'anomaly_score', ascending = False)['anomaly_score'].values
        # Anomaly scores of anormal sounds sorted by descending order
        y_anormal_sorted = df_res_mach[df_res_mach['true_soundtype'] == 1] \
                            .sort_values(by = 'anomaly_score', ascending = False)['anomaly_score'].values
        #auc = np.sum(np.heaviside(y_anormal_sorted - y_normal_sorted, 0.5)) / 50 / 50
        auc = 0
        for anomaly_sc_anormal, anomaly_sc_normal in zip(y_anormal_sorted, y_normal_sorted):
            auc += np.heaviside(anomaly_sc_anormal - anomaly_sc_normal, 0.5)
        auc = auc / 50 / 50
        dict_aucs[machine] = auc

    # Plot all machine AUCs
    if source and not section:
        title = 'source domain'
    elif source and section:
        title = 'source domain' + " - section "+str(section_num)
    else:
        title = None

    plt.xticks(range(7), dict_aucs.keys())
    plt.ylabel("AUC (challenge version)")
    plt.grid(True, axis = 'y')
    plt.bar(range(7), dict_aucs.values())
    plt.title(title)


## RUN : predict the machine and deduce the sound type

### First model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNN_RegDrop(nn.Module):
    def __init__(self):
        super(CNN_RegDrop, self).__init__()

        # Input: (batch_size, 1, 128, 431)

        # BatchNormalization for input
        self.batch_norm1 = nn.BatchNorm2d(1)

        # First Conv2D layer with reduced filters (16 filters, kernel size 7x7)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(7, 7), padding=0)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.batch_norm2 = nn.BatchNorm2d(16)  # BatchNormalization after conv1

        # Second Conv2D layer with reduced filters (32 filters, kernel size 3x3)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.batch_norm3 = nn.BatchNorm2d(32)  # BatchNormalization after conv2

        # Flatten
        self.flatten = nn.Flatten()

        # Fully connected layer with L2 regularization and Dropout
        # L2 regularization in PyTorch is done using weight decay in the optimizer (not here)
        self.fc1 = nn.Linear(97440, 64)
        self.dropout = nn.Dropout(0.5)  # 50% dropout
        self.batch_norm4 = nn.BatchNorm1d(64)  # BatchNormalization for fully connected layer

        # Output layer with softmax for classification (6 classes)
        self.fc2 = nn.Linear(64, 6)

    def forward(self, x):
        # Input shape: (batch_size, 1, 128, 431)

        # First conv block: Conv2d -> ReLU -> MaxPool2d -> BatchNorm
        x = self.batch_norm1(x)
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = self.batch_norm2(x)

        # Second conv block: Conv2d -> ReLU -> MaxPool2d -> BatchNorm
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = self.batch_norm3(x)

        # Flatten the output
        x = self.flatten(x)

        # Fully connected layer with dropout and batch norm
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.batch_norm4(x)

        # Output layer with softmax
        x = F.softmax(self.fc2(x), dim=1)  # Softmax for multi-class classification

        return x


In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np

# Custom Dataset class for loading data in PyTorch
class CustomDataset(Dataset):
    def __init__(self, filepaths, labels, num_classes=6):
        self.filepaths = filepaths
        self.labels = labels
        self.num_classes = num_classes

    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, idx):
        # Load data and reshape
        data = np.load(self.filepaths[idx]).reshape(1, 128, 431).astype(np.float32)
        label = self.labels[idx]
        label_one_hot = np.eye(self.num_classes)[label].astype(np.float32)
        return torch.tensor(data), torch.tensor(label_one_hot)

# Function to plot training diagnostics
def plot_training_diag(train_acc, val_acc, train_loss, val_loss):
    # Plot accuracy
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.plot(range(1, len(train_acc) + 1), train_acc, label='Training Accuracy', color='blue')
    plt.plot(range(1, len(val_acc) + 1), val_acc, label='Validation Accuracy', color='red')
    plt.legend()
    plt.title("Training and Validation Accuracy")

    # Plot loss
    plt.subplot(1, 2, 2)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.plot(range(1, len(train_loss) + 1), train_loss, label='Training Loss', color='blue')
    plt.plot(range(1, len(val_loss) + 1), val_loss, label='Validation Loss', color='red')
    plt.legend()
    plt.title("Training and Validation Loss")

    plt.show()

# Training function with early stopping and learning rate scheduler
def fit_and_predict_machine(model, criterion, optimizer, df, source=False, section=False, section_num=0, model_name='model'):

    # Select data from source and/or target domains
    if source:
        if section:
            train_df = df[(df['dir'] == 'train') & (df['domain_env'] == 'source') & (df['section'] == section_num)]
            test_df = df[(df['dir'] == 'test') & (df['domain_env'] == 'source') & (df['section'] == section_num)]
        else:
            train_df = df[(df['dir'] == 'train') & (df['domain_env'] == 'source')]
            test_df = df[(df['dir'] == 'test') & (df['domain_env'] == 'source')]
    else:
        if section:
            train_df = df[(df['dir'] == 'train') & (df['section'] == section_num)]
            test_df = df[(df['dir'] == 'test') & (df['section'] == section_num)]
        else:
            train_df = df[df['dir'] == 'train']
            test_df = df[df['dir'] == 'test']

    train_filepaths, valid_filepaths, y_train, y_valid = train_test_split(
    train_df['filepath'].tolist(),  # Convert to list
    train_df['machine_num'].tolist(),  # Convert to list
    test_size=0.2,
    stratify=train_df['machine_num']
    )

    # Set up DataLoaders
    batch_size = 32
    num_classes = 6
    train_dataset = CustomDataset(train_filepaths, y_train, num_classes)
    valid_dataset = CustomDataset(valid_filepaths, y_valid, num_classes)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

    # Training settings
    epochs = 100
    scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.3, verbose=True)
    early_stopping_patience = 10
    best_val_loss = np.inf
    early_stopping_counter = 0
    train_acc_history, val_acc_history = [], []
    train_loss_history, val_loss_history = [], []  # To store loss for each epoch

    # Training and validation loop
    for epoch in range(epochs):
        model.train()
        running_train_loss = 0.0
        correct_train = 0
        total_train = 0

        for data, labels in train_loader:
            data, labels = data.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = model(data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_train_loss += loss.item() * data.size(0)
            _, predicted = torch.max(outputs, 1)
            _, labels_max = torch.max(labels, 1)
            correct_train += (predicted == labels_max).sum().item()
            total_train += labels.size(0)

        train_loss = running_train_loss / len(train_loader.dataset)
        train_accuracy = correct_train / total_train
        train_acc_history.append(train_accuracy)
        train_loss_history.append(train_loss)  # Append training loss

        # Validation
        model.eval()
        running_val_loss = 0.0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for data, labels in valid_loader:
                data, labels = data.cuda(), labels.cuda()
                outputs = model(data)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item() * data.size(0)

                _, predicted = torch.max(outputs, 1)
                _, labels_max = torch.max(labels, 1)
                correct_val += (predicted == labels_max).sum().item()
                total_val += labels.size(0)

        val_loss = running_val_loss / len(valid_loader.dataset)
        val_accuracy = correct_val / total_val
        val_acc_history.append(val_accuracy)
        val_loss_history.append(val_loss)  # Append validation loss

        print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, '
              f'Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}')

        # Learning rate scheduler step
        scheduler.step(val_loss)

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), f'/content/drive/MyDrive/Colab Notebooks/model/{model_name}.pt')  # Updated save path
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= early_stopping_patience:
                print("Early stopping triggered.")
                break

    # Plot training diagnostics
    plot_training_diag(train_acc_history, val_acc_history, train_loss_history, val_loss_history)  # Pass loss history

    # Load best model and get test predictions
    model.load_state_dict(torch.load(f'/content/drive/MyDrive/Colab Notebooks/model/{model_name}.pt'))

    # Convert to lists to avoid KeyError during indexing
    test_filepaths = test_df['filepath'].tolist()
    test_labels = test_df['machine_num'].tolist()

    test_dataset = CustomDataset(test_filepaths, test_labels, num_classes)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    model.eval()
    test_pred = []
    with torch.no_grad():
        for data, _ in test_loader:
            data = data.cuda()
            outputs = model(data)
            test_pred.append(outputs.cpu().numpy())

    return np.concatenate(test_pred, axis=0)



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

model = CNN_RegDrop().cuda()

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()  # For multi-class classification
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)  # Includes L2 regularization

# Call the fit_and_predict_machine function
pred_machine = fit_and_predict_machine(model, criterion, optimizer, df, source=False, model_name='CNN_RegDrop')

In [None]:
df_result = build_result_df(pred_machine, source = False, model = 'CNN_RegDrop_New')

  y_true_soundtype = test_df['sound_type'].replace(['normal', 'anomaly'], [0, 1]).values[:len(y_pred_machine)]


In [None]:
df_results_allmodels = df_result.copy()

In [None]:
show_evaluations_machine(df_result)

None
              precision    recall  f1-score   support

           0       1.00      1.00      1.00        29
           1       0.00      0.00      0.00         1
           2       0.80      1.00      0.89         4
           3       1.00      1.00      1.00         2
           4       0.90      1.00      0.95         9
           5       1.00      0.83      0.91         6

    accuracy                           0.96        51
   macro avg       0.78      0.81      0.79        51
weighted avg       0.95      0.96      0.95        51



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Pred,0,2,3,4,5
True,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,29,0,0,0,0
1,0,1,0,0,0
2,0,4,0,0,0
3,0,0,2,0,0
4,0,0,0,9,0
5,0,0,0,1,5




