In [1]:
import os
import mido
import utils ##
import random
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.colors as mcolors

import torch
from torch.utils.data import DataLoader

## Importar Datos

In [2]:
df_audios = pd.read_csv('datos_procesados/valid_samples.csv', index_col=0)
audio_names = list(df_audios.index)

## Dataset

- Definición del objeto Dataset para importar los datos

In [3]:
class TarareoMIDIDataset(torch.utils.data.Dataset):
    def __init__(self, tarareo_directory: str, midi_directory: str, samples: list, extensions: tuple[str, str]):
        self._tarareo_directory = tarareo_directory
        self._midi_directory = midi_directory
        
        self._samples = samples #  Nombre del elemento de grabación
        self._extensions = extensions #  extension del nombre del tarareo y vector midi. Ejm audio_name[extension] (incluye formato)

        # Si usamos PADDING, será conveniente tener un vocabulario como si se tratara de una traducción en NLP
        # Esto ayuda a ordenar las etiquetas por orden de frecuencia
        # Colocando en las primeras casillas a las etiquetas de inicio y final
        # self._vocabulary = vocabulary


    def __len__(self):
        return len(self._samples)

    def __getitem__(self, idx):
        # Recuperar nombre de la muestra
        file_name = self._samples[idx]
        tarareo_name = file_name + self._extensions[0]
        midi_name = file_name + self._extensions[1]

        # Importar
        tarareo_path = os.path.join(self._tarareo_directory, tarareo_name)
        tarareo = np.load(tarareo_path)

        midi_path = os.path.join(self._midi_directory, midi_name)
        midi = np.load(midi_path)

        # Agregar <SOS> y <EOS> ?
        return torch.tensor(tarareo, dtype=torch.float32), torch.tensor(midi, dtype=torch.int)

- Definición de `collate_fn` para crear batches y homologar las dimensiones de audios de distinta duración.

In [4]:
def audio_vector_collate_fn(batch: list[tuple[torch.Tensor, torch.Tensor]]) -> tuple[torch.Tensor, torch.Tensor]:
    """Función para agregar padding a los audios y vectores de etiquetas por lote.

    :param batch: Una lista de tuplas donde cada tupla contiene un audio y su vector de etiquetas.
    :return: Una tupla que contiene dos tensores:
             1) Un tensor que contiene todos los audios del lote, apilados juntos. Forma: [batch_size, L_max, N_max].
             2) Un tensor que contiene todos los vectores de etiquetas del lote, rellenados con zeros para que tengan la misma longitud. 
                Forma: [batch_size, K_max].
    """
    audios, labels = zip(*batch)
    
    # Determinar las dimensiones máximas
    L_max = max(audio.size(0) for audio in audios)
    N_max = max(audio.size(1) for audio in audios)
    K_max = max(label.size(0) for label in labels)
    
    # Inicializar los tensores con padding
    padded_audios = torch.zeros(len(audios), L_max, N_max, dtype=torch.float32)
    padded_labels = torch.zeros(len(labels), K_max, dtype=torch.int64)
    
    for i, (audio, label) in enumerate(batch):
        L = audio.size(0)
        N = audio.size(1)
        K = label.size(0)
        
        # Copiar el audio y el vector de etiquetas a los tensores con padding
        padded_audios[i, :L, :N] = audio
        padded_labels[i, :K] = label
    
    return padded_audios, padded_labels

## Separación de conjuntos

In [5]:
tarareo_directory = "datos_procesados/tarareos/ventanas/"
midi_directory = 'datos_procesados/midis/target_vectors/'
extensions = ('_frames.npy', '_target.npy')
dataset = TarareoMIDIDataset(tarareo_directory=tarareo_directory, midi_directory=midi_directory, samples=audio_names, extensions=extensions)

In [6]:
props = [0.7, 0.2, 0.1]
train_size = int(props[0] * len(dataset))
val_size = int(props[1] * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, props) # [train_size, val_size, test_size]

In [7]:
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, collate_fn=audio_vector_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=audio_vector_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0, collate_fn=audio_vector_collate_fn)

## Arquitectura ConvSeq2Seq

