In [1]:
import numpy as np
import torch 
from sklearn.preprocessing import OneHotEncoder

import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from transformers import get_linear_schedule_with_warmup, get_polynomial_decay_schedule_with_warmup, get_cosine_schedule_with_warmup
from torch.optim import Adam, AdamW

import math
from tqdm import trange 
import gc
import time

import pandas as pd

import gzip
import pickle

import logging
import copy

from torch.optim.lr_scheduler import StepLR

from collections import OrderedDict

In [2]:
logging.basicConfig(filename='torch_logs.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Testing")

Time       ->  Window Frame Size <br>
1 seconds  ->  32  <br>
3 seconds  ->  94  <br>
6 seconds  ->  188 <br>
9 seconds  ->  282 <br>
12 seconds ->  375 <br>
15 seconds ->  469 <br>

In [3]:
SHOW_PROCCESS = bool(True)
PROCESS_STEP = int(100)

TYPE_METRIC = str("f1")
FINE_TUNNING = bool(True)
SAVE_BEST_MODEL = bool(True)
# WINDOW_FRAME = int(188)#int(94) #15 seconds

# MODEL TRAINING
EPOCHS = int(100)
BATCH_SIZE = int(200)
lr = float(2.3e-6)

model_config = {
    "in_channels_cnn" : int(1),
    "out_channels_cnn" : int(128),
    "num_heads" : int(1),
    "num_layers" : int(1),
    "classes" : int(20)
}


# INIT SCHEDULER FOR TEST DIFFERENT LR

POWER = float(4.0)
NUM_WARMUP_STEPS= float(2) 
NUM_TRAINING_STEPS = EPOCHS

LEARNING_RATE_OR_SCHEDULE = True #schedule

Load files that contains path for songs

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
train_df = pd.read_csv("./dataset/train.csv")
test_df = pd.read_csv("./dataset/test.csv")
validation_df = pd.read_csv("./dataset/validation.csv")

In [6]:
train_df = train_df.sample(frac=1, random_state=54)
test_df = test_df.sample(frac=1, random_state=54)
validation_df = validation_df.sample(frac=1, random_state=54)

In [7]:
train_df.head(5)


Unnamed: 0.1,Unnamed: 0,label,path
19380,19380,radiohead,./dataset/train/radiohead-Karma_Police-3sec-50...
58724,58724,cure,./dataset/train/cure-Give_Me_It-3sec-29.plk.gz
96554,96554,roxette,./dataset/train/roxette-What_s_She_Like_-3sec-...
39551,39551,beatles,./dataset/train/beatles-I_ll_Cry_Instead-3sec-...
7442,7442,prince,./dataset/train/prince-The_Max-3sec-61.plk.gz


Create data for loading in DataLoader

In [8]:
train_spectrogram_path = train_df.path.values 
train_spectrogram_labels = train_df.label.values 

test_spectrogram_path = test_df.path.values 
test_spectrogram_labels = test_df.label.values 

validation_spectrogram_path = validation_df.path.values 
validation_spectrogram_labels = validation_df.label.values 

In [9]:
encoder = OneHotEncoder(sparse_output = False).fit(train_spectrogram_labels.reshape(-1, 1))
encoder.categories_

[array(['aerosmith', 'beatles', 'creedence_clearwater_revival', 'cure',
        'dave_matthews_band', 'depeche_mode', 'fleetwood_mac',
        'garth_brooks', 'green_day', 'led_zeppelin', 'madonna',
        'metallica', 'prince', 'queen', 'radiohead', 'roxette',
        'steely_dan', 'suzanne_vega', 'tori_amos', 'u2'], dtype=object)]

In [10]:
train_labels_encoded = encoder.transform(train_spectrogram_labels.reshape(-1, 1))
test_labels_encoded = encoder.transform(test_spectrogram_labels.reshape(-1, 1))
validation_labels_encoded = encoder.transform(validation_spectrogram_labels.reshape(-1, 1))

#### Define Class for opening file and create DataLoader

In [11]:
class SongDatasetLoader(Dataset):
    """Load songs spectrograms from path"""
    def __init__(self, data: np.ndarray, labels: np.ndarray, transform: bool = True) -> None:
        self.data = data
        self.labels = labels
        self.transform = transform
        self.DTYPE = torch.float
   
    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, idx:int):
        data_path_idx = self.data[idx]
        labels = self.labels[idx]

        with gzip.open(data_path_idx, 'rb') as f:
            # Load the decompressed data
            decompressed_data = pickle.load(f)
            decompressed_data = decompressed_data['spectrogram']

    
        if self.transform:
            # Convert to tensor without specifying dtype
            decompressed_data = torch.tensor(decompressed_data, dtype=self.DTYPE )
            labels = torch.tensor(labels, dtype = self.DTYPE )
           

        return decompressed_data, labels

    

In [12]:
train_load_data = SongDatasetLoader(train_spectrogram_path, train_labels_encoded)
test_load_data = SongDatasetLoader(test_spectrogram_path, test_labels_encoded)
Validation_load_data = SongDatasetLoader(validation_spectrogram_path, validation_labels_encoded)

In [13]:
train_dataLoader = DataLoader(train_load_data, batch_size = BATCH_SIZE, shuffle = True)
test_dataLoader = DataLoader(test_load_data, batch_size = BATCH_SIZE, shuffle = False)
validation_dataLoader = DataLoader(Validation_load_data, batch_size = BATCH_SIZE, shuffle = True)

#### Model Design

In [14]:
from torch.autograd import Variable

