# **nnfp**

Nesta file consta a arquitetura da rede.

Importação das bibliotecas

In [1]:
# -*- coding: utf-8 -*-
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
""" nnfp.py

'Neural Audio Fingerprint for High-specific Audio Retrieval based on 
Contrastive Learning', https://arxiv.org/abs/2010.11910

USAGE:
    
    Please see test() in the below.
    
"""
import numpy as np
import tensorflow as tf
assert tf.__version__ >= "2.0"

2024-03-26 14:57:44.561224: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


- hidden_ch - número de canais/filtros de saída da convolução.
- strides - tuple com os strides para cada uma das convoluções, a primeira é apra a convolução 1x3 e a segunda é para a convolução 3x1.
- norm - tipo de normalização a ser aplicado
- As duas convoluções:
    - conv2d_1x3 - Convolução 1x3 que opera na direção do eixo temporal.
    - conv2d_3x1 - Convolução 3x1 que opera na direção do eixo frequência.
    
Em ***conv2d_1x3*** é definido o tamanho do *kernel_size*, os *strides[0]* para a primeira convolução.O *padding='SAME'* a ser aplicado à entrada durante a convolução, em que *'SAME'* significa que o preenchimento e aplicado para que a saída tenha a mesma altura e largura que a entrada. *dilation_rate* controla o espaçamento entre os elementos do kernel ao longo da entrada, (1,1) indica que não há espacamento. *kernel_initializer* inicializa os pesos de forma aleatória, seguindo uma distribuição uniforme. *bias_initializer* método de inicialiação do bias, inicializa com zeros.

```python
self.conv2d_1x3 = tf.keras.layers.Conv2D(hidden_ch,
                                            kernel_size=(1, 3),
                                            strides=strides[0],
                                            padding='SAME',
                                            dilation_rate=(1, 1),
                                            kernel_initializer='glorot_uniform',
                                            bias_initializer='zeros')
```
Em ***conv2d_3x1*** é semelhante à ***conv2d_1x3***, mas opera noutra direção, daí definir tamanho do *kernel_size* como (3,1).
```python
self.conv2d_3x1 = tf.keras.layers.Conv2D(hidden_ch,
                                            kernel_size=(3, 1),
                                            strides=strides[1],
                                            padding='SAME',
                                            dilation_rate=(1, 1),
                                            kernel_initializer='glorot_uniform',
                                            bias_initializer='zeros')
```
No método *call* recebe *x* (tensor de espetrograma de áudio) como entrada e passa pelo *forward*. *x* passa pela convolução 1x3, seguido da função de ativação ELU e normalização. Depois *x* passa pela convolução 3x1, seguido da função de ativação ELU e normalização.
```python
 self.forward = tf.keras.Sequential([self.conv2d_1x3,
                                            tf.keras.layers.ELU(),
                                            self.BN_1x3,
                                            self.conv2d_3x1,
                                            tf.keras.layers.ELU(),
                                            self.BN_3x1
                                            ])
```

**ConvLayer** é projetada para extrair as características do áudio bidimensionais, capturandos padrões tanto no eixo temporal como no eixo de frequência do espetrograma de entrada.

