## Demo of VAE trained using user uploaded samples

Please sign in to your Google Drive and upload audio (.wav) files to a folder names 'samples' in the same directory as this notebook.

This notebook is meant on train on a dataset of multiple samples so it would be useful to upload atleast 10. 

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Optionally you can use this code to select files and upload them.

In [None]:
'''optional: upload files to drive'''
# from google.colab import files

# uploaded = files.upload()

# for fn in uploaded.keys():
#   print('User uploaded file "{name}" with length {length} bytes'.format(
#       name=fn, length=len(uploaded[fn])/1000000))

In [8]:
'''create dataset of audio files'''

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchaudio
from torchaudio.transforms import Resample
from torch.nn.utils.rnn import pad_sequence
import os


## Load and preprocess the audio waveform ##
%cd '/content/drive/My Drive/Colab Notebooks/'     ## add location of files in google drive or local directory
data_dir = 'samples'
sample_rate = 16000
transform = Resample(new_freq=sample_rate)
files_audio = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.wav')]
waveforms = []

for f in files_audio:
    waveform, _ = torchaudio.load(f)
    waveform = transform(waveform)
    waveform = torch.mean(waveform, dim=0, keepdim=True)
    waveform /= torch.max(torch.abs(waveform))
    waveforms.append(waveform)

# Define the dataset class

class AudioDataset(Dataset):
    def __init__(self, waveforms, max_length):
        self.waveforms = waveforms
        self.max_length = max_length

    def __getitem__(self, index):
        waveform = self.waveforms[index]
        if waveform.size(1) > self.max_length:
            waveform = waveform[:, :self.max_length]
        else:
            waveform = pad_sequence([waveform], batch_first=True)[0]
            waveform = F.pad(waveform, (0, self.max_length - waveform.size(1)), 'constant', 0)
        return waveform

    def __len__(self):
        return len(self.waveforms)


seconds = 20                    ## try different max lengths 
max = sample_rate*seconds       
# Create a PyTorch DataLoader for the audio data
dataset = AudioDataset(waveforms, max_length = max)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True)   ## try different batch sizes

for waveform in waveforms:
    print(f'original size = {waveform.size()}')
for batch in dataloader:
    print('batch sizes =')
    print(batch.size())

/content/drive/My Drive/Colab Notebooks/data_glac
original size = torch.Size([1, 19956953])
original size = torch.Size([1, 17848785])
original size = torch.Size([1, 14087078])
original size = torch.Size([1, 16027648])
original size = torch.Size([1, 12025856])
original size = torch.Size([1, 23990400])
original size = torch.Size([1, 13230000])
original size = torch.Size([1, 13914808])
original size = torch.Size([1, 15468075])
original size = torch.Size([1, 16846200])
batch sizes =
torch.Size([2, 1, 320000])
batch sizes =
torch.Size([2, 1, 320000])
batch sizes =
torch.Size([2, 1, 320000])
batch sizes =
torch.Size([2, 1, 320000])
batch sizes =
torch.Size([2, 1, 320000])


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchaudio
from torchaudio.transforms import Resample
import time as time
import matplotlib.pyplot as plt
import os

size = max
latent_size = 1600                    ## try different latent sizes

# Define the VAE model architecture
class VAE(nn.Module):
    def __init__(self, latent_size=latent_size):
        super(VAE, self).__init__()
        
        # Encoder layers
        self.fc1 = nn.Linear(size, 512)
        self.fc2_mean = nn.Linear(512, latent_size)
        self.fc2_logvar = nn.Linear(512, latent_size)
        
        # Decoder layers
        self.fc3 = nn.Linear(latent_size, 512)
        self.fc4 = nn.Linear(512, size)
        
    def encode(self, x):
        x = F.relu(self.fc1(x))
        mean = self.fc2_mean(x)
        logvar = self.fc2_logvar(x)
        return mean, logvar
    
    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mean + eps * std
        return z
    
    def decode(self, z):
        z = F.relu(self.fc3(z))
        x = torch.tanh(self.fc4(z))
        return x
    
    def forward(self, x):
        mean, logvar = self.encode(x.view(-1, size))
        z = self.reparameterize(mean, logvar)
        x_recon = self.decode(z)
        return x_recon, mean, logvar

# Define the loss function
def vae_loss(x_recon, x, mean, logvar):
    recon_loss = F.mse_loss(x_recon, x.view(-1, size), reduction='sum')
    kl_loss = -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
    return recon_loss + kl_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vae = VAE()
