# Transformer Sign2text

## Imports

In [None]:
!pip install tensorflow-text==2.15.0
#!pip install tensorflow-text

Collecting tensorflow-text==2.15.0
  Downloading tensorflow_text-2.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m38.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-text
Successfully installed tensorflow-text-2.15.0


In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import os
import math
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import tensorflow_text as text
import string
import re
import zipfile
from tensorflow_text.tools.wordpiece_vocab import bert_vocab_from_dataset as bert_vocab
import pathlib
from tqdm.notebook import tqdm
from google.colab import drive


tf.get_logger().setLevel('ERROR')
pwd = pathlib.Path.cwd()

In [None]:
drive.mount('/content/drive')
DIR = "H2S2/"

Mounted at /content/drive


In [None]:
# Caminho para o arquivo local no Google Drive
path_to_zip = "/content/drive/MyDrive/H2S2.zip"

# Função para extrair com barra de progresso
def extract_with_progress(zip_path, output_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        # Lista dos nomes dos arquivos no zip
        zip_files = [zinfo.filename for zinfo in zip_ref.filelist]
        # Verifica se pelo menos um arquivo já existe no diretório de destino
        already_exists = any(os.path.exists(os.path.join(output_path, f)) for f in zip_files)

        if already_exists:
            print("Alguns arquivos já existem no diretório de destino. Extração pulada.")
            return

        # Cria um objeto tqdm para a barra de progresso
        total = sum([zinfo.file_size for zinfo in zip_ref.filelist])
        with tqdm(total=total, unit='B', unit_scale=True, desc="Extraindo") as pbar:
            for zinfo in zip_ref.filelist:
                zip_ref.extract(zinfo, output_path)
                pbar.update(zinfo.file_size)

extract_with_progress(path_to_zip, "/content/")  # Extrair para a pasta do Google Colab
print("Processo concluído.")

Extraindo:   0%|          | 0.00/2.68G [00:00<?, ?B/s]

Processo concluído.


In [None]:
WORKSPACE = "SENTENCE"
tfrecord = 'tfrecord/'

BATCH_SIZE = 60

N_EPOCHS = 120
FRAME_LEN = 180
MAX_TOKENS = 30

lr = 1e-4
wd = 1e-4

## Data

In [None]:
# Define the _parse_function to read and parse the TFRecord files
def _parse_function(proto):
    # Define the feature description dictionary
    feature_description = {
        'keypoints': tf.io.VarLenFeature(tf.float32),
        'caption': tf.io.FixedLenFeature([], tf.string),
    }
    # Parse the input tf.train.Example proto using the dictionary above
    parsed_features = tf.io.parse_single_example(proto, feature_description)
    # Reshape frames based on the actual shape of the frames in your dataset
    frames = tf.reshape(tf.sparse.to_dense(parsed_features['keypoints']), [-1, 59, 3])
    caption = parsed_features['caption']
    return frames, caption

In [None]:
def load_dataset(tfrecord_path):
    dataset = tf.data.TFRecordDataset(tfrecord_path)
    dataset = dataset.map(_parse_function)
    return dataset

# Load the datasets
train_raw = load_dataset(f'{tfrecord}train.tfrecord')
dev_raw = load_dataset(f'{tfrecord}dev.tfrecord')
test_raw = load_dataset(f'{tfrecord}test.tfrecord')

train_raw = train_raw.concatenate(dev_raw).concatenate(test_raw)

# Example usage: Iterate over the dataset
for frames, caption in train_raw.take(1):
    print(frames.shape, caption.numpy())

len(list(train_raw))

(142, 59, 3) b'This is all the you know, take off on the idea of the acanthus leaf'


30159

## Tokenizer

### Import tokenizer

In [None]:
model_name = WORKSPACE
tokenizers = tf.saved_model.load(model_name)
tokenizer = tokenizers.en
pad_token_idx = 0

@tf.function
def tokenize_caption(caption):
    caption_sequence = tokenizers.en.tokenize([caption])[0]
    # Truncar ou fazer padding de acordo com o comprimento máximo de 80 tokens
    padded_sequence = caption_sequence[:MAX_TOKENS]
    # Preencher com zeros se a sequência for menor que MAX_TOKENS
    padded_sequence = tf.pad(padded_sequence, paddings=[[0, MAX_TOKENS - tf.shape(padded_sequence)[0]]], constant_values=0)
    return padded_sequence

@tf.function
def tokenize_function(path, caption):
    tokenized_caption = tokenize_caption(caption)
    return path, tokenized_caption

In [None]:
tokenizers.en.get_vocab_size().numpy()

4446

## Landmarks Pre Proces

In [None]:
@tf.function
def spatial_random_affine(keypoints,
                          scale_range = (0.8, 1.2),
                          rotation_range = (-30, 30),
                          shear_range = (-0.15, 0.15),
                          translation_range = (-0.1, 0.1)):

    keypoints = tf.convert_to_tensor(keypoints, dtype=tf.float32)

    # Escala
    if scale_range is not None:
        scale_factor = tf.random.uniform([], scale_range[0], scale_range[1])
        keypoints *= scale_factor

    # Cisalhamento
    if shear_range is not None:
        xy = keypoints[..., :2]
        z = keypoints[..., 2:]
        shear_factor_x, shear_factor_y = tf.random.uniform([], shear_range[0], shear_range[1]),tf.random.uniform([], shear_range[0], shear_range[1])
        if tf.random.uniform([]) < 0.5:
            shear_factor_x = 0.
        else:
            shear_factor_y = 0.
        shear_matrix = tf.convert_to_tensor([
            [1, shear_factor_x],
            [shear_factor_y, 1]
        ], dtype=tf.float32)
        xy = tf.matmul(xy, shear_matrix)
        keypoints = tf.concat([xy, z], axis=-1)

    # Rotação
    if rotation_range is not None:
        xy = keypoints[..., :2]
        z = keypoints[..., 2:]
        center = tf.constant([0.5, 0.5], dtype=tf.float32)
        xy -= center
        angle = tf.random.uniform([], rotation_range[0], rotation_range[1])
        theta = angle * np.pi / 180  # Convertendo de graus para radianos
        c = tf.cos(theta)
        s = tf.sin(theta)
        rotation_matrix = tf.convert_to_tensor([
            [c, -s],
            [s, c]
        ], dtype=tf.float32)
        xy = tf.matmul(xy, rotation_matrix)
        xy += center
        keypoints = tf.concat([xy, z], axis=-1)

    # Translação
    if translation_range is not None:
        translation = tf.random.uniform([1, 3], translation_range[0], translation_range[1])
        keypoints += translation

    return keypoints

@tf.function
def interp1d(x, target_len, method='random'):
    length = tf.shape(x)[1]
    target_len = tf.maximum(1,target_len)
    if method == 'random':
        if tf.random.uniform(()) < 0.33:
            x = tf.image.resize(x, (target_len,tf.shape(x)[1]),'bilinear')
        else:
            if tf.random.uniform(()) < 0.5:
                x = tf.image.resize(x, (target_len,tf.shape(x)[1]),'bicubic')
            else:
                x = tf.image.resize(x, (target_len,tf.shape(x)[1]),'nearest')
    else:
        x = tf.image.resize(x, (target_len,tf.shape(x)[1]),method)
    return x

@tf.function
def resample(x, rate=(0.8,1.2)):
  rate = tf.random.uniform((), rate[0], rate[1])
  length = tf.shape(x)[0]
  new_size = tf.cast(rate*tf.cast(length,tf.float32), tf.int32)
  new_size = interp1d(x, new_size)
  return new_size

### Process

In [None]:
@tf.function
def normalize(keypoints):
    # Flatten the keypoints to 2D and then normalize
    keypoints = tf.reshape(keypoints, [-1, keypoints.shape[1] * keypoints.shape[2]])
    mean, variance = tf.nn.moments(keypoints, [-1], keepdims=True)
    normalized_keypoints = tf.nn.batch_normalization(keypoints, mean, variance, offset=None, scale=None, variance_epsilon=1e-6)
    return normalized_keypoints

@tf.function
def resize_pad(keypoints):
    """
    Resize and pad the keypoints data to have a standard frame length.
    """
    num_frames = tf.shape(keypoints)[0]

    # Flatten the last two dimensions
    keypoints = tf.reshape(keypoints, [num_frames, -1])

    if num_frames < FRAME_LEN:
        padding = [[0, FRAME_LEN - num_frames], [0, 0]]
        keypoints = tf.pad(keypoints, padding)
    else:
        keypoints = keypoints[:FRAME_LEN, :]

    return keypoints

@tf.function
def normalize(keypoints):
    # Flatten the keypoints to 2D and then normalize
    keypoints = tf.reshape(keypoints, [-1, keypoints.shape[1] * keypoints.shape[2]])
    mean, variance = tf.nn.moments(keypoints, [-1], keepdims=True)
    normalized_keypoints = tf.nn.batch_normalization(keypoints, mean, variance, offset=None, scale=None, variance_epsilon=1e-6)

    return normalized_keypoints

@tf.function
def process_data(keypoints, caption):
    keypoints = resize_pad(keypoints)
    return keypoints, caption

@tf.function
def tf_load_and_normalize_keypoints_train(keypoints, caption):
    keypoints = spatial_random_affine(keypoints)
    #keypoints = resample(keypoints)
    keypoints = normalize(keypoints)
    return keypoints, caption

@tf.function
def tf_load_and_normalize_keypoints_non_train(keypoints, caption):
    keypoints = normalize(keypoints)
    return keypoints, caption


In [None]:
# Training pipeline with data augmentation
train_dataset = (train_raw.map(tokenize_function)
                          .map(tf_load_and_normalize_keypoints_train)
                          .map(process_data)
                          .batch(BATCH_SIZE))

# Validation and test pipelines without data augmentation
dev_dataset = (dev_raw.map(tokenize_function)
                       .map(tf_load_and_normalize_keypoints_non_train)
                       .map(process_data)
                       .batch(BATCH_SIZE))

test_dataset = (test_raw.map(tokenize_function)
                        .map(tf_load_and_normalize_keypoints_non_train)
                        .map(process_data)
                        .batch(BATCH_SIZE))


In [None]:
def inspect_dataset_shape(dataset):
    for keypoints, captions in dataset.take(1):
        print("Keypoints Shape:", keypoints.shape)
        print("Captions Shape:", captions.shape)
        return keypoints.shape, captions.shape

# Obtém o próximo lote de dados do conjunto de validação usando o iterador
batch = next(iter(train_dataset))

# Calcula a forma das entradas de dados no lote
INPUT_SHAPE = batch[0].shape[1:]

print("Train Dataset:")
train_shape = inspect_dataset_shape(train_dataset)
print("\nDev Dataset:")
dev_shape = inspect_dataset_shape(dev_dataset)
print("\nTest Dataset:")
test_shape = inspect_dataset_shape(test_dataset)

Train Dataset:
Keypoints Shape: (60, 180, 177)
Captions Shape: (60, 30)

Dev Dataset:
Keypoints Shape: (60, 180, 177)
Captions Shape: (60, 30)

Test Dataset:
Keypoints Shape: (60, 180, 177)
Captions Shape: (60, 30)


## Model

In [None]:
class ECA(tf.keras.layers.Layer):
    def __init__(self, kernel_size=5, **kwargs):
        super().__init__(**kwargs)
        self.supports_masking = True
        self.kernel_size = kernel_size
        self.conv = tf.keras.layers.Conv1D(1, kernel_size=kernel_size, strides=1, padding="same", use_bias=False)

    def call(self, inputs, mask=None):
        """
        Realiza uma operação ECA (Enhanced Channel Attention) em tensores de entrada.

        Args:
            inputs (tf.Tensor): Tensor de entrada.
            mask (tf.Tensor, opcional): Tensor de máscara para suportar sequências com comprimentos diferentes.

        Returns:
            tf.Tensor: Tensor após a operação ECA.
        """
        nn = tf.keras.layers.GlobalAveragePooling1D()(inputs, mask=mask)
        nn = tf.expand_dims(nn, -1)
        nn = self.conv(nn)
        nn = tf.squeeze(nn, -1)
        nn = tf.nn.sigmoid(nn)
        nn = nn[:, None, :]
        return inputs * nn

In [None]:
class CausalDWConv1D(tf.keras.layers.Layer):
    def __init__(self,
                 kernel_size=17,
                 dilation_rate=1,
                 use_bias=False,
                 depthwise_initializer='glorot_uniform',
                 name='',
                 **kwargs):
        super().__init__(name=name, **kwargs)
        # Adiciona uma camada de padding causal à esquerda dos dados de entrada.
        self.causal_pad = tf.keras.layers.ZeroPadding1D((dilation_rate * (kernel_size - 1), 0), name=name + '_pad')
        # Aplica uma convolução depthwise causal à sequência de entrada.
        self.dw_conv = tf.keras.layers.DepthwiseConv1D(
            kernel_size,
            strides=1,
            dilation_rate=dilation_rate,
            padding='valid',
            use_bias=use_bias,
            depthwise_initializer=depthwise_initializer,
            name=name + '_dwconv')
        self.supports_masking = True

    def call(self, inputs):
        # Realiza o padding causal à esquerda dos dados de entrada.
        x = self.causal_pad(inputs)
        # Aplica a convolução depthwise causal.
        x = self.dw_conv(x)
        return x

In [None]:
def Conv1DBlock(channel_size,
                kernel_size,
                dilation_rate=1,
                drop_rate=0.0,
                expand_ratio=2,
                se_ratio=0.25,
                activation='swish',
                name=None):
    '''
    Efetua uma operação de bloco conv1d eficiente.

    Args:
        channel_size (int): Número de canais de saída.
        kernel_size (int): Tamanho do kernel da convolução.
        dilation_rate (int, opcional): Taxa de dilatação para convolução causal. Padrão é 1.
        drop_rate (float, opcional): Taxa de dropout. Padrão é 0.0.
        expand_ratio (int, opcional): Fator de expansão do canal. Padrão é 2.
        se_ratio (float, opcional): Taxa de excitação espacial (SE). Padrão é 0.25.
        activation (str, opcional): Função de ativação. Padrão é 'swish'.
        name (str, opcional): Nome da camada. Padrão é None.

    Returns:
        Callable: Função que aplica o bloco conv1d eficiente.
    '''
    if name is None:
        name = str(tf.keras.backend.get_uid("mbblock"))
    # Fase de expansão
    def apply(inputs):
        channels_in = tf.keras.backend.int_shape(inputs)[-1]
        channels_expand = channels_in * expand_ratio

        skip = inputs

        x = tf.keras.layers.Dense(
            channels_expand,
            use_bias=True,
            activation=activation,
            name=name + '_expand_conv')(inputs)

        # Convolução Depthwise
        x = CausalDWConv1D(kernel_size,
                           dilation_rate=dilation_rate,
                           use_bias=False,
                           name=name + '_dwconv')(x)

        x = tf.keras.layers.BatchNormalization(momentum=0.95, name=name + '_bn')(x)

        x = ECA()(x)

        x = tf.keras.layers.Dense(
            channel_size,
            use_bias=True,
            name=name + '_project_conv')(x)

        if drop_rate > 0:
            x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None, 1, 1), name=name + '_drop')(x)

        if (channels_in == channel_size):
            x = tf.keras.layers.add([x, skip], name=name + '_add')
        return x

    return apply