class CNN(nn.Module):
    
    def __init__(self, in_channels:int, out_channels:int)-> Tensor:
        super(CNN, self).__init__()
        padding = [(1,1), (0,0)]
        number_filters = [int(64), int(64),  out_channels]

        self.cnn = nn.Sequential(OrderedDict([
            ("batch_norm",nn.BatchNorm2d(1)),
            ("conv_1", nn.Conv2d(in_channels = in_channels, out_channels=number_filters[0], kernel_size=(3,3), padding=padding[0])),
            ("act_1", nn.ELU()),
            ("max_pooling_1",nn.MaxPool2d(kernel_size=(4,4), stride=(4,2), padding = padding[0])),
            ("dropout_1",nn.Dropout(p = 0.2)),

            ("conv_2", nn.Conv2d(in_channels = number_filters[0], out_channels=number_filters[1], kernel_size=(4,4), padding=padding[0])),
            ("act_2", nn.ELU()),
            ("max_pooling_2",nn.MaxPool2d(kernel_size=(4,2), stride=(4,1), padding = padding[0])),
            ("dropout_2",nn.Dropout(p = 0.2)),

            ("conv_3", nn.Conv2d(in_channels = number_filters[1], out_channels=number_filters[2], kernel_size=(4,4), padding=padding[0])),
            ("act_3", nn.ELU()),
            ("max_pooling_3",nn.MaxPool2d(kernel_size=(4,2), stride=(4,1), padding = padding[1])),
            ("dropout_3",nn.Dropout(p = 0.2)),

        ]))


    def forward(self, src:Tensor) -> Tensor:
        
        src = src.unsqueeze(1)
        src =  self.cnn(src)
       
        src = src.squeeze(2)
        # src = src.contiguous().view(src.size(0), 256, 50 * 8)  #torch.Size([batch = 128, 512, 6 * 46])   
        return src 
        


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        x = x + self.encoding[:, :x.size(1), :].to(device)
        return x
    

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.2,
                 output: int=20):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.embedding_dim = embedding_dim
        self.embedding = nn.Linear(max_seq_len, embedding_dim)
        self.pos_encoder = PositionalEncoding(embedding_dim) 
        self.norm = nn.LayerNorm(embedding_dim, eps = 1e-6)
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    dim_feedforward = embedding_dim,
                                                    # activation = "gelu",
                                                    layer_norm_eps = 1e-12)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm
                                                          )

        self.output = nn.Sequential(OrderedDict([
            # ("dense",  nn.Linear(in_features= embedding_dim, out_features=embedding_dim)),
            # ("activation", nn.Tanh()),
            # ("activation", nn.GELU()),
            # ("dropout", nn.Dropout(p=0.2)),
            ("classifier", nn.Linear(in_features=embedding_dim, out_features=output)),
            ("dropout", nn.Dropout(p=0.2)),
        ]))


    def forward(self, src: Tensor) -> Tensor:
        src = self.embedding(src) * math.sqrt(self.embedding_dim)
        # src = print(src.size())
        # src = src.permute(2,0,1)
      
        src = self.norm(src)
        src = self.pos_encoder(src)
        src = src.permute(2,0,1)
        src = self.transformer_encoder(src)
        # src, _ = src.max(dim=0)
        src = src.mean(dim=0)
 
        src = self.output(src)
   
        return src
    


class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 num_heads : int=2,
                 num_layers : int = 1,
                 )-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 45, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = num_heads,  
                                          num_layers = num_layers, 
                                          output=classes)
                
    def forward(self, src: torch.Tensor):
        src = self.cnn(src) 
        src = self.encoder(src)
        return src
       

In [15]:
# model = VisualEncoderModel(**model_config)

# model = nn.DataParallel(model)
# model.to(device)

# for epoch in trange(2, desc="Epoch"):
  
#   model.train()
 
#   for step, batch in enumerate(train_dataLoader):
#     batch = tuple(t.to(device) for t in batch)
#     input_ids, labels = batch
    
    
#     output = model(input_ids)
#     print(output.size())
#     loss = criterion(output, labels)

    
#     # print(output)

#     break
  

INIT MODEL

In [16]:

# model = VisualEncoderModel(**model_config)

# model = nn.DataParallel(model)
# model.to(device)
# logging.info(model.parameters)

# model.parameters

Load Model

In [17]:
model_txt = "./model/47_epoch_2.2956857088918255e-06_lr_1716644414_timestap.pt"
model = torch.load(model_txt)
model = nn.DataParallel(model)
model.to(device)
logging.info(f"Loading model {model_txt}")
logging.info( model.parameters)
model.parameters

