In [None]:
# pipenv install --python 3.10

# TODO:
# using pretrain embedding with transformer on smaller dataset
# using depth-wise-separable 1D convolution based model

In [36]:
# !pipenv install keras-nlp

%pip install -q --upgrade keras-nlp
%pip install -q --upgrade keras  # Upgrade to Keras 3.

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


**Setup**

In [1]:
import numpy as np 
import keras
from keras import layers
import keras_nlp
from keras_nlp import layers as nlp_layers
from pathlib import Path

**Download the data**

In [2]:
keras.utils.get_file(
    origin="https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz",
    cache_dir="./",
    extract=True
)

imdb_dir = Path("./datasets/aclImdb")

Downloading data from https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
[1m84125825/84125825[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 0us/step


In [2]:
!tree -d datasets/aclImdb/

[01;34mdatasets/aclImdb/[0m
├── [01;34mtest[0m
│   ├── [01;34mneg[0m
│   └── [01;34mpos[0m
├── [01;34mtrain[0m
│   ├── [01;34mneg[0m
│   └── [01;34mpos[0m
└── [01;34mvalidation[0m
    ├── [01;34mneg[0m
    └── [01;34mpos[0m

10 directories


remove unsupervised training data, we don't need that here

In [16]:
# !rm -r datasets/aclImdb/train/unsup

In [3]:
!tree -d datasets/aclImdb/

[01;34mdatasets/aclImdb/[0m
├── [01;34mtest[0m
│   ├── [01;34mneg[0m
│   └── [01;34mpos[0m
├── [01;34mtrain[0m
│   ├── [01;34mneg[0m
│   └── [01;34mpos[0m
└── [01;34mvalidation[0m
    ├── [01;34mneg[0m
    └── [01;34mpos[0m

10 directories


quick look at one review

In [4]:
!cat datasets/aclImdb/train/neg/21_4.txt

What was with all the Turkish actors? No offense but I thought it was all for nothing for all these actors. The film had no script to test any actors acting skill or ability. It demanded next to nothing I bought this film to see Michael Madsen. He is one of my favorite actors but this film was another failure for him. The script was so bad. Their was just nothing to sink your teeth into and all the characters were two dimensional. Madsen tried to act like a hard ass but the script and direction didn't even allow him to do enough with his character to make it more interesting or 3 dimensional.<br /><br />Even the sound effects of the gunfight at the beginning of the film sounded like the noise of paint ball guns when they are fired in a skirmish. It was really weird and they didn't sound like real guns. A video game had better sound effects than this film. There was also a really annoying bloke at the beginning of the film who was a member of the robbery gang. He had this American whini

prepare validation set

In [19]:
import os, shutil, random

validation_dir = imdb_dir / "validation"
validation_dir.mkdir()
train_dir = imdb_dir / "train"
for category in ("neg", "pos"):
    (validation_dir / category).mkdir()
    files = os.listdir(train_dir / category)
    random.Random(1234).shuffle(files)  # use seed to ensure same dataset through different runs
    num_validation_samples = int(0.2 * len(files))
    validation_files = files[-num_validation_samples:]
    for file in validation_files:
        shutil.move(train_dir / category / file,
                    validation_dir / category / file)


In [5]:
batch_size = 32

# 0 for negative, 1 for positive
train_dataset = keras.utils.text_dataset_from_directory(
    "datasets/aclImdb/train", batch_size=batch_size)
validation_dataset = keras.utils.text_dataset_from_directory(
    "datasets/aclImdb/validation", batch_size=batch_size)
test_dataset = keras.utils.text_dataset_from_directory(
    "datasets/aclImdb/test", batch_size=batch_size)

Found 20000 files belonging to 2 classes.
Found 5000 files belonging to 2 classes.
Found 25000 files belonging to 2 classes.


take a look at the batch data

In [6]:
for inputs, targets in train_dataset:
    print("inputs.shape: ", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("targets.shape:", targets.shape)
    print("targets.dtype:", targets.dtype)
    print("inputs[0]:", inputs[0])
    print("targets[0]:", targets[0])
    break


inputs.shape:  (32,)
inputs.dtype: <dtype: 'string'>
targets.shape: (32,)
targets.dtype: <dtype: 'int32'>
inputs[0]: tf.Tensor(b'An old family story told to two young girls by their grandfather is brought to life 16 years later as he foretold.<br /><br />People are getting murdered and blood is being spilled and rats are scampering all over and naked bodies are being enjoyed.<br /><br />Kitty (Barbara Bouchet) is the suspect, but we know she is not the killer. Is it Franziska (Marina Malfatti)? Is it Evelyn back from death for revenge? Is it a plot to steal an inheritance? The color is superb in this thriller from Emilio Miraglia, who only did one other Giallo, as far as I know.<br /><br />The only thing that spoiled the film was the appearance that several frames were cut out. Someone calls the police, and suddenly they are there trying to save Kitty.', shape=(), dtype=string)
targets[0]: tf.Tensor(1, shape=(), dtype=int32)


prepare a text vectorization layer

In [7]:
text_only_train_dataset = train_dataset.map(lambda x, y: x)  # do not need labels to train the text vectorization layer

max_length = 600
max_tokens = 20_000
text_vectorization = layers.TextVectorization(
    max_tokens=max_tokens,
    output_mode="int",
    output_sequence_length=max_length)

text_vectorization.adapt(text_only_train_dataset)

2024-03-22 17:16:56.918696: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


prepare integer sequence datasets 

In [8]:
int_train_dataset = train_dataset.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_validation_dataset = validation_dataset.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)
int_test_dataset = test_dataset.map(
    lambda x, y: (text_vectorization(x), y),
    num_parallel_calls=4)

use keras built-in transformer encoder for text classification

In [None]:
# !pipenv update keras_nlp
# !pipenv update keras 

In [34]:
encoder = nlp_layers.T

AttributeError: module 'keras_nlp.layers' has no attribute 'TransformerDecoder'

transformer encoder implemented as a subclassed Layer

In [5]:
import tensorflow as tf
from keras import layers


class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_projection = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim)])
        self.layer_normalization_1 = layers.LayerNormalization()
        self.layer_normalization_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[: tf.newaxis, :]  # FIXME: use keras ops
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        projection_input = self.layer_normalization_1(inputs + attention_output)
        projection_output = self.dense_projection(projection_input)
        return self.layer_normalization_2(projection_input + projection_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim
        })
        return config


use the transformer encoder for text classification

In [6]:
vocabulary_size = 20_000
embed_dimension = 256
num_heads = 2
dense_layer_dimension = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = layers.Embedding(input_dim=vocabulary_size,
                     output_dim=embed_dimension
                     )(inputs)
x = TransformerEncoder(embed_dim=embed_dimension,
                       dense_dim=dense_layer_dimension,
                       num_heads=num_heads
                       )(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

train and evaluate the transformer encoder model

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint(
        "trained_models/transformer_encoder.keras",
        save_best_only=True)]

model.fit(int_train_dataset,
          validation_data=int_validation_dataset,
          epochs=20,
          callbacks=callbacks)

model = keras.models.load_model(
    "trained_models/transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder})

print(f"Test accuracy: {model.evaluate(int_test_dataset)[1]:.3f}")

implement position and token embedding as a subclassed layer

In [14]:
from keras import ops

class PositionAndTokenEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim, mask_zero=True)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim, mask_zero=True)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim
    
    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(start=0, stop=length, step=1)

        # positions = tf.range(start=0, limit=length, delta=1)

        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        # return embedded_tokens + embedded_positions
        return ops.add(embedded_tokens, embedded_positions)

    # def compute_mask(self, inputs, mask=None):
    #     # keras.ops.not_equal(x1, x2)
    #     # return tf.math.not_equal(inputs, 0)
    #     return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config
    

combine token-position embedding with transformer

In [15]:
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionAndTokenEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop",
              loss="binary_crossentropy",
              metrics=["accuracy"])
model.summary()

In [None]:
callbacks = [
    keras.callbacks.ModelCheckpoint(
        "trained_models/full_transformer_encoder.keras",
        save_best_only=True)]

model.fit(int_train_dataset, 
          validation_data=int_validation_dataset,
          epochs=20,
          callbacks=callbacks)

model = keras.models.load_model(
    "trained_models/full_transformer_encoder.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionAndTokenEmbedding": PositionAndTokenEmbedding})

print(f"full transformer test accuracy: {model.evaluate(int_test_dataset)[1]:.3f}")