In [None]:
import math
import os
import numpy as np
import librosa as lb
import soundfile as sf
import pyaudio

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
dir_path = r"C:\Users\llama\Desktop\cuni\bakalarka\data\test\kicks"
# dir_path = r"C:\Users\llama\Desktop\cuni\bakalarka\data\test\crashes" 
# dir_path = r"C:\Users\llama\Desktop\cuni\bakalarka\data\drums-one_shots\kick\kick_samples"

file_paths = [os.path.join(dir_path, path) for path in os.listdir(dir_path)]

In [None]:
class Wave():
    def __init__(self, array, sr, info = None) -> None:
        self.array = array
        self.sr = sr
        self.info = info

waves = []

for path in file_paths:
    array, sr = lb.load(path)
    waves.append(Wave(array, sr))

In [None]:
def pad_or_trim(mfcc, length = 100):
    if mfcc.shape[1] > length:
        return mfcc[:, :length]
    else:
        last_column = mfcc[:, -1:]
        padding = np.repeat(last_column, length - mfcc.shape[1], axis=1)
        return np.concatenate((mfcc, padding), axis=1)
        # return np.pad(mfcc, ((0,0),(0,length-mfcc.shape[1])), constant_values = mfcc[:,-1])        def pad_or_trim(mfcc, length=100):


In [None]:
mfccs = []

for wave in waves:
    mfcc = lb.feature.mfcc(y=wave.array, sr=wave.sr, n_mfcc=512, n_fft=512, hop_length=256, lifter=0, dct_type=3, n_mels = 256)
    mfcc_pad_or_trim = pad_or_trim(mfcc, 100)

    mfccs.append(mfcc_pad_or_trim)
    # mfccs.append(mfcc)



mfccs_tensor = torch.tensor(mfccs).view(-1, 1, 256, 100)

train_loader = torch.utils.data.DataLoader(mfccs_tensor, batch_size=4, shuffle=True)


### pokus o convolutional VAE

In [None]:
class Encoder(nn.Module):
    def __init__(self, latent_dim):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=8, kernel_size=3, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=0)
        self.fc = nn.Linear(16 * 63 * 24, latent_dim)
        self.fc_mu = nn.Linear(latent_dim, latent_dim)
        self.fc_logvar = nn.Linear(latent_dim, latent_dim)

    def forward(self, x):
        # print(f"Encoder input shape: {x.shape}")
        x = F.relu(self.conv1(x))
        # print(f"Encoder shape after conv1: {x.shape}")
        x = F.relu(self.conv2(x))
        # print(f"Encoder shape after conv2: {x.shape}")
        x = nn.Flatten()(x)
        # print(f"Encoder shape after flatten: {x.shape}")
        x = F.relu(self.fc(x))
        # print(f"Encoder shape after fc: {x.shape}")
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

class Decoder(nn.Module):
    def __init__(self, latent_dim):
        super(Decoder, self).__init__()
        self.fc = nn.Linear(latent_dim, 16 * 63 * 24)
        self.deconv1 = nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=0)
        self.deconv2 = nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=3, stride=2, padding=0, output_padding=1)

    def forward(self, x):
        # print()
        # print(f"Dencoder input shape: {x.shape}")
        x = F.relu(self.fc(x))
        # print(f"Decoder shape after fc: {x.shape}")
        x = x.view(-1, 16, 63, 24)
        # print(f"Decoder shape after view: {x.shape}")
        x = F.relu(self.deconv1(x))
        # print(f"Decoder shape after deconv1: {x.shape}")
        x = self.deconv2(x)
        # print(f"Decoder shape after deconv2: {x.shape}")

        #Mirroring of weights with encoder
        # F.ConvTranspose(weight = endocer layers ...)
        
        return x

class VAE(nn.Module):
    def __init__(self, latent_dim):
        super(VAE, self).__init__()
        self.encoder = Encoder(latent_dim)
        self.decoder = Decoder(latent_dim)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        reconstructed_x = self.decoder(z)
        return reconstructed_x, mu, logvar