In [None]:
class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, dim=256, num_heads=4, dropout=0, **kwargs):
        super().__init__(**kwargs)
        # Inicialização da camada de atenção multi-head.
        self.dim = dim
        self.scale = self.dim ** -0.5
        self.num_heads = num_heads
        # Camada densa para calcular as consultas, chaves e valores da atenção.
        self.qkv = tf.keras.layers.Dense(3 * dim, use_bias=False)
        self.drop1 = tf.keras.layers.Dropout(dropout)
        self.proj = tf.keras.layers.Dense(dim, use_bias=False)
        self.supports_masking = True

    def call(self, inputs, mask=None):
        # Calcula consultas, chaves e valores usando a camada densa.
        qkv = self.qkv(inputs)
        # Reorganiza os tensores para a forma necessária para atenção multi-head.
        qkv = tf.keras.layers.Permute((2, 1, 3))(
            tf.keras.layers.Reshape((-1, self.num_heads, self.dim * 3 // self.num_heads))(qkv))
        q, k, v = tf.split(qkv, [self.dim // self.num_heads] * 3, axis=-1)

        # Calcula a atenção.
        attn = tf.matmul(q, k, transpose_b=True) * self.scale

        if mask is not None:
            mask = mask[:, None, None, :]

        # Aplica a função de Softmax à atenção, opcionalmente usando uma máscara.
        attn = tf.keras.layers.Softmax(axis=-1)(attn, mask=mask)
        attn = self.drop1(attn)

        # Calcula o resultado da atenção ponderada.
        x = attn @ v
        # Reorganiza o resultado de volta à forma original.
        x = tf.keras.layers.Reshape((-1, self.dim))(
            tf.keras.layers.Permute((2, 1, 3))(x))
        # Projeta o resultado de volta à dimensão original.
        x = self.proj(x)
        return x

In [None]:
def TransformerBlock(dim=192, num_heads=6, expand=4, attn_dropout=0.2, drop_rate=0.2, activation='swish'):
    """
    Bloco de Transformer personalizado.

    Args:
        dim (int): Dimensão do espaço de características.
        num_heads (int): Número de cabeças de atenção multi-head.
        expand (int): Fator de expansão para a camada densa interna.
        attn_dropout (float): Taxa de dropout para a camada de atenção multi-head.
        drop_rate (float): Taxa de dropout para as camadas de dropout.
        activation (str): Função de ativação para as camadas densas internas.

    Returns:
        Callable: Função que aplica o bloco de Transformer a um tensor de entrada.
    """
    def apply(inputs):
        # Ajusta a dimensão de entrada para `dim`.
        reshaped_inputs = tf.keras.layers.Dense(dim, use_bias=False)(inputs)

        x = reshaped_inputs
        # Normalização por camada antes da camada de atenção.
        x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        # Camada de atenção multi-head.
        x = MultiHeadSelfAttention(dim=dim, num_heads=num_heads, dropout=attn_dropout)(x)
        # Camada de dropout após a atenção.
        x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None, 1, 1))(x)
        # Adição da saída da camada de atenção à entrada ajustada.
        x = tf.keras.layers.Add()([reshaped_inputs, x])
        attn_out = x  # Saída da camada de atenção.

        # Normalização por camada antes das camadas densas internas.
        x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)
        # Primeira camada densa interna.
        x = tf.keras.layers.Dense(dim * expand, use_bias=False, activation=activation)(x)
        # Segunda camada densa interna.
        x = tf.keras.layers.Dense(dim, use_bias=False)(x)
        # Camada de dropout após as camadas densas internas.
        x = tf.keras.layers.Dropout(drop_rate, noise_shape=(None, 1, 1))(x)
        # Adição da saída das camadas densas internas à saída da camada de atenção.
        x = tf.keras.layers.Add()([attn_out, x])
        return x

    return apply

In [None]:
def positional_encoding(maxlen, num_hid):
    """
    Gera a codificação posicional para sequências de entrada.

    Args:
        maxlen (int): Comprimento máximo da sequência.
        num_hid (int): Número de dimensões ocultas para a codificação posicional.

    Returns:
        tf.Tensor: Codifica ção posicional para a sequência de entrada.
    """
    depth = num_hid / 2
    positions = tf.range(maxlen, dtype=tf.float32)[..., tf.newaxis]
    depths = tf.range(depth, dtype=tf.float32)[np.newaxis, :] / depth
    angle_rates = tf.math.divide(1, tf.math.pow(tf.cast(10000, tf.float32), depths))
    angle_rads = tf.linalg.matmul(positions, angle_rates)

    # Calcula as funções trigonométricas para a codificação posicional.
    sin_vals = tf.math.sin(angle_rads)
    cos_vals = tf.math.cos(angle_rads)

    # Concatena as funções seno e cosseno para formar a codificação posicional.
    pos_encoding = tf.concat([sin_vals, cos_vals], axis=-1)
    return pos_encoding

In [None]:
def CTCLoss(labels, logits):
    """
    Calcula a perda CTC (Connectionist Temporal Classification).

    Args:
        labels (tf.Tensor): Rótulos verdadeiros da sequência.
        logits (tf.Tensor): Logits produzidos pelo modelo.

    Returns:
        tf.Tensor: Valor da perda CTC.
    """
    # Certifique-se de que os rótulos estão no formato int32
    #labels = tf.cast(labels, tf.int32)

    # Certifique-se de que os logits estão no formato float32
    #logits = tf.cast(logits, tf.float32)

    # Calcula o comprimento real dos rótulos removendo os tokens de preenchimento.
    label_length = tf.reduce_sum(tf.cast(labels != pad_token_idx, tf.int32), axis=-1)

    # Calcula o comprimento dos logits como o comprimento máximo da sequência de saída.
    logit_length = tf.ones(tf.shape(logits)[0], dtype=tf.int32) * tf.shape(logits)[1]

    # Calcula a perda CTC.
    loss = tf.nn.ctc_loss(
        labels=labels,
        logits=logits,
        label_length=label_length,
        logit_length=logit_length,
        blank_index=pad_token_idx,
        logits_time_major=False  # Os logits não estão no formato "tempo primeiro".
    )

    # Calcula a média da perda.
    loss = tf.reduce_mean(loss)
    return loss

In [None]:
def get_model(dim=384, vocab_size=tokenizers.en.get_vocab_size().numpy()):
    """
    Cria e retorna um modelo de rede neural.

    Args:
        dim (int): Dimensão das camadas do modelo.
        vocab_size (int): Tamanho do vocabulário.

    Returns:
        tf.keras.Model: O modelo de rede neural criado.
    """
    inp = tf.keras.Input(INPUT_SHAPE)
    x = tf.keras.layers.Masking(mask_value=0.0)(inp)  # Máscara para ignorar sequências de preenchimento.
    x = tf.keras.layers.Dense(dim, use_bias=False, name='stem_conv')(x)  # Camada densa inicial.
    pe = tf.cast(positional_encoding(INPUT_SHAPE[0], dim), dtype=x.dtype)  # Codificação posicional.
    x = x + pe  # Adiciona a codificação posicional às entradas.
    x = tf.keras.layers.BatchNormalization(momentum=0.95, name='stem_bn')(x)  # Normalização em lote.

    num_blocks = 6  # Número de blocos no modelo.
    drop_rate = 0.2  # Taxa de dropout.

    for i in range(num_blocks):
        # Camadas de bloco convolucional.
        x = Conv1DBlock(dim, 11, drop_rate=drop_rate)(x)
        x = Conv1DBlock(dim, 5, drop_rate=drop_rate)(x)
        #x = Conv1DBlock(dim, 3, drop_rate=drop_rate)(x)
        # Bloco de transformador.
        x = TransformerBlock(dim, expand=2)(x)

    x = tf.keras.layers.Dense(dim * 2, activation='relu', name='top_conv')(x)  # Camada densa intermediária.
    x = tf.keras.layers.Dropout(0.2)(x)  # Dropout.
    x = tf.keras.layers.Dense(vocab_size, name='classifier')(x)  # Camada de classificação.

    model = tf.keras.Model(inp, x)  # Cria o modelo.

    loss = CTCLoss  # Função de perda CTC.

    # Otimizador Adam com retificação e lookahead.
    optimizer = tf.optimizers.AdamW(learning_rate=4e-4, weight_decay=1e-4)

    model.compile(loss=loss, optimizer=optimizer)  # Compila o modelo.

    return model

In [None]:
tf.keras.backend.clear_session()
model = get_model()
model(batch[0])
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 180, 177)]           0         []                            
                                                                                                  
 masking (Masking)           (None, 180, 177)             0         ['input_1[0][0]']             
                                                                                                  
 stem_conv (Dense)           (None, 180, 384)             67968     ['masking[0][0]']             
                                                                                                  
 tf.__operators__.add (TFOp  (None, 180, 384)             0         ['stem_conv[0][0]']           
 Lambda)                                                                                      

