<a href="https://colab.research.google.com/github/Kiira6/kiira6.github.io/blob/master/AutoEncoder_RAW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import fnmatch

def find_mp3_files(directory):
    # Lista para almacenar las rutas de los archivos .mp3
    mp3_files = []

    # Recorrer el directorio y subdirectorios
    for dirpath, dirnames, filenames in os.walk(directory):
        for filename in filenames:
            # Si el archivo es .mp3, agregarlo a la lista
            if fnmatch.fnmatch(filename, '*.mp3'):
                mp3_files.append(os.path.join(dirpath, filename))

    return mp3_files

In [None]:
# Solicitamos al usuario la ruta donde buscar los archivos mp3
directory = "D:\ProyectoLibertad\mp3"

mp3_files = find_mp3_files(directory)

len(mp3_files)

119355

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        self.norm1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.norm2 = nn.BatchNorm1d(out_channels)
        self.skip = nn.Conv1d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        residual = self.skip(x)
        x = F.relu(self.norm1(self.conv1(x)))
        x = self.norm2(self.conv2(x))
        return F.relu(x + residual)

class Encoder(nn.Module):
    def __init__(self, C, B, K, S, D):
        super(Encoder, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=C, kernel_size=7, padding=3)
        self.norm1 = nn.BatchNorm1d(C)
        self.blocks = nn.ModuleList([ResidualBlock(C*(2**i), C*(2**(i+1))) for i in range(B)])
        self.lstm = nn.LSTM(input_size=C*(2**B), hidden_size=D, num_layers=2)
        self.conv2 = nn.Conv1d(in_channels=D, out_channels=D, kernel_size=7, padding=3)
        self.norm2 = nn.BatchNorm1d(D)

    def forward(self, x):
        x = F.relu(self.norm1(self.conv1(x)))
        for block in self.blocks:
            x = block(x)
        x, _ = self.lstm(x)
        x = F.relu(self.norm2(self.conv2(x)))
        return x

class Decoder(nn.Module):
    def __init__(self, C, B, K, S, D):
        super(Decoder, self).__init__()
        self.conv1 = nn.ConvTranspose1d(in_channels=D, out_channels=D, kernel_size=7, padding=3)
        self.norm1 = nn.BatchNorm1d(D)
        self.blocks = nn.ModuleList([ResidualBlock(C*(2**(B-i)), C*(2**(B-i-1))) for i in range(B)])
        self.lstm = nn.LSTM(input_size=C, hidden_size=D, num_layers=2)
        self.conv2 = nn.ConvTranspose1d(in_channels=D, out_channels=1, kernel_size=7, padding=3)

    def forward(self, x):
        x = F.relu(self.norm1(self.conv1(x)))
        for block in reversed(self.blocks):
            x = block(x)
        x, _ = self.lstm(x)
        x = self.conv2(x)
        return x

class RVQ(nn.Module):
    def __init__(self, D, num_embeddings, num_residuals):
        super(RVQ, self).__init__()
        self.embeddings = nn.ModuleList([nn.Embedding(num_embeddings, D) for _ in range(num_residuals)])

    def forward(self, x):
        x = x.permute(0, 2, 1).contiguous()
        x_shape = x.shape
        flat_input = x.view(-1, x_shape[-1])
        residuals = []
        for embedding in self.embeddings:
            distances = (torch.sum(flat_input**2, dim=1, keepdim=True)
                        + torch.sum(embedding.weight**2, dim=1)
                        - 2 * torch.matmul(flat_input, embedding.weight.t()))
            encoding_indices = torch.argmin(distances, dim=1).unsqueeze(1)
            encodings = torch.zeros(encoding_indices.shape[0], embedding.weight.shape[0]).to(x.device)
            encodings.scatter_(1, encoding_indices, 1)
            quantized = embedding(encoding_indices).view(*x_shape)
            residuals.append(quantized)
            flat_input = flat_input - quantized.view(-1, x_shape[-1])
        return sum(residuals).permute(0, 2, 1).contiguous(), encodings, encoding_indices