vae.to(device)

learning_rate = 0.0001        # try different learning rates       
optimizer = optim.Adam(vae.parameters(), lr=learning_rate)       

vae.train()
start_time = time.time()

num_epochs = 10000

for epoch in range(num_epochs):
    total_loss = 0
    for batch_idx, batch in enumerate(dataloader):
        batch = batch.to(device)
        optimizer.zero_grad()
        x_recon, mean, logvar = vae(batch)
        loss = vae_loss(x_recon, batch, mean, logvar)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, total_loss/len(dataloader)))
print(f'Loss = {total_loss/len(dataloader)}')
# Save the trained model
model_name = 'simple'
torch.save(vae.state_dict(), f'vae_model_{model_name}.pt')
end_time = time.time()
print('Training Time: {:.2f} s'.format(end_time - start_time))

# from google.colab import files

# with open(f'vae_model_{model_name}.pt', 'w') as f:
#   f.write(f'vae_model_{model_name}.pt')

# files.download(f'vae_model_{model_name}.pt')


In [None]:
model_name ="simple"
vae = VAE()
vae.load_state_dict(torch.load(f'vae_model_{model_name}.pt'))

# Generate new audio waveforms
num_samples = 3

for i in range(num_samples):
    # Generate a random latent vector
    z = torch.randn(1, latent_size)
    
    # Decode the latent vector to generate a new audio sample
    output_audio = vae.decode(z)
    
    # Save the output audio file
    torchaudio.save('output_audio_{}.wav'.format(i), output_audio, 16000)


# Preview the input and output audio files
from IPython.display import Audio
for i in range(num_samples):
  print(f'Output Audio {i}:')
  display(Audio(f'output_audio_{i}.wav'))

Output Audio 0:


Output Audio 1:


Output Audio 2:


## Add RNN layers to VAE

Create dataset with smaller sample length and batch size.

using RNN slows down the training considerably.

In [2]:
'''create dataset of audio files'''

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchaudio
from torchaudio.transforms import Resample
from torch.nn.utils.rnn import pad_sequence
import os


## Load and preprocess the audio waveform ##
%cd '/content/drive/My Drive/Colab Notebooks/'     ## add location of files in google drive or local directory
data_dir = 'samples'
sample_rate = 16000
transform = Resample(new_freq=sample_rate)
files_audio = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.wav')]
waveforms = []

for f in files_audio:
    waveform, _ = torchaudio.load(f)
    waveform = transform(waveform)
    waveform = torch.mean(waveform, dim=0, keepdim=True)
    waveform /= torch.max(torch.abs(waveform))
    waveforms.append(waveform)

# Define the dataset class

class AudioDataset(Dataset):
    def __init__(self, waveforms, max_length):
        self.waveforms = waveforms
        self.max_length = max_length

    def __getitem__(self, index):
        waveform = self.waveforms[index]
        if waveform.size(1) > self.max_length:
            waveform = waveform[:, :self.max_length]
        else:
            waveform = pad_sequence([waveform], batch_first=True)[0]
            waveform = F.pad(waveform, (0, self.max_length - waveform.size(1)), 'constant', 0)
        return waveform

    def __len__(self):
        return len(self.waveforms)


seconds = 15                    ## try different max lengths 
max_length = sample_rate*seconds       
# Create a PyTorch DataLoader for the audio data
dataset = AudioDataset(waveforms, max_length = max_length)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)   ## try different batch sizes

for waveform in waveforms:
    print(f'original size = {waveform.size()}')
for batch in dataloader:
    print('batch sizes =')
    print(batch.size())

/content/drive/My Drive/Colab Notebooks/data_glac
original size = torch.Size([1, 19956953])
original size = torch.Size([1, 17848785])
original size = torch.Size([1, 14087078])
original size = torch.Size([1, 16027648])
original size = torch.Size([1, 12025856])
original size = torch.Size([1, 23990400])
original size = torch.Size([1, 13230000])
original size = torch.Size([1, 13914808])
original size = torch.Size([1, 15468075])
original size = torch.Size([1, 16846200])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])
batch sizes =
torch.Size([1, 1, 240000])


In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchaudio
from torchaudio.transforms import Resample
import time as time
import matplotlib.pyplot as plt
import os

'''add RNN layers'''

