In [67]:
#importar dependencias
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.jit import Final
from torch.cuda.amp import autocast
from functools import partial
import os
os.environ['TORCH_HOME'] = '../../pretrained_models'
import timm
from timm.layers import use_fused_attn
from timm.models.layers import to_2tuple,trunc_normal_
import numpy as np
import wget

# Clases de huggingface
Las siguientes clases fueron extraída del repositorio:

https://github.com/huggingface/pytorch-image-models

In [85]:
class Attention(nn.Module):
    '''
    Clase para representar una cabeza de atención
    '''
    fused_attn: Final[bool]

    def __init__(
            self,
            dim,
            num_heads=8,
            qkv_bias=False,
            qk_norm=False,
            attn_drop=0.,
            proj_drop=0.,
            norm_layer=nn.LayerNorm,
    ):
        super().__init__()
        assert dim % num_heads == 0, 'dim should be divisible by num_heads'
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.scale = self.head_dim ** -0.5
        self.fused_attn = use_fused_attn()

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.q_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity()
        self.k_norm = norm_layer(self.head_dim) if qk_norm else nn.Identity()
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)
    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)
        q, k = self.q_norm(q), self.k_norm(k)

        if self.fused_attn:
            x = F.scaled_dot_product_attention(
                q, k, v,
                dropout_p=self.attn_drop.p,
            )
        else:
            q = q * self.scale
            attn = q @ k.transpose(-2, -1)
            attn = attn.softmax(dim=-1)
            attn = self.attn_drop(attn)
            x = attn @ v

        x = x.transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

In [86]:
class LayerScale(nn.Module):
    '''When increasing the depth of the ViT models, they meet with optimization instability and eventually don't
    converge. The residual connections within each Transformer block introduce information bottleneck
    . When there is an increased amount of depth, this bottleneck can quickly explode and deviate the
    optimization pathway for the underlying model..
    
    https://keras.io/examples/vision/cait/
    '''
    def __init__(self, dim, init_values=1e-5, inplace=False):
        super().__init__()
        self.inplace = inplace
        self.gamma = nn.Parameter(init_values * torch.ones(dim))

    def forward(self, x):
        return x.mul_(self.gamma) if self.inplace else x * self.gamma

In [87]:
class DropPath(nn.Module):
    """
    Evita coadapatación mediante la eliminación de paths poe muestra
    """
    def __init__(self, drop_prob: float = 0., scale_by_keep: bool = True):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob
        self.scale_by_keep = scale_by_keep

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training, self.scale_by_keep)

    def extra_repr(self):
        return f'drop_prob={round(self.drop_prob,3):0.3f}'

In [88]:
#QT partial(nn.Conv2d, kernel_size=1)
        


In [89]:
class Mlp(nn.Module):
    """ perceptron multi capa
    """
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, 
                 norm_layer=None, bias=True,drop=0.,use_conv=False):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        bias = to_2tuple(bias)
        drop_probs = to_2tuple(drop)
        linear_layer = partial(nn.Conv2d, kernel_size=1) if use_conv else nn.Linear

        self.fc1 = linear_layer(in_features, hidden_features, bias=bias[0])
        self.act = act_layer()
        self.drop1 = nn.Dropout(drop_probs[0])
        self.norm = norm_layer(hidden_features) if norm_layer is not None else nn.Identity()
        self.fc2 = linear_layer(hidden_features, out_features, bias=bias[1])
        self.drop2 = nn.Dropout(drop_probs[1])

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop1(x)
        x = self.fc2(x)
        x = self.drop2(x)
        return x

