In [1]:
# # Prepare environment
# %cd ../input/sign-language-components-1/Kaggle-folder
# !pip install keras_preprocessing


import numpy as np
import tensorflow as tf
import pickle
import random
import os
import cv2

from tensorflow import data, GradientTape
from keras import layers, models
from keras.losses import sparse_categorical_crossentropy
from keras.optimizers import Adam
from keras_preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from dataset import Dataset, pre_processing_1
from time import time

In [None]:
# For Kaggle 
dataset_path = "Dataset/phoenix14t.pami0.dev.annotations_only/phoenix14t.pami0.dev.dontsync.annotations_only"
class Dataset:
    def __init__(self) -> None:
        with open(dataset_path, 'rb') as f:
            # annotations: list of object
            # - name: path (train/...)
            # - signer: signer name
            # - gloss: JETZT ...
            # - text: ...
            self.annotations = pickle.load(f)

    def load(self, path="Dataset/videos_phoenix/videos", size: int=10) -> list:
        """
        Return format: List of object: {cap, gloss, text}
        """
        # shuffle all annotations
        random.shuffle(self.annotations)
        count = 0
        data = []

        for obj in self.annotations:
            if count >= size:
                break
        
            vid_path = os.path.join(path, obj["name"]) + ".mp4"
            cap = cv2.VideoCapture(vid_path)
            ret = True
            frames = []

            while ret:
                ret, img = cap.read()
                if ret:
                    frames.append(img)

            # Check if the video exists
            if len(frames) == 0:
                continue

            frames = np.array(frames)

            count += 1
            data.append({'path': vid_path, 'frames': frames, 'gloss': obj["gloss"], "text": obj["text"]})

        return data

**GLOBAL VARIABLE**

In [2]:
heads = 4
d_k = 64
d_v = 64
d_ff = 64
encoder_seq_len = 200
decoder_seq_len = 20
d_model = 128
rate=0.2

epochs = 2
batch_size = 2


In [3]:
# Load dataset
dataset = Dataset()
sample_data = dataset.load(size=20)
tokenizer, sample_encoder_input, sample_decoder_input, _ = pre_processing_1(sample_data, encoder_seq_len, decoder_seq_len)
vocab_size = len(tokenizer.word_index)

X_train, X_test, y_train, y_test = train_test_split(sample_encoder_input, sample_decoder_input, test_size=0.2)

train_dataset = data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.batch(batch_size)

# Define model
training_model = TransformerModel(encoder_seq_len, vocab_size, decoder_seq_len, heads, d_k, d_v, d_ff, d_model, rate)

### Spatial Embedding

In [None]:
class SpatialEmbedding(layers.Layer):
    """
    Convert sequence of frames into sequence of vectors

    Parameters
    --
    x : array_like
        Array with shape (batch_size, seq_len, height, width, channels)

    Return
    --
    output : ndarray
        Tensor with shape (batch_size, seq_len, embedding_dim)
    """
    def __init__(self, seq_len, embedding_dim: int = 64, **kwargs):
        super().__init__(**kwargs)
        self.seq_len = seq_len

        self.conv_2d_1 = layers.Conv2D(16, 3, activation='relu')
        self.max_pooling_1 = layers.MaxPooling2D()
        self.conv_2d_2 = layers.Conv2D(32, 3, activation='relu')
        self.W_1 = layers.Dense(embedding_dim)
        self.flatten = layers.Flatten()

        # position encoding
        self.positional_encoding = self.get_positional_encoding(seq_len, embedding_dim)

    def get_positional_encoding(self, seq_len, d, n=10000):
        
        P = np.zeros((seq_len, d))
        for k in range(seq_len):
            for i in range(int(d / 2)):
                denominator = pow(n, 2 * i / d)
                P[k, 2 * i] = np.sin(k / denominator)
                P[k, 2 * i + 1] = np.cos(k / denominator)
        
        return P

    def call(self, x):
        # x: (batch_size, seq_len, height, width, channels)
        processed_frames = []

        for i in range(self.seq_len):
            frame = x[:, i, :, :, :]
            # Frame shape: (batch_size, height, width, channels)

            # Applying Conv2D
            x1 = self.conv_2d_1(frame)
            x1 = self.max_pooling_1(x1)
            x1 = self.conv_2d_2(x1)

            # Flatten and set final dim = embedding_dim
            x1 = self.flatten(x1)
            x1 = self.W_1(x1)

            processed_frames.append(x1)

        output = tf.stack(processed_frames, axis=1)
        # Output shape: (batch_size, seq_len, embedding_dim)
        output += self.positional_encoding

        return output