In [None]:
def decode_phrase(pred):
    """
    Decodifica uma previsão da rede neural em uma sequência de tokens.

    Args:
        pred (tf.Tensor): Tensor contendo as previsões da rede neural.
        tokenizer (tf.keras.layers.TextVectorization): Tokenizador.

    Returns:
        List[str]: Lista de tokens decodificados.
    """
    # Obter o índice do token com a maior probabilidade para cada posição
    x = tf.argmax(pred, axis=1)
    # Remover índices consecutivos idênticos
    diff = tf.not_equal(x[:-1], x[1:])
    adjacent_indices = tf.where(diff)[:, 0]
    x = tf.gather(x, adjacent_indices)
    # Remover o índice do token de padding
    mask = x != pad_token_idx
    x = tf.boolean_mask(x, mask, axis=0)
    return x

def decode_batch_predictions(pred, tokenizer):
    """
    Decodifica as previsões da rede neural em uma lista de frases.

    Args:
        pred (List[tf.Tensor]): Lista de tensores contendo as previsões da rede neural.
        tokenizer (Tokenizer): O tokenizador.

    Returns:
        List[str]: Lista de frases decodificadas.
    """
    output_text = []
    for result in pred:
        tokens = decode_phrase(result).numpy()
        sentence = tokenizer.detokenize([tokens])
        sentence = sentence.numpy()[0].decode('utf-8')
        output_text.append(sentence)
    return output_text

