In [7]:
import torch
import torchaudio
from torch.utils.data import Dataset

import numpy as np
import os # for file path manipulation

import csv # for reading tsv files


# custom dataset class
class SpeechDataset(Dataset):
    def __init__(self, tsvs=[], sample_rate=16000, transform=None, columns=['path']):
        self.tsvs = tsvs
        self.sample_rate = sample_rate
        self.transform = transform
        self.columns = columns
        self.data = []

        # load metadata
        self._load_metadata()


    def split(self, split_ratio=0.8):
        # split data into train and test sets
        # split_ratio is the ratio of training data to test data
        # returns two SpeechDataset objects, one for train and one for test

        # get split index
        split_idx = int(len(self.data) * split_ratio)

        # split data
        train_data = self.data[:split_idx]
        test_data = self.data[split_idx:]

        # create new SpeechDataset objects
        train_dataset = SpeechDataset(sample_rate=self.sample_rate, transform=self.transform, columns=self.columns)
        test_dataset = SpeechDataset(sample_rate=self.sample_rate, transform=self.transform, columns=self.columns)

        # set data
        train_dataset.data = train_data
        test_dataset.data = test_data

        return train_dataset, test_dataset

    def _load_metadata(self):
        self.data = []
        for tsv in self.tsvs:
            dir_path, _ = os.path.split(tsv)
            
            clips = os.path.join(dir_path, 'clips', '')
            
            # read tsv and append to data
            with open(tsv, 'r') as f:
                reader = csv.DictReader(f, delimiter='\t')
                for row in reader:
                    # commonvoice columns:
                    # client_id	path	sentence	up_votes	down_votes	age	gender	accents	variant	locale	segment
                    
                    # get columns
                    data = [row[col] for col in self.columns]
                    if 'path' in self.columns:
                        # convert path to absolute path
                        path_idx = self.columns.index('path')
                        data[path_idx] = clips + data[path_idx]
                    # append to data
                    self.data.append(data)


        # shuffle data
        np.random.shuffle(self.data)

    def get_column_names(self):
        # if path is included, last column is audio data that will be loaded in __getitem__
        if 'path' in self.columns:
            # self.columns + ['audio']
            return self.columns + ['audio']
        else:
            return self.columns
        

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):

        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        # load data
        sample = self.data[idx]
        # load audio (if path is in sample)
        if 'path' in self.columns:
            # load audio
            #print(sample[self.columns.index('path')])
            audio, sample_rate = torchaudio.load(sample[self.columns.index('path')])
            
            # resample audio if necessary
            if sample_rate != self.sample_rate:
                resampler = torchaudio.transforms.Resample(sample_rate, self.sample_rate)
                audio = resampler(audio)

                # normalize audio
                audio = audio / torch.max(torch.abs(audio))

            
            # add audio to sample
            sample.append(audio)

        # apply transform if necessary
        if self.transform:
            sample = self.transform(sample)

        return sample


In [8]:
dataset = SpeechDataset(tsvs=[
    'commonvoice\\cv-corpus-16.0-delta-2023-12-06\\en\\validated.tsv',
    'commonvoice\\cv-corpus-16.0-delta-2023-12-06\\de\\validated.tsv', 
    'commonvoice\\cv-corpus-16.0-delta-2023-12-06\\ja\\validated.tsv'], columns=['path', 'sentence'])

print('Dataset length:', len(dataset))
print('Dataset columns:', dataset.get_column_names())

import random
# get first sample
sample = dataset[random.randint(0, len(dataset))]
print('Sample:', sample)

# get audio from sample
audio = sample[-1]

# play audio
from IPython.display import Audio


# Play the audio using IPython's Audio widget
audio_widget = Audio(data=audio.numpy(), rate=16000)
display(audio_widget)