### Word Embedding

In [None]:
class WordEmbedding(layers.Layer):
    """
    Convert sequence of tokens into sequence of vector

    Parameters
    --
    x : array_like
        List of token. Shape : (batch_size, seq_len)

    Return
    --
    output : shape (batch_size, seq_len, embedding_dim)
    """
    def __init__(self, seq_len, vocab_size, embedding_dim, **kwargs):
        super().__init__(**kwargs)
        self.embedding = layers.Embedding(vocab_size, embedding_dim)
        self.positional_encoding = self.get_positional_encoding(seq_len, embedding_dim)

    def get_positional_encoding(self, seq_len, d, n=10000):
        P = np.zeros((seq_len, d))
        for k in range(seq_len):
            for i in range(int(d / 2)):
                denominator = pow(n, 2 * i / d)
                P[k, 2 * i] = np.sin(k / denominator)
                P[k, 2 * i + 1] = np.cos(k / denominator)
        
        return P
    
    def call(self, x):
        x = self.embedding(x)
        x += self.positional_encoding

        return x

### Dot Product Attention

In [None]:
class DotProductAttention(layers.Layer):
    """
    Dot-product Attention layer base on `Attention is all you need`
    """
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, queries, keys, values, d_k : int, mask=None):
        """
        Calculate Dot-product Attention
        
        Parameters
        -
        queries : array_like
        keys : array_like
        values : array_like
        d_k: int
        mask: array_like | None
        """
        
        scores = tf.matmul(queries, keys, transpose_b=True) / np.sqrt(d_k)

        if mask is not None:
            scores += -1e9 * mask

        weights = tf.math.softmax(scores)

        return tf.matmul(weights, values)

### Multi Head Attention

In [None]:
class MultiHeadAttention(layers.Layer):
    """
    Multi-Head Attention layers

    Parameters
    -
    heads : int
        Number of heads. `d_k` must be divisible by `heads`
    d_k : int
    d_v : int
    d_model : int
        Final layer unit value
    
    """
    def __init__(self, heads, d_k, d_v, d_model, **kwargs):
        super().__init__(**kwargs)
        self.heads = heads
        self.d_k = d_k
        self.attention = DotProductAttention()
        self.W_q = layers.Dense(d_k)
        self.W_k = layers.Dense(d_k)
        self.W_v = layers.Dense(d_v)
        self.W_o = layers.Dense(d_model)

    def reshape_tensor(self, x, heads, flag):
        """
        Convert tensor shape into multi-head array and reverse
        """
        if flag:
            x = tf.reshape(x, shape=(x.shape[0], x.shape[1], heads, -1))
            x = tf.transpose(x, perm=(0, 2, 1, 3))
        else:
            x = tf.transpose(x, perm=(0, 2, 1, 3))
            x = tf.reshape(x, shape=(x.shape[0], x.shape[1], -1))

        return x
        
    
    def call(self, queries, keys, values, mask=None):
        # Split queries, keys, values into multihead
        queries = self.reshape_tensor(self.W_q(queries), self.heads, True)
        # Output shape: (batch_size, heads, seq_len, -1)

        keys = self.reshape_tensor(self.W_k(keys), self.heads, True)
        # Output shape: (batch_size, heads, seq_len, -1)

        values = self.reshape_tensor(self.W_v(values), self.heads, True)
        # Output shape: (batch_size, heads, seq_len, -1)

        output = self.attention(queries, keys, values, d_k=self.d_k, mask=mask)
        # Output shape: (batch_size, heads, seq_len, -1)

        # Concat output heads
        output = self.reshape_tensor(output, self.heads, False)
        
        return self.W_o(output)

### Model Layer

