# Import and Data Loading

In [2]:
try:
  import pypianoroll as ppr
except ModuleNotFoundError:
  !apt-get install fluidsynth
  !pip install pyfluidsynth
  !pip install pypianoroll
  import pypianoroll as ppr

import torch
from torch import nn
from torch.distributions import Normal
from torch.utils.data import Dataset, DataLoader, TensorDataset
import torch.optim as optim
from torch.nn.utils import clip_grad_norm_
import random
from scipy.sparse import csc_matrix
import scipy.io.wavfile
from IPython.display import Audio
import matplotlib.pyplot as plt
import numpy as np

In [None]:
# Mount Google Drive to the filesystem.
from google.colab import drive
drive.mount('/content/drive')

In [16]:
# Definitions of model and training parameters.
NUM_EPOCHS = 200
BATCH_SIZE = 100
SHUFFLE = True
LATENT_DIM = 64
ENC_HIDDEN_SIZE = 144
DEC_HIDDEN_SIZE = 96

# Take advantage of GPUs if available.
GPU = False
cuda = torch.device('cuda')
cpu = torch.device('cpu')
device = cuda if GPU else cpu

# Load Data

In [5]:
def load_data_from_npz(filename_format, num_train=10, num_valid=1):
  """Load and return the training data from several .npz files.
    
  Parameters
  ----------
  filename_format: str
      Format string for the path to the .npz files.  Contains a single
      placeholder for the partition number, e.g. '/path/to/data-{}.npz'
  num_train: int
      The number of partitions to use for training.
  num_valid: int
      The number of partitions to use for validation.

  Returns
  -------
  (x_train, x_valid): tuple(np.array, np.array)
      The training and validation datasets, as Numpy arrays.
  """
  train_data = None
  validation_data = None

  # First `num_train` files are for training.  Final `num_valid` files are for
  # validation.
  for i in range(num_train + num_valid):
    filename = filename_format.format(i)
    with np.load(filename) as f:
      dataset_shape = f['shape']

      # Set array entries to -1 if no note, 1 otherwise.
      data = -np.ones(dataset_shape, np.float32)
      data[[x for x in f['nonzero']]] = 1

      if i < num_train:
        if train_data is None:
          train_data = data
        else:
          # After the second iteration, concatenate the new data.
          train_data = np.concatenate((train_data, data), axis=0)
      else:
        if validation_data is None:
          validation_data = data
        else:
          # After the second iteration, concatenate the new data.
          validation_data = np.concatenate((validation_data, data), axis=0)

  num_train_examples = train_data.shape[0]
  num_valid_examples = validation_data.shape[0]

  # Remove parts of the piano roll matrices that don't contain notes.
  train_data = train_data.reshape(num_train_examples, -1, 128)[:, :, 20:108].reshape(num_train_examples, -1)
  validation_data = validation_data.reshape(num_valid_examples, -1, 128)[:, :, 20:108].reshape(num_valid_examples, -1)

  x_train = torch.from_numpy(train_data).to(device)
  x_valid = torch.from_numpy(validation_data).to(device)
  return x_train, x_valid

# Variational Auto Encoder

In [6]:
class VAE(nn.Module):
  """VAE model.
  """

  def __init__(self, latent_dim, input_size, enc_hidden_size, dec_hidden_size):
    """Initialize this VAE.
    
    Parameters
    ----------
    latent_dim: int
        The dimension of the VAE's latent space.
    input_size: int
        The size of the encoder input/decoder output (should be equal to the
        number of timesteps multiplied by the number of pitches).
    enc_hidden_size: int
        The size of the encoder's hidden layer.
    dec_hidden_size: int
        The size of the decoder's hidden layer.
    """
    super(VAE, self).__init__()
    self.latent_dim = latent_dim

    # The encoder has one hidden layer, batch normalization and a ReLU nonlinearity.
    # It outputs mu and sigma, the parameters for the variational distribution.
    self.encoder = nn.Sequential(
                    nn.Linear(input_size, enc_hidden_size).to(device),
                    nn.BatchNorm1d(enc_hidden_size).to(device),
                    nn.ReLU(),
                    nn.Linear(enc_hidden_size, 2*latent_dim).to(device)
    )
    
    # The decoder has one hidden layer, batch normalization and a ReLU nonlinearity.
    self.decoder = nn.Sequential(
        nn.Linear(latent_dim, dec_hidden_size).to(device),
        nn.BatchNorm1d(dec_hidden_size).to(device),
        nn.ReLU(),
        nn.Linear(dec_hidden_size, input_size).to(device)
    )

  def reparameterize(self, mu, log_sigma):
    """Performs the reparametrization trick.
    
    Parameters
    ----------
    mu: torch.Tensor(latent_dim)
        The mean of the variational distribution.
    log_sigma: torch.tensor(latent_dim)
        Log of the standard deviation of the variational distribution.

    Returns
    -------
    transformed_samples
        Samples from the variational distribution.
    """
    eps = torch.randn(self.latent_dim).to(device)
    return mu + torch.exp(log_sigma) * eps

  def forward(self, batch):
    """Forward pass for the VAE.
    
    Parameters
    ----------
    batch: torch.Tensor(batch_size, input_dim)
        A batch of data.

    Returns
    -------
    decoded: torch.Tensor(batch_size, input_dim)
        The reconstruction of the input batch.
    """
    # Pass the batch through the encoder to obtain the variational distribution
    # parameters, mu and log_sigma.
    encoded = self.encoder(batch)
    params = torch.chunk(input = encoded, chunks=2, dim=1)
    mu, log_sigma = params[0], params[1]

    # Sample from the variational distribution.
    sample = self.reparameterize(mu, log_sigma)

    # Decode the variational distribution sample.
    decoded = self.decoder(sample)

    return decoded
    

