In [None]:
### Scripts to analyze audio

In [None]:
BASEDIR="/usr0/home/amadaan/audio/tacotron_baseline/Tacotron-pytorch/training-adb/"

### Mel-spectogram classifier

In [None]:
from torch.utils.data import Dataset
import torch
import json
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import glob
import os
from collections import Counter

#### Dataloader/Dataset

In [None]:
def _pad_2d(x, max_len):
    x = np.pad(x, [(0, max_len - len(x)), (0, 0)],
               mode="constant", constant_values=0)
    return x

class MelDataset(Dataset):
    
    def __init__(self, pth):
        self.mel_files = glob.glob(f"{BASEDIR}/*mel*")
        print(f"{len(self.mel_files)} mel-files found")
        # the files are supposed to be named accent_speaker_*.wav, 
        # e.g. australian_s02_362.wav
        self.labels = [os.path.basename(mel_file_pth).split("_")[:2] for mel_file_pth in self.mel_files]
        labels = [os.path.basename(mel_file_pth).split("_")[:2] for mel_file_pth in self.mel_files]
        
        # get accent labels, make a dict
        self.accent_labels = [l[0] for l in self.labels]
        self.accent_label_dict = {k: i for i, k in enumerate(sorted(Counter(self.accent_labels).keys()))}
        print(self.accent_label_dict)
        
        # same processing for the speakers
        self.speaker_labels = [" ".join(l) for l in self.labels]
        self.speaker_label_dict = {k: i for i, k in enumerate(sorted(Counter(self.speaker_labels).keys()))}
        print(self.speaker_label_dict)

    def __getitem__(self, i):
        return np.load(self.mel_files[i])
    
    def __len__(self):
        return len(self.mel_files)

    @staticmethod
    def batchify(dataset, bsz, shuffle=True):
        idx = list(range(len(dataset)))
        if shuffle:
            np.random.shuffle(idx)

        for begin in range(0, len(dataset), bsz):
            end = min(begin + bsz, len(dataset))
            num_elems = end - begin
    
            
            # read all the mels for this batch, find the max length
            mels = [dataset[idx[i]] for i in range(begin, end)]
            seq_lengths = torch.LongTensor([len(mel) for mel in mels])
            max_target_len = seq_lengths.max().item()
            
            
            b = np.array([_pad_2d(mel, max_target_len) for mel in mels],
                 dtype=np.float32)
            mel_batch = torch.FloatTensor(b)
            speaker_labels = torch.LongTensor([dataset.speaker_label_dict[dataset.speaker_labels[idx[i]]]\
                                               for i in range(begin, end)])
            accent_labels = torch.LongTensor([dataset.accent_label_dict[dataset.accent_labels[idx[i]]]\
                                              for i in range(begin, end)])
            
            
            seq_lengths, perm_idx = seq_lengths.sort(0, descending=True)
            
            
            yield mel_batch[perm_idx], speaker_labels[perm_idx], accent_labels[perm_idx], seq_lengths
            
            

In [None]:
# dataset sanity checks
dataset = MelDataset(BASEDIR)

# check shape of one mel file
print(dataset[0].shape)

# dataloader check
dataloader = MelDataset.batchify(dataset, 32)

# check two batches for correct batchification:
for _ in range(2):
    mel, speaker, accent, input_lengths = next(dataloader)
    print(mel.shape, speaker.shape, accent.shape)

#### Model

In [None]:
class MelClassifier(nn.Module):
    def __init__(self, 
                 num_class,
                 mel_spectogram_dim: int = 80,
                 gru_hidden_size=32,
                 gru_num_layers=2):
        super(MelClassifier, self).__init__()
        self.num_class = num_class
        self.conv_block_1 = nn.Sequential(
                              nn.Conv1d(in_channels=mel_spectogram_dim, out_channels=64, kernel_size=3),
                              nn.ELU(),
                              nn.BatchNorm1d(64),
                              nn.MaxPool2d((2, 2)),
                              nn.Dropout(p=0.1))
        self.gru = nn.GRU(input_size=32, hidden_size=gru_hidden_size, num_layers=gru_num_layers,\
                          bidirectional=True, batch_first=True, dropout=0.3)
        num_directions = 2
        self.mlp = nn.Linear(gru_hidden_size * gru_num_layers * num_directions, self.num_class)

    def forward(self, mel_batch, input_lengths):
        batch_size = len(mel_batch)
        # mel_batch -> (batch_size, max_time_step, 80)
        conv_output = self.conv_block_1(mel_batch.permute(0, 2, 1)).permute(0, 2, 1)
        # conv_output -> (batch_size, max_time_step, 32)
        conv_output = nn.utils.rnn.pack_padded_sequence(
                    conv_output, input_lengths, batch_first=True)
        output, h_n = self.gru(conv_output)
        # h_n -> (4, batch_size, 32)
        
        h_n = h_n.permute(1, 0, 2).reshape(batch_size, -1)
        return self.mlp(h_n)
        

#### Training loop

In [None]:
device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = MelClassifier(len(dataset.accent_label_dict)).to(device)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3,
                                    betas=(0.9, 0.99),
                                    eps=1e-6,
                                    weight_decay=0.01)

In [None]:
num_epochs = 10
dataset = MelDataset(BASEDIR)

dataloader = MelDataset.batchify(dataset, 32)

loss_func = nn.CrossEntropyLoss()

losses = []
for epoch in range(num_epochs):
    for i, (mels, speakers, accents) in enumerate(dataloader):
        mels = mels.to(device)
        speakers = speakers.to(device)
        accents = accents.to(device)
        
        optimizer.zero_grad()
        
        logits = model(mels)
        loss = loss_func(logits, accents).mean()
        
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        if i % 50 == 0:
            print(f"Epoch = {epoch} iter = {i} Loss = {round(np.array(losses).mean(), 2)}")
            losses = []

In [None]:
mel.shape

In [None]:
conv_block_1 = nn.Sequential(
                              nn.Conv1d(in_channels=80, out_channels=64, kernel_size=3),
                              nn.ELU(),
                              nn.BatchNorm1d(64),
                              nn.MaxPool2d((2, 2)),
                              nn.Dropout(p=0.1))

In [None]:
mel, speaker, accent, input_lengths = next(dataloader)

In [None]:
len(input_lengths)

In [None]:
conv_output = conv_block_1(mel.permute(0, 2, 1)).permute(0, 2, 1); conv_output.shape

In [None]:
packed_conv_output = nn.utils.rnn.pack_padded_sequence(
                    conv_output, input_lengths, batch_first=True)

In [None]:
packed_conv_output.batch_sizes.sum()

In [None]:
np.array(input_lengths).sum()

In [None]:
packed_conv_output.data.shape

In [None]:
gru = nn.GRU(input_size=32, hidden_size=32, num_layers=2,\
                          bidirectional=True, batch_first=True, dropout=0.3)

In [None]:
packed_conv_output.batch_sizes.sum()

In [None]:
outputs, h_n = gru(packed_conv_output)