<a href="https://colab.research.google.com/github/LuisFerRosas/ia3/blob/nuevo/nuevo__modelo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
import torch
import torch.nn as nn
import numpy as np
from torch import Tensor 
import math

In [18]:
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)


class PositionalEncoding(nn.Module):
  def __init__(self,
                emb_size: int,
                dropout: float,
                maxlen: int = 6000):
      super(PositionalEncoding, self).__init__()
      den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
      pos = torch.arange(0, maxlen).reshape(maxlen, 1)
      pos_embedding = torch.zeros((maxlen, emb_size))
      pos_embedding[:, 0::2] = torch.sin(pos * den)
      pos_embedding[:, 1::2] = torch.cos(pos * den)
      pos_embedding = pos_embedding.unsqueeze(-2)

      self.dropout = nn.Dropout(dropout)
      self.register_buffer('pos_embedding', pos_embedding)

  def forward(self, token_embedding: Tensor):
      return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

In [19]:
class TmusicTrasforms(nn.Module):
  def __init__(self,num_emotions,n_vocabulario_tgt):
    super().__init__() 
    
    ################ TRANSFORMER BLOCK #############################
    # maxpool the input feature map/tensor to the transformer 
    # a rectangular kernel worked better here for the rectangular input spectrogram feature map/tensor
    self.transformer_maxpool = nn.MaxPool2d(kernel_size=[1,4], stride=[1,4])
    
    self.src_tok_emb = TokenEmbedding(2752*2+40, 512)
    self.tgt_tok_emb = TokenEmbedding(n_vocabulario_tgt, 512)
    self.positional_encoding = PositionalEncoding(512, dropout=0.3)
    
    transformer_layer = nn.TransformerEncoderLayer(
        d_model=40, # input feature (frequency) dim after maxpooling 40*282 -> 40*70 (MFC*time)
        nhead=4, # 4 self-attention layers in each multi-head self-attention layer in each encoder block
        dim_feedforward=512, # 2 linear layers in each encoder block's feedforward network: dim 40-->512--->40
        dropout=0.4, 
        activation='relu' # ReLU: avoid saturation/tame gradient/reduce compute time
    )
    
    self.transformer_encoder = nn.TransformerEncoder(transformer_layer, num_layers=4)
    
    transformer_decoder_layer=nn.TransformerDecoderLayer(
        d_model=512,
        nhead=4,
        dim_feedforward=512,
        dropout=0.4,
        activation='relu'
    )
    self.transformer_decoder=nn.TransformerDecoder(transformer_decoder_layer,num_layers=6)

    ############### 1ST PARALLEL 2D CONVOLUTION BLOCK ############
    # 3 sequential conv2D layers: (1,40,282) --> (16, 20, 141) -> (32, 5, 35) -> (64, 1, 8)
    self.conv2Dblock1 = nn.Sequential(
        
        # 1st 2D convolution layer
        nn.Conv2d(
            in_channels=1, # input volume depth == input channel dim == 1
            out_channels=8, # expand output feature map volume's depth to 16
            kernel_size=3, # typical 3*3 stride 1 kernel
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(8), # batch normalize the output feature map before activation
        nn.ReLU(), # feature map --> activation map
        nn.MaxPool2d(kernel_size=2, stride=2), #typical maxpool kernel size
        nn.Dropout(p=0.3), #randomly zero 30% of 1st layer's output feature map in training
        
        # 2nd 2D convolution layer identical to last except output dim, maxpool kernel
        nn.Conv2d(
            in_channels=8, 
            out_channels=16, # expand output feature map volume's depth to 32
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(16),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=5, stride=5), # increase maxpool kernel for subsequent filters
        nn.Dropout(p=0.3), 

        # 2nd 2D convolution layer identical to last except output dim, maxpool kernel
        nn.Conv2d(
            in_channels=16, 
            out_channels=32, # expand output feature map volume's depth to 32
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2), # increase maxpool kernel for subsequent filters
        nn.Dropout(p=0.3), 
        
        # 3rd 2D convolution layer identical to last except output dim
        nn.Conv2d(
            in_channels=32,
            out_channels=64, # expand output feature map volume's depth to 64
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout(p=0.3),
    )
    ############### 2ND PARALLEL 2D CONVOLUTION BLOCK ############
    # 3 sequential conv2D layers: (1,40,282) --> (16, 20, 141) -> (32, 5, 35) -> (64, 1, 8)
    self.conv2Dblock2 = nn.Sequential(
      # 1st 2D convolution layer
        nn.Conv2d(
            in_channels=1, # input volume depth == input channel dim == 1
            out_channels=8, # expand output feature map volume's depth to 16
            kernel_size=3, # typical 3*3 stride 1 kernel
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(8), # batch normalize the output feature map before activation
        nn.ReLU(), # feature map --> activation map
        nn.MaxPool2d(kernel_size=2, stride=2), #typical maxpool kernel size
        nn.Dropout(p=0.3), #randomly zero 30% of 1st layer's output feature map in training
        
        # 2nd 2D convolution layer identical to last except output dim, maxpool kernel
        nn.Conv2d(
            in_channels=8, 
            out_channels=16, # expand output feature map volume's depth to 32
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(16),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=5, stride=5), # increase maxpool kernel for subsequent filters
        nn.Dropout(p=0.3), 

        # 2nd 2D convolution layer identical to last except output dim, maxpool kernel
        nn.Conv2d(
            in_channels=16, 
            out_channels=32, # expand output feature map volume's depth to 32
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(32),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2), # increase maxpool kernel for subsequent filters
        nn.Dropout(p=0.3), 
        
        # 3rd 2D convolution layer identical to last except output dim
        nn.Conv2d(
            in_channels=32,
            out_channels=64, # expand output feature map volume's depth to 64
            kernel_size=3,
            stride=1,
            padding=1
                  ),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Dropout(p=0.3),
    
    )

    ################# FINAL LINEAR BLOCK ####################
    # Linear softmax layer to take final concatenated embedding tensor 
    #    from parallel 2D convolutional and transformer blocks, output 8 logits 
    # Each full convolution block outputs (64*1*8) embedding flattened to dim 512 1D array 
    # Full transformer block outputs 40*70 feature map, which we time-avg to dim 40 1D array
    # 512*2+40 == 1064 input features --> 8 output emotions 
    self.fc1_linear = nn.Linear(2752*2+40,num_emotions) 
    
    ### Softmax layer for the 8 output logits from final FC linear layer 
    self.softmax_out = nn.Softmax(dim=1) # dim==1 is the freq embedding
    
    # define one complete parallel fwd pass of input feature tensor thru 2*conv+1*transformer blocks
  def forward(self,x,partitura_tok):
    
    ############ 1st parallel Conv2D block: 4 Convolutional layers ############################
    # create final feature embedding from 1st convolutional layer 
    # input features pased through 4 sequential 2D convolutional layers
    conv2d_embedding1 = self.conv2Dblock1(x) # x == N/batch * channel * freq * time
    print("conv2d_embedding1 : "+str(conv2d_embedding1.shape))
    # flatten final 64*1*8 feature map from convolutional layers to length 512 1D array 
    # skip the 1st (N/batch) dimension when flattening
    conv2d_embedding1 = torch.flatten(conv2d_embedding1, start_dim=1) 
    print("conv2d_embedding1 flatten : "+str(conv2d_embedding1.shape))
    ############ 2nd parallel Conv2D block: 4 Convolutional layers #############################
    # create final feature embedding from 2nd convolutional layer 
    # input features pased through 4 sequential 2D convolutional layers
    conv2d_embedding2 = self.conv2Dblock2(x) # x == N/batch * channel * freq * time
    print("conv2d_embedding2 : "+str(conv2d_embedding2.shape))
    # flatten final 64*1*8 feature map from convolutional layers to length 512 1D array 
    # skip the 1st (N/batch) dimension when flattening
    conv2d_embedding2 = torch.flatten(conv2d_embedding2, start_dim=1) 
    print("conv2d_embedding2 flatten : "+str(conv2d_embedding2.shape))
      
    ########## 4-encoder-layer Transformer block w/ 40-->512-->40 feedfwd network ##############
    # maxpool input feature map: 1*40*282 w/ 1*4 kernel --> 1*40*70
    x_maxpool = self.transformer_maxpool(x)
    print("x_maxpool : "+str(x_maxpool.shape))
    # remove channel dim: 1*40*70 --> 40*70
    x_maxpool_reduced = torch.squeeze(x_maxpool,1)
    print("x_maxpool_reduced : "+str(x_maxpool_reduced.shape))
    # convert maxpooled feature map format: batch * freq * time ---> time * batch * freq format
    # because transformer encoder layer requires tensor in format: time * batch * embedding (freq)
    x = x_maxpool_reduced.permute(2,0,1) 
    print("x------>entrada transformer : "+str(x.shape))
    # finally, pass reduced input feature map x into transformer encoder layers
    transformer_output = self.transformer_encoder(x)
    print("salida transformer : "+str(transformer_output.shape))
    
    # create final feature emedding from transformer layer by taking mean in the time dimension (now the 0th dim)
    # transformer outputs 2x40 (MFCC embedding*time) feature map, take mean of columns i.e. take time average
    transformer_embedding = torch.mean(transformer_output, dim=0) # dim 40x70 --> 40
    print("transformer_embedding  : "+str(transformer_embedding.shape))
  
    complete_embedding = torch.cat([conv2d_embedding1, conv2d_embedding2,transformer_embedding], dim=1)  
    print("complete_embedding  : "+str(complete_embedding.shape))
    complete_embedding=complete_embedding.transpose(1,0)
    print("complete_embedding222  : "+str(complete_embedding.shape))
    memory = self.src_tok_emb(complete_embedding)
    print("memory  : "+str(memory.shape))
    # partitura_tok=partitura_tok.transpose(1,0)
    # print("partitura_tok  : "+str(partitura_tok.shape))
    partitura_tok= self.tgt_tok_emb(partitura_tok)
    print("partitura_tok2  : "+str(partitura_tok.shape))
    tgt_emb = self.positional_encoding(partitura_tok)
    print("tgt_emb  : "+str(tgt_emb.shape))

    ouput_decoder=self.transformer_decoder(tgt_emb,memory)
    print("ouput_decoder  : "+str(ouput_decoder.shape))
    ######### final FC linear layer, need logits for loss #########################
    output_logits = self.fc1_linear(complete_embedding)  
    print("output_logits  : "+str(output_logits.shape))
    ######### Final Softmax layer: use logits from FC linear, get softmax for prediction ######
    output_softmax = self.softmax_out(output_logits)
    print("output_softmax  : "+str(output_softmax.shape))
    # need output logits to compute cross entropy loss, need softmax probabilities to predict class
    return output_logits, output_softmax     


In [28]:
import torch
inputt = torch.randn(2,1, 40,1723, requires_grad=True)
partitura = torch.randn(2,133, requires_grad=True)
pert = torch.randn(2,5544, requires_grad=True)
print(inputt.shape)
print(partitura.shape)
print(pert.shape)

torch.Size([2, 1, 40, 1723])
torch.Size([2, 133])
torch.Size([2, 5544])


In [21]:
model=TmusicTrasforms(8,n_vocabulario_tgt=134)
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model=model.to(DEVICE)
for p in model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

In [22]:
model.train()
ouput=model(inputt,partitura)
ouput

conv2d_embedding1 : torch.Size([2, 64, 1, 43])
conv2d_embedding1 flatten : torch.Size([2, 2752])
conv2d_embedding2 : torch.Size([2, 64, 1, 43])
conv2d_embedding2 flatten : torch.Size([2, 2752])
x_maxpool : torch.Size([2, 1, 40, 430])
x_maxpool_reduced : torch.Size([2, 40, 430])
x------>entrada transformer : torch.Size([430, 2, 40])
salida transformer : torch.Size([430, 2, 40])
transformer_embedding  : torch.Size([2, 40])
complete_embedding  : torch.Size([2, 5544])
complete_embedding222  : torch.Size([5544, 2])


IndexError: ignored

In [None]:
from torchsummary import summary

# need device to instantiate model
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# instantiate model for 8 emotions and move to GPU 
model = TmusicTrasforms(8).to(device)

# include input feature map dims in call to summary()
summary(model, input_size=[(1,40,1723),(1,100)])


In [33]:
pert.long().dtype

torch.int64

In [39]:
emnb =nn.Embedding(6000,512)

salida=emnb(pert.long())
salida.shape

IndexError: ignored