# Plotting Utilities

In [7]:
def get_multitrack(pianoroll):
    """Converts a Numpy array pianoroll into a pypianoroll.Multitrack object.

    Parameters
    ----------
    pianoroll: np.array(num_timesteps, 128)
        A Numpy boolean-valued matrix which defines the music.

    Returns
    -------
    multitrack: pypianoroll.Multitrack
        A multitrack containing the specified music.
    """

    tracks = [
            ppr.BinaryTrack(name='track',
            program=1,
            is_drum=False,
            pianoroll=pianoroll
          )
    ]
    multitrack = ppr.Multitrack(
            name='multitrack',
            resolution=24,
            tracks=tracks
          )
    return multitrack

In [8]:
def plot_pianoroll(pianoroll):
    """Plot a pianoroll for visualization.
      
    Parameters
    ----------
    pianoroll: np.array(num_timesteps, 128)
        A Numpy boolean-valued matrix which defines the music.
    """
    multitrack = get_multitrack(pianoroll)
    ppr.plot(multitrack)

In [10]:
def plot_model_reconstruction(model=None, model_filepath='/content/drive/MyDrive/models/model.pth'):
    """Plots the reconstruction of a random validation sample.
      
    Parameters
    ----------
    model: torch.nn.Module
        PyTorch VAE model.
    """
  
    # If no model is specified, load one from a file.
    if model is None:
        model = loadModel(model_filepath, forInference=True)
    else:
        # We are using this model for inference.
        model.eval()
    
    # Randomly select a validation example.
    index = int(random.random() * validation_data.shape[0])
    validation_example = validation_data[index]

    # Plot the validation example as a pianoroll.
    pianoroll = validation_example.reshape((96, -1)).to(cpu).numpy()
    padded_zeroes = np.zeros((96, 20))
    pianoroll = np.concatenate((padded_zeroes, pianoroll, padded_zeroes), axis=1)
    pianoroll = pianoroll > 0
    plot_pianoroll(pianoroll)

    # Plot the reconstruction of the validation example as a pianoroll.
    validation_example = validation_example.reshape(-1, validation_example.shape[0])
    encoder_out = model.encoder(validation_example)
    mean = torch.chunk(input=encoder_out, chunks=2, dim=1)[0]
    out_pianoroll = model.decoder(mean).reshape((96, -1)).to(cpu).detach().numpy()
    out_pianoroll = np.concatenate((padded_zeroes, out_pianoroll, padded_zeroes), axis=1)
    out_pianoroll = out_pianoroll > 0
    plot_pianoroll(out_pianoroll)

# Save and Load Models

In [17]:
def saveModel(path, model):
  """Save a model in its entirety
  
  Parameters
  ----------
  path: str
    The path to the file where the model should be saved
  model: torch.nn.Module
    The model to be saved
  """
  torch.save(model, path)

In [18]:
def loadModel(path, forInference=False):
  """Load a saved RevNet model

  Parameters
  ----------
  path: str
    The path to the file where the model should be read from
  forInference: bool, optional
    Whether the model should be loaded specifically for inference or not
  
  Returns
  -------
  model: torch.nn.Module
    The loaded model
  """
  model = torch.load(path, map_location=device)
  if forInference:
    model.eval()
  return model