# Define your loss function (e.g., a combination of reconstruction loss and KL divergence)
def loss_function(reconstructed_x, x, mu, logvar, kl_regulation = 0.5):
    # softmax = nn.Softmax()
    # reconstructed_x = softmax(reconstructed_x)
    # x = softmax(x)
    reconstruction_loss = F.mse_loss(reconstructed_x, x, reduction='sum') #mse for simplicity, could change in the future
    kl_divergence = torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return (reconstruction_loss - kl_regulation * kl_divergence) 

### LSTM VAE ?

In [None]:
# class Encoder(nn.Module):
#     def __init__(self, latent_dim):
#         super(Encoder, self).__init__()
#         self.lstm = nn.LSTM(input_size=256, hidden_size=latent_dim, num_layers=1, batch_first=True)

#     def forward(self, x):
#         # print(f"Encoder input shape: {x.shape}")
#         x = F.relu(self.conv1(x))
#         # print(f"Encoder shape after conv1: {x.shape}")
#         x = F.relu(self.conv2(x))
#         # print(f"Encoder shape after conv2: {x.shape}")
#         x = nn.Flatten()(x)
#         # print(f"Encoder shape after flatten: {x.shape}")
#         x = F.relu(self.fc(x))
#         # print(f"Encoder shape after fc: {x.shape}")
#         mu = self.fc_mu(x)
#         logvar = self.fc_logvar(x)
#         return mu, logvar

# class Decoder(nn.Module):
#     def __init__(self, latent_dim):
#         super(Decoder, self).__init__()
#         self.fc = nn.Linear(latent_dim, 16 * 63 * 24)
#         self.deconv1 = nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=0)
#         self.deconv2 = nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=3, stride=2, padding=0, output_padding=1)

#     def forward(self, x):
#         # print()
#         # print(f"Dencoder input shape: {x.shape}")
#         x = F.relu(self.fc(x))
#         # print(f"Decoder shape after fc: {x.shape}")
#         x = x.view(-1, 16, 63, 24)
#         # print(f"Decoder shape after view: {x.shape}")
#         x = F.relu(self.deconv1(x))
#         # print(f"Decoder shape after deconv1: {x.shape}")
#         x = self.deconv2(x)
#         # print(f"Decoder shape after deconv2: {x.shape}")
#         return x

# class VAE(nn.Module):
#     def __init__(self, latent_dim):
#         super(VAE, self).__init__()
#         self.encoder = Encoder(latent_dim)
#         self.decoder = Decoder(latent_dim)

#     def reparameterize(self, mu, logvar):
#         std = torch.exp(0.5 * logvar)
#         eps = torch.randn_like(std)
#         return mu + eps * std

#     def forward(self, x):
#         mu, logvar = self.encoder(x)
#         z = self.reparameterize(mu, logvar)
#         reconstructed_x = self.decoder(z)
#         return reconstructed_x, mu, logvar

# # Define your loss function (e.g., a combination of reconstruction loss and KL divergence)
# def loss_function(reconstructed_x, x, mu, logvar, kl_regulation = 0.5):
#     # softmax = nn.Softmax()
#     # reconstructed_x = softmax(reconstructed_x)
#     # x = softmax(x)
#     reconstruction_loss = F.mse_loss(reconstructed_x, x, reduction='sum') #mse for simplicity, could change in the future
#     kl_divergence = torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
#     return (reconstruction_loss - kl_regulation * kl_divergence) * (-1)


In [None]:
#train the model
def train(model, train_loader, epochs, device):
    #define the optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    model.train()

    losses = []

    for epoch in range(epochs):
        train_loss = 0
        for batch_idx, x in enumerate(train_loader):
            x = x.to(device)
            optimizer.zero_grad()
            reconstructed_x, mu, logvar = model(x)
            loss = loss_function(reconstructed_x, x, mu, logvar)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()

        average_loss = train_loss / len(train_loader.dataset)
        print('====> Epoch: {} Average loss: {:.4f}'.format(epoch+1, average_loss))
        losses.append(average_loss)
    print('Finished training.') 

    return losses

# Define your VAE model with a specific latent dimension
latent_dim = 32

# Create an instance of your VAE model
model = VAE(latent_dim)
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

losses = train(model, train_loader, epochs, device)

plt.plot(losses)
    