In [None]:
class CallbackEval(tf.keras.callbacks.Callback):
    """
    Uma classe de callback para exibir algumas transcrições durante o treinamento.

    Args:
        dataset (tf.data.Dataset): O conjunto de dados de validação para avaliar as transcrições.
        tokenizer (Tokenizer): O tokenizador.
    """

    def __init__(self, dataset, tokenizer):
        super().__init__()
        self.dataset = dataset
        self.tokenizer = tokenizer

    def on_epoch_end(self, epoch: int, logs=None):
        """
        Método chamado no final de cada época durante o treinamento.

        Args:
            epoch (int): O número da época atual.
            logs: Dicionário contendo as métricas de treinamento.
        """
        predictions = []  # Armazena as transcrições previstas
        targets = []  # Armazena as transcrições reais

        # Loop através dos lotes do conjunto de dados de validação
        for batch in self.dataset:
            X, y = batch
            batch_predictions = self.model(X)  # Obtém previsões para o lote
            batch_predictions = decode_batch_predictions(batch_predictions, self.tokenizer)  # Decodifica as previsões
            predictions.extend(batch_predictions)  # Adiciona as previsões à lista
            for label in y:
                # Converte os rótulos em sequências de palavras
                decoded_label = self.tokenizer.detokenize([label])
                targets.append(decoded_label.numpy()[0].decode('utf-8'))  # Adiciona os rótulos à lista de metas

        print("-" * 100)

        # Exibe transcrições de um subconjunto aleatório de exemplos (32 no total)
        for i in range(15):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}, len: {len(predictions[i].split())}")
            print("-" * 100)