<bound method Module.parameters of DataParallel(
  (module): DataParallel(
    (module): VisualEncoderModel(
      (cnn): CNN(
        (cnn): Sequential(
          (batch_norm): BatchNorm2d(1, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv_1): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
          (act_1): ELU(alpha=1.0)
          (max_pooling_1): MaxPool2d(kernel_size=(4, 4), stride=(4, 2), padding=(1, 1), dilation=1, ceil_mode=False)
          (dropout_1): Dropout(p=0.2, inplace=False)
          (conv_2): Conv2d(64, 64, kernel_size=(4, 4), stride=(1, 1), padding=(1, 1))
          (act_2): ELU(alpha=1.0)
          (max_pooling_2): MaxPool2d(kernel_size=(4, 2), stride=(4, 1), padding=(1, 1), dilation=1, ceil_mode=False)
          (dropout_2): Dropout(p=0.2, inplace=False)
          (conv_3): Conv2d(64, 128, kernel_size=(4, 4), stride=(1, 1), padding=(1, 1))
          (act_3): ELU(alpha=1.0)
          (max_pooling_3): MaxPool2d(kernel

In [18]:
lr

2.3e-06

In [19]:
criterion = nn.CrossEntropyLoss()
# optimizer = AdamW(model.parameters(), lr = lr, weight_decay=0.023 )
optimizer = Adam(model.parameters(), lr = lr, weight_decay=0.02 )

total_steps = len(train_load_data) * EPOCHS

# scheduler = get_polynomial_decay_schedule_with_warmup(optimizer,
#                                                       num_warmup_steps=0,
#                                                       num_training_steps=total_steps,
#                                                       power=POWER)



scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0, # Default value in run_glue.py
                                            num_training_steps = total_steps)


# scheduler = get_cosine_schedule_with_warmup(optimizer, 
#                                             num_warmup_steps=NUM_WARMUP_STEPS,
#                                             num_training_steps=NUM_TRAINING_STEPS)

# Define the step size and gamma for the scheduler
# step_size = 20
# gamma = 0.1

# # Define the scheduler
# scheduler = StepLR(optimizer, step_size=step_size, gamma=gamma)


gc.collect()
torch.cuda.empty_cache()

Model

In [20]:
def metrics(labels : Tensor , output : Tensor, typeMetric: str = 'f1' ) -> float:
  from sklearn.metrics import f1_score

  y_pred = output.detach().cpu().numpy()
  y_true = labels.detach().cpu().numpy()

  pred_flat = np.argmax(y_pred, axis=1).flatten()
  labels_flat = np.argmax(y_true, axis=1).flatten()

  # print(pred_flat)
  # print(labels_flat)
  # print("---------------------")
 
  if typeMetric == 'f1':
    return f1_score(labels_flat, pred_flat, average='micro')
  if typeMetric == 'acc':
    return np.sum(pred_flat == labels_flat) / len(labels_flat)


In [21]:
def testing() -> None:
    test_loss = float(0)
    metric_evaluation = float(0)
    accuracy_evaluation = float(0)

    start_time = time.time()


    # Evaluate data for one epoch
    model.eval()
    with torch.no_grad(): #  not to compute or store gradients
      for _, batch in enumerate(test_dataLoader):
        batch = tuple(t.to(device) for t in batch)
        input_ids, labels = batch #unpack from Dataloader
        output = model(input_ids)
        loss = criterion(output, labels)
        
        test_loss += loss.item()
        
        metric_evaluation += metrics(labels, output, typeMetric = TYPE_METRIC)
        accuracy_evaluation  += metrics(labels, output, typeMetric = 'acc')
        
    txt = f'Test Evaluation {TYPE_METRIC}: {(metric_evaluation/len(test_dataLoader)):3.5f} |' \
    f'acc: {(accuracy_evaluation/len(test_dataLoader)):3.5f} | loss: {(test_loss/len(test_dataLoader)):3.5} |'\
    f'Execution time: {(time.time() - start_time):5.2f}'
    logging.info(txt)
    print(txt)
      
    # clean memory
    gc.collect()
    torch.cuda.empty_cache()

In [22]:

best_epoch = int(0)
# best_by_metric = float('-inf')
best_by_metric = float(0.3125273417734863)
best_model = copy.deepcopy(model.state_dict())
file_name = str("")
check_if_model_saved = bool(False)

train_loss_set = []

for epoch in trange(EPOCHS, desc="Epoch"):
  print(f'Training Process: epochs {epoch}/{EPOCHS - 1}')
  # Training --------------------------------------------------------------------------------------------------------

  # Set our model to training mode (as opposed to evaluation mode)
  model.train()


  
  training_loss = float(0)
  training_metric_evaluation = float(0)
  training_accuracy_evaluation = float(0)
  log_interval = PROCESS_STEP
  start_time = time.time()

  # Train the data for one epoch
  for step, batch in enumerate(train_dataLoader):
    
    batch = tuple(t.to(device) for t in batch)
    input_ids, labels = batch
    output = model(input_ids)

    loss = criterion(output, labels)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
    optimizer.step()

    training_loss += loss.item()

    training_metric_evaluation += metrics(labels, output, typeMetric = TYPE_METRIC)
    training_accuracy_evaluation  += metrics(labels, output, typeMetric = 'acc')
    train_loss_set.append(loss.item())

    
    if SHOW_PROCCESS and (step % log_interval == 0 and step > 0):
        lr_show = scheduler.get_last_lr()[0] if LEARNING_RATE_OR_SCHEDULE else lr
        ms_per_batch = (time.time() - start_time) * 1000 / log_interval # time in miliseconds
        txt = f'Training stage: | epoch {epoch:3d}/{EPOCHS - 1} | {step:5d}/{len(train_dataLoader):5d} batches |'\
              f'lr {lr_show:02.9f} | ms/batch {ms_per_batch:5.2f} | loss {loss.item():5.9f} |'\
              f' accuracy {(training_accuracy_evaluation / step):2.4f} | {TYPE_METRIC} {(training_metric_evaluation/step):2.4f}' 

        logging.info(txt)
        print(txt) 
        
    if LEARNING_RATE_OR_SCHEDULE:
      scheduler.step()

  txt = f"Train loss {training_loss/len(train_dataLoader)} | accuracy {training_accuracy_evaluation/len(train_dataLoader)} | {TYPE_METRIC} {training_metric_evaluation/len(train_dataLoader)} |"
  logging.info(txt)
  print(txt)
    
  # clean memory
  gc.collect()
  torch.cuda.empty_cache()
  
  print('-' * 150)
  logging.info('-' * 150)
  # Validation -----------------------------------------------------------------------------------------------------------
  print(f'Validation Process: epochs {epoch}/{EPOCHS - 1}')
  validation_loss = float(0)
  metric_evaluation = float(0)
  accuracy_evaluation = float(0)


  start_time = time.time()
  log_interval = PROCESS_STEP

  # Evaluate data for one epoch
  model.eval()
  with torch.no_grad(): #  not to compute or store gradients
    for step, batch in enumerate(validation_dataLoader):
    
      batch = tuple(t.to(device) for t in batch)
      input_ids, labels = batch #unpack from Dataloader
    
      output = model(input_ids)
      loss = criterion(output, labels)

      validation_loss += loss.item()
      metric_evaluation += metrics(labels, output, typeMetric = TYPE_METRIC)
      accuracy_evaluation  += metrics(labels, output, typeMetric = 'acc')
      
      

      if SHOW_PROCCESS and (step % log_interval == 0 and step > 0):
        lr_show = scheduler.get_last_lr()[0] if LEARNING_RATE_OR_SCHEDULE else lr
        ms_per_batch = (time.time() - start_time) * 1000 / log_interval # time in miliseconds
        txt = f'Validation stage: | epoch {epoch:3d}/{EPOCHS - 1} | {step:5d}/{len(validation_dataLoader):5d} batches |'\
              f'lr {lr_show:02.9f} | ms/batch {ms_per_batch:5.2f} | loss {loss.item():5.9f} |'\
              f'Validation Evaluation {TYPE_METRIC}: {(metric_evaluation/step):3.5f} |'\
              f'acc: {(accuracy_evaluation/step):3.5f} |'
        
        logging.info(txt)
        print(txt)  

    
    if SAVE_BEST_MODEL and ((metric_evaluation/len(validation_dataLoader)) > best_by_metric):
      best_by_metric = metric_evaluation/len(validation_dataLoader)
      best_Model =  copy.deepcopy(model.state_dict())
      best_epoch = epoch

      lr_show = scheduler.get_last_lr()[0] if LEARNING_RATE_OR_SCHEDULE else lr
      file_name = f'./model/{epoch}_epoch_{lr_show}_lr_{int(time.time())}_timestap.pt'
      torch.save(model, file_name)
      logging.info(f'File saved {file_name}')
      print(f'File saved {file_name}') 
      check_if_model_saved = True


    txt = f'Validation Evaluation {TYPE_METRIC}: {(metric_evaluation/len(validation_dataLoader)):3.5f} |' \
    f'acc: {(accuracy_evaluation/len(validation_dataLoader)):3.5f} | loss: {(validation_loss/len(validation_dataLoader)):3.9} |'\
    f'Execution time: {(time.time() - start_time):5.2f}'  

    logging.info(txt)
    print(txt)

  

  # clean memory
  gc.collect()
  torch.cuda.empty_cache()

  if check_if_model_saved:
    txt = f'Best model saved: | {best_epoch} epoch | metric {TYPE_METRIC}: {best_by_metric} | model file saved: {file_name}'
    logging.info(txt)
    print(txt)
    check_if_model_saved = False
  
  
  print('=' * 150)
  logging.info('=' * 150)



Epoch:   0%|          | 0/100 [00:00<?, ?it/s]

Training Process: epochs 0/99


  return F.conv2d(input, weight, bias, self.stride,


Training stage: | epoch   0/99 |   100/  580 batches |lr 0.000002300 | ms/batch 144.64 | loss 2.608300686 | accuracy 0.2373 | f1 0.2373
Training stage: | epoch   0/99 |   200/  580 batches |lr 0.000002300 | ms/batch 279.06 | loss 2.538571358 | accuracy 0.2377 | f1 0.2377
Training stage: | epoch   0/99 |   300/  580 batches |lr 0.000002300 | ms/batch 428.63 | loss 2.622751713 | accuracy 0.2388 | f1 0.2388
Training stage: | epoch   0/99 |   400/  580 batches |lr 0.000002300 | ms/batch 592.96 | loss 2.535528898 | accuracy 0.2376 | f1 0.2376
Training stage: | epoch   0/99 |   500/  580 batches |lr 0.000002300 | ms/batch 765.29 | loss 2.621937752 | accuracy 0.2390 | f1 0.2390


  return F.conv2d(input, weight, bias, self.stride,


Train loss 2.5938690473293438 | accuracy 0.23817684320823013 | f1 0.23817684320823013 |
------------------------------------------------------------------------------------------------------------------------------------------------------
Validation Process: epochs 0/99


  return F.conv2d(input, weight, bias, self.stride,
Epoch:   1%|          | 1/100 [01:34<2:35:32, 94.27s/it]

Validation Evaluation f1: 0.30728 |acc: 0.30728 | loss: 2.44341149 |Execution time:  4.38
Training Process: epochs 1/99
Training stage: | epoch   1/99 |   100/  580 batches |lr 0.000002300 | ms/batch 160.69 | loss 2.591262102 | accuracy 0.2432 | f1 0.2432


Epoch:   1%|          | 1/100 [02:04<3:24:50, 124.14s/it]


KeyboardInterrupt: 

In [None]:
model = torch.load("./model/9_epoch_0.00018352070914127424_lr_1716471410_timestap.pt").to(device)
model.parameters

<bound method Module.parameters of DataParallel(
  (module): VisualEncoderModel(
    (cnn): CNN(
      (cnn): Sequential(
        (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ELU(alpha=1.0)
        (3): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
        (4): Dropout(p=0.2, inplace=False)
        (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (6): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (7): ELU(alpha=1.0)
        (8): MaxPool2d(kernel_size=(4, 1), stride=(4, 1), padding=0, dilation=1, ceil_mode=False)
        (9): Dropout(p=0.2, inplace=False)
        (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        (11): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (12): ELU(

In [None]:
torch.save({"cnn":model.cnn.cnn.state_dict()}, "./model_pretrained/cnn.pth")

In [34]:
testing()

Test Evaluation f1: 0.39979 |acc: 0.42514 | loss: 1.9445 |Execution time: 36.26


  return F.conv2d(input, weight, bias, self.stride,


In [None]:
from torch.autograd import Variable

class CNN(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(CNN, self).__init__()
        filters = [128, 128, 128, out_channels]
        kernel_size = (3,3)
        pool_size = [(2, 2), (4, 1), (3 , 1)] 
        self.cnn = nn.Sequential(
            # nn.BatchNorm2d(in_channels),
            nn.Conv2d(in_channels, out_channels=filters[0], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[0]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[0], stride=pool_size[0]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[0], out_channels=filters[1], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[1]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[1], stride=pool_size[1]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[1], out_channels=filters[2], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[2]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[2], out_channels=filters[3], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[3]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2)
        )

        # self.fc1 = nn.Linear(128 * 1 * 47, 512)  # Adjust the input size based on the input dimensions
        # self.dropout = nn.Dropout(0.5)
        # self.fc2 = nn.Linear(512, 20)

    def forward(self, src:Tensor) -> Tensor:
    
        src = src.unsqueeze(1)
        src =  self.cnn(src)
       
        # src = src.view(-1, 128 * 1 * 47)
        # x = F.relu(self.fc1(src))
        # x = self.dropout(x)
        # x = self.fc2(x)
        src = src.squeeze(2)       
        return src


class Norm(nn.Module):
    def __init__(self, embedding_dim, eps:float= 1e-12):
        super(Norm, self).__init__()
        self.norm = nn.LayerNorm(embedding_dim,eps)

    def forward(self, x):
        return self.norm(x)

    
# # The positional encoding vector, embedding_dim is d_model
# class PositionalEncoder(nn.Module):
#     def __init__(self, embedding_dim, max_seq_length=512, dropout=0.1):
#         super(PositionalEncoder, self).__init__()
#         self.embedding_dim = embedding_dim
#         self.dropout = nn.Dropout(dropout)
#         pe = torch.zeros(max_seq_length, embedding_dim)
#         for pos in range(max_seq_length):
#             for i in range(0, embedding_dim, 2):
#                 pe[pos, i] = math.sin(pos/(10000**(2*i/embedding_dim)))
#                 pe[pos, i+1] = math.cos(pos/(10000**((2*i+1)/embedding_dim)))
#         pe = pe.unsqueeze(0)        
#         self.register_buffer('pe', pe)
    
#     def forward(self, x):
#         x = x*math.sqrt(self.embedding_dim)
#         seq_length = x.size(1)
#         pe = Variable(self.pe[:, :seq_length], requires_grad=False).to(x.device)
#         # Add the positional encoding vector to the embedding vector
#         x = x + pe
#         x = self.dropout(x)
#         return x
    
        
class Pooler(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        # self.dense = nn.Linear(d_model,  d_model)
        self.dense = nn.Linear(d_model,  20)
        # self.activation = nn.Tanh()

    def forward(self, src):
        # We "pool" the model by simply taking the hidden state corresponding
        # to the first token.
        # first_token_tensor = hidden_states[:, 0]
        # src, _ = src.max(dim=1)
        src = src.mean(dim=0)
        pooled_output = self.dense(src)
        # pooled_output = self.activation(pooled_output)
        return pooled_output
    

# Transformer encoder layer
# class EncoderLayer(nn.Module):
#     def __init__(self, embedding_dim, num_heads, ff_dim=2048, dropout=0.1):
#         super(EncoderLayer, self).__init__()
#         self.self_attention = nn.MultiheadAttention(embedding_dim, num_heads, dropout)
#         self.feed_forward = nn.Sequential(
#             nn.Linear(embedding_dim, ff_dim),
#             nn.ReLU(),
#             nn.Linear(ff_dim, embedding_dim)
#         )
#         self.dropout1 = nn.Dropout(dropout)
#         self.dropout2 = nn.Dropout(dropout)
#         self.norm1 = Norm(embedding_dim)
#         self.norm2 = Norm(embedding_dim)

#     def forward(self, x):
#         x2 = self.norm1(x)
#         # Add and Muti-head attention
#         # x = x + self.dropout1(self.self_attention(x2, x2, x2, mask))
#         x = x + self.dropout1(self.self_attention(x2, x2, x2))
#         x2 = self.norm2(x)
#         x = x + self.dropout2(self.feed_forward(x2))
#         return x

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.5):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.embedding_dim = embedding_dim
      
        # self.layernorm = nn.LayerNorm(d_model, eps=1e-5)
         
        # self.pos_encoder = PositionalEncoder(embedding_dim, dropout= 0.3, max_seq_length = max_seq_len)  # Max time length
        
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        # self.layers = nn.ModuleList([EncoderLayer(embedding_dim, num_heads, 2048, dropout) for _ in range(num_layers)])
        # self.layers = nn.ModuleList([EncoderLayer(embedding_dim, num_heads, 1024, dropout) for _ in range(num_layers)])
        self.norm = Norm(embedding_dim)
        # self.position_embedding = PositionalEncoder(embedding_dim, max_seq_len, dropout)
        
        
        
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    # activation = 'gelu', 
                                                    dim_feedforward = embedding_dim,
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm, 
                                                          enable_nested_tensor=True)
        self.Intermediate = nn.Sequential(
            nn.Linear(embedding_dim,1024),
            nn.ELU()
        )

        self.output = nn.Sequential(
            nn.Linear(1024, embedding_dim),
            nn.ELU(), # borrar
            nn.LayerNorm(embedding_dim, eps=1e-12),
            nn.Dropout(p = 0.1)
        )

        self.pooler = Pooler(embedding_dim)
        
        # self.classifier= nn.Sequential(
        #     nn.Linear(embedding_dim, 20),
        #     # nn.Dropout(0.2),
        #     # nn.Linear(128, 64),
        #     # nn.ReLU(),
        #     # nn.Dropout(0.2),
        #     # nn.Linear(64, 20),
        #     # nn.Dropout(0.2),
        # )
        
    def forward(self, src: Tensor) -> Tensor:
        
        src = src.permute(0, 2, 1)
        # src = self.pos_encoder(src)
 
        # src = src * math.sqrt(self.embedding_dim) 
        src = self.transformer_encoder(src)
        src = self.Intermediate(src)
        src = self.output(src)
        # src, _ = src.max(dim=1)
        # sequence_output = encoder_outputs[0]
        src = self.pooler(src)
        # src = self.classifier(src)
        
            
        return src
 

class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 n_head : int=2,
                 n_layers : int = 1,
                 dropout_transformer : float = 0.2,
                 dropout_classifier: float = 0.2)-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 78, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = n_head,  
                                          num_layers = n_layers, 
                                          dropout = dropout_transformer)
    
    def forward(self, src: torch.Tensor):
       
        src = self.cnn(src) 
        
        src = self.encoder(src)
        return src
       

In [None]:
class CNN(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(CNN, self).__init__()
        filters = [64, 128, 128, out_channels]
        kernel_size = (3,3)
        pool_size = [(2, 2), (4, 1), (3 , 1)] 
        self.cnn = nn.Sequential(
            # nn.BatchNorm2d(in_channels),
            nn.Conv2d(in_channels, out_channels=filters[0], kernel_size=kernel_size, padding=1),
            nn.BatchNorm2d(filters[0]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[0], stride=pool_size[0]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[0], out_channels=filters[1], kernel_size=kernel_size, padding=1),
            nn.BatchNorm2d(filters[1]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[1], stride=pool_size[1]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[1], out_channels=filters[2], kernel_size=kernel_size, padding=1),
            nn.BatchNorm2d(filters[2]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[2], out_channels=filters[3], kernel_size=kernel_size, padding=1),
            nn.BatchNorm2d(filters[3]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2)
        )

 

    def forward(self, src:Tensor) -> Tensor:
    
        src = src.unsqueeze(1)
        src =  self.cnn(src)

        src = src.squeeze(2)       
        return src



class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x: torch.FloatTensor) -> torch.FloatTensor:
        """
        Args:
            x: `embeddings`, shape (batch, max_len, d_model)

        Returns:
            `encoder input`, shape (batch, max_len, d_model)
        """
        x = x + self.pe[:, : x.size(1)]
        return self.dropout(x)
    
        
         
    
class TransformerEncoder(nn.Module):
    def __init__(self, ntoken: int, 
                 d_model: int, 
                 n_head: int,  
                 nlayers: int, 
                 dropout: float = 0.5):
        
        super(TransformerEncoder, self).__init__()
        self.model_type = 'Transformer'
         
        self.pos_encoder = PositionalEncoding(d_model = d_model, dropout= 0.3, max_len=ntoken)  # Max time length
        encoder_layers = nn.TransformerEncoderLayer(d_model, 
                                                    nhead = n_head,  
                                                    dropout=dropout, 
                                               
                                                    dim_feedforward = 1024,
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, num_layers =  nlayers)

        self.classifier= nn.Sequential(
     
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 20),

        )
        
    def forward(self, src: Tensor) -> Tensor:
        
        src = src.permute(0, 2, 1)
        
        src = self.pos_encoder(src)
        
        src = self.transformer_encoder(src)
        src, _ = src.max(dim=1)
        
        src = self.classifier(src)
        
            
        return src
    

class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 n_head : int=2,
                 n_layers : int = 1,
                 dropout_transformer : float = 0.2,
                 dropout_classifier: float = 0.2)-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoder(ntoken = 157, 
                                          d_model = out_channels_cnn, 
                                          n_head = n_head,  
                                          nlayers = n_layers, 
                                          dropout = dropout_transformer)
        # self.classifier= nn.Sequential(
        #     nn.Linear(out_channels_cnn, 128),
        #     nn.ELU(),
        #     nn.Dropout(dropout_classifier),
        #     nn.Linear(128, 64),
        #     nn.ELU(),
        #     nn.Dropout(dropout_classifier),
        #     nn.Linear(64, classes),
        #     nn.Dropout(dropout_classifier),
        # )

        # self.classifier= nn.Sequential(
        #     nn.Linear(47, classes),
        #     nn.Dropout(dropout_classifier)
        # )
        # self.initialize_weights()

    # def initialize_weights(self):
    #     for m in self.classifier.modules():
    #         if isinstance(m, nn.Linear):
    #             nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
    #             if m.bias is not None:
    #                 nn.init.constant_(m.bias, 0)

    def forward(self, src: torch.Tensor):
       
        src = self.cnn(src) 
        
        src = self.encoder(src)
        # src = self.classifier(src)
        

        return src
        # batch, filters, _, _ = src.size()
        # src = src.permute(0, 2, 3, 1)  # New shape: [48, 8, 47, 128]
        # src = src.contiguous().view(batch, -1, filters)  # New shape: [48, 8*47, 128]
      
        # src = src.squeeze(2) 
        # src = self.encoder(src)
        # print(src.size())
        # return self.classifier(src)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class CNNTransformerModel(nn.Module):
    def __init__(self, num_classes=20):
        super(CNNTransformerModel, self).__init__()
        # CNN layers
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Positional encoding for transformers
        self.positional_encoding = PositionalEncoding(128)
        
        # Transformer layers
        self.transformer_layer = nn.TransformerEncoderLayer(d_model=128, nhead=8, dim_feedforward=512)
        self.transformer = nn.TransformerEncoder(self.transformer_layer, num_layers=3)
        
        # Fully connected layer for classification
        self.fc = nn.Linear(128 * 47, num_classes)  # 128 channels, reduced size of 47 after pooling
    
    def forward(self, x):
        # x shape: (batch_size, 1, 128, 94)
        
        # Apply CNN layers
        x = self.pool(F.relu(self.conv1(x)))  # shape: (batch_size, 32, 64, 47)
        x = self.pool(F.relu(self.conv2(x)))  # shape: (batch_size, 64, 32, 23)
        x = self.pool(F.relu(self.conv3(x)))  # shape: (batch_size, 128, 16, 11)
        
        # Flatten for transformer input
        x = x.view(x.size(0), 128, -1).permute(2, 0, 1)  # shape: (11*16, batch_size, 128)
        
        # Apply positional encoding
        x = self.positional_encoding(x)
        
        # Apply transformer layers
        x = self.transformer(x)  # shape: (11*16, batch_size, 128)
        
        # Flatten and fully connected layer
        x = x.permute(1, 0, 2).contiguous().view(x.size(1), -1)  # shape: (batch_size, 128 * 47)
        x = self.fc(x)  # shape: (batch_size, num_classes)
        
        return x

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=512):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=0.1)
        
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# Example usage
model = CNNTransformerModel(num_classes=20)
input_tensor = torch.randn(8, 1, 128, 94)  # batch_size=8
output = model(input_tensor)
print(output.shape)  # Expected output shape: (8, 20)


In [None]:
from torch.autograd import Variable

class CNN(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(CNN, self).__init__()
        filters = [128, 128, 128, out_channels]
        kernel_size = (3,3)
        pool_size = [(2, 2), (4, 1), (3 , 1)] 
        self.cnn = nn.Sequential(
            # nn.BatchNorm2d(in_channels),
            nn.Conv2d(in_channels, out_channels=filters[0], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[0]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[0], stride=pool_size[0]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[0], out_channels=filters[1], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[1]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[1], stride=pool_size[1]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[1], out_channels=filters[2], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[2]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[2], out_channels=filters[3], kernel_size=kernel_size, padding=1),
            nn.ELU(),
            # nn.BatchNorm2d(filters[3]), nn.ELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2)
        )

        # self.fc1 = nn.Linear(128 * 1 * 47, 512)  # Adjust the input size based on the input dimensions
        # self.dropout = nn.Dropout(0.5)
        # self.fc2 = nn.Linear(512, 20)

    def forward(self, src:Tensor) -> Tensor:
    
        src = src.unsqueeze(1)
        src =  self.cnn(src)
       
        # src = src.view(-1, 128 * 1 * 47)
        # x = F.relu(self.fc1(src))
        # x = self.dropout(x)
        # x = self.fc2(x)
        src = src.squeeze(2)       
        return src


class Norm(nn.Module):
    def __init__(self, embedding_dim, eps:float= 1e-12):
        super(Norm, self).__init__()
        self.norm = nn.LayerNorm(embedding_dim,eps)

    def forward(self, x):
        return self.norm(x)

    
# The positional encoding vector, embedding_dim is d_model
class PositionalEncoder(nn.Module):
    def __init__(self, embedding_dim, max_seq_length=512, dropout=0.1):
        super(PositionalEncoder, self).__init__()
        self.embedding_dim = embedding_dim
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_seq_length, embedding_dim)
        for pos in range(max_seq_length):
            for i in range(0, embedding_dim, 2):
                pe[pos, i] = math.sin(pos/(10000**(2*i/embedding_dim)))
                pe[pos, i+1] = math.cos(pos/(10000**((2*i+1)/embedding_dim)))
        pe = pe.unsqueeze(0)        
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x*math.sqrt(self.embedding_dim)
        seq_length = x.size(1)
        pe = Variable(self.pe[:, :seq_length], requires_grad=False).to(x.device)
        # Add the positional encoding vector to the embedding vector
        x = x + pe
        x = self.dropout(x)
        return x
    
        
class Pooler(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        # self.dense = nn.Linear(d_model,  d_model)
        self.dense = nn.Linear(d_model,  20)
        # self.activation = nn.Tanh()

    def forward(self, src):
        # We "pool" the model by simply taking the hidden state corresponding
        # to the first token.
        # first_token_tensor = hidden_states[:, 0]
        src, _ = src.max(dim=1)
        # src = src.mean(dim=1)
        pooled_output = self.dense(src)
        # pooled_output = self.activation(pooled_output)
        return pooled_output
    

# Transformer encoder layer
# class EncoderLayer(nn.Module):
#     def __init__(self, embedding_dim, num_heads, ff_dim=2048, dropout=0.1):
#         super(EncoderLayer, self).__init__()
#         self.self_attention = nn.MultiheadAttention(embedding_dim, num_heads, dropout)
#         self.feed_forward = nn.Sequential(
#             nn.Linear(embedding_dim, ff_dim),
#             nn.ReLU(),
#             nn.Linear(ff_dim, embedding_dim)
#         )
#         self.dropout1 = nn.Dropout(dropout)
#         self.dropout2 = nn.Dropout(dropout)
#         self.norm1 = Norm(embedding_dim)
#         self.norm2 = Norm(embedding_dim)

#     def forward(self, x):
#         x2 = self.norm1(x)
#         # Add and Muti-head attention
#         # x = x + self.dropout1(self.self_attention(x2, x2, x2, mask))
#         x = x + self.dropout1(self.self_attention(x2, x2, x2))
#         x2 = self.norm2(x)
#         x = x + self.dropout2(self.feed_forward(x2))
#         return x

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.5):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.embedding_dim = embedding_dim
      
        # self.layernorm = nn.LayerNorm(d_model, eps=1e-5)
         
        self.pos_encoder = PositionalEncoder(embedding_dim, dropout= 0.3, max_seq_length = max_seq_len)  # Max time length
        
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.embedding_dim = embedding_dim
        # self.layers = nn.ModuleList([EncoderLayer(embedding_dim, num_heads, 2048, dropout) for _ in range(num_layers)])
        # self.layers = nn.ModuleList([EncoderLayer(embedding_dim, num_heads, 1024, dropout) for _ in range(num_layers)])
        self.norm = Norm(embedding_dim)
        # self.position_embedding = PositionalEncoder(embedding_dim, max_seq_len, dropout)
        
        
        
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    # activation = 'gelu', 
                                                    dim_feedforward = embedding_dim,
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm, 
                                                          enable_nested_tensor=True)
        self.Intermediate = nn.Sequential(
            nn.Linear(embedding_dim,1024),
            nn.ELU()
        )

        self.output = nn.Sequential(
            nn.Linear(1024, embedding_dim),
            nn.ELU(), # borrar
            nn.LayerNorm(embedding_dim, eps=1e-12),
            nn.Dropout(p = 0.1)
        )

        self.pooler = Pooler(embedding_dim)
        
        # self.classifier= nn.Sequential(
        #     nn.Linear(embedding_dim, 20),
        #     # nn.Dropout(0.2),
        #     # nn.Linear(128, 64),
        #     # nn.ReLU(),
        #     # nn.Dropout(0.2),
        #     # nn.Linear(64, 20),
        #     # nn.Dropout(0.2),
        # )
        
    def forward(self, src: Tensor) -> Tensor:
        
        src = src.permute(0, 2, 1)
        src = self.pos_encoder(src)
 
        src = src * math.sqrt(self.embedding_dim) 
        src = self.transformer_encoder(src)
        src = self.Intermediate(src)
        src = self.output(src)
        # src, _ = src.max(dim=1)
        # sequence_output = encoder_outputs[0]
        src = self.pooler(src)
        # src = self.classifier(src)
        
            
        return src
 

class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 n_head : int=2,
                 n_layers : int = 1,
                 dropout_transformer : float = 0.2,
                 dropout_classifier: float = 0.2)-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 78, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = n_head,  
                                          num_layers = n_layers, 
                                          dropout = dropout_transformer)
    
    def forward(self, src: torch.Tensor):
       
        src = self.cnn(src) 
        
        src = self.encoder(src)
        return src
       

ORIGINAL

from torch.autograd import Variable

class CNN(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super(CNN, self).__init__()
        filters = [128, 128, 128, out_channels]
        kernel_size = (3,3)
        pool_size = [(2, 2), (4, 1), (3 , 1)] 
        self.cnn = nn.Sequential(
            
            nn.Conv2d(in_channels, out_channels=filters[0], kernel_size=kernel_size, padding=1),
            nn.GELU(),
            nn.MaxPool2d(kernel_size=pool_size[0], stride=pool_size[0]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[0], out_channels=filters[1], kernel_size=kernel_size, padding=1),
            nn.GELU(),
            nn.MaxPool2d(kernel_size=pool_size[1], stride=pool_size[1]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[1], out_channels=filters[2], kernel_size=kernel_size, padding=1),
            nn.GELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2),

            nn.Conv2d(filters[2], out_channels=filters[3], kernel_size=kernel_size, padding=1),
            nn.GELU(),
            nn.MaxPool2d(kernel_size=pool_size[2], stride=pool_size[2]),
            nn.Dropout(0.2)
        )

        self._initialize_weights()
        
    def forward(self, src:Tensor) -> Tensor:
    
        src = src.unsqueeze(1)
        src =  self.cnn(src)
        src = src.squeeze(2)       
        return src
    
    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)


    
# The positional encoding vector, embedding_dim is d_model
class PositionalEncoder(nn.Module):
    def __init__(self, embedding_dim, max_seq_length=512, dropout=0.1):
        super(PositionalEncoder, self).__init__()
        self.embedding_dim = embedding_dim
        self.dropout = nn.Dropout(dropout)
        pe = torch.zeros(max_seq_length, embedding_dim)
        for pos in range(max_seq_length):
            for i in range(0, embedding_dim, 2):
                pe[pos, i] = math.sin(pos/(10000**(2*i/embedding_dim)))
                pe[pos, i+1] = math.cos(pos/(10000**((2*i+1)/embedding_dim)))
        pe = pe.unsqueeze(0)        
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x*math.sqrt(self.embedding_dim)
        seq_length = x.size(1)
        pe = Variable(self.pe[:, :seq_length], requires_grad=False).to(x.device)
        # Add the positional encoding vector to the embedding vector
        x = x + pe
        x = self.dropout(x)
        return x
    
        
    

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.1,
                 output: int=20):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.embedding_dim = embedding_dim
        self.pos_encoder = PositionalEncoder(embedding_dim, max_seq_length = max_seq_len) 
        self.embedding_dim = embedding_dim
        self.norm = nn.LayerNorm(embedding_dim,eps = 1e-12)
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    dim_feedforward = embedding_dim,
                                                    # activation = "gelu",
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm, 
                                                          enable_nested_tensor=True)
        self.Intermediate = nn.Sequential(
            nn.Linear(embedding_dim,1024),
            nn.GELU(),
            nn.Linear(1024, embedding_dim),
            nn.LayerNorm(embedding_dim, eps=1e-12),
            nn.GELU(), # borrar
            nn.Dropout(p = 0.1)
        )

        self.output = nn.Linear(embedding_dim,  output)

        self._initialize_weights()


    def forward(self, src: Tensor) -> Tensor:
        
        src = src.permute(0, 2, 1)
        src = self.pos_encoder(src)
 
        src = src * math.sqrt(self.embedding_dim) 
        src = self.transformer_encoder(src)
        src = self.Intermediate(src)
        src, _ = src.max(dim=1)
        # src = src.mean(dim=1)
        src = self.output(src)

        return src
    
    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
 

class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 num_heads : int=2,
                 num_layers : int = 1,
                 )-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 78, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = num_heads,  
                                          num_layers = num_layers, 
                                          output=classes)
                
    def forward(self, src: torch.Tensor):
        src = self.cnn(src) 
        
        src = self.encoder(src)
        return src
       

In [None]:
from torch.autograd import Variable

class CNN(nn.Module):
    
    def __init__(self, in_channels:int, out_channels:int)-> Tensor:
        super(CNN, self).__init__()
        kernels = [(4,4),(4,2),(4,2)]
        strides = [(2,2), (2,1), (2,1)]
        padding = [(1,1)]
        number_filters = [int(32), int(64), int(128),  out_channels]

        self.cnn = nn.Sequential(OrderedDict([
            ("batch_norm",nn.BatchNorm2d(1)),

            ("conv_1", nn.Conv2d(in_channels = in_channels, out_channels=number_filters[0], kernel_size=(3,3), padding=padding[0])),
            ("act_1", nn.ELU()),
            ("batch_norm_1",nn.BatchNorm2d(number_filters[0])),
            ("max_pooling_1",nn.MaxPool2d(kernel_size=kernels[0], stride=strides[0], padding = padding[0])),
            ("dropout_1",nn.Dropout(p = 0.2)),

            ("conv_2", nn.Conv2d(in_channels = number_filters[0], out_channels=number_filters[1], kernel_size=(3,3), padding=padding[0])),
            ("act_2", nn.ELU()),
            ("batch_norm_2",nn.BatchNorm2d(number_filters[1])),
            ("max_pooling_2",nn.MaxPool2d(kernel_size=kernels[1], stride=strides[1], padding = padding[0])),
            ("dropout_2",nn.Dropout(p = 0.2)),

            ("conv_3", nn.Conv2d(in_channels = number_filters[1], out_channels=number_filters[2], kernel_size=(3,3), padding=padding[0])),
            ("act_3", nn.ELU()),
            ("batch_norm_3",nn.BatchNorm2d(number_filters[2])),
            ("max_pooling_3",nn.MaxPool2d(kernel_size=kernels[1], stride=strides[1], padding = padding[0])),
            ("dropout_3",nn.Dropout(p = 0.2)),

            ("conv_4", nn.Conv2d(in_channels = number_filters[2], out_channels=number_filters[3], kernel_size=(3,3), padding=padding[0])),
            ("act_4", nn.ELU()),
            ("batch_norm_4",nn.BatchNorm2d(number_filters[3])),
            ("max_pooling_4",nn.MaxPool2d(kernel_size=kernels[2], stride=strides[2], padding = padding[0])),
            ("dropout_4",nn.Dropout(p = 0.2))
        ]))

        self._initialize_weights()

    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)


    def forward(self, src:Tensor) -> Tensor:
        
        src = src.unsqueeze(1)
        src =  self.cnn(src)
        
        # src = src.squeeze(2)
        src = src.contiguous().view(src.size(0), 256, 50 * 8)  #torch.Size([batch = 128, 512, 6 * 46])   
        return src #[batch, 512, 231]
        


    
# # The positional encoding vector, embedding_dim is d_model
# class PositionalEncoder(nn.Module):
#     def __init__(self, embedding_dim, max_seq_length=512, dropout=0.1):
#         super(PositionalEncoder, self).__init__()
#         self.embedding_dim = embedding_dim
#         self.dropout = nn.Dropout(dropout)
#         pe = torch.zeros(max_seq_length, embedding_dim)
#         for pos in range(max_seq_length):
#             for i in range(0, embedding_dim, 2):
#                 pe[pos, i] = math.sin(pos/(10000**(2*i/embedding_dim)))
#                 pe[pos, i+1] = math.cos(pos/(10000**((2*i+1)/embedding_dim)))
#         pe = pe.unsqueeze(0)        
#         self.register_buffer('pe', pe)
    
#     def forward(self, x):
#         seq_length = x.size(1)
#         pe = Variable(self.pe[:, :seq_length], requires_grad=False).to(x.device)
#         # Add the positional encoding vector to the embedding vector
#         x = x + pe
#         # x = x * math.sqrt(self.embedding_dim) + pe
#         x = self.dropout(x)
#         return x
    
class PositionalEncoder(nn.Module):

    def __init__(self, embedding_dim: int, max_seq_length: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p = 0.1)
        self.embedding_dim = embedding_dim
        position = torch.arange(max_seq_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2) * (-math.log(10000.0) / embedding_dim))
        pe = torch.zeros(max_seq_length, 1, embedding_dim)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        #[batch_size, seq_len, embedding_dim]
        # print(self.pe[:x.size(0)].size())
        return self.dropout(x * math.sqrt(self.embedding_dim) + self.pe[:x.size(0)] )   
    

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.1,
                 output: int=20):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoder(embedding_dim, max_seq_length = max_seq_len) 
        self.norm = nn.LayerNorm(embedding_dim, eps = 1e-5)
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    dim_feedforward = embedding_dim,
                                                    # activation = "gelu",
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm, 
                                                          enable_nested_tensor=True)

        self.output = nn.Sequential(OrderedDict([
            ("dense",  nn.Linear(in_features= embedding_dim, out_features=128)),
            ("activation", nn.Tanh()),
            # ("activation", nn.GELU()),
            ("dropout", nn.Dropout(p=0.2)),
            ("classifier", nn.Linear(in_features=128, out_features=output)),
        ]))
        

        self._initialize_weights()


    def forward(self, src: Tensor) -> Tensor:
       
        src = src.permute(2,0,1)
       
        src = self.norm(src)
        # src = self.pos_encoder(src)
        src = self.transformer_encoder(src)
        src, _ = src.max(dim=0)
        # src = src.mean(dim=0)
        src = self.output(src)
        return src
    
    def _initialize_weights(self) -> None:
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
 

