In [26]:
import numpy 
from scipy.io import wavfile
from pathlib import Path
import librosa
import librosa.display
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import torchaudio
import pandas as pd
import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F
from torcheval.metrics import R2Score
import dawdreamer as daw
import numpy 
from scipy.io import wavfile
from pathlib import Path
import librosa
import librosa.display
import matplotlib.pyplot as plt
import pytorch_lightning as pl
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision
from pytorch_lightning.callbacks import Callback, LearningRateMonitor, ModelCheckpoint
import os

In [27]:
CHECKPOINT_PATH = os.environ.get("PATH_CHECKPOINT", "C:\\Users\\jayor\\Documents\\repos\\synth-reconstruct\\demo\\autoencoder\\checkpoint")

In [28]:
pl.seed_everything(42)

Seed set to 42


42

In [40]:
class AudioDS(Dataset):
    def __init__(self, presets_csv_path):
        self.presets = pd.read_csv(presets_csv_path)
    
    def __len__(self):
        return len(self.presets)
    
    def __getitem__(self, idx):
        preset = self.presets.iloc[idx]
        audio_path = Path(f'../samples/{preset[0]}.wav')
        y, sr = librosa.load(audio_path)
        spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)
        
        # Save spectrogram image for debugging
        # fig, ax = plt.subplots()
        # img = librosa.display.specshow(librosa.power_to_db(spectrogram, ref=numpy.max), y_axis='mel', x_axis='time', ax=ax)
        # fig.colorbar(img, ax=ax, format='%+2.0f dB')
        # ax.set(title='Mel spectrogram display')
        # # Save to disk
        # # create folder if not exists
        # Path('./sample_images').mkdir(parents=True, exist_ok=True)
        # plt.savefig(f'./sample_images/{preset[0]}.png')

        # Add channel dimension
        spectrogram = numpy.expand_dims(spectrogram, axis=0)
        # Transform to tensors
        spectrogram = torch.tensor(spectrogram)
        preset = torch.tensor(preset[1:-1])

        return spectrogram, preset

In [41]:
dataset = AudioDS('..\\presets.csv')
num_train = int(0.9 * len(dataset))
num_val = len(dataset) - num_train
train_dataset, validation_dataset = torch.utils.data.random_split(dataset, [num_train, num_val])
batch_size = 1

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=True)

In [42]:
# Iterate through first batch to check image dimensions
for batch in train_loader:
    print(batch[0].shape)
    print(batch[1].shape)
    break

torch.Size([1, 1, 128, 87])
torch.Size([1, 88])


  audio_path = Path(f'../samples/{preset[0]}.wav')
  preset = torch.tensor(preset[1:-1])