In [None]:
def play_wave(wave, sr):
    # initialize PyAudio
    p = pyaudio.PyAudio()

    # open a stream
    stream = p.open(format=pyaudio.paFloat32,
                    channels=1,
                    rate=sr,
                    output=True)

    # play audio
    stream.write(wave.tobytes())

    # stop stream and terminate PyAudio
    stream.stop_stream()
    stream.close()
    p.terminate()

In [None]:
def inference(model, mu, logvar):
    model.eval()
    z = model.reparameterize(mu, torch.tensor(logvar*2))
    with torch.no_grad():
        return model.decoder(z)
    
reconstructed_x = inference(model, torch.zeros((32)), torch.ones((32)))

reconstructed_x_np = reconstructed_x[0,0,:,:].numpy()

lb.display.specshow(reconstructed_x_np, sr = 44100)

inverted = lb.feature.inverse.mfcc_to_audio(reconstructed_x_np, sr=44100, n_mels=256, n_fft=512, hop_length=256, lifter=0, dct_type=3)
play_wave(inverted, 44100)


In [None]:
kick = torch.from_numpy(mfccs[0]).view(1, 1, 256,100)


print(kick.shape)

reconstructed_x, mu, logvar = model.forward(kick)

reconstructed_x_np = reconstructed_x[0,0,:,:].detach().numpy()[:,:-2]


lb.display.specshow(reconstructed_x_np, sr = 44100)



inverted_orig = lb.feature.inverse.mfcc_to_audio(mfccs[0], sr=44100, n_mels=256, n_fft=512, hop_length=256, lifter=0, dct_type=3)
inverted = lb.feature.inverse.mfcc_to_audio(reconstructed_x_np, sr=44100, n_mels=256, n_fft=512, hop_length=256, lifter=0, dct_type=3)

play_wave(inverted_orig, 44100)
play_wave(inverted, 44100)

reconstructed_x = inference(model, mu + 1, logvar + 0.5)
reconstructed_x_np = reconstructed_x[0,0,:,:].detach().numpy()[:,:-2]

inverted = lb.feature.inverse.mfcc_to_audio(reconstructed_x_np, sr=44100, n_mels=256, n_fft=512, hop_length=256, lifter=0, dct_type=3)
lb.display.specshow(reconstructed_x_np, sr = 44100)

play_wave(inverted, 44100)

print(np.mean(mu.detach().numpy()))

In [130]:
VIEW_SHAPE = 1, 1, 256, 100
INVERSE_MFCC_PARAMS = {
    'sr': 44100,
    'n_mels': 256,
    'n_fft': 512,
    'hop_length': 256,
    'lifter': 0,
    'dct_type': 3
}

def mfcc_to_wave(mfcc):
    wave = lb.feature.inverse.mfcc_to_audio(mfcc, **INVERSE_MFCC_PARAMS)
    return wave

kick_1_np = mfccs[20]
kick_1_torch = torch.from_numpy(kick_1_np).view(*VIEW_SHAPE)
kick_1_reconstructed, kick_1_mu, kick_1_logvar = model.forward(kick_1_torch)

kick_2_np = mfccs[-13]
kick_2_torch = torch.from_numpy(kick_2_np).view(*VIEW_SHAPE)
kick_2_reconstructed, kick_2_mu, kick_2_logvar = model.forward(kick_2_torch)

interpolated_mu = (kick_1_mu + kick_2_mu) / 2
interpolated_logvar = (kick_1_logvar + kick_2_logvar) / 2

reconstructed_interpolation = inference(model, interpolated_mu, interpolated_logvar)
# The :, :-2 indexing is because of the reconstruction after padding.
reconstructed_interpolation_np = reconstructed_interpolation[0, 0, :, :].detach().numpy()[:, :-2]
reconstructed_interpolation_wave = mfcc_to_wave(reconstructed_interpolation_np)


play_wave(mfcc_to_wave(kick_1_np), INVERSE_MFCC_PARAMS['sr'])
play_wave(mfcc_to_wave(kick_2_np), INVERSE_MFCC_PARAMS['sr']) 
play_wave(reconstructed_interpolation_wave, INVERSE_MFCC_PARAMS['sr'])

  z = model.reparameterize(mu, torch.tensor(logvar*2))