Dataset length: 16894
Dataset columns: ['path', 'sentence', 'audio']
Sample: ['commonvoice\\cv-corpus-16.0-delta-2023-12-06\\de\\clips\\common_voice_de_38702976.mp3', 'Ein Schiff mit Namen "Elisabeth" mit zwanzig Auswanderern stand bereit.', tensor([[ 3.5298e-13,  3.9103e-12, -4.2826e-12,  ..., -9.3494e-06,
         -6.1109e-05, -8.1028e-05]])]


In [9]:

# baseline models
import torch.nn as nn

SAMPLE_RATE = 16000

def pad_batch(batch):
    if isinstance(batch[0], list):
        # if batch is list of list, get tensor from last element
        batch = [sample[-1].reshape(-1) for sample in batch]
    # pads batch to longest sequence
    # batch is list of samples
    lengths = [len(sample) for sample in batch]
    max_length = max(lengths)
    max_length = (max_length // (SAMPLE_RATE // 100) + 1) * (SAMPLE_RATE // 100)
    # pad to max length
    padded_batch = [torch.nn.functional.pad(sample, (0, max_length - len(sample))) for sample in batch]
    return torch.stack(padded_batch)

class BaselineEmbedder(nn.Module):
    def __init__(self, sample_rate = SAMPLE_RATE, embedding_dim=32):
        super(BaselineEmbedder, self).__init__()
        self.sample_rate = sample_rate
        self.embedding_dim = embedding_dim

        # lstm layers
        self.lstm = nn.LSTM(input_size=1, hidden_size=embedding_dim, num_layers=3, batch_first=True)

    
    def forward(self, x):
        # x is audio, clips are padded to longest sequence
        # x is (batch_size, samples)

        # reshape to (batch_size, samples, 1)
        x = x.unsqueeze(2)
        x = self.lstm(x)
        # get last hidden state
        x = x[0][:, -1, :]
        x = x.reshape(-1, self.embedding_dim)
        return x
    


In [10]:
baseline = BaselineEmbedder()
print(baseline)

batch = [dataset[random.randint(0, len(dataset))][-1] for _ in range(16)]
batch = [sample[-1] for sample in batch]
batch = pad_batch(batch)

print('Input shape:', batch.shape)

# get embeddings
embeddings = baseline(batch)
print('Embeddings shape:', embeddings.shape)


BaselineEmbedder(
  (lstm): LSTM(1, 32, num_layers=3, batch_first=True)
)
Input shape: torch.Size([16, 159040])
Embeddings shape: torch.Size([16, 32])


In [11]:
# VAE and decoder

class print_shape(nn.Module):
    def __init__(self, message):
        super().__init__()
        self.message = message
    
    def forward(self, x):
        print(self.message, x.shape)
        return x

class VAEBase(nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        self.training = True
        # encoder
        # input: audio waveform 16000 samples
        # latent space: 32 dimensions, 100 samples
        # goal: learn latent space representation of audio that is easier to use in RNNs

        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=16, kernel_size=20, stride=10, padding=5),
            # samples 16,000 -> 1,600
            nn.ReLU(),
            #nn.Linear(in_features=16, out_features=16),
            #nn.ReLU(),
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=8, stride=4, padding=2),
            # samples 1,600 -> 400
            nn.ReLU(),
            #nn.Linear(in_features=32, out_features=32),
            #nn.ReLU(),
            nn.Conv1d(in_channels=32, out_channels=32, kernel_size=4, stride=2, padding=1),
            # samples 400 -> 200
            nn.ReLU(),
            #nn.Linear(in_features=32, out_features=32),
            #nn.ReLU(),
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=4, stride=2, padding=1),
            # samples 200 -> 100
            #nn.ReLU(),
            #nn.Linear(in_features=64, out_features=64),
            nn.Tanh()
        )

        # decoder
        # input: latent space representation
        # output: audio waveform 16000 samples
        # goal: reconstruct original audio waveform from latent space representation

        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(in_channels=32, out_channels=32, kernel_size=4, stride=2, padding=1),
            # samples 100 -> 200
            nn.ReLU(),
            #nn.Linear(in_features=32, out_features=32),
            #nn.ReLU(),
            nn.ConvTranspose1d(in_channels=32, out_channels=32, kernel_size=4, stride=2, padding=1),
            # samples 200 -> 400
            nn.ReLU(),
            #nn.Linear(in_features=32, out_features=32),
            #nn.ReLU(),
            nn.ConvTranspose1d(in_channels=32, out_channels=16, kernel_size=8, stride=4, padding=2),
            # samples 400 -> 1600
            nn.ReLU(),
            #nn.Linear(in_features=16, out_features=16),
            #nn.ReLU(),
            nn.ConvTranspose1d(in_channels=16, out_channels=1, kernel_size=20, stride=10, padding=5),
            # samples 1600 -> 16000
            nn.Tanh()
        )

    def set_training(self, training):
        self.training = training

    def sample(self, mu, log_var):
        # if not self.training:
        #     return mu
        if not self.training:
            return mu
        # reparameterization trick
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def forward(self, x):

        # reshape to (n, 1, length)
        x = x.unsqueeze(1)
        # encode
        x = self.encoder(x)
        # get mu and log_var
        mu = x[:, :32]
        log_var = x[:, 32:]
        # sample from latent space
        z = self.sample(mu, log_var)
        # decode
        x = self.decoder(z)
        if not self.training:
            return x
        
        
        return x, mu, log_var

