# Transformers

In [None]:
import tensorflow as tf
import keras
from keras import layers

class TransformerEncoder(layers.Layerayer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim)]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs = attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)
    
    def get_config(self):
        config = super().get_config()
        config.update((
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim
        ))
        return config


SyntaxError: invalid syntax (1026288560.py, line 30)

In [None]:
import os, pathlib, shutil, random
base_dir = pathlib.Path("../Data/aclImdb")
val_dir = base_dir / "val"
train_dir = base_dir / "train"
batch_size=32

train_ds = keras.utils.text_dataset_from_directory(train_dir, batch_size=batch_size)
val_ds = keras.utils.text_dataset_from_directory(val_dir, batch_size=batch_size)
test_ds = keras.utils.text_dataset_from_directory(base_dir / "test", batch_size=batch_size)

In [None]:
text_vectorization = layers.TextVectorization(max_tokens=20000, output_mode="multi_hot")
text_only_train_ds = train_ds.map(lambda x, _: x)
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y))
int_1gram_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y))
int_1gram_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y))

In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None, ), dtype="int64")
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint("transform_encoder,keras", save_best_only=True)
]

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)

In [None]:
model = keras.models.load_model("transformer_encoder.keras", custom_objects=("TransformerEncoder": TransformerEncoder))
print(f"Test acc: {model.evaluate()}")

In [None]:
class MyLayer(keras.Layer):
    def call(self, x):
        return tf.math.not_equal()

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_emmbeddings = layers.Embedding(input_dim=input_dim, output_ddim=output_dim)
        self.positional_embeddings = layers.Embedding(input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.positional_embeddings(positions)
        return embedded_tokens + embedded_positions
    
    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    
    def get_config(self):
        config = super().get_config()
        config.update((
            "output_dim": self.output_dim,
            "input_dim": self.input_dim,
            "sequence_length": self.sequenceLength
        ))
        return config

In [None]:
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length=sequence_length, input_dim=vocab_size, output_dim=embed_dim)(inputs)
x = TransformerEncoder(embed_dim=embed_dim, dense_dim=dense_dim, num_heads=num_heads)(x)
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="relu")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras", save_best_only=True)
]
model.fit(int_train_ds, validation_data=int_1gram_val_ds, epochs=20, callbacks=callbacks)
model = keras.models.load_model("full_transformer_encoder.keras", custom_objects=("TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding))
print(f"Test acc: {model.evaluate(int_1gram_test_ds)[1]:.3f}")