In [2]:
class ConvLayer(tf.keras.layers.Layer):
    """
    Separable convolution layer
    
    Arguments
    ---------
    hidden_ch: (int)
    strides: [(int, int), (int, int)]
    norm: 'layer_norm1d' for normalization on Freq axis. (default)
          'layer_norm2d' for normalization on on FxT space 
          'batch_norm' or else, batch-normalization
    
    Input
    -----
    x: (B,F,T,1)
    
    [Conv1x3]>>[ELU]>>[BN]>>[Conv3x1]>>[ELU]>>[BN]
    
    Output
    ------
    x: (B,F,T,C) with {F=F/stride, T=T/stride, C=hidden_ch}
    
    """
    def __init__(self,
                 hidden_ch=128,
                 strides=[(1,1),(1,1)],
                 norm='layer_norm2d'):
        super(ConvLayer, self).__init__()
        self.conv2d_1x3 = tf.keras.layers.Conv2D(hidden_ch,
                                                 kernel_size=(1, 3),
                                                 strides=strides[0],
                                                 padding='SAME',
                                                 dilation_rate=(1, 1),
                                                 kernel_initializer='glorot_uniform',
                                                 bias_initializer='zeros')
        self.conv2d_3x1 = tf.keras.layers.Conv2D(hidden_ch,
                                                 kernel_size=(3, 1),
                                                 strides=strides[1],
                                                 padding='SAME',
                                                 dilation_rate=(1, 1),
                                                 kernel_initializer='glorot_uniform',
                                                 bias_initializer='zeros')
        
        if norm == 'layer_norm1d':
            self.BN_1x3 = tf.keras.layers.LayerNormalization(axis=-1)
            self.BN_3x1 = tf.keras.layers.LayerNormalization(axis=-1)
        elif norm == 'layer_norm2d':
            self.BN_1x3 = tf.keras.layers.LayerNormalization(axis=(1, 2, 3))
            self.BN_3x1 = tf.keras.layers.LayerNormalization(axis=(1, 2, 3))
        else:
            self.BN_1x3 = tf.keras.layers.BatchNormalization(axis=-1) # Fix axis: 2020 Apr20
            self.BN_3x1 = tf.keras.layers.BatchNormalization(axis=-1)
            
        self.forward = tf.keras.Sequential([self.conv2d_1x3,
                                            tf.keras.layers.ELU(),
                                            self.BN_1x3,
                                            self.conv2d_3x1,
                                            tf.keras.layers.ELU(),
                                            self.BN_3x1
                                            ])

       
    def call(self, x):
        return self.forward(x)

- q - número de slices em que a entrada será dividida. O que determina quantas projeções distintas serão calculadas.
- unit_dim - lista que contém o número de unidades (neuŕonios) para cada camada totalmente conectada em cada projeção. A primeira entrada determina o número de unidades para a primeira camada totalmente conectada, e a segunda entrada determina o número de unidades para a segunda camada totalemnte conectada.
- norm - tipo de normalização

<br>O método *call*, *tf.reshape(x, shape=[x.shape[0], self.q, -1])* a entrada *x* é remodelada para que a sua forma que (B,Q,S), onde *'B'* é o tamanho do batch, *'Q'* o número de slices e *'S'* é o comprimento de cada slice. *self._split_encoding(x)*, calcula as projeções de cada slice e concatena-as ao longo da dimensão do batch, retornando um tensor (B,Q) que contém as projeções de todos os slices.
```python
def call(self, x): # x: (B,1,1,2048)
        x = tf.reshape(x, shape=[x.shape[0], self.q, -1]) # (B,Q,S); Q=num_slices; S=slice length; (B,128,8 or 16)
        return self._split_encoding(x)
```

<br>Para neste caso, com d=128, e de acordo com a tabela 1 do artigo:
<br> $g(.):=L2 \triangleleft Concat \triangleleft C^{1 \leftarrow 32}_{1*1} \triangleleft ELU<C^{32 \leftarrow 8}_{1*1} \triangleleft Split^{h/d}(.) $

In [3]:
class DivEncLayer(tf.keras.layers.Layer):
    """
    Multi-head projection a.k.a. 'divide and encode' layer:
        
    • The concept of 'divide and encode' was discovered  in Lai et.al.,
     'Simultaneous Feature Learning and Hash Coding with Deep Neural Networks',
      2015. https://arxiv.org/abs/1504.03410
    • It was also adopted in Gfeller et.al. 'Now Playing: Continuo-
      us low-power music recognition', 2017. https://arxiv.org/abs/1711.10958
    
    Arguments
    ---------
    q: (int) number of slices as 'slice_length = input_dim / q'
    unit_dim: [(int), (int)]
    norm: 'layer_norm1d' or 'layer_norm2d' uses 1D-layer normalization on the feature.
          'batch_norm' or else uses batch normalization. Default is 'layer_norm2d'.

    Input
    -----
    x: (B,1,1,C)
    
    Returns
    -------
    emb: (B,Q)
    
    """
    def __init__(self, q=128, unit_dim=[32, 1], norm='batch_norm'):
        super(DivEncLayer, self).__init__()

        self.q = q
        self.unit_dim = unit_dim
        self.norm = norm
        
        if norm in ['layer_norm1d', 'layer_norm2d']:
            self.BN = [tf.keras.layers.LayerNormalization(axis=-1) for i in range(q)]
        else:
            self.BN = [tf.keras.layers.BatchNormalization(axis=-1) for i in range(q)]
            
        self.split_fc_layers = self._construct_layers() 


    def build(self, input_shape):
        # Prepare output embedding variable for dynamic batch-size 
        self.slice_length = int(input_shape[-1] / self.q)

 
    def _construct_layers(self):
        layers = list()
        for i in range(self.q): # q: num_slices
            layers.append(tf.keras.Sequential([tf.keras.layers.Dense(self.unit_dim[0], activation='elu'),
                                               #self.BN[i],
                                               tf.keras.layers.Dense(self.unit_dim[1])]))
        return layers

 
    @tf.function
    def _split_encoding(self, x_slices):
        """
        Input: (B,Q,S)
        Returns: (B,Q)
        
        """
        out = list()
        for i in range(self.q):
            out.append(self.split_fc_layers[i](x_slices[:, i, :]))
        return tf.concat(out, axis=1)

    
    def call(self, x): # x: (B,1,1,2048)
        x = tf.reshape(x, shape=[x.shape[0], self.q, -1]) # (B,Q,S); Q=num_slices; S=slice length; (B,128,8 or 16)
        return self._split_encoding(x)