class EnCodec(nn.Module):
    def __init__(self, C, B, K, S, D, num_embeddings, num_residuals):
        super(EnCodec, self).__init__()
        self.encoder = Encoder(C, B, K, S, D)
        self.rvq = RVQ(D, num_embeddings, num_residuals)
        self.decoder = Decoder(C, B, K, S, D)

    def forward(self, x):
        x = self.encoder(x)
        x, encodings, encoding_indices = self.rvq(x)
        x = self.decoder(x)
        return x, encodings, encoding_indices

    def loss(self, x, target, encodings):
        reconstruction_loss = F.mse_loss(x, target)
        commitment_loss = F.mse_loss(encodings, torch.ones_like(encodings).to(x.device))
        return reconstruction_loss + commitment_loss


In [None]:
import librosa
import librosa.display
import matplotlib.pyplot as plt

def visualize_audio(path):
    # Cargar el archivo de audio
    y, sr = librosa.load(path)

    # Visualizar la onda sinusoidal
    plt.figure(figsize=(14, 5))
    librosa.display.waveshow(y, sr=sr)
    plt.title('Waveplot')
    plt.show()

    # Calcular el MFCC
    mfccs = librosa.feature.mfcc(y=y, sr=sr)

    # Visualizar el MFCC
    plt.figure(figsize=(14, 5))
    librosa.display.specshow(mfccs, sr=sr, x_axis='time')
    plt.colorbar()
    plt.title('MFCC')
    plt.show()

def visualize_sinewave(sinewave, sr = 22050):

    # Visualizar la onda sinusoidal
    plt.figure(figsize=(14, 5))
    librosa.display.waveshow(sinewave, sr=sr)
    plt.title('Waveplot')
    plt.show()

    # Calcular el MFCC
    mfccs = librosa.feature.mfcc(y=sinewave, sr=sr)

    # Visualizar el MFCC
    plt.figure(figsize=(14, 5))
    librosa.display.specshow(mfccs, sr=sr, x_axis='time')
    plt.colorbar()
    plt.title('MFCC')
    plt.show()

# Uso de la función
#visualize_audio('/content/1 - ABBA - Super Trouper.mp3')


In [None]:
import torch
from torch import nn

class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1),
            nn.ReLU(),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid(),  # To ensure the output is in the range [0, 1]
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

    def encode(self, x):
      return self.encoder(x)

    def decode(self, x):
      return self.decoder(x)


In [None]:
import pytorch_lightning as pl
import torchaudio

class AutoEncoderModule(pl.LightningModule):
    def __init__(self):
        super(AutoEncoderModule, self).__init__()
        self.autoencoder = AutoEncoder()

    def forward(self, x):
        return self.autoencoder(x)

    def save_decoded_audio(self, decoded_waveform, sample_rate, filename):
        """
        Save the decoded waveform as an audio file.

        Args:
            decoded_waveform (torch.Tensor): The decoded waveform to save. Should be of shape (n_channels, n_time_steps).
            sample_rate (int): The sample rate for the audio file.
            filename (str): The name of the file to save the audio as.
        """

        mono_tensor = decoded_waveform.squeeze(0)
        mono_tensor = mono_tensor.detach()
        mono_tensor = mono_tensor.sum(dim=0, keepdim=True)
        torchaudio.save(filename, decoded_waveform[0], sample_rate)

    def training_step(self, batch, batch_idx):
        x = batch

        # MFCC transform
        mfcc_transform = torchaudio.transforms.MFCC(sample_rate=22050, n_mfcc=128).to(x.device)
        y_mfcc = mfcc_transform(x)

        # Autoencoder output
        z = self.autoencoder(x)
        z = z[:, :, :22050*180]
        z_mfcc = mfcc_transform(z)

        # Log-Mel Spectrogram transform
        mel_transform = torchaudio.transforms.MelSpectrogram(sample_rate=22050, n_mels=40).to(x.device)
        log_mel_transform = torchaudio.transforms.AmplitudeToDB().to(x.device)
        y_log_mel = log_mel_transform(mel_transform(x))
        z_log_mel = log_mel_transform(mel_transform(z))

        # Loss calculation
        loss_mfcc = nn.MSELoss()(z_mfcc, y_mfcc)
        loss_waveform = nn.MSELoss()(z, x)
        #loss_log_mel = nn.MSELoss()(z_log_mel, y_log_mel)
        loss = loss_mfcc + loss_waveform # + loss_log_mel

        self.log('train_loss', loss)
        print('\ntrain_loss:', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x = batch

        # MFCC transform
        mfcc_transform = torchaudio.transforms.MFCC(sample_rate=22050, n_mfcc=128).to(x.device)
        y_mfcc = mfcc_transform(x)

        # Autoencoder output
        z = self.autoencoder(x)
        z = z[:, :, :22050*180]
        z_mfcc = mfcc_transform(z)

        # Log-Mel Spectrogram transform
        mel_transform = torchaudio.transforms.MelSpectrogram(sample_rate=22050, n_mels=40).to(x.device)
        log_mel_transform = torchaudio.transforms.AmplitudeToDB().to(x.device)
        y_log_mel = log_mel_transform(mel_transform(x))
        z_log_mel = log_mel_transform(mel_transform(z))

        # Loss calculation
        loss_mfcc = nn.MSELoss()(z_mfcc, y_mfcc)
        loss_waveform = nn.MSELoss()(z, x)
        #loss_log_mel = nn.MSELoss()(z_log_mel, y_log_mel)
        loss = loss_mfcc + loss_waveform # + loss_log_mel

        self.log('val_loss', loss)
        print('\nval_loss:', loss)
        return loss


    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)


