In [None]:
# Importo las librerías necesarias.
import os
import gc
import torch
import torchaudio
import tarfile
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as td
import torchaudio.transforms as tt
from torchaudio.datasets import GTZAN
import matplotlib
import matplotlib.pyplot as plt
import itertools
from hyperopt import fmin, tpe, hp
from google.colab import drive

In [None]:
ic1, ic2, ic3 = 16, 6, 8
ks1, ks2, ks3 = 22, 5, 1
s1, s2, s3 = 12, 2, 2
p1, p2, p3 = 2, 1, 0

batch_size = 20
epochs = 40

In [None]:
# Definición del encoder
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, ic1, kernel_size=ks1, stride=s1, padding=p1),
            nn.Tanh(),
            nn.Conv1d(ic1, ic2, kernel_size=ks2, stride=s2, padding=p2),
            nn.Tanh(),
            nn.Conv1d(ic2, ic3, kernel_size=ks3, stride=s3, padding=p3),
            nn.Tanh(),
        )
        self.latent_space = nn.Flatten()

    def forward(self, x):
        x = self.encoder(x)
        latent_rep = self.latent_space(x)
        return latent_rep

# Definición del decoder
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.unflatten = nn.Unflatten(1, (8, 2297))
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(ic3, ic2, kernel_size=ks3, stride=s3, padding=p3),
            nn.Tanh(),
            nn.ConvTranspose1d(ic2, ic1, kernel_size=ks2, stride=s2, padding=p2),
            nn.Tanh(),
            nn.ConvTranspose1d(ic1, 1, kernel_size=ks1, stride=s1, padding=p1),
        )

    def forward(self, x):
        x = self.unflatten(x)
        reconstructed_seq = self.decoder(x)
        return reconstructed_seq

# Definición del Convolutional Autoencoder (CAE)
class CAE(nn.Module):
    def __init__(self):
        super(CAE, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()

    def forward(self, x):
        latent_rep = self.encoder(x)
        reconstructed_seq = self.decoder(latent_rep)
        return reconstructed_seq, latent_rep

In [None]:
# Utilizo GPU de estar disponible.
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

# Seteo una semilla para replicabilidad.
SEED = 42
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

# Conectamos la notebook a gdrive y seteamos data_dir con el path a los archivos
drive.mount('/content/drive')
data_dir = '//content/drive/MyDrive/UTDT/TD6/genres_5sec/'
list_files = os.listdir(data_dir)

# Obtenemos las clases (géneros)
classes=[]
for file in list_files:
  name='{}/{}'.format(data_dir,file)
  if os.path.isdir(name):
    classes.append(file)

# Funciones auxiliares para la clase del dataset
samplerate=22050

# Para obtener generos
def parse_genres(fname):
    parts = fname.split('/')[-1].split('.')[0]
    return parts #' '.join(parts[0])

# Para definir transformación de audios (esto es data leakage hacerlo a todo? hay distintas formas: chequear clasificador.ipynb)
def transform(audio):
    return tt.Spectrogram()(audio)

# Definimos clase para obtener el dataset
class MusicDataset():
    def __init__(self, root):
        super().__init__()
        self.root = root
        self.files =[]
        for c in classes:
          self.files = self.files + [fname for fname in os.listdir(os.path.join(root,c)) if fname.endswith('.wav')]
        self.classes = list(set(parse_genres(fname) for fname in self.files))
        self.transform = tt.Spectrogram()

    def __len__(self):
        return len(self.files)

    def __getitem__(self, i):
        fname = self.files[i]
        genre = parse_genres(fname)
        fpath = os.path.join(self.root,genre, fname)
        class_idx = self.classes.index(genre)
        audio = torchaudio.load(fpath)[0]
        spectrogram = self.transform(audio)

        return audio, spectrogram, class_idx

dataset = MusicDataset(data_dir)

# Divido en training, validation, testing.
val_size = 100
test_size = 100
train_size = len(dataset) - val_size - test_size

generator = torch.Generator().manual_seed(SEED)
train_ds, val_ds, test_ds = td.random_split(dataset, [train_size, val_size, test_size], generator)

train_dl = td.DataLoader(train_ds, batch_size, shuffle=True, num_workers=2, pin_memory=True)
valid_dl = td.DataLoader(val_ds, batch_size, num_workers=2, pin_memory=True)
test_dl = td.DataLoader(test_ds,1, num_workers=2, pin_memory=True)

In [None]:
class RMSELoss(nn.Module):
    def __init__(self, eps=1e-8):
        super().__init__()
        self.mse = nn.MSELoss()
        self.eps = eps

    def forward(self,yhat,y):
        loss = torch.sqrt(self.mse(yhat,y) + self.eps)
        return loss

In [None]:
def train_cae(params):

  print(f"Using lr = {params['learning_rate']}")
  
  # Inicializo model
  model = CAE()
  model.to(device)

  # Definición de función de perdida (MSE loss la que se suele usar para reconstrucción)
  criterion = RMSELoss()
  # Definición del optimizador
  optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'])

  # Importante para ir liberando memoria ram
  torch.cuda.empty_cache()
  gc.collect()

  lowest_loss = 1000
  best_epoch = -1

  for epoch in range(epochs):
      train_losses = []
      # Entrenamiento
      model.train()
      for batch in train_dl:
          optimizer.zero_grad()  # Clear gradients

          inputs, _, _ = batch
          inputs = inputs.to(device)
          # Forward pass
          outputs, _ = model(inputs)

          # Compute reconstruction loss
          loss = criterion(outputs, inputs)

          # Backward pass and optimization
          optimizer.zero_grad()
          loss.backward()
          optimizer.step()

          train_losses.append(loss.item())

          # Importante para ir liberando memoria ram
          del inputs
          del loss
          del outputs
          torch.cuda.empty_cache()
          gc.collect()

      # Validation
      model.eval()
      val_losses = []
      with torch.no_grad():
          for batch in valid_dl:
              inputs, _, _ = batch
              inputs = inputs.to(device)
              outputs, _ = model(inputs)
              loss = criterion(outputs, inputs)
              val_losses.append(loss.item())

      # Importante para ir liberando memoria ram
      del inputs
      del loss
      del outputs
      torch.cuda.empty_cache()
      gc.collect()

      # Para estadisticas de wandb
      if val_losses[-1] < lowest_loss:
          lowest_loss = val_losses[-1]
          best_epoch = epoch
          best_model_state_dict = model.state_dict()

      # Print progress
      print(f'Epoch [{epoch+1}/{epochs}], Training Loss: {np.mean(train_losses):.4f}, Validation Loss: {np.mean(val_losses):.4f}')

  # Indico por consola cuando finalizó el entrenamiento
  print(f"Entrenamiento finalizado, la loss más baja fue {str(lowest_loss)} la mejor epoch:{str(best_epoch)}")

  return lowest_loss

In [None]:
# Define the search space for hyperparameters
space = {
    'learning_rate': hp.uniform('learning_rate', 0.001, 0.1)
}

# Perform hyperparameter optimization
best = fmin(fn=train_cae, space=space, algo=tpe.suggest, max_evals=8)
print("Best learning rate:", best['learning_rate'])