In [None]:
from pathlib import Path
import os
import glob
from tqdm import tqdm
import librosa
import numpy as np
import sys

In [None]:
# arguments

N_MELS = 128
FRAMES = 313
N_FFT = 1024
HOP_LENGTH=512
WIN_LENGTH = 1024
POWER = 2.0
SR = 16000

NUM_CLASS = 41

BATCH_SIZE = 128
EPOCHS = 1
LR = 1e-4

In [None]:
def file_to_vector_array(file_name,
                         n_mels=N_MELS,
                         frames=FRAMES,
                         n_fft=N_FFT,
                         hop_length=HOP_LENGTH,
                         power=POWER,
                         sr=SR):
    # 01 calculate the number of dimensions
    dims = n_mels * frames

    # 02 generate melspectrogram using librosa
    y, sr = librosa.load(file_name, sr=sr, mono=False)
    mel_spectrogram = librosa.feature.melspectrogram(y=y,
                                                     sr=sr,
                                                     n_fft=n_fft,
                                                     hop_length=hop_length,
                                                     n_mels=n_mels,
                                                     power=power)

    # 03 convert melspectrogram to log mel energy
    log_mel_spectrogram = 20.0 / power * np.log10(mel_spectrogram + sys.float_info.epsilon)

    # 04 calculate total vector size
    vector_array_size = len(log_mel_spectrogram[0, :]) - frames + 1

    # 05 skip too short clips
    if vector_array_size < 1:
        return np.empty((0, dims))

    # 06 generate feature vectors by concatenating multiframes
    vector_array = np.zeros((vector_array_size, dims))
    for t in range(frames):
        vector_array[:, n_mels * t: n_mels * (t + 1)] = log_mel_spectrogram[:, t: t + vector_array_size].T

    return vector_array

In [None]:
def list_to_vector_array(file_list,
                         msg="calc...",
                         n_mels=N_MELS,
                         frames=FRAMES,
                         n_fft=N_FFT,
                         hop_length=HOP_LENGTH,
                         power=POWER):
    # calculate the number of dimensions
    dims = n_mels * frames

    # iterate file_to_vector_array()
    for idx in tqdm(range(len(file_list)), desc=msg):
        vector_array = file_to_vector_array(file_list[idx],
                                            n_mels=n_mels,
                                            frames=frames,
                                            n_fft=n_fft,
                                            hop_length=hop_length,
                                            power=power)
        if idx == 0:
            dataset = np.zeros((vector_array.shape[0] * len(file_list), dims), np.float32)
        dataset[vector_array.shape[0] * idx: vector_array.shape[0] * (idx + 1), :] = vector_array

    return dataset

In [None]:
def file_list_generator(target_dir,
                        dir_name="train",
                        ext="wav"):
    print("target_dir : {}".format('/'.join(str(target_dir).split('/')[-2:])))

    # generate training list
    training_list_path = os.path.abspath("{dir}/{dir_name}/*.{ext}".format(dir=target_dir, dir_name=dir_name, ext=ext))
    files = sorted(glob.glob(training_list_path))
    if len(files) == 0:
        print(f"{training_list_path} -> no_wav_file!!")

    print("# of training samples : {num}".format(num=len(files)))
    return files

In [None]:
import torchaudio
import torch 

def transform_sample(x, sr=SR):
    x = x[:sr * 10]  # (1, audio_length)
    x_wav = torch.from_numpy(x)
    x_mel = mel_transform(x_wav)
    x_mel = amplitude_to_db(x_mel)
    x_mel = x_mel.unsqueeze(0)
    return x_wav, x_mel, 0


class SliderDataset(torch.utils.data.Dataset):
    def __init__(self, files):
        self.X = list_to_vector_array(files)
        self.mel_transform = torchaudio.transforms.MelSpectrogram(sample_rate=SR,
                                                      win_length=WIN_LENGTH,
                                                      hop_length=HOP_LENGTH,
                                                      n_fft=N_FFT,
                                                      n_mels=N_MELS,
                                                      power=POWER)
        self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB(stype='power')

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index, sr=SR):
        x = self.X[index]
        x = x[:sr * 10]  # (1, audio_length)
        x_wav = torch.from_numpy(x)
        #x_mel = self.mel_transform(x_wav)
        #x_mel = self.amplitude_to_db(x_mel)
        x_mel = x_wav.unsqueeze(0)
        return x_wav, x_mel

target_dir = '../input/eurecom-aml-2022-challenge-2/dev_data/dev_data/slider'

files = file_list_generator(target_dir)
ds = SliderDataset(files)
loader = torch.utils.data.DataLoader(ds, 
                                     batch_size=BATCH_SIZE, 
                                     shuffle=True, 
                                     pin_memory=True,
                                     drop_last=True,
                                     num_workers=4)



In [None]:
#Define the Convolutional Autoencoder
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
       
        self.encoder = nn.Sequential(
            nn.Conv2d(128, 256, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.Conv2d(256, 512, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.Conv2d(512, 512, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(512),
            nn.ReLU(True)
        )
        self.decoder = nn.Sequential(
            nn.Conv2d(256, 128, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.Conv2d(128, 64, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.Conv2d(64, 32, 3, padding=1, stride=(2,2)),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.Conv2d(32, 2, 3, padding=1, stride=(1,2)),
            nn.BatchNorm2d(32),
            nn.ReLU(True),
            nn.Conv2d(1, 1, 3, padding=1, stride=(1,2))
        )


    def forward(self, x):
        print(x.shape)
        x = self.encoder(x)
        x = self.decoder(x)      
        return x

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = ConvAutoencoder()

optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = torch.nn.CrossEntropyLoss().to(device)


for epoch in tqdm(range(EPOCHS)):
    total_loss = 0
    pbar = tqdm(loader, total=len(loader))
    for waveform, melspec in pbar:
        
        waveform = waveform.float().unsqueeze(1).to(device)
        melspec = melspec.float().to(device)
        
        output = model(melspec)
        loss = criterion(output, melspec)
        pbar.set_description(f'Epoch:{epoch_counter}'
                             f'\tLclf:{loss.item():.5f}\t')

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.data
        
        print(f"Epoch: {epoch_counter}\tLoss: {loss.item()}")
        if epoch % 10 == 0:
            # save model checkpoints
            checkpoint_name = 'checkpoint_best.pth.tar'
            state = {
                'epoch': epoch,
                'clf_state_dict': model.module.state_dict(),
                'optimizer': optimizer.state_dict(),
            }
            filename='checkpoint.pth.tar'
            torch.save(state, filename)