In [None]:
from torch.utils.data import Dataset
import os
import torchaudio
import torch.nn.functional as F

class AudioDataset(Dataset):
    def __init__(self, audio_paths, max_length=22050*180):
        self.file_list = audio_paths
        self.max_length = max_length

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        audio_name = self.file_list[idx]
        waveform, sample_rate = torchaudio.load(audio_name)
        # Convert to mono by averaging the channels
        #waveform = waveform.mean(dim=0, keepdim=True)
        waveform = torch.mean(waveform, dim=0, keepdim=True)  # Convert to mono
        waveform = waveform[:, :self.max_length]

        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=22050)
        waveform = resampler(waveform)
        #mfcc_transform = torchaudio.transforms.MFCC(sample_rate=22050)
        #mfcc = mfcc_transform(waveform)
        waveform = waveform[:, :self.max_length]

        # Pad or truncate the waveform to a fixed length
        if waveform.size(1) < self.max_length:
            padding = torch.zeros((waveform.size(0), self.max_length - waveform.size(1)))
            waveform = torch.cat((waveform, padding), dim=1)

        return waveform


In [None]:
def collate_fn(batch):
    print([audio.shape for audio, _ in batch])

    # Find the maximum length audio in the batch
    max_length = max(audio.shape[1] for audio, _ in batch)

    # Create tensors to hold the padded audios and labels
    audios = torch.zeros(len(batch), 1, max_length)
    labels = torch.zeros(len(batch), 9117)  # replace `label_length` with the length of your labels

    # Pad the audios and get the labels
    for i, (audio, label) in enumerate(batch):
        audios[i, :, :audio.shape[1]] = audio
        labels = torch.zeros(len(batch), 9117)
        labels[i, :] = label

    return audios, labels


In [None]:
import random
from torch.utils.data import random_split
from torch.utils.data import DataLoader

n = 1000  # Número de elementos que quieres seleccionar
random_files = random.sample(mp3_files, n)

# Crear el conjunto de datos completo
dataset = AudioDataset(random_files)

# Determinar los tamaños de los conjuntos de entrenamiento y prueba
train_size = int(0.8 * len(dataset))  # 80% para entrenamiento
test_size = len(dataset) - train_size  # 20% para prueba

# Dividir el conjunto de datos
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Crear los DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)


In [None]:
from pytorch_lightning.callbacks import ModelCheckpoint

# Define un ModelCheckpoint callback
checkpoint_callback = ModelCheckpoint(
    # Ruta donde se guardarán los checkpoints
    dirpath='C:\\ProyectoLibertad\\autoencoder\\checkpoints',
    # Nombre del archivo del checkpoint
    filename='autoencoder-{epoch:02d}-{val_loss:.2f}',
    # Guarda un checkpoint cada X épocas
    every_n_epochs=1,
    # Guarda solo los K mejores modelos
    save_top_k=1000,
    # Métrica de validación para determinar cuáles son los mejores modelos
    monitor='val_loss',
    # Modo del monitor ('min' para pérdida, 'max' para precisión, etc.)
    mode='min',
)


