In [None]:
# Imports
import torch
import random

import numpy as np
import matplotlib.pylab as plt

import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

import os
import glob
import imageio
import random, shutil
import torch
import torch.nn as nn
from tqdm.notebook import tqdm, trange
import torch.nn.functional as F
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import IPython.display as display
import librosa
import librosa.display
import pandas as pd
import seaborn as sns
#import torchaudio

In [None]:
def set_seed(seed=None, seed_torch=True):
  if seed is None:
    seed = np.random.choice(2 ** 32)
  random.seed(seed)
  np.random.seed(seed)
  if seed_torch:
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

  print(f'Random seed {seed} has been set.')

set_seed(2021)

Random seed 2021 has been set.


In [None]:
def set_device():
  device = "cuda" if torch.cuda.is_available() else "cpu"
  if device != "cuda":
    print("WARNING: For this notebook to perform best, "
        "if possible, in the menu under `Runtime` -> "
        "`Change runtime type.`  select `GPU` ")
  else:
    print("GPU is enabled in this notebook.")

  return device

DEVICE = set_device()

GPU is enabled in this notebook.


From W2D5 Tutorial: 

## Load Data

In [None]:
import requests

fname = "music.zip"
url = "https://osf.io/drjhb/download"

if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("!!! Failed to download data !!!")
  else:
    if r.status_code != requests.codes.ok:
      print("!!! Failed to download data !!!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)

In [None]:
from zipfile import ZipFile

with ZipFile(fname, 'r') as zipObj:
  # Extract all the contents of zip file in different directory
  zipObj.extractall()

In [None]:
in_folder_path = "/content/Data/genres_original/"
out_folder_path = "/content/spectrograms_30s/" #change this for your personal drive. 

if not os.path.exists(out_folder_path):
    os.mkdir(out_folder_path)

#get list of genre folders
genre_folders = glob.glob(in_folder_path + "*")

#initialize empty list of problem files:
problem_files = []

#loop over genre folders
for genre_folder in genre_folders: 
  #get list of individual .wav files
  wav_files = glob.glob(genre_folder + "/*.wav")

  #get output folder path
  genre_name = genre_folder.split('/')[-1]
  print(genre_name)
  out_folder = out_folder_path + genre_name + "/"
  #make output folder if it doesn't exist
  if not os.path.exists(out_folder):
    os.mkdir(out_folder)

  #loop over list of wav files
  for wav_file in wav_files:
    #extract file name
    file_name = wav_file.split('/')[-1]
    file_name = file_name.strip(".wav")
    
    #load wav file
    try: 
      y, sr = librosa.load(wav_file)

      #normalize audio
      y = (y - y.mean())/ y.std()

      #calculate mel spectrogram
      spectrogram = librosa.feature.melspectrogram(y, sr = sr)
      #spectrogram_db = librosa.amplitude_to_db(spectrogram, ref = np.max)

      #global normalization of the spectrogram (not frequency band normalization...)
      #spectrogram_norm = (spectrogram_db - spectrogram_db.mean()) / spectrogram_db.std()

      #save spectrogram
      np.save(out_folder + file_name, spectrogram)

    except:
      print("Something is wrong with " + file_name + "so it has been skipped")
      problem_files = problem_files.append(wav_file)

reggae
disco
country
pop
blues
rock
hiphop
metal
jazz




Something is wrong with jazz.00054so it has been skipped
classical


In [None]:
## EDIT HERE TO CHANGE SIZE OF TEST AND VAL DATASETS
test_prop = 0.15
val_prop = 0.15

train_prop = 1 - test_prop - val_prop

# Create folder with training, testing and validation data.

spectrograms_dir = "/content/spectrograms_30s/"
folder_names = ['/content/train/', 
                '/content/test/', 
                '/content/val/']
train_dir = folder_names[0]
test_dir = folder_names[1]
val_dir = folder_names[2]

for f in folder_names:
  if os.path.exists(f):
    shutil.rmtree(f)
    os.mkdir(f)
  else:
    os.mkdir(f)

# Loop over all genres.

genres = list(os.listdir(spectrograms_dir))
for g in genres:
  # find all images & split in train, test, and validation
  src_file_paths= []
  for im in glob.glob(os.path.join(spectrograms_dir, f'{g}',"*.npy"), recursive=True):
    src_file_paths.append(im)
  random.Random(2021).shuffle(src_file_paths)

  test_idx = int(len(src_file_paths) * test_prop)
  val_idx = test_idx + int(len(src_file_paths) * val_prop)

  test_files = src_file_paths[0:test_idx]
  val_files = src_file_paths[test_idx:val_idx]
  train_files = src_file_paths[val_idx:]

  #  make destination folders for train and test images
  for f in folder_names:
    if not os.path.exists(os.path.join(f + f"{g}")):
      os.mkdir(os.path.join(f + f"{g}"))

  # copy training and testing images over
  for f in train_files:
    shutil.copy(f, os.path.join(os.path.join(train_dir + f"{g}") + '/',os.path.split(f)[1]))
  for f in test_files:
    shutil.copy(f, os.path.join(os.path.join(test_dir + f"{g}") + '/',os.path.split(f)[1]))
  for f in val_files:
    shutil.copy(f, os.path.join(os.path.join(val_dir + f"{g}") + '/',os.path.split(f)[1]))

In [None]:
def apply_sliding_window(in_dir, out_dir, window_length = 3, hop_length = 1, 
                         save_as_tensor = False):
  #IMPORTANT NOTE - this function assumes that the spectrograms were made with 
    #default librosa nfft_size, hop_length, etc. 

    #window and hop length units are in seconds. 

  window_size = librosa.time_to_frames(window_length)
  hop_size = librosa.time_to_frames(hop_length)
  #get list of genre folders
  genre_folders = glob.glob(in_dir + "*")

  #make out_dir if it doesn't exist
  if not os.path.exists(out_dir):
    os.mkdir(out_dir)

  #loop over genre folders
  for genre_folder in genre_folders:

    #get list of individual spectrogram files
    spec_files = glob.glob(genre_folder + "/*.npy")

    #get output folder path
    genre_name = genre_folder.split('/')[-1]
    out_folder = out_dir + genre_name + "/"
    print(out_folder)
    #make output folder if it doesn't exist

    if not os.path.exists(out_folder):
      os.mkdir(out_folder)

    #loop over spectrogram files
    for spec_file in spec_files:
      #extract file name
      file_name = spec_file.split('/')[-1]
      file_name = file_name.strip(".npy")

      #load spectrogram
      spec = np.load(spec_file)
      
      #apply sliding frame to spectrogram
      all_frames = librosa.util.frame(spec, window_size, hop_size)
      all_frames = np.moveaxis(all_frames, 2, 0)

      #loop over individual frames
      for i, frame in enumerate(all_frames):
        
        #specify out file name
        full_out_path = out_folder + file_name + "_" + str(i)
        
        if save_as_tensor: 
          #convert np array to tensor
          torch_frame = torch.from_numpy(frame)
          #save torch of spectrogram frame
          torch.save(torch_frame, full_out_path + ".pt")  

        else: 
          #save spectrogram frame
          np.save(full_out_path, frame)

In [None]:
# Cut Data 
full_path = '/content/'

apply_sliding_window(full_path + 'test/*', full_path + 'test_cropped/', save_as_tensor=True)
apply_sliding_window(full_path + 'train/*', full_path + 'train_cropped/', save_as_tensor=True)
apply_sliding_window(full_path + 'val/*', full_path + 'val_cropped/', save_as_tensor = True)

/content/test_cropped/reggae/
/content/test_cropped/disco/


  "on a non-contiguous input. This will result in a copy.".format(axis)


/content/test_cropped/country/
/content/test_cropped/pop/
/content/test_cropped/blues/
/content/test_cropped/rock/
/content/test_cropped/hiphop/
/content/test_cropped/metal/
/content/test_cropped/jazz/
/content/test_cropped/classical/
/content/train_cropped/reggae/
/content/train_cropped/disco/
/content/train_cropped/country/
/content/train_cropped/pop/
/content/train_cropped/blues/
/content/train_cropped/rock/
/content/train_cropped/hiphop/
/content/train_cropped/metal/
/content/train_cropped/jazz/
/content/train_cropped/classical/
/content/val_cropped/reggae/
/content/val_cropped/disco/
/content/val_cropped/country/
/content/val_cropped/pop/
/content/val_cropped/blues/
/content/val_cropped/rock/
/content/val_cropped/hiphop/
/content/val_cropped/metal/
/content/val_cropped/jazz/
/content/val_cropped/classical/


## Train the Conv Auto Encoder

In [None]:
class BiasLayer(nn.Module):
  def __init__(self, shape):
    super(BiasLayer, self).__init__()
    init_bias = torch.zeros(shape)
    self.bias = nn.Parameter(init_bias, requires_grad=True)

  def forward(self, x):
    return x + self.bias

In [None]:
def cout(x, layer):
  """Unnecessarily complicated but complete way to
  calculate the output depth, height and width size for a Conv2D layer

  Args:
    x (tuple): input size (depth, height, width)
    layer (nn.Conv2d): the Conv2D layer

  returns:
    (int): output shape as given in [Ref]

  Ref:
    https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
  """
  assert isinstance(layer, nn.Conv2d)
  p = layer.padding if isinstance(layer.padding, tuple) else (layer.padding,)
  k = layer.kernel_size if isinstance(layer.kernel_size, tuple) else (layer.kernel_size,)
  d = layer.dilation if isinstance(layer.dilation, tuple) else (layer.dilation,)
  s = layer.stride if isinstance(layer.stride, tuple) else (layer.stride,)
  in_depth, in_height, in_width = x
  out_depth = layer.out_channels
  out_height = 1 + (in_height + 2 * p[0] - (k[0] - 1) * d[0] - 1) // s[0]
  out_width = 1 + (in_width + 2 * p[-1] - (k[-1] - 1) * d[-1] - 1) // s[-1]
  return (out_depth, out_height, out_width)

## ConvVAE

In [None]:
class BiasLayer(nn.Module):
  def __init__(self, shape):
    super(BiasLayer, self).__init__()
    init_bias = torch.zeros(shape)
    self.bias = nn.Parameter(init_bias, requires_grad=True)

  def forward(self, x):
    print(x.shape)
    return x + self.bias

In [None]:
def log_p_x(x, mu_xs, sig_x):
  """Given [batch, ...] input x and [batch, n, ...] reconstructions, compute
  pixel-wise log Gaussian probability

  Sum over pixel dimensions, but mean over batch and samples.
  """
  b, n = mu_xs.size()[:2]
  # Flatten out pixels and add a singleton dimension [1] so that x will be
  # implicitly expanded when combined with mu_xs
  x = x.reshape(b, 1, -1)
  _, _, p = x.size()
  squared_error = (x - mu_xs.view(b, n, -1))**2 / (2*sig_x**2)

  # Size of squared_error is [b,n,p]. log prob is by definition sum over [p].
  # Expected value requires mean over [n]. Handling different size batches
  # requires mean over [b].
  return -(squared_error + torch.log(sig_x)).sum(dim=2).mean(dim=(0,1))


In [None]:
def kl_q_p(zs, phi):
  """Given [b,n,k] samples of z drawn from q, compute estimate of KL(q||p).
  phi must be size [b,k+1]

  This uses mu_p = 0 and sigma_p = 1, which simplifies the log(p(zs)) term to
  just -1/2*(zs**2)
  """
  b, n, k = zs.size()
  mu_q, log_sig_q = phi[:,:-1], phi[:,-1]
  log_p = -0.5*(zs**2)
  log_q = -0.5*(zs - mu_q.view(b,1,k))**2 / log_sig_q.exp().view(b,1,1)**2 - log_sig_q.view(b,1,-1)
  # Size of log_q and log_p is [b,n,k]. Sum along [k] but mean along [b,n]
  return (log_q - log_p).sum(dim=2).mean(dim=(0,1))

In [None]:

K_VAE = 2


class ConvVAE(nn.Module):
  def __init__(self, K, num_filters=32, filter_size=5):
    super(ConvVAE, self).__init__()

    # With padding=0, the number of pixels cut off from each image dimension
    # is filter_size // 2. Double it to get the amount of pixels lost in
    # width and height per Conv2D layer, or added back in per
    # ConvTranspose2D layer.
    filter_reduction = 2 * (filter_size // 2)

    # After passing input through two Conv2d layers, the shape will be
    # 'shape_after_conv'. This is also the shape that will go into the first
    # deconvolution layer in the decoder
    self.shape_after_conv = (num_filters,
                              data_shape[1]-2*filter_reduction,
                              data_shape[2]-2*filter_reduction)
    flat_size_after_conv = self.shape_after_conv[0] \
        * self.shape_after_conv[1] \
        * self.shape_after_conv[2]

    # Define the recognition model (encoder or q) part
    self.q_bias = BiasLayer(data_shape)
    self.q_conv_1 = nn.Conv2d(data_shape[0], num_filters, 5)
    self.q_conv_2 = nn.Conv2d(num_filters, num_filters, 5)
    self.q_flatten = nn.Flatten()
    self.q_fc_phi = nn.Linear(flat_size_after_conv, K+1)

    # Define the generative model (decoder or p) part
    self.p_fc_upsample = nn.Linear(K, flat_size_after_conv)
    self.p_unflatten = nn.Unflatten(-1, self.shape_after_conv)
    self.p_deconv_1 = nn.ConvTranspose2d(num_filters, num_filters, 5)
    self.p_deconv_2 = nn.ConvTranspose2d(num_filters, data_shape[0], 5)
    self.p_bias = BiasLayer(data_shape)

    # Define a special extra parameter to learn scalar sig_x for all pixels
    self.log_sig_x = nn.Parameter(torch.zeros(()))

  def infer(self, x):
    """Map (batch of) x to (batch of) phi which can then be passed to
    rsample to get z
    """
    s = self.q_bias(x)
    s = F.relu(self.q_conv_1(s))
    s = F.relu(self.q_conv_2(s))
    flat_s = s.view(s.size()[0], -1)
    phi = self.q_fc_phi(flat_s)
    return phi

  def generate(self, zs):
    """Map [b,n,k] sized samples of z to [b,n,p] sized images
    """
    # Note that for the purposes of passing through the generator, we need
    # to reshape zs to be size [b*n,k]
    b, n, k = zs.size()
    s = zs.view(b*n, -1)
    s = F.relu(self.p_fc_upsample(s)).view((b*n,) + self.shape_after_conv)
    s = F.relu(self.p_deconv_1(s))
    s = self.p_deconv_2(s)
    s = self.p_bias(s)
    mu_xs = s.view(b, n, -1)
    return mu_xs

  def decode(self, zs):
    # Included for compatability with conv-AE code
    return self.generate(zs.unsqueeze(0))

  def forward(self, x):
    # VAE.forward() is not used for training, but we'll treat it like a
    # classic autoencoder by taking a single sample of z ~ q
    phi = self.infer(x)
    zs = rsample(phi, 1)
    return self.generate(zs).view(x.size())

  def elbo(self, x, n=1):
    """Run input end to end through the VAE and compute the ELBO using n
    samples of z
    """
    print('In elbo')
    phi = self.infer(x)
    print('phi', phi.shape, phi[1])
    zs = rsample(phi, n)
    print('zs', zs.shape, zs[1])
    mu_xs = self.generate(zs)
    print('mu',mu_xs.shape, mu_xs[1])
    return log_p_x(x, mu_xs, self.log_sig_x.exp()) - kl_q_p(zs, phi)


def expected_z(phi):
  return phi[:, :-1]


def rsample(phi, n_samples):
  """Sample z ~ q(z;phi)
  Ouput z is size [b,n_samples,K] given phi with shape [b,K+1]. The first K
  entries of each row of phi are the mean of q, and phi[:,-1] is the log
  standard deviation
  """
  print('In rsample')
  b, kplus1 = phi.size()
  print('b',b,'kplus1',kplus1)
  k = kplus1-1
  mu, sig = phi[:, :-1], phi[:,-1].exp()
  print('mu',mu[1],'sig',sig[1])
  eps = torch.randn(b, n_samples, k, device=phi.device)
  print('eps',eps[1])
  return eps*sig.view(b,1,1) + mu.view(b,1,k)


def train_vae(vae, dataset, epochs=10, n_samples=1000):
  opt = torch.optim.Adam(vae.parameters(), lr=1e-3, weight_decay=0)
  elbo_vals = []
  vae.to(DEVICE)
  vae.train()
  loader = DataLoader(dataset, batch_size=250, shuffle=True, pin_memory=True)
  for epoch in trange(epochs, desc='Epochs'):
    for im, _ in tqdm(loader, total=len(dataset) // 250, desc='Batches', leave=False):
      im = im.to(DEVICE)
      im = torch.unsqueeze(im, 1)
      print('Im shape',im.shape)
      opt.zero_grad()
      loss = -vae.elbo(im)
      loss.backward()
      opt.step()

      elbo_vals.append(-loss.item())
  vae.to('cpu')
  vae.eval()
  return elbo_vals


trained_conv_VarAE = ConvVAE(K=K_VAE)
elbo_vals = train_vae(trained_conv_VarAE, train_dataset, n_samples=10000)

print(f'Learned sigma_x is {torch.exp(trained_conv_VarAE.log_sig_x)}')

# Uncomment below if you'd like to see the the training
# curve of the evaluated ELBO loss function
# ELBO is the loss function used to train VAEs (see lecture!)
plt.figure()
plt.plot(elbo_vals)
plt.xlabel('Batch #')
plt.ylabel('ELBO')
plt.show()

Epochs:   0%|          | 0/10 [00:00<?, ?it/s]

Batches:   0%|          | 0/78 [00:00<?, ?it/s]

Im shape torch.Size([250, 1, 128, 129])
In elbo
phi torch.Size([250, 3]) tensor([57.7423, 16.0517, 66.0804], device='cuda:0', grad_fn=<SelectBackward>)
In rsample
b 250 kplus1 3
mu tensor([57.7423, 16.0517], device='cuda:0', grad_fn=<SelectBackward>) sig tensor(4.9929e+28, device='cuda:0', grad_fn=<SelectBackward>)
eps tensor([[-0.8138,  0.3330]], device='cuda:0')
zs torch.Size([250, 1, 2]) tensor([[-4.0633e+28,  1.6626e+28]], device='cuda:0', grad_fn=<SelectBackward>)
mu torch.Size([250, 1, 16512]) tensor([[-5.5260e+26,  7.4099e+26, -1.4658e+27,  ..., -4.5801e+26,
         -1.2067e+27, -9.2506e+26]], device='cuda:0', grad_fn=<SelectBackward>)
Im shape torch.Size([250, 1, 128, 129])
In elbo
phi torch.Size([250, 3]) tensor([nan, nan, nan], device='cuda:0', grad_fn=<SelectBackward>)
In rsample
b 250 kplus1 3
mu tensor([nan, nan], device='cuda:0', grad_fn=<SelectBackward>) sig tensor(nan, device='cuda:0', grad_fn=<SelectBackward>)
eps tensor([[-1.2029,  0.7386]], device='cuda:0')
zs torch

KeyboardInterrupt: ignored

In [None]:
test_3d = torch.zeros([250, 128, 129])

test_4d = torch.unsqueeze(test_3d, 1)

test_4d.shape

torch.Size([250, 1, 128, 129])