size = max_length
latent_size = 1600                    ## try different latent sizes
class VAE(nn.Module):
    def __init__(self, latent_size=latent_size):
        super(VAE, self).__init__()

        # Encoder layers
        self.rnn = nn.RNN(input_size=size, hidden_size=256, num_layers=1, batch_first=True)
        self.fc1 = nn.Linear(256, 512)
        self.fc2_mean = nn.Linear(512, latent_size)
        self.fc2_logvar = nn.Linear(512, latent_size)

        # Decoder layers
        self.fc3 = nn.Linear(latent_size, 512)
        self.rnn2 = nn.RNN(input_size=512, hidden_size=256, num_layers=1, batch_first=True)
        self.fc4 = nn.Linear(256, size)

    def encode(self, x):
        x, _ = self.rnn(x.view(-1,1 ,size))
        x = F.relu(self.fc1(x[:, -1, :]))
        mean = self.fc2_mean(x)
        logvar = self.fc2_logvar(x)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mean + eps * std
        return z

    def decode(self, z):
        z = F.relu(self.fc3(z))
        z = z.unsqueeze(1).repeat(1, size, 1)
        z, _ = self.rnn2(z)
        x = torch.tanh(self.fc4(z[:, -1, :]))
        return x

    def forward(self, x):
        mean, logvar = self.encode(x.view(-1, size, 1))
        z = self.reparameterize(mean, logvar)
        x_recon = self.decode(z)
        return x_recon, mean, logvar


# Define the loss function
def vae_loss(x_recon, x, mean, logvar):
    recon_loss = F.mse_loss(x_recon, x.view(-1, size), reduction='sum')
    kl_loss = -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
    return recon_loss + kl_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vae = VAE()
vae.to(device)

learning_rate = 0.0001        # try different learning rates       
optimizer = optim.Adam(vae.parameters(), lr=learning_rate)       

vae.train()
start_time = time.time()

num_epochs = 200

for epoch in range(num_epochs):
    total_loss = 0
    for batch_idx, batch in enumerate(dataloader):
        batch = batch.to(device)
        optimizer.zero_grad()
        x_recon, mean, logvar = vae(batch)
        loss = vae_loss(x_recon, batch, mean, logvar)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, total_loss/len(dataloader)))

print(f'Loss = {total_loss/len(dataloader)}')
end_time = time.time()

print('Training Time: {:.2f} s'.format(end_time - start_time))

# Save the trained model
model_name = 'rnn'
torch.save(vae.state_dict(), f'vae_model_{model_name}.pt')

# from google.colab import files

# with open(f'vae_model_{model_name}.pt', 'w') as f:
#   f.write(f'vae_model_{model_name}.pt')

# files.download(f'vae_model_{model_name}.pt')


