In [1]:
import pandas as pd
import numpy as np
import librosa
import os

import torch
import torchinfo
from torch.utils.data import DataLoader

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

from transformerAEclass import *


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cuda


In [6]:
model = Transformer(emb_dim=250, heads=5, nb_transformer_blocks=6, seq_length=512)
torchinfo.summary(model, input_size=(2,1, 8000, 1)) #batch_size, channel, rows,cols

torch.Size([2, 512, 250])


Layer (type:depth-idx)                   Output Shape              Param #
Transformer                              [2, 1, 8000, 1]           --
├─AudioEmbedding: 1-1                    [2, 512, 250]             --
│    └─Sequential: 2-1                   [2, 32, 4000, 1]          --
│    │    └─Conv2d: 3-1                  [2, 32, 8000, 1]          128
│    │    └─Tanh: 3-2                    [2, 32, 8000, 1]          --
│    │    └─MaxPool2d: 3-3               [2, 32, 4000, 1]          --
│    └─Sequential: 2-2                   [2, 64, 2000, 1]          --
│    │    └─Conv2d: 3-4                  [2, 64, 4000, 1]          6,208
│    │    └─Tanh: 3-5                    [2, 64, 4000, 1]          --
│    │    └─MaxPool2d: 3-6               [2, 64, 2000, 1]          --
│    └─Sequential: 2-3                   [2, 128, 1000, 1]         --
│    │    └─Conv2d: 3-7                  [2, 128, 2000, 1]         24,704
│    │    └─Tanh: 3-8                    [2, 128, 2000, 1]         --
│    │ 

In [2]:
def load_dataset(dataset_path):
    """Loads a dataset from a CSV file."""
    return pd.read_csv(dataset_path)

def filter_dataset_by_cases_and_channels(dataset, cases, channels):
    """Filters a dataset to keep only the rows that correspond to the specified cases and channels."""
    selected_rows = pd.DataFrame()
    for case_number in cases:
        rows_for_case = dataset[dataset['Case'] == f'case{case_number}']
        selected_rows = pd.concat([selected_rows, rows_for_case])
    selected_rows = selected_rows[selected_rows['Channel'].isin(channels)]
    return selected_rows

def split_dataset_for_train_val_test(data_pd):
    """Splits a dataset into three parts: train, validation, and test."""
    normal_data = data_pd[data_pd['norm/ab'] == 'normal']
    abnormal_data = data_pd[data_pd['norm/ab'] == 'abnormal']
    
    # We only need normal data for training, but validation and test need both normal and abnormal data.
    train_data, intermediate_data = train_test_split(normal_data, test_size=0.2, shuffle=True)
    validation_data, test_data = train_test_split(pd.concat([abnormal_data, intermediate_data]), test_size=0.8, shuffle=True)

    return train_data, validation_data, test_data

def train_val_test(dataset_path=None, cases=[], channels=[]):
    """Loads a dataset, filters it, and splits it for training, validation, and test."""
    dataset = load_dataset(dataset_path)
    filtered_dataset = filter_dataset_by_cases_and_channels(dataset, cases, channels)
    train_data, validation_data, test_data = split_dataset_for_train_val_test(filtered_dataset)
    return train_data, validation_data, test_data


datapath = r'C:\Users\brech\THESIS_local\ToyADMOS\ToycarCSV.csv'
cases = [1]
channels = ['ch1']
train_data, validation_data, test_data = train_val_test(dataset_path=datapath, cases=cases, channels=channels)


In [22]:
print("train shape: ", train_data.shape, "\n", "validation shape: ", validation_data.shape, "\n", "test_dataset: ", test_data.shape)

train shape:  (1080, 7) 
 validation shape:  (106, 7) 
 test_dataset:  (428, 7)


In [3]:
#helper functions 
def find_path_to_wav(full_sample_name):
    for root, dirs, files in os.walk(os.path.dirname(datapath)):
        for name in files:
            if name == full_sample_name:
                path_to_wavFile = os.path.abspath(os.path.join(root, name))
                return path_to_wavFile


def get_sample_waveform_normalised(full_sample_name, start = 0, stop = 11):
    #returns waveform values, cut to seconds going from start to stop
    sample_path = find_path_to_wav(full_sample_name)
    waveform, sample_rate = librosa.load(sample_path, sr= None)
    waveform = waveform[int(start*sample_rate): int(stop*sample_rate)]
        
    return librosa.util.normalize(waveform)

In [4]:
X_train_wav = train_data["Full Sample Name"].values
X_test_wav = test_data["Full Sample Name"].values
X_valid_wav = validation_data["Full Sample Name"].values

batch_train = np.array([get_sample_waveform_normalised(elem,4,4.5) for elem in X_train_wav]) 
batch_test = np.array([get_sample_waveform_normalised(elem,4,4.5) for elem in X_test_wav])
batch_val = np.array([get_sample_waveform_normalised(elem,4,4.5) for elem in X_valid_wav])

X_train = DataLoader(batch_train, batch_size=64, shuffle=False)
X_test = DataLoader(batch_test, batch_size=64, shuffle=False)
X_val = DataLoader(batch_val, batch_size=64, shuffle=False)

Y_train = train_data["norm/ab"]
Y_train = np.array([1 if i == "normal" else -1 for i in Y_train]).reshape(-1, 1)

Y_val = validation_data["norm/ab"]
Y_val = np.array([1 if i == "normal" else -1 for i in Y_val]).reshape(-1, 1)

Y_test = test_data["norm/ab"]
Y_test = np.array([1 if i == "normal" else -1 for i in Y_test]).reshape(-1, 1)


In [None]:
model = Transformer().to(device=device)
#torchinfo.summary(model) #batch_size, channel, rows,cols


model_loss = nn.BCEWithLogitsLoss()    #?nn.L1Loss() best type of loss for sound?, MSE loss seems to result in lower loss
learning_rate = 0.0001  #0.0001 seems best so far
optimizer=torch.optim.Adam(model.parameters(), lr=learning_rate)


de embedding moet een vector zijn, maar de waveform kan eigenlijk op zich bekeken worden als een vector van [8000,1]