In [None]:
from pytorch_lightning import Trainer

torch.set_float32_matmul_precision('medium')
#model = AutoEncoderModule()
trainer = Trainer(max_epochs=300, callbacks=[checkpoint_callback])

# Pasar los dataloaders al método fit()
trainer.fit(model2, train_dataloader, val_dataloaders=test_dataloader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name        | Type        | Params
--------------------------------------------
0 | autoencoder | AutoEncoder | 784 K 
--------------------------------------------
784 K     Trainable params
0         Non-trainable params
784 K     Total params
3.138     Total estimated model params size (MB)


[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
train_loss: tensor(24.9295, device='cuda:0', grad_fn=<AddBackward0>)
Epoch 2:  28%|████████████████▌                                          | 224/800 [51:53<2:13:25, 13.90s/it, v_num=75]
train_loss: tensor(18.0926, device='cuda:0', grad_fn=<AddBackward0>)
Epoch 2:  28%|████████████████▌                                          | 225/800 [52:07<2:13:12, 13.90s/it, v_num=75]
train_loss: tensor(10.9250, device='cuda:0', grad_fn=<AddBackward0>)
Epoch 2:  28%|████████████████▋                                          | 226/800 [52:21<2:12:58, 13.90s/it, v_num=75]
train_loss: tensor(20.0756, device='cuda:0', grad_fn=<AddBackward0>)
Epoch 2:  28%|████████████████▋                                          | 227/800 [52:35<2:12:45, 13.90s/it, v_num=75]
train_loss: tensor(12.6043, device='cuda:0', grad_fn=<AddBackward0>)
Epoch 2:  28%|████████████████▊                                          | 228/800 [52:49<2:12:31, 1

In [None]:
import torch

def save_tensor(tensor, filepath):
    torch.save(tensor, filepath)

In [None]:
# Supongamos que 'model' es tu modelo entrenado
torch.save(model2, 'C:\\ProyectoLibertad\\autoencoder\\checkpoints\\autoencoder_last.ckpt')

In [None]:
# Primero, debes crear una instancia del mismo tipo de modelo
model2 = AutoEncoderModule()

# Luego, puedes cargar los parámetros del modelo
#model2.load_state_dict(torch.load('C:\\ProyectoLibertad\\autoencoder\\checkpoints\\autoencoder-epoch=09-val_loss=198.90.ckpt'))
checkpoint = torch.load('C:\\ProyectoLibertad\\autoencoder\\checkpoints\\autoencoder-epoch=02-val_loss=14.87.ckpt')
model2.load_state_dict(checkpoint['state_dict'])


# Asegúrate de llamar a model.eval() antes de hacer inferencias
#model2.eval()


<All keys matched successfully>

In [None]:
model2.eval()
__sinewave__ = test_dataset.__getitem__(0)
torchaudio.save('C:\\ProyectoLibertad\\autoencoder\\validations\\6.wav', __sinewave__, 22050)
__sinewave__.shape
__sinewave__ = __sinewave__.unsqueeze(0)
__sinewave__.shape
__result__ = model2.autoencoder(__sinewave__)
__encoded__ = model2.autoencoder.encode(__sinewave__)
print("ENCODED SHAPE: ", __encoded__.shape)
__result__.shape
__result__ = __result__.squeeze(0)
__result__ = __result__.detach()
__result__ = __result__.sum(dim=0, keepdim=True)
torchaudio.save('C:\\ProyectoLibertad\\autoencoder\\validations\\6_gen.wav', __result__, 22050)

ENCODED SHAPE:  torch.Size([256, 1, 124032])


In [None]:
__encoded__.shape
#torch.save(__encoded__, "C:\\ProyectoLibertad\\autoencoder\\encodeds\\tensor.bbb")

torch.Size([256, 1, 124032])

In [None]:
#__result__.shape
mono_tensor = __result__.sum(dim=0, keepdim=True)
mono_tensor.shape
mono_tensor_numpy = mono_tensor.cpu().detach().numpy()
#visualize_sinewave(mono_tensor_numpy)

bii = 2394
torchaudio.save('C:\\ProyectoLibertad\\autoencoder\\validations\\{}_gen.wav'.format(bii), mono_tensor, 22050)
#visualize_sinewave(mono_tensor)