In [43]:
# Autoencoder model. Adapted from: https://lightning.ai/docs/pytorch/stable/notebooks/course_UvA-DL/08-deep-autoencoders.html
class Encoder(nn.Module):
    def __init__(self, num_input_channels: int, base_channel_size: int, latent_dim: int, act_fn: object = nn.GELU):
        """Encoder.

        Args:
           num_input_channels : Number of input channels of the image.
           base_channel_size : Number of channels we use in the first convolutional layers. Deeper layers might use a duplicate of it.
           latent_dim : Dimensionality of latent representation z
           act_fn : Activation function used throughout the encoder network

        """
        super().__init__()
        c_hid = base_channel_size
        self.net = nn.Sequential(
            nn.Conv2d(num_input_channels, c_hid, kernel_size=3, padding=1, stride=2),  # 32x32 => 16x16
            act_fn(),
            nn.Conv2d(c_hid, c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.Conv2d(c_hid, 2 * c_hid, kernel_size=3, padding=1, stride=2),  # 16x16 => 8x8
            act_fn(),
            nn.Conv2d(2 * c_hid, 2 * c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.Conv2d(2 * c_hid, 2 * c_hid, kernel_size=3, padding=1, stride=2),  # 8x8 => 4x4
            act_fn(),
            nn.Flatten(),  # Image grid to single feature vector
            nn.Linear(2 * 16 * c_hid, latent_dim),
        )

    def forward(self, x):
        print(x.shape)
        return self.net(x)

In [44]:
class Decoder(nn.Module):
    def __init__(self, num_input_channels: int, base_channel_size: int, latent_dim: int, act_fn: object = nn.GELU):
        """Decoder.

        Args:
           num_input_channels : Number of channels of the image to reconstruct.
           base_channel_size : Number of channels we use in the last convolutional layers. Early layers might use a duplicate of it.
           latent_dim : Dimensionality of latent representation z
           act_fn : Activation function used throughout the decoder network

        """
        super().__init__()
        c_hid = base_channel_size
        self.linear = nn.Sequential(nn.Linear(latent_dim, 2 * 16 * c_hid), act_fn())
        self.net = nn.Sequential(
            nn.ConvTranspose2d(2 * c_hid, 2 * c_hid, kernel_size=3, output_padding=1, padding=1, stride=2),  # 4x4 => 8x8
            act_fn(),
            nn.Conv2d(2 * c_hid, 2 * c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.ConvTranspose2d(2 * c_hid, c_hid, kernel_size=3, output_padding=1, padding=1, stride=2),  # 8x8 => 16x16
            act_fn(),
            nn.Conv2d(c_hid, c_hid, kernel_size=3, padding=1),
            act_fn(),
            nn.ConvTranspose2d(c_hid, num_input_channels, kernel_size=3, output_padding=1, padding=1, stride=2),  # 16x16 => 32x32
            nn.Tanh(),  # The input images is scaled between -1 and 1, hence the output has to be bounded as well
        )

    def forward(self, x):
        x = self.linear(x)
        x = x.reshape(x.shape[0], -1, 4, 4)
        x = self.net(x)
        return x

In [45]:
class Autoencoder(pl.LightningModule):
    def __init__(
        self,
        base_channel_size: int,
        latent_dim: int,
        encoder_class: object = Encoder,
        decoder_class: object = Decoder,
        num_input_channels: int = 1,
        width: int = 128,
        height: int = 87,
    ):
        super().__init__()
        # Saving hyperparameters of autoencoder
        self.save_hyperparameters()
        # Creating encoder and decoder
        self.encoder = encoder_class(num_input_channels, base_channel_size, latent_dim)
        self.decoder = decoder_class(num_input_channels, base_channel_size, latent_dim)
        # Example input array needed for visualizing the graph of the network
        self.example_input_array = torch.zeros(2, num_input_channels, width, height)

    def forward(self, x):
        """The forward function takes in an image and returns the reconstructed image."""
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

    def _get_reconstruction_loss(self, batch):
        """Given a batch of images, this function returns the reconstruction loss (MSE in our case)."""
        x, _ = batch  # We do not need the labels
        x_hat = self.forward(x)
        loss = F.mse_loss(x, x_hat, reduction="none")
        loss = loss.sum(dim=[1, 2, 3]).mean(dim=[0])
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        # Using a scheduler is optional but can be helpful.
        # The scheduler reduces the LR if the validation performance hasn't improved for the last N epochs
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.2, patience=20, min_lr=5e-5)
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}

    def training_step(self, batch, batch_idx):
        loss = self._get_reconstruction_loss(batch)
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        loss = self._get_reconstruction_loss(batch)
        self.log("val_loss", loss)

    def test_step(self, batch, batch_idx):
        loss = self._get_reconstruction_loss(batch)
        self.log("test_loss", loss)

In [46]:
# class GenerateCallback(Callback):
#     def __init__(self, input_imgs, every_n_epochs=1):
#         super().__init__()
#         self.input_imgs = input_imgs  # Images to reconstruct during training
#         # Only save those images every N epochs (otherwise tensorboard gets quite large)
#         self.every_n_epochs = every_n_epochs

#     def on_train_epoch_end(self, trainer, pl_module):
#         if trainer.current_epoch % self.every_n_epochs == 0:
#             # Reconstruct images
#             input_imgs = self.input_imgs.to(pl_module.device)
#             with torch.no_grad():
#                 pl_module.eval()
#                 reconst_imgs = pl_module(input_imgs)
#                 pl_module.train()
#             imgs = torch.stack([input_imgs, reconst_imgs], dim=1).flatten(0, 1)
#             grid = torchvision.utils.make_grid(imgs, nrow=2, normalize=True, value_range=(-1, 1))
#             # Save image to disk. TODO: fix
#             with tempfile.NamedTemporaryFile(suffix=".png") as tmp_file:
#                 save_image(grid, tmp_file.name)

In [47]:
def train(latent_dim):
    # Create a PyTorch Lightning trainer with the generation callback
    trainer = pl.Trainer(
        default_root_dir="C:\\Users\\jayor\\Documents\\repos\\synth-reconstruct\\demo\\autoencoder",
        accelerator="auto",
        devices=1,
        max_epochs=500,
        callbacks=[
            ModelCheckpoint(save_weights_only=True),
            # GenerateCallback(get_train_images(8), every_n_epochs=10),
            LearningRateMonitor("epoch"),
        ],
    )
    # trainer.logger._log_graph = True  # If True, we plot the computation graph in tensorboard
    # trainer.logger._default_hp_metric = None  # Optional logging argument that we don't need

    # Check whether pretrained model exists. If yes, load it and skip training
    pretrained_filename = os.path.join(CHECKPOINT_PATH, "sr_%i.ckpt" % latent_dim)
    if os.path.isfile(pretrained_filename):
        print("Found pretrained model, loading...")
        model = Autoencoder.load_from_checkpoint(pretrained_filename)
    else:
        model = Autoencoder(base_channel_size=32, latent_dim=latent_dim)
        trainer.fit(model, train_loader, val_loader)
    # Test best model on validation and test set
    val_result = trainer.test(model, dataloaders=val_loader, verbose=False)
    result = {"test": test_result, "val": val_result}
    return model, result

In [48]:
model_dict = {}
for latent_dim in [64, 128, 256, 384]:
    model_ld, result_ld = train(latent_dim)
    model_dict[latent_dim] = {"model": model_ld, "result": result_ld}

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


torch.Size([2, 1, 128, 87])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (2x11264 and 1024x64)