Epoch [1/200], Loss: 12474.4571
Epoch [2/200], Loss: 11080.0290
Epoch [3/200], Loss: 11436.3223
Epoch [4/200], Loss: 10421.7962
Epoch [5/200], Loss: 10081.4859
Epoch [6/200], Loss: 10001.6838
Epoch [7/200], Loss: 9674.9312
Epoch [8/200], Loss: 8163.4927
Epoch [9/200], Loss: 7921.9513
Epoch [10/200], Loss: 6955.9072
Epoch [11/200], Loss: 6674.4726
Epoch [12/200], Loss: 6456.5896
Epoch [13/200], Loss: 5887.8505
Epoch [14/200], Loss: 5597.0071
Epoch [15/200], Loss: 5500.6664
Epoch [16/200], Loss: 5240.3244
Epoch [17/200], Loss: 5117.3406
Epoch [18/200], Loss: 5055.9841
Epoch [19/200], Loss: 4978.2656
Epoch [20/200], Loss: 4934.7339
Epoch [21/200], Loss: 4956.9266
Epoch [22/200], Loss: 4846.4683
Epoch [23/200], Loss: 4795.8013
Epoch [24/200], Loss: 4749.5477
Epoch [25/200], Loss: 4727.6284
Epoch [26/200], Loss: 4718.7053
Epoch [27/200], Loss: 4698.0096
Epoch [28/200], Loss: 4651.2746
Epoch [29/200], Loss: 4586.4608
Epoch [30/200], Loss: 4620.0353
Epoch [31/200], Loss: 4558.7187
Epoch [32/2

In [5]:
model_name ="rnn"
vae = VAE()
vae.load_state_dict(torch.load(f'vae_model_{model_name}.pt'))

# Generate new audio waveforms
num_samples = 3

for i in range(num_samples):
    # Generate a random latent vector
    z = torch.randn(1, latent_size)
    
    # Decode the latent vector to generate a new audio sample
    output_audio = vae.decode(z)
    
    # Save the output audio file
    torchaudio.save('output_audio_{}.wav'.format(i), output_audio, 16000)


# Preview the input and output audio files
from IPython.display import Audio
for i in range(num_samples):
  print(f'Output Audio {i}:')
  display(Audio(f'output_audio_{i}.wav'))

Output Audio 0:


Output Audio 1:


Output Audio 2:


## Try with CNN

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchaudio
from torchaudio.transforms import Resample
import time as time
import matplotlib.pyplot as plt
import os

'''add CNN layers'''

size = max_length
latent_size = 1600                    ## try different latent sizes

class VAE(nn.Module):
    def __init__(self, latent_size=latent_size):
        super(VAE, self).__init__()

        # Encoder layers
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=5, stride=2, padding=2)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, stride=2, padding=2)
        self.fc1 = nn.Linear(in_features=32*size // 4, out_features=512)
        self.fc2_mean = nn.Linear(in_features=512, out_features=latent_size)
        self.fc2_logvar = nn.Linear(in_features=512, out_features=latent_size)

        # Decoder layers
        self.fc3 = nn.Linear(in_features=latent_size, out_features=512)
        self.fc4 = nn.Linear(in_features=512, out_features=32*size // 4)
        self.deconv1 = nn.ConvTranspose1d(in_channels=32, out_channels=16, kernel_size=5, stride=2, padding=2, output_padding=1)
        self.deconv2 = nn.ConvTranspose1d(in_channels=16, out_channels=1, kernel_size=5, stride=2, padding=2, output_padding=1)

    def encode(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(-1, 32 * (size // 4))
        x = F.relu(self.fc1(x))
        mean = self.fc2_mean(x)
        logvar = self.fc2_logvar(x)
        return mean, logvar

    def reparameterize(self, mean, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mean + eps * std
        return z

    def decode(self, z):
        z = F.relu(self.fc3(z))
        z = F.relu(self.fc4(z))
        z = z.view(-1, 32, size // 4)
        z = F.relu(self.deconv1(z))
        x = torch.tanh(self.deconv2(z))
        return x

    def forward(self, x):
        mean, logvar = self.encode(x)
        z = self.reparameterize(mean, logvar)
        x_recon = self.decode(z)
        return x_recon, mean, logvar



# Define the loss function
def vae_loss(x_recon, x, mean, logvar):
    recon_loss = F.mse_loss(x_recon, x, reduction='sum')
    kl_loss = -0.5 * torch.sum(1 + logvar - mean.pow(2) - logvar.exp())
    return recon_loss + kl_loss

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vae = VAE()
vae.to(device)

learning_rate = 0.0001        # try different learning rates       
optimizer = optim.Adam(vae.parameters(), lr=learning_rate)       

vae.train()
start_time = time.time()

num_epochs = 10

for epoch in range(num_epochs):
    total_loss = 0
    for batch_idx, batch in enumerate(dataloader):
        batch = batch.to(device)
        optimizer.zero_grad()
        x_recon, mean, logvar = vae(batch)
        loss = vae_loss(x_recon, batch, mean, logvar)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, total_loss/len(dataloader)))

print(f'Loss = {total_loss/len(dataloader)}')
end_time = time.time()

print('Training Time: {:.2f} s'.format(end_time - start_time))

# Save the trained model
model_name = 'cnn'
torch.save(vae.state_dict(), f'vae_model_{model_name}.pt')

# from google.colab import files

# with open(f'vae_model_{model_name}.pt', 'w') as f:
#   f.write(f'vae_model_{model_name}.pt')

# files.download(f'vae_model_{model_name}.pt')


In [None]:
model_name ="cnn"
vae = VAE()
vae.load_state_dict(torch.load(f'vae_model_{model_name}.pt'))

# Generate new audio waveforms
num_samples = 3

for i in range(num_samples):
    # Generate a random latent vector
    z = torch.randn(1, latent_size)
    
    # Decode the latent vector to generate a new audio sample
    output_audio = vae.decode(z)
    
    # Save the output audio file
    torchaudio.save('output_audio_{}.wav'.format(i), output_audio, 16000)


# Preview the input and output audio files
from IPython.display import Audio
for i in range(num_samples):
  print(f'Output Audio {i}:')
  display(Audio(f'output_audio_{i}.wav'))