In [98]:
class Block(nn.Module):

    def __init__(
            self,
            dim,
            num_heads,
            mlp_ratio=4.,
            qkv_bias=False,
            qk_norm=False,
            proj_drop=0.,
            attn_drop=0.,
            init_values=None,
            drop_path=0.,
            act_layer=nn.GELU,
            norm_layer=nn.LayerNorm,
            mlp_layer=Mlp):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(dim,num_heads=num_heads,qkv_bias=qkv_bias,qk_norm=qk_norm,attn_drop=attn_drop,proj_drop=proj_drop, norm_layer=norm_layer)
        self.ls1 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity()
        self.drop_path1 = DropPath(drop_path) if drop_path > 0. else nn.Identity()

        self.ls1 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity()
        self.drop_path1 = DropPath(drop_path) if drop_path > 0. else nn.Identity()

        self.norm2 = norm_layer(dim)
        self.mlp = mlp_layer(
            in_features=dim,
            hidden_features=int(dim * mlp_ratio),
            act_layer=act_layer,
            drop=proj_drop,
        )
        self.ls2 = LayerScale(dim, init_values=init_values) if init_values else nn.Identity()
        self.drop_path2 = DropPath(drop_path) if drop_path > 0. else nn.Identity()

    def forward(self, x):
        #se va sumando en un flujo residual
        x = x + self.drop_path1(self.ls1(self.attn(self.norm1(x))))
        x = x + self.drop_path2(self.ls2(self.mlp(self.norm2(x))))
        return x


## Nuestras clases

Si bien sabemos que Pytorch cuenta con sus propias implementaciones de embedding se decidió por implementar la propia, de nuevo con el fin de implementar algo cercano a lo mostrado en el artículo