## Trainning

In [None]:
# Função de callback para verificar a transcrição no conjunto de validação.
save_dir = '/content/gdrive/MyDrive/h2s'
#validation_callback = CallbackEval(dataset, tokenizer)

# Lista existente de callbacks
#callbacks_list = [
#    validation_callback,
#    wandb.keras.WandbCallback(save_model=True, monitor='val_loss', mode='min')  # Salvar o melhor modelo com base na perda de validação
#]

# Treinamento do modelo
history = model.fit(
    train_dataset,
    validation_data=dev_dataset,
    epochs=N_EPOCHS,
    #callbacks=callbacks_list
)

Epoch 1/120
Epoch 2/120
Epoch 3/120
Epoch 4/120
Epoch 5/120
Epoch 6/120
Epoch 7/120
Epoch 8/120
Epoch 9/120
Epoch 10/120
Epoch 11/120
Epoch 12/120
Epoch 13/120
Epoch 14/120
Epoch 15/120
Epoch 16/120
Epoch 17/120
Epoch 18/120
Epoch 19/120
Epoch 20/120
Epoch 21/120
Epoch 22/120
Epoch 23/120
Epoch 24/120
Epoch 25/120
Epoch 26/120
Epoch 27/120
Epoch 28/120
Epoch 29/120
Epoch 30/120