Tem como input - x: (B,F,T,1).
<br>Tem como return um embedding (B,Q).
<br>O construtor *\_\_init\_\_* da class:
- Por padrão a entrada dos melEspetrogramas é (256,32,1), o que indica que os espetrogramas tem uma frequência de 256 e tempo 32.
- front_hidden_ch e front_strides definem a arquitetura das camadas convolucionais da frente.
    - *front_hidden_ch* - Lista que definie o número de canais para cada camada convilucional da frente.
    - *front_strides* - Lista de listas que define o stride para cada camada convolucional da frente.
- *emb_sz* - define o tamanho para cada embedding.
- *fc_unit_dim* - é uma lista com as dimensões das camadas totalmente conectadas após as camdas camadas convolucionais, o valor padrão é [32,1].
- *norm* - tipo de normalização, neste caso será *layer_norm2d* on FxT space.
- *use_L2layer* - indica se a cadama L2 deve ser usada ou não (boolean).

<br>*front_conv* é uma sequência de camadas **convolucionais** da frente, definida em *front_hidden_ch* e *front_strides*.
<br>*div_enc* é uma camada de **divisão** e **codificação** de modo a reduzir a dimensão da saída para o tamanho do embedding.
```python
# Front (sep-)conv layers
for i in range(self.n_clayers):
    self.front_conv.add(ConvLayer(hidden_ch=front_hidden_ch[i],
        strides=front_strides[i], norm=norm))
self.front_conv.add(tf.keras.layers.Flatten()) # (B,F',T',C) >> (B,D)
```

O loop percorre todas as camadas convolucionais definidas pela variável *self.n_clayers* (que define o número de camadas convolucionais na frente da rede). De seguida, adiciona uma camada convolucional à sequência *self.front_conv*. *hidden_ch=front_hidden_ch[i]* especifica o número de canais para a atual camada convolucional, onde *i* representa o índice da camada. * strides=front_strides[i]* espeficia o stride para a camada convolucional atual. Após adicionar todas as camadas convolucionais, uma camada de achatamento é adicionar à sequência de moro a trnasformar os tensores tridimensionais resultantes das camadas em vetores unidimenionais (B,F',T',C) >> (B,D). B é o tamanho do batch, e D o número de elementos do tensor resultante.


<br>O método *call* define como os dados fluem no modelo, recebe inputs como entrada (tensor de espetograma de áudio), passa os dados pela sequência de camadas convolucionais da frente (*front_conv*) para **extrair as características**. Em seguida, passa as características pela camada de divisão e codificação (*div_enc*) praa obter os respetivos embeddings. Se *use_L2layer* for verdadeiro, normaliza os embeddings com normalização L2. Por fim retorna o **embeddings**.