In [None]:
# VAE v2

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, n_conv=2, kernel_size=2, stride=1, padding=1, activation=nn.Tanh):
        super().__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.n_conv = n_conv
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        self.conv_in = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)
        self.conv_rest = nn.ModuleList([nn.Conv1d(out_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding) for _ in range(n_conv)])

        self.activation = activation()
        self.pool = nn.AvgPool1d(kernel_size=2, stride=2, padding=0)

        self.l_norm = nn.LayerNorm(out_channels)


    def residual(self, state, block_input):
        # fit residual to to block output shape
        # block_input is (n, in_channels, length)
        # target shape is (n, out_channels, length)

        # repeat channels to match target shape
        block_input = block_input.repeat(1, self.out_channels // self.in_channels, 1)
        # add residual to block output
        state = state + block_input
        state = self.l_norm(state)
        return state
    
    def forward(self, x):
        l = self.conv_in(x)
        l = self.activation(l)
        for conv in self.conv_rest:
            l = conv(l)
            l = self.activation(l)

        # residual connection
        l = self.residual(l, x)

        return l
    
class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels, n_conv=2, kernel_size=2, stride=1, padding=1, activation=nn.Tanh):
        super().__init__()

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.n_conv = n_conv
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        self.conv_rest = nn.ModuleList([nn.Conv1d(in_channels, in_channels, kernel_size=kernel_size, stride=stride, padding=padding) for _ in range(n_conv)])
        self.conv_out = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=padding)

        self.activation = activation()
        self.upsample = nn.Upsample(scale_factor=2)

        self.l_norm = nn.LayerNorm(out_channels)

    def residual(self, state, upsampled):
        # fit residual to to block output shape
        # upsampled is (n, in_channels, length)
        # target shape is (n, out_channels, length)
        # average channels to get target shape

        # hack, just return first n channels
        state = state + upsampled[:, :self.conv_out.out_channels, :]
        state = self.l_norm(state)
        return state


    
    def forward(self, x):
        # upsample
        l = self.upsample(x)
        res = l
        # convolutions
        for conv in self.conv_rest:
            l = conv(l)
            l = self.activation(l)
        # last convolution
        l = self.conv_out(l)
        # activation
        l = self.activation(l)

        # residual connection
        l = self.residual(l, res)

        return l
    
def get_dimension_count(layer_i, max_dim=64, layer_n=6):
    if layer_i == 0:
        return 1
    elif layer_i >= layer_n:
        return max_dim
    return min(4 ** layer_i, max_dim)