In [None]:
import datetime
# Get the current timestamp
current_time = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')

model_filename = f'H2S_{current_time}.keras'

# Save the model
model.save(f'{save_dir}/{model_filename}')

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training loss', 'val_loss'])

In [None]:
# Fazer previsões no conjunto de dados de validação (dev_dataset)
predictions_dev  = []  # Armazena as transcrições previstas
targets_dev = []  # Armazena as transcrições reais

# Loop através dos lotes do conjunto de dados de validação
for batch in dev_dataset:
    X, y = batch
    batch_predictions = model(X)  # Obtém previsões para o lote
    batch_predictions = decode_batch_predictions(batch_predictions, tokenizer)  # Decodifica as previsões
    predictions_dev.extend(batch_predictions)  # Adiciona as previsões à lista
    for label in y:
        # Converte os rótulos em sequências de palavras
        decoded_label = tokenizer.detokenize([label]).numpy()[0].decode('utf-8')
        targets_dev.append(decoded_label)  # Adiciona os rótulos à lista de metas

# Exibe transcrições previstas e rótulos reais
for i in range(len(predictions_dev)):
    print(f"Target    : {targets_dev[i]}")
    print(f"Prediction: {predictions_dev [i]}, len: {len(predictions_dev [i].split())}")
    print("-" * 100)