In [4]:
class FingerPrinter(tf.keras.Model):
    """
    Fingerprinter: 'Neural Audio Fingerprint for High-specific Audio Retrieval
        based on Contrastive Learning', https://arxiv.org/abs/2010.11910
    
    IN >> [Convlayer]x8 >> [DivEncLayer] >> [L2Normalizer] >> OUT 
    
    Arguments
    ---------
    input_shape: tuple (int), not including the batch size
    front_hidden_ch: (list)
    front_strides: (list)
    emb_sz: (int) default=128
    fc_unit_dim: (list) default=[32,1]
    norm: 'layer_norm1d' for normalization on Freq axis. 
          'layer_norm2d' for normalization on on FxT space (default).
          'batch_norm' or else, batch-normalization.
    use_L2layer: True (default)
    
    • Note: batch-normalization will not work properly with TPUs.
                    
    
    Input
    -----
    x: (B,F,T,1)
    
        
    Returns
    -------
    emb: (B,Q) 
    
    """
    def __init__(self,
                 input_shape=(256,32,1),
                 front_hidden_ch=[128, 128, 256, 256, 512, 512, 1024, 1024],
                 front_strides=[[(1,2), (2,1)], [(1,2), (2,1)],
                                [(1,2), (2,1)], [(1,2), (2,1)],
                                [(1,1), (2,1)], [(1,2), (2,1)],
                                [(1,1), (2,1)], [(1,2), (2,1)]],
                 emb_sz=128, # q
                 fc_unit_dim=[32,1],
                 norm='layer_norm2d',
                 use_L2layer=True):
        super(FingerPrinter, self).__init__()
        self.front_hidden_ch = front_hidden_ch
        self.front_strides = front_strides
        self.emb_sz=emb_sz
        self.norm = norm
        self.use_L2layer = use_L2layer
        
        self.n_clayers = len(front_strides)
        self.front_conv = tf.keras.Sequential(name='ConvLayers')
        if ((front_hidden_ch[-1] % emb_sz) != 0):
            front_hidden_ch[-1] = ((front_hidden_ch[-1]//emb_sz) + 1) * emb_sz                
        
        # Front (sep-)conv layers
        for i in range(self.n_clayers):
            self.front_conv.add(ConvLayer(hidden_ch=front_hidden_ch[i],
                strides=front_strides[i], norm=norm))
        self.front_conv.add(tf.keras.layers.Flatten()) # (B,F',T',C) >> (B,D)
            
        # Divide & Encoder layer
        self.div_enc = DivEncLayer(q=emb_sz, unit_dim=fc_unit_dim, norm=norm)

        
    @tf.function
    def call(self, inputs):
        x = self.front_conv(inputs) # (B,D) with D = (T/2^4) x last_hidden_ch
        x = self.div_enc(x) # (B,Q)
        if self.use_L2layer:
            return tf.math.l2_normalize(x, axis=1) 
        else:
            return x

Esta função é chamada no ficheiro *trainer.py*. Tem o input_shape definido, emb_sz e o model norm (*layer_norm2d*, pre-definido) vai buscar à config file.
<br>O formato de entrada do modelo é definido como (256,32,1), ou seja, de 3 dimensões.
- 256 - número de frequênicas (bandas) no espetrograma de áudio.
- 32 - número de frames de tempo no espetrograma.
- 1 - número de canais de cor.

In [5]:
def get_fingerprinter(cfg, trainable=False):
    """
    Input length : 1s or 2s
    
    Arguements
    ----------
    cfg : (dict)
        created from the '.yaml' located in /config dicrectory

    Returns
    -------
    <tf.keras.Model> FingerPrinter object
    
    """
    input_shape = (256, 32, 1) 
    emb_sz = cfg['MODEL']['EMB_SZ']
    norm = cfg['MODEL']['BN']
    fc_unit_dim = [32, 1]
    
    m = FingerPrinter(input_shape=input_shape,
                      emb_sz=emb_sz,
                      fc_unit_dim=fc_unit_dim,
                      norm=norm)
    m.trainable = trainable
    return m

## Teste

In [6]:
def test():
    input_1s = tf.constant(np.random.randn(3,256,32,1), dtype=tf.float32) # BxFxTx1
    fprinter = FingerPrinter(emb_sz=128, fc_unit_dim=[32, 1], norm='layer_norm2d')
    emb_1s = fprinter(input_1s) # BxD
    
    input_2s = tf.constant(np.random.randn(3,256,63,1), dtype=tf.float32) # BxFxTx1
    fprinter = FingerPrinter(emb_sz=128, fc_unit_dim=[32, 1], norm='layer_norm2d')
    emb_2s = fprinter(input_2s)
    #%timeit -n 10 fprinter(_input) # 27.9ms
"""
Total params: 19,224,576
Trainable params: 19,224,576
Non-trainable params: 0

"""

'\nTotal params: 19,224,576\nTrainable params: 19,224,576\nNon-trainable params: 0\n\n'