# Training

In [19]:
def neg_elbo_loss(model, batch, num_samples=20):
    """Computes and returns the ELBO loss of the specified batch.
      
    Parameters
    ----------
    model: torch.nn.Module
        PyTorch VAE model.
    batch: torch.Tensor(batch_size, input_dim)
        A batch of data.
    num_samples: int
        The number of samples to take for computing the ELBO loss.

    Returns
    -------
    elbo: float
        Negative ELBO value for the current batch.
    """

    # Pass the batch to the encoder to obtain the variational parameters.
    params = model.encoder(batch)
    params_split = torch.chunk(input = params, chunks=2, dim=1)
    mu, log_sigma = params_split[0], params_split[1]

    input_dim = batch.shape[1]
    batch_size, latent_dim = mu.shape
    samples = torch.randn(num_samples, batch_size, latent_dim).to(device)
    q_samples = samples * torch.exp(log_sigma).expand_as(samples) + mu.expand_as(samples)
    # Model posterior.
    p_z = Normal(torch.zeros(mu.shape).to(device), torch.ones(mu.shape).to(device))
    # Approximate variational distribution.
    q_z = Normal(mu, torch.exp(log_sigma))

    # Compute the KL Divergence Term of the ELBO loss.
    kl_term = p_z.log_prob(q_samples) - q_z.log_prob(q_samples)

    # Compute the Reconstruction Term of the ELBO loss.
    x_hat = model.decoder(q_samples.reshape(-1, latent_dim)).reshape(num_samples, batch_size, input_dim)
    p_x_z = Normal(x_hat, torch.ones(x_hat.shape).to(device))
    reconstruction_term = p_x_z.log_prob(batch)

    # Sum over the last dimension of each loss term, then compute the mean of
    # each over the batch size and the number of samples.
    elbo = torch.mean(kl_term.sum(-1)) + torch.mean(reconstruction_term.sum(-1))

    return -elbo