class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 num_heads : int=2,
                 num_layers : int = 1,
                 )-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 400, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = num_heads,  
                                          num_layers = num_layers, 
                                          output=classes)
                
    def forward(self, src: torch.Tensor):
        src = self.cnn(src) 
        src = self.encoder(src)
        return src
       

BEST


from torch.autograd import Variable

class CNN(nn.Module):
    
    def __init__(self, in_channels:int, out_channels:int)-> Tensor:
        super(CNN, self).__init__()
        padding = [(1,1), (0,0)]
        number_filters = [int(64), int(128),  out_channels]

        self.cnn = nn.Sequential(OrderedDict([
            ("batch_norm",nn.BatchNorm2d(1)),
            ("conv_1", nn.Conv2d(in_channels = in_channels, out_channels=number_filters[0], kernel_size=(3,3), padding=padding[0])),
            ("act_1", nn.ELU()),
            ("max_pooling_1",nn.MaxPool2d(kernel_size=(4,2), stride=(4,1), padding = padding[0])),
            ("dropout_1",nn.Dropout(p = 0.1)),

            ("conv_2", nn.Conv2d(in_channels = number_filters[0], out_channels=number_filters[1], kernel_size=(4,4), padding=padding[0])),
            ("act_2", nn.ELU()),
            ("max_pooling_2",nn.MaxPool2d(kernel_size=(4,2), stride=(4,1), padding = padding[0])),
            ("dropout_2",nn.Dropout(p = 0.1)),

            ("conv_3", nn.Conv2d(in_channels = number_filters[1], out_channels=number_filters[2], kernel_size=(4,4), padding=padding[0])),
            ("act_3", nn.ELU()),
            ("max_pooling_3",nn.MaxPool2d(kernel_size=(4,2), stride=(4,1), padding = padding[1])),
            ("dropout_3",nn.Dropout(p = 0.1)),

        ]))


    def forward(self, src:Tensor) -> Tensor:
        
        src = src.unsqueeze(1)
        src =  self.cnn(src)
        src = src.squeeze(2)
        # src = src.contiguous().view(src.size(0), 256, 50 * 8)  #torch.Size([batch = 128, 512, 6 * 46])   
        return src 
        


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        x = x + self.encoding[:, :x.size(1), :].to(device)
        return x
    