In [11]:
def convout_calc(input_dim, kernel, stride, dilation=1):
    new_dim = np.floor((input_dim - 1 - dilation*(kernel-1)) / stride + 1)
    return int(new_dim)

In [33]:
class Encoder(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super(Encoder, self).__init__()
        
        # Conv2D 
        self.conv2d = torch.nn.Conv2d(in_channels=1, 
                                      out_channels=1, 
                                      kernel_size=(int(1378//16), 4), 
                                      stride=(1, 1))
        # Dimensión H de Conv2D
        self.conv2_H_dim = convout_calc(input_dim, kernel=int(1378//16), stride=1)
        
        # MaxPooling2D
        self.max_pool2d = torch.nn.MaxPool2d(kernel_size=(int(86//4), 2))
        # Dimensión H de MaxPooling2D
        self.pool_H_dim = convout_calc(self.conv2_H_dim, kernel=int(86//4), stride=int(86//4))
        
        # Encoder LSTM
        self.encoder_lstm = torch.nn.LSTM(input_size=self.pool_H_dim, 
                                          hidden_size=hidden_dim, 
                                          batch_first=True)
        
    def forward(self, x):
        # x: [batch_size, L, N]
        # x = x.unsqueeze(1) # Canal pivote

        # ETAPA Convolucional
        conv_out = self.conv2d(x)
        pool_out = self.max_pool2d(conv_out)

        # Transformar a dimensiones apropiadas para LSTM
        pool_out = pool_out.squeeze(1)
        pool_out_t = pool_out.transpose(1, 2)
        
        # ETAPA Recurrente
        encoder_outputs, (hidden, cell) = self.encoder_lstm(pool_out_t)
        
        return encoder_outputs, (hidden, cell)

In [34]:
class Decoder(torch.nn.Module):
    def __init__(self, labels_dim, hidden_dim, embedding_dim, dropout=0.25):
        super(Decoder, self).__init__()
        # Parámetros
        self._labels_dim = labels_dim
        self._hidden_dim = hidden_dim
        self._embedding_dim = embedding_dim

        # Decoder LSTM
        self.embedding = torch.nn.Embedding(labels_dim, embedding_dim)
        self.decoder_lstm = torch.nn.LSTM(input_size=embedding_dim,
                                          hidden_size=hidden_dim,
                                          batch_first=True)
        self.dropout = torch.nn.Dropout(dropout)
        
        # Capa lineal que mapeará las salidas del LSTM a la dimensión de salida (es decir, al número de clases)
        self.fc = torch.nn.Linear(hidden_dim, labels_dim)
        

    def forward(self, input, hidden, cell): #, teacher_forcing_ratio=0.5, target_seq=None
        # input : [batch_size]

        # Embedding:
        embedded = self.dropout(self.embedding(input))
        embedded = embedded.unsqueeze(1) # embedded: [batch_size, 1, embedding_dim]
        
        # Aplicar celda recurrente
        output, (hidden, cell) = self.decoder_lstm(embedded, (hidden, cell)) # output: [batch_size, 1, hidden_dim]
        
        # Predicción
        output = output.squeeze(1) # output: [batch_size, hidden_dim]
        logits = self.fc(output) # logits: [batch_size, labels_dim]

        return logits, hidden, cell

In [35]:
class Seq2Seq(torch.nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src = [batch_size, L, N]
        # trg = [batch_size, K]

        batch_size = trg.shape[0]
        trg_length = trg.shape[1]
        labels_size = self.decoder._labels_dim

        # Placeholder del tensor de salida
        outputs = torch.zeros(batch_size, trg_length, labels_size).to(self.device)

        # Último estado oculto del encoder
        _, (hidden, cell) = self.encoder(src)

        
        input = trg[:, 0] # Se comienza en la primera celda del vector MIDI objetivo
        # input: [batch size]
        for t in range(1, trg_length):
            # Realiza la predicción del valor de celda
            output, hidden, cell = self.decoder(input, hidden, cell)
            # output = [batch_size, labels_dim]
            # hidden,cell = [batch size, hidden dim]

            # Almacena los logits para la celda t
            outputs[:,t,:] = output

            # Decidir si se realiza teacher forcing
            teacher_force = random.random() < teacher_forcing_ratio
            if teacher_force:
                input = trg[:,t]
                pass
            else:
                # Escoge el código más probable
                top1 = output.argmax(1)
                input = top1
            
        return outputs

- TEST

In [19]:
L = 1378
batch_size = 20
N = 160
K=12
hidden_dim = 6
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

encoder = Encoder(input_dim=L, hidden_dim=hidden_dim)
decoder = Decoder(labels_dim=3, embedding_dim=3, hidden_dim=hidden_dim, dropout=0.25)
model = Seq2Seq(encoder, decoder, device).to(device)

x_src = torch.rand(batch_size,L,N)
y_trg = torch.randint(0,3, (batch_size,K))
_ = model(x_src, y_trg)

## Definición de modelo

- Función de evaluación

In [36]:
def evaluate_seq2seq(model, val_loader, criterion):
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for src, trg in val_loader:
            src, trg = src.unsqueeze(1), trg  # Añadir dimensión de canal
            output = model(src, trg, 0)  # No usar teacher forcing durante la evaluación
            output = output[:, 1:].reshape(-1, output.shape[-1])
            trg = trg[:, 1:].reshape(-1)
            loss = criterion(output, trg)
            val_loss += loss.item()
    return val_loss / len(val_loader)

- Función de entrenamiento

In [37]:
def train_seq2seq(model, train_loader, val_loader, criterion, optimizer, num_epochs=20):
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for src, trg in train_loader:
            src, trg = src.unsqueeze(1), trg  # Añadir dimensión de canal
            optimizer.zero_grad()
            output = model(src, trg)
            output = output[:, 1:].reshape(-1, output.shape[-1])
            trg = trg[:, 1:].reshape(-1)
            loss = criterion(output, trg)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        train_loss /= len(train_loader)
        val_loss = evaluate_seq2seq(model, val_loader, criterion)
        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

- Configuración

In [38]:
L = 1378
input_dim = L
batch_size = 20
hidden_dim = 30
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [39]:
encoder = Encoder(input_dim=L, hidden_dim=hidden_dim)
decoder = Decoder(labels_dim=3, embedding_dim=3, hidden_dim=hidden_dim, dropout=0.25)
model = Seq2Seq(encoder, decoder, device).to(device)

## Entrenamiento

In [None]:
# Configuración de optimizador y función de pérdida
criterion = torch.nn.CrossEntropyLoss(ignore_index=0)  # Ignorar el padding
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Entrenar el modelo
train_seq2seq(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)

## Attention TEST

In [76]:
class Encoder(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=1):
        super(Encoder, self).__init__()
        self.lstm = torch.nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)

    def forward(self, x):
        outputs, (hidden, cell) = self.lstm(x)
        return outputs, hidden, cell

In [77]:
class Decoder(torch.nn.Module):
    def __init__(self, output_dim, hidden_dim, num_layers=1):
        super(Decoder, self).__init__()
        self.lstm = torch.nn.LSTM(output_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        output, (hidden, cell) = self.lstm(x, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell

In [78]:
class Seq2SeqWithAttention(torch.nn.Module):
    def __init__(self, encoder, decoder, attention, hidden_dim):
        super(Seq2SeqWithAttention, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.attention = attention
        self.hidden_dim = hidden_dim

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.size(0)
        trg_len = trg.size(1)
        trg_vocab_size = 3  # As there are 3 possible labels [0, 1, 2]
        
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(src.device)
        
        encoder_outputs, hidden, cell = self.encoder(src)
        
        input = trg[:, 0]
        
        for t in range(1, trg_len):
            context_vector, _ = self.attention(encoder_outputs, hidden[-1])
            input = input.unsqueeze(1)
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output
            teacher_force = np.random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[:, t] if teacher_force else top1
        
        return outputs

In [79]:
# Definición de parámetros
input_dim = 64
hidden_dim = 128
output_dim = 3  # Etiquetas [0, 1, 2]

encoder = Encoder(input_dim, hidden_dim)
attention = Attention(hidden_dim)
decoder = Decoder(output_dim, hidden_dim)
model = Seq2SeqWithAttention(encoder, decoder, attention, hidden_dim)