## Computar o Bleu

In [None]:
import collections
import math


def _get_ngrams(segment, max_order):
    """Extracts all n-grams upto a given maximum order from an input segment.

    Args:
      segment: text segment from which n-grams will be extracted.
      max_order: maximum length in tokens of the n-grams returned by this
          methods.

    Returns:
      The Counter containing all n-grams upto max_order in segment
      with a count of how many times each n-gram occurred.
    """
    ngram_counts = collections.Counter()
    for order in range(1, max_order + 1):
        for i in range(0, len(segment) - order + 1):
            ngram = tuple(segment[i:i + order])
            ngram_counts[ngram] += 1
    return ngram_counts


def compute_bleu(reference_corpus, translation_corpus, max_order=4,
                 smooth=False):
    """Computes BLEU score of translated segments against one or more references.

    Args:
      reference_corpus: list of lists of references for each translation. Each
          reference should be tokenized into a list of tokens.
      translation_corpus: list of translations to score. Each translation
          should be tokenized into a list of tokens.
      max_order: Maximum n-gram order to use when computing BLEU score.
      smooth: Whether or not to apply Lin et al. 2004 smoothing.

    Returns:
      3-Tuple with the BLEU score, n-gram precisions, geometric mean of n-gram
      precisions and brevity penalty.
    """
    matches_by_order = [0] * max_order
    possible_matches_by_order = [0] * max_order
    reference_length = 0
    translation_length = 0
    for (references, translation) in zip(reference_corpus,
                                         translation_corpus):
        reference_length += min(len(r) for r in references)
        translation_length += len(translation)

        merged_ref_ngram_counts = collections.Counter()
        for reference in references:
            merged_ref_ngram_counts |= _get_ngrams(reference, max_order)
        translation_ngram_counts = _get_ngrams(translation, max_order)
        overlap = translation_ngram_counts & merged_ref_ngram_counts
        for ngram in overlap:
            matches_by_order[len(ngram) - 1] += overlap[ngram]
        for order in range(1, max_order + 1):
            possible_matches = len(translation) - order + 1
            if possible_matches > 0:
                possible_matches_by_order[order - 1] += possible_matches

    precisions = [0] * max_order
    for i in range(0, max_order):
        if smooth:
            precisions[i] = ((matches_by_order[i] + 1.) /
                             (possible_matches_by_order[i] + 1.))
        else:
            if possible_matches_by_order[i] > 0:
                precisions[i] = (float(matches_by_order[i]) /
                                 possible_matches_by_order[i])
            else:
                precisions[i] = 0.0

    if min(precisions) > 0:
        p_log_sum = sum((1. / max_order) * math.log(p) for p in precisions)
        geo_mean = math.exp(p_log_sum)
    else:
        geo_mean = 0

    ratio = float(translation_length) / reference_length

    if ratio > 1.0:
        bp = 1.
    else:
        bp = math.exp(1 - 1. / ratio)

    bleu = geo_mean * bp

    return bleu, precisions, bp, ratio, translation_length, reference_length