class Encoder(nn.Module):
    def __init__(self, n_layers=6, latent_dim=64):
        super().__init__()

        self.n_layers = n_layers

        self.blocks = nn.ModuleList([EncoderBlock(get_dimension_count(i), get_dimension_count(i+1)) for i in range(n_layers)])

    def forward(self, x):
        for block in self.blocks:
            x = block(x)
        return x
    
class Decoder(nn.Module):
    def __init__(self, n_layers=6, latent_dim=64):
        super().__init__()

        self.n_layers = n_layers

        self.blocks = nn.ModuleList([DecoderBlock(get_dimension_count(n_layers - i), get_dimension_count(n_layers - i - 1)) for i in range(n_layers)])

    def forward(self, x):
        for block in self.blocks:
            x = block(x)
        return x
    
class VAE(nn.Module):
    def __init__(self, n_layers=6, latent_dim=64):
        super().__init__()

        self.n_layers = n_layers
        self.latent_dim = latent_dim

        self.encoder = Encoder(n_layers=n_layers, latent_dim=latent_dim)
        self.decoder = Decoder(n_layers=n_layers, latent_dim=latent_dim)

        self.fc_mu = nn.Linear(get_dimension_count(n_layers), latent_dim)
        self.fc_log_var = nn.Linear(get_dimension_count(n_layers), latent_dim)

    def sample(self, mu, log_var):
        # if not self.training:
        #     return mu
        if not self.training:
            return mu
        # reparameterization trick
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std
    
    def forward(self, x):
        # encode
        x = self.encoder(x)
        # get mu and log_var
        #mu = self.fc_mu(x[:, :, 0])
        #log_var = self.fc_log_var(x[:, :, 0])
        # sample from latent space
        #z = self.sample(mu, log_var)
        # decode
        z = x
        x = self.decoder(z)
        if not self.training:
            return x
        
        
        #return x, mu, log_var
        return x



        

In [12]:
vae = VAEBase()
print(vae)

# get batch
batch = [dataset[random.randint(0, len(dataset))][-1] for _ in range(16)]
batch = [sample[-1] for sample in batch]
batch = pad_batch(batch)


print('Input shape:', batch.shape)

# get output
output,_,_ = vae(batch)
print('Output shape:', output.shape)



VAE(
  (encoder): Sequential(
    (0): Conv1d(1, 16, kernel_size=(20,), stride=(10,), padding=(5,))
    (1): ReLU()
    (2): Conv1d(16, 32, kernel_size=(8,), stride=(4,), padding=(2,))
    (3): ReLU()
    (4): Conv1d(32, 32, kernel_size=(4,), stride=(2,), padding=(1,))
    (5): ReLU()
    (6): Conv1d(32, 64, kernel_size=(4,), stride=(2,), padding=(1,))
    (7): Tanh()
  )
  (decoder): Sequential(
    (0): ConvTranspose1d(32, 32, kernel_size=(4,), stride=(2,), padding=(1,))
    (1): ReLU()
    (2): ConvTranspose1d(32, 32, kernel_size=(4,), stride=(2,), padding=(1,))
    (3): ReLU()
    (4): ConvTranspose1d(32, 16, kernel_size=(8,), stride=(4,), padding=(2,))
    (5): ReLU()
    (6): ConvTranspose1d(16, 1, kernel_size=(20,), stride=(10,), padding=(5,))
    (7): Tanh()
  )
)
Input shape: torch.Size([16, 164320])
Output shape: torch.Size([16, 1, 164320])


In [17]:
# train vae
import torch.optim as optim
from torch.utils.data import DataLoader
import tqdm

# hyperparameters
BATCH_SIZE = 64
LEARNING_RATE = 0.001
EPOCHS = 5
kl_beta = 0.1

# create dataloader
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_batch)

# create model
vae = VAEBase()
vae.train()
# create optimizer
optimizer = optim.Adam(vae.parameters(), lr=LEARNING_RATE)

# train
def l_rate(epoch):
    return 0.001 * 0.5 ** (epoch)