class TransformerEncoderMusic(nn.Module):
    def __init__(self, max_seq_len: int, 
                 embedding_dim: int, 
                 num_heads: int,  
                 num_layers: int, 
                 dropout: float = 0.1,
                 output: int=20):
        
        super(TransformerEncoderMusic, self).__init__()
        self.model_type = 'Transformer'
        self.embedding_dim = embedding_dim
        self.embedding = nn.Linear(max_seq_len, embedding_dim)
        self.pos_encoder = PositionalEncoding(embedding_dim) 
        self.norm = nn.LayerNorm(embedding_dim, eps = 1e-6)
        encoder_layers = nn.TransformerEncoderLayer(embedding_dim, 
                                                    nhead = num_heads,  
                                                    dropout=dropout, 
                                                    dim_feedforward = embedding_dim,
                                                    # activation = "gelu",
                                                    batch_first = False)
        
        self.transformer_encoder  = nn.TransformerEncoder(encoder_layers, 
                                                          num_layers =  num_layers, 
                                                          norm = self.norm
                                                          )

        self.output = nn.Sequential(OrderedDict([
            ("dense",  nn.Linear(in_features= embedding_dim, out_features=embedding_dim)),
            ("activation", nn.Tanh()),
            # ("activation", nn.GELU()),
            ("dropout", nn.Dropout(p=0.2)),
            ("classifier", nn.Linear(in_features=embedding_dim, out_features=output)),
        ]))


    def forward(self, src: Tensor) -> Tensor:
        src = self.embedding(src) * math.sqrt(self.embedding_dim)
        # src = print(src.size())
        # src = src.permute(2,0,1)
      
        src = self.norm(src)
        src = self.pos_encoder(src)
        src = src.permute(2,0,1)
        src = self.transformer_encoder(src)
        # src, _ = src.max(dim=0)
        src = src.mean(dim=0)
 
        src = self.output(src)
   
        return src
    


class VisualEncoderModel(nn.Module):
    def __init__(self, 
                 in_channels_cnn: int, 
                 out_channels_cnn:int, 
                 classes:int = 20, 
                 num_heads : int=2,
                 num_layers : int = 1,
                 )-> Tensor:
        
        super(VisualEncoderModel, self).__init__()
        self.cnn = CNN(in_channels_cnn, out_channels_cnn)
        
        self.encoder = TransformerEncoderMusic(max_seq_len = 93, 
                                          embedding_dim = out_channels_cnn, 
                                          num_heads = num_heads,  
                                          num_layers = num_layers, 
                                          output=classes)
                
    def forward(self, src: torch.Tensor):
        src = self.cnn(src) 
        src = self.encoder(src)
        return src
       