In [27]:
def train(model, training_data, validation_data=None, epochs=20, bs=1, lr=0.02,
              momentum=0.9, num_elbo_samples=20,
              model_filepath='/content/drive/MyDrive/models/model.pth', verbose=False):
    """Train the VAE model.
      
    Parameters
    ----------
    model: torch.nn.Module
        PyTorch VAE model.
    training_data: torch.Tensor(train_size, input_dim)
        The training dataset.
    validation_data: torch.Tensor(validation_size, input_dim)
        The validation dataset.
    epochs: int
        Number of epochs of training.
    bs: int
        Batch size.
    lr: float
        Learning rate.
    momentum: float
        momentum parameter for SGD.
    verbose: bool
        If True, print out extra information during training.
    """
      
    training_dataset = TensorDataset(training_data)
    training_dataloader = DataLoader(training_dataset, batch_size=bs, shuffle=True)
    validation_dataset = TensorDataset(validation_data)
    validation_dataloader = DataLoader(validation_dataset, batch_size=bs, shuffle=False)

    # Use stochastic gradient descent as an optimizer.
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)

    # Use learning rate annealing during training.
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

    all_losses = []
    all_validation_losses = []
    
    for epoch in range(epochs):
        running_loss = 0.0
        running_validation_loss = 0.0

        for i, x in enumerate(training_dataloader):

            x = x[0]
            x.requires_grad = True

            # Zero the parameter gradients.
            optimizer.zero_grad()

            # Forward + backward + optimize.
            loss = neg_elbo_loss(model, x, num_samples=num_elbo_samples)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Compute the current validation loss.
        for i, x in enumerate(validation_dataloader):
            x = x[0]
            loss = neg_elbo_loss(model, x, num_samples=20)
            running_validation_loss += loss.item()
        all_validation_losses.append(running_validation_loss)

        # If we have achieved the minimum validation loss seen so far, save the model.
        if running_validation_loss == min(all_validation_losses):
            print('Saving model on Epoch {}'.format(epoch + 1))
            saveModel(model_filepath, model)
        running_validation_loss = 0.0

        # Anneal the learning rate.
        scheduler.step(running_loss)

        # Print loss at end of epoch.
        if (verbose and (epoch+1) % 1 == 0) or epoch == epochs - 1:
            print('[Epoch: %d] loss: %.3f' % (epoch + 1, running_loss * (bs / len(training_data))))
        all_losses.append(running_loss)
        running_loss = 0.0

        # Plot the model's reconstruction of a validation example.
        plot_model_reconstruction(model)

    # Plot training losses.
    plt.plot(list(range(epochs)), all_losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.show()

    # Plot validation losses.
    plt.plot(list(range(epochs)), all_validation_losses)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Validation Loss')
    plt.show()

In [None]:
# Load the training and validation data.
training_data, validation_data = load_data_from_npz('/content/drive/MyDrive/CSC412/data/dataset-{}.npz', num_train=10, num_valid=1)

# Initialize our VAE model.
model = VAE(latent_dim=LATENT_DIM, input_size=training_data.shape[1], enc_hidden_size=ENC_HIDDEN_SIZE, dec_hidden_size=DEC_HIDDEN_SIZE)

# Train the model with the specified hyperparameters.
train(model, training_data, validation_data=validation_data, epochs=NUM_EPOCHS, bs=BATCH_SIZE, lr=0.002, num_elbo_samples=20, verbose=True)

# Music Generation

In [32]:
def plot_training_samples(training_data):
    """Plot a sample training example and create an audio listener for it.
      
    Parameters
    ----------
    training_data: torch.Tensor(train_size, input_dim)
        The training dataset.
    """
    # Choose a random training example and pad it above and below with zeroes.
    sample_index = int(random.random() * training_data.shape[0])
    training_sample = training_data[sample_index]
    padded_zeroes = torch.zeros((96, 20))
    curr_result = training_sample.reshape(96, 88)
    curr_result = torch.cat((padded_zeroes, curr_result, padded_zeroes), dim=1)

    # Convert this example to a boolean array.
    binary_results = curr_result > 0

    # Plot this example as a pianoroll.
    plot_pianoroll(binary_results)

    # Synthesize this MIDI data.
    multitrack = get_multitrack(binary_results)
    pm = multitrack.to_pretty_midi()
    waveform = pm.fluidsynth()

    # Output an audio listener within Colab.
    Audio(waveform, rate=44100)

In [33]:
def write_to_npz_file(result, output_file):
    """Write a numpy array to an output file.
      
    Parameters
    ----------
    result: np.array
        The array to write to a file.
    output_file: str
        The name of the output file.
    """

    # Convert pianorolls to a boolean matrix, if necessary.
    if result.dtype != np.bool:
        result = (result > 0).astype(bool)
      
    assert output_file.endswith(".npz")
    np.savez_compressed(
        output_file,
        data=result
    )

In [46]:
def plot_sample_reconstruction(model_filepath='/content/drive/MyDrive/models/model.pth'):
    """Sample from the latent space, plot it, and create an audio player for it.

    Parameters
    ----------
    model_filepath: str
        Path to the saved VAE model weights.
    """
    num_samples = 1
    padded_zeroes = torch.zeros((96, 20))
    model = loadModel(model_filepath, forInference=True)
    latent_samples = torch.randn(num_samples, LATENT_DIM).to(device)
    results = model.decoder(latent_samples).to(cpu).detach()
    
    curr_result = results.reshape(96, 88)
    curr_result = torch.cat((padded_zeroes, curr_result, padded_zeroes), dim=1)
    binary_results = curr_result > 0
    return binary_results

    # Plot this example as a pianoroll.
    plot_pianoroll(binary_results)

    # Synthesize this MIDI data.
    multitrack = get_multitrack(binary_results)
    pm = multitrack.to_pretty_midi()
    waveform = pm.fluidsynth()

    # Output an audio listener within Colab.
    Audio(waveform, rate=44100)

In [53]:
def find_closest_training_sample(generated_example, training_dataset):
    """Find the smallest distance between this sample and a training sample.

    Parameters
    ----------
    generated_example: torch.Tensor(input_dim)
        A sample generated from the model.
    training_dataset: torch.Tensor(train_size, input_dim)
        The training dataset.

    Returns
    -------
    """
    # Find the distances between this sample and every training example.
    sample_expanded = generated_example.expand(training_data.shape[0], -1)
    dists = torch.linalg.norm(sample_expanded - training_data, ord=None, dim=1)

    # Obtain the minimal distance track.
    padded_zeroes = torch.zeros((96, 20))
    min_dist, min_dist_index = torch.min(dists, dim=0)
    min_dist_track = training_data[min_dist_index]
    min_dist_track = min_dist_track.reshape(-1, 88)
    min_dist_track = torch.cat((padded_zeroes, min_dist_track, padded_zeroes), dim=1)
    binary_min_dist_track = min_dist_track > 0

    return binary_min_dist_track