In [None]:
def remove_padding(tokens_list):
    """
    Remove padding tokens (extra spaces) from a list of tokenized sequences.
    """
    return [seq.strip() for seq in tokens_list]


def calculate_bleu_with_padding_removal(predictions, targets, max_order=4):
    """
    Calculate the BLEU score for sequences with padding tokens removed.
    """
    # Remove padding tokens
    cleaned_predictions = remove_padding(predictions)
    cleaned_targets = remove_padding(targets)

    # Tokenize the cleaned sequences
    tokenized_predictions = [prediction.split() for prediction in cleaned_predictions]
    tokenized_targets = [target.split() for target in cleaned_targets]

    # Convert each target into a list containing a single list (as expected by the compute_bleu function).
    reference_corpus = [[target] for target in tokenized_targets]

    # Compute the BLEU score
    bleu_score, precisions, bp, ratio, translation_length, reference_length = compute_bleu(reference_corpus, tokenized_predictions, max_order=max_order, smooth=True)

    return bleu_score, precisions, bp, ratio, translation_length, reference_length

In [None]:
# Calculate BLEU for dev and test sets for BLEU-1 to BLEU-4, with padding tokens removed
bleu_results = {}
for dataset_name, (predictions, targets) in [('dev', (predictions_dev, targets_dev)), ('test', (predictions_test, targets_test))]:
    bleu_results[dataset_name] = {}
    for n in range(1, 5):  # For BLEU-1 to BLEU-4
        bleu_score, precisions, bp, ratio, translation_length, reference_length = calculate_bleu_with_padding_removal(predictions, targets, max_order=n)
        bleu_results[dataset_name][f'BLEU-{n}'] = bleu_score
bleu_results

In [None]:
# Registro no wandb
bleu_4_dev = bleu_results['dev']['BLEU-4']
bleu_4_test = bleu_results['test']['BLEU-4']

wandb.log({
    "BLEU-4/dev": bleu_4_dev,
    "BLEU-4/test": bleu_4_test,
})