In [91]:
class PatchEmbedding(nn.Module):
    '''
    Clase para obtener el patch embedding: Dividimos la imagen en 'paches' y obtenemos 
    la información importante de esto.
    '''
    def __init__(self, img_size = 224, patch_size = 16, input_channels = 3, embedding_n = 768):
        super().__init__()

        self.img_size = (img_size, img_size)#QT o_tuple
        self.patch_size = (patch_size, patch_size)
        self.n_patch = (self.img_size[1] // self.patch_size[1]) * (self.img_size[0] // self.patch_size[0])
        self.proj = nn.Conv2d(input_channels, embedding_n, kernel_size = patch_size, stride = patch_size)

    def forward(self, x):
        x = self.proj(x).flatten(2).transpose(1, 2)
        return x

## Modelo

El modelo se basa en la arquitectura de los transformadores. La idea es utilizar los transformadores para el procesamiento de imágenes pero en audio, tal como se explica en el artículo: https://arxiv.org/abs/2104.01778
Para lograr lo anterior se tomaron las siguientes consideraciones: 

* Se trata a los audio como imágenes, esto mediante el espectograma del audio.
* Como nos basamos en la arquitectura de procesamiento de imágenes y con el objetivo de tener un mejor rendimiento se usará un `class_token`para obtener el input embedding (ampliar explicación) 


In [92]:
#QT timm.__version__ == '0.4.5'

In [99]:
class ASTModel(nn.Module):
    """
    The AST model.
    :param class_n: numero de clases. QT 527 for AudioSet
    :param div_f: division del patch en dim frecuencia.QT 16*16 patchs, fstride=16 means no overlap, fstride=10 means overlap of 6
    :param div_t: division del patch en dim tiempo. QT 16*16 patchs, tstride=16 means no overlap, tstride=10 means overlap of 6
    :param input_f: bins de frecuencia en entrada.
    :param input_t: frames de tiempo en entrada. the number of time frames of the input spectrogram base224 and base 384 are same model, but are trained differently during ImageNet pretraining.
    """
    def __init__(self, class_n=527, div_f=10, div_t=10, input_f=128, input_t=1024):

        super(ASTModel, self).__init__()
        
        self.patch_embedding = PatchEmbedding() 
        self.embedding_n = self.patch_embedding.n_patch #QT embed_len self.original_num_patches self.embed_dim
        self.hw = int(self.embedding_n ** 0.5)#QT quitar(?) self.oringal_hw
        
        self.p_e = nn.Parameter(torch.zeros(1, self.embedding_n + 2, self.embedding_n))
                        #QT nn.Parameter(torch.randn(1, self.embedding_n, self.embedding_n) * .02) alternativa
        
        
        self.original_embedding_n = self.embedding_n + 2 #self.p_e.shape[2]
        self.mlp = nn.Sequential(nn.LayerNorm(self.embedding_n), 
                                          nn.Linear(self.embedding_n, class_n))#/us/
        
        #QT automatcially get the intermediate shape #/us/ cambiar nombres
        f_dim, t_dim = self.get_shape(div_f, div_t, input_f, input_t)
        self.patch_embedding.n_patch = f_dim * t_dim
        
        #QT the linear projection layer self.original_embedding_n original_
        self.patch_embedding.proj  = torch.nn.Conv2d(1, self.embedding_n, kernel_size=(16, 16), stride=(div_f, div_t))
       
        
        # inicializamos de manera aleatoria los pesos correspondientes al positional embedding 
        # QT can use sinusoidal positional embedding instead, self.patch_embedding.n_patch + 2(no, pues qquitamos dil) (1),
        # self.original_embedding_n (2)
        self.p_e = nn.Parameter(torch.zeros(1, self.patch_embedding.n_patch + 1, self.embedding_n))
        
        trunc_normal_(self.p_e, std=.02)
        
        # Para calcular el input embedding
        self.class_token = nn.Parameter(torch.zeros(1, 1, self.embedding_n))#QT self.original_embedding_n))
        # QT Modelo base  model_size == 'small224','vit_deit_small_distilled_patch16_224'
        # Parametros elegidos a base de prueba y error 
        print('aaaaaa QT')
        print(self.embedding_n)#QT quitar
        #QT num_heads = 6
        self.blocks = nn.Sequential(*[ Block(dim = self.embedding_n, num_heads = 7, mlp_ratio = 4, qkv_bias = True,
                                             norm_layer = partial(nn.LayerNorm, eps=1e-6))
                                      for i in range(12)])
        #capa de normalización
        self.norm = nn.LayerNorm(self.embedding_n)#norm_layer(self.embedding_n) if not use_fc_norm else nn.Identity()
        #QT cambiar: ayudaaaaaa
        pos_drop_rate = 0.0
        self.pos_drop = nn.Dropout(p=pos_drop_rate)
    def get_shape(self, div_f, div_t, input_f=128, input_t=1024):
        test_input = torch.randn(1, 1, input_f, input_t)
        test_proj = nn.Conv2d(1, self.original_embedding_n, kernel_size=(16, 16), stride=(div_f, div_t))
        test_out = test_proj(test_input)
        return (test_out.shape[2], test_out.shape[3])

    @autocast()
    def forward(self, x):
        """
        QT
        :param x: the input spectrogram, expected shape: (batch_size, time_frame_num, frequency_bins)
        :return: el sample al que se cree que pertenece la canción representada por x
        """
        #QT quitar dependiendo del formato de los datos.
        x = x.unsqueeze(1) 
        x = x.transpose(2, 3)
        
        # obtenemos el input embedding
        B = x.shape[0]
        x = self.patch_embedding(x) 
        print(np.shape(x))
        class_tokens = self.class_token.expand(B, -1, -1)
        x = torch.cat((class_tokens, x), dim=1)
        
        # agregamos codificación posicional
        #QT
        print(np.shape(x))
        print(np.shape(self.p_e))
        x = x + self.p_e
        # flujo residual(?)
        x = self.pos_drop(x) 
        
        # Pasamos nuestro vector por todas las cabezas 
        for b in self.blocks:
            x = b(x)
            
        # Normalizamos la salida
        x = self.norm(x)
        x = (x[:,0] + x[:,1]) / 2
        
        # Perceptron multicapa: ffnn
        x = self.mlp(x)
        
        #QT falta otra normalización(?)
        
        return x


In [102]:


input_tdim = 256
ast_mdl = ASTModel(input_t=input_tdim,class_n=50)
# input a batch of 10 spectrogram, each with 512 time frames and 128 frequency bins
test_input = torch.rand([10, input_tdim, 128])
test_output = ast_mdl(test_input)
# output should be in shape [10, 50], i.e., 10 samples, each with prediction of 50 classes.
print(test_output.shape)

aaaaaa QT
196
torch.Size([10, 300, 196])
torch.Size([10, 301, 196])
torch.Size([1, 301, 196])
torch.Size([10, 50])


In [101]:
input_tdim = 100
ast_mdl = ASTModel(input_t=input_tdim)
# input a batch of 10 spectrogram, each with 100 time frames and 128 frequency bins
test_input = torch.rand([10, input_tdim, 128])
test_output = ast_mdl(test_input)
# output should be in shape [10, 527], i.e., 10 samples, each with prediction of 527 classes.
print(test_output.shape)
# agregar comentarios
# cambiar nombres
# ver si se puede cambiar lógica
# poner explicaciones a las clases hugging
# probar

aaaaaa QT
196
torch.Size([10, 108, 196])
torch.Size([10, 109, 196])
torch.Size([1, 109, 196])
torch.Size([10, 527])