def b_size(epoch):
    return 16 * 2 ** (epoch // 2)

def print_progress(epoch, batch, loss):
    prog = batch / len(dataloader)
    prog = int(prog * 20)
    print(f'Epoch: {epoch} | {"#" * prog}{"-" * (20 - prog)} | Loss: {loss.item()}', end='\r')

for epoch in range(EPOCHS):

    # set learning rate
    for param_group in optimizer.param_groups:
        param_group['lr'] = l_rate(epoch)

    # set batch size
    BATCH_SIZE = b_size(epoch)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_batch)

    # set model to training mode    
    vae.train()

    for i, batch in enumerate(dataloader):
        # zero gradients
        optimizer.zero_grad()

        # forward pass
        output, mu, var = vae(batch)
        output = output.squeeze(1)

        # calculate loss
        reconstruction_loss = torch.nn.functional.mse_loss(output, batch)
        # KL divergence
        kl_divergence = -0.5 * torch.sum(1 + var - mu.pow(2) - var.exp())
        # total loss
        loss = reconstruction_loss

        # backward pass
        loss.backward()

        # update weights
        optimizer.step()

        # print progress
        print_progress(epoch, i, loss)
    print()

# save model
torch.save(vae.state_dict(), 'weights/vae.pth')

Epoch: 0 | ###################- | Loss: 0.0041544688865542415
Epoch: 1 | ###################- | Loss: 0.0031084974762052298
Epoch: 2 | ###################- | Loss: 0.0027430593036115174
Epoch: 3 | ###################- | Loss: 0.0027506006881594664
Epoch: 4 | ###################- | Loss: 0.0028377671260386705


In [25]:
#vae = VAE()
#vae.load_state_dict(torch.load('weights/vae.pth'))

vae.eval()

# test vae
# get sample
sample = dataset[random.randint(0, len(dataset))]
audio = sample[-1]
print(audio[:100])
audio.reshape(1, -1)

# play before
audio_widget = Audio(data=audio.numpy()[0], rate=16000)
display(audio_widget)

# get reconstruction
output = vae(audio)

# play after
output_np = output.detach().numpy()[0]
output_np = output_np.reshape(-1)
print(output_np[:100])
audio_widget = Audio(data=output.detach().numpy()[0], rate=16000)
display(audio_widget)


tensor([[0., 0., 0.,  ..., 0., 0., 0.]])


[-1.0463607e-02 -2.0247752e-02 -3.2424197e-02 -8.7270774e-03
 -7.4947993e-03 -2.0210603e-03 -1.5000205e-03 -8.7042805e-03
 -5.5538588e-03 -1.0269281e-03 -4.2518084e-03  7.2010222e-04
 -1.5773090e-03  2.1968225e-03 -7.1021840e-03 -6.2778960e-03
 -3.2423211e-03 -1.5487062e-03 -2.3569160e-03  2.5814490e-03
 -1.7894562e-03  8.9769787e-04  6.3601620e-03  1.5494811e-03
  5.3663072e-03  9.4767660e-05  6.5692503e-04  1.8041002e-03
  1.7685760e-03  2.7762728e-03  3.2187104e-03  3.3005581e-03
  4.3407776e-03  3.9621680e-03  3.9668246e-03  2.6898691e-03
  2.7465601e-03  3.2765823e-03  3.3999812e-03  3.5200049e-03
  3.6956575e-03  3.0170141e-03  2.9368913e-03  3.0550377e-03
  2.2931767e-03  2.5313443e-03  2.0950704e-03  1.7204267e-03
  2.2827534e-03  1.6975238e-03  2.2156090e-03  1.8099638e-03
  1.3424448e-03  2.3147012e-03  1.9619775e-03  1.8405409e-03
  1.2571022e-03  1.4891130e-03  1.6489275e-03  1.9832451e-03
  2.1995644e-03  1.8945948e-03  2.0093331e-03  2.5089367e-03
  2.4815858e-03  3.94508