In [None]:
class AddNormalization(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.layer_norm = layers.LayerNormalization()

    def call(self, x_1, x_2):
        return self.layer_norm(x_1 + x_2)
    

class FeedForward(layers.Layer):
    """
    Feed forward layer with 2 connected NN layers
    """
    def __init__(self, d_ff, d_model, **kwargs):
        super().__init__(**kwargs)
        self.W_1 = layers.Dense(d_ff, activation='relu')
        self.W_2 = layers.Dense(d_model)

    def call(self, x):
        x = self.W_1(x)

        return self.W_2(x)

### Encoder

In [None]:
class EncoderLayer(layers.Layer):
    def __init__(self, heads, d_k, d_v, d_ff, d_model, rate, **kwargs):
        super().__init__(**kwargs)
        self.multi_head_attention = MultiHeadAttention(heads, d_k, d_v, d_model)
        self.dropout_1 = layers.Dropout(rate)
        self.add_norm_1 = AddNormalization()
        self.feed_forward = FeedForward(d_ff, d_model)
        self.dropout_2 = layers.Dropout(rate)
        self.add_norm_2 = AddNormalization()

    def call(self, x, padding_mask, training):
        multihead_output = self.multi_head_attention(x, x, x, mask=padding_mask)
        # Output shape: (batch_size, seq_len, d_model)
        multihead_output = self.dropout_1(multihead_output, training=training)
        multihead_output = self.add_norm_1(x, multihead_output)

        feed_forward_output = self.feed_forward(multihead_output)
        feed_forward_output = self.dropout_2(feed_forward_output, training=training)
        # Output shape: (batch_size, seq_len, d_model)
        feed_forward_output = self.add_norm_2(multihead_output, feed_forward_output)

        return feed_forward_output

# Possible error:
# Only input tensors may be passed as positional arguments.
# The following argument value should be passed as a keyword argument
class Encoder(layers.Layer):
    def __init__(self, seq_len, heads, d_k, d_v, d_ff, d_model, rate, N=6, **kwargs):
        super().__init__(**kwargs)
        self.spatial_embedding = SpatialEmbedding(seq_len, d_model)
        self.encoder_layers = [EncoderLayer(heads, d_k, d_v, d_ff, d_model, rate) for _ in range(N)]

    def call(self, x, padding_mask, training):
        x = self.spatial_embedding(x)

        for i, layer in enumerate(self.encoder_layers):
            x = layer(x, padding_mask=padding_mask, training=training)

        return x

### Decoder

In [None]:
class DecoderLayer(layers.Layer):
    def __init__(self, heads, d_k, d_v, d_ff, d_model, rate, **kwargs):
        super().__init__(**kwargs)
        # Masked multi head attention
        self.multi_head_attention_1 = MultiHeadAttention(heads, d_k, d_v, d_model)
        self.dropout_1 = layers.Dropout(rate)
        self.add_norm_1 = AddNormalization()

        self.multi_head_attention_2 = MultiHeadAttention(heads, d_k, d_v, d_model)
        self.dropout_2 = layers.Dropout(rate)
        self.add_norm_2 = AddNormalization()

        self.feed_forward = FeedForward(d_ff, d_model)
        self.dropout_3 = layers.Dropout(rate)
        self.add_norm_3 = AddNormalization()

    def call(self, x, encoder_output, lookahead_mask, padding_mask, training):
        multihead_output_1 = self.multi_head_attention_1(x, x, x, mask=lookahead_mask)
        # Output shape: (batch_size, seq_len, d_model)
        multihead_output_1 = self.dropout_1(multihead_output_1, training=training)
        multihead_output_1 = self.add_norm_1(multihead_output_1, x)
        # Output shape: (batch_size, seq_len, d_model)

        multihead_output_2 = self.multi_head_attention_2(multihead_output_1, encoder_output, encoder_output, mask=padding_mask)
        # Output shape: (batch_size, seq_len, d_model)
        multihead_output_2 = self.dropout_2(multihead_output_2)
        multihead_output_2 = self.add_norm_2(multihead_output_2, multihead_output_1)
        # Output shape: (batch_size, seq_len, d_model)

        feed_forward_output = self.feed_forward(multihead_output_2)
        feed_forward_output = self.dropout_3(feed_forward_output)
        feed_forward_output = self.add_norm_3(feed_forward_output, multihead_output_2)
        # Output shape: (batch_size, seq_len, d_model)

        return feed_forward_output
    

class Decoder(layers.Layer):
    def __init__(self, vocab_size, seq_len, heads, d_k, d_v, d_ff, d_model, rate, N=6, **kwargs):
        super().__init__(**kwargs)
        self.word_embedding = WordEmbedding(seq_len, vocab_size, d_model)
        self.decoder_layers = [DecoderLayer(heads, d_k, d_v, d_ff, d_model, rate) for _ in range(N)]

    def call(self, output_target, encoder_output, lookahead_mask, padding_mask, training):
        x = self.word_embedding(output_target)

        for i, layer in enumerate(self.decoder_layers):
            x = layer(x, encoder_output, lookahead_mask=lookahead_mask, padding_mask=padding_mask, training=training)

        return x

### Transformer

In [None]:
class TransformerModel(models.Model):
    def __init__ (self, encoder_seq_len, vocab_size, decoder_seq_len, heads, d_k, d_v, d_ff, d_model, rate, **kwargs):
        super().__init__(**kwargs)

        self.encoder = Encoder(encoder_seq_len, heads, d_k, d_v, d_ff, d_model, rate)
        self.decoder = Decoder(vocab_size, decoder_seq_len, heads, d_k, d_v, d_ff, d_model, rate)

        self.linear = layers.Dense(vocab_size, activation='softmax')

    def encoder_padding_mask(self, input):
        mask = tf.math.equal(input, 0)
        
        # Mask shape: (batch_size, seq_len, height, width, channels)
        mask = tf.reshape(mask, (mask.shape[0], mask.shape[1], -1))
        mask = tf.reduce_all(mask, -1)
        mask = tf.expand_dims(mask, axis=1)
        mask = tf.cast(mask, tf.float32)
        # Output shape (batch_size, 1, seq_len)
        return mask

    def lookahead_mask(self, shape):
        mask = 1 - tf.linalg.band_part(np.ones((shape, shape)), -1, 0)
        return mask
    
    def call(self, encoder_input, decoder_input, training):
        encoder_padding_mask = self.encoder_padding_mask(encoder_input)
        decoder_lookahead_mask = self.lookahead_mask(decoder_input.shape[-1])

        # For encoder, the dot-product shape is (batch_size, heads, encoder_seq_len, -1)
        # -> the mask shape should be (batch_size, 1, encoder_seq_len, 1)
        encoder_output = self.encoder(encoder_input, tf.expand_dims(encoder_padding_mask, axis=-1), training=training)

        # For decoder, the dot-product shape is (batch_size, heads, decoder_seq_len, encoder_seq_len)
        # -> the mask shape should be (batch_size, 1, 1, encoder_seq_len)
        decoder_output = self.decoder(decoder_input, encoder_output, decoder_lookahead_mask, tf.expand_dims(encoder_padding_mask, axis=1), training=training)

        model_output = self.linear(decoder_output)

        return model_output

### Train Model

In [4]:
optimizer = Adam()

In [5]:
def loss_fcn(target, prediction):
    mask = tf.math.not_equal(target, 0)
    mask = tf.cast(mask, tf.float32)
    
    loss = sparse_categorical_crossentropy(target, prediction) * mask

    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

In [6]:
def accuracy_fcn(target, prediction):
    mask = tf.math.not_equal(target, 0)

    accuracy = tf.equal(target, tf.argmax(prediction, axis=-1))
    accuracy = tf.logical_and(accuracy, mask)

    mask = tf.cast(mask, tf.float32)
    accuracy = tf.cast(accuracy, tf.float32)

    return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)

In [7]:
def train_step(encoder_input, decoder_input, decoder_output):
    with GradientTape() as tape:
        prediction = training_model(encoder_input, decoder_input, training=True)

        loss = loss_fcn(decoder_output, prediction)

        accuracy = accuracy_fcn(decoder_output, prediction)

    gradient = tape.gradient(loss, training_model.trainable_weights)

    optimizer.apply_gradient(zip(gradient, training_model.trainable_weights))

    return loss, accuracy  

In [None]:
for epoch in range(epochs):
    train_loss = []
    train_accuracy = []

    print(f"\nStart of epoch: {epoch + 1} \n")

    start = time()

    for step, (train_batchX, train_batchY) in enumerate(train_dataset):
        decoder_output = pad_sequences(train_batchY[:, 1:], decoder_seq_len)
        print('1')
        loss, accuracy = train_step(train_batchX, train_batchY, decoder_output)
        print('2')

        train_loss.append(loss)
        train_accuracy.append(accuracy)

        if step % 5:
            print(f"\nEpoch: {epoch + 1} - Step: {step + 1}" + 
                  f"\n\t Training loss: {tf.reduce_mean(train_loss)}" + 
                  f"\n\t Training accuracy: {tf.reduct_mean(train_accuracy)}")
            
    print(f"\nEpoch: {epoch + 1}" + 
            f"\n\t Training loss: {tf.reduce_mean(train_loss)}" + 
            f"\n\t Training accuracy: {tf.reduct_mean(train_accuracy)}")
    
    print(f"\nTime taken: {time() - start}")