<a href="https://colab.research.google.com/github/KCL-Health-NLP/nlp_examples/blob/master/ann/transformer_classification_with_keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Adapted from a [Keras team example](https://keras.io/examples/nlp/text_classification_with_transformer/)

In [None]:
# Basics
import tensorflow as tf
from tensorflow import keras

# Keras package to handle directories of text
from tensorflow.keras.utils import text_dataset_from_directory

# Model layers - we need these!
from tensorflow.keras import layers

# We use these next two when pre-processing string
import string
import re

# For plotting
import matplotlib.pyplot as plt

In [None]:
# How many documents in a batch?
batch_size = 32

# Maximum or padded length (in tokens) of a text sequence
sequence_length = 200

# Maximum number of features in our text vector space.
# i.e. how many different tokens in our vocabulary
max_features = 20000

# Dimensions in text embedding
embedding_dim = 32

# Number of training epochs
epochs = 2

# Prediction threshold, above which an output probability
# will indicate class 1.
#pred_threshold = 0.5

## Build a transformer block

In [None]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

## Build an embedding layer

This will contain two separate embedding layers

* Token embedding
* Token position embedding

In [None]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super().__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

## Build the model

In [None]:
num_heads = 2  # Number of attention heads
ff_dim = 32  # Hidden layer size in feed forward network inside transformer

inputs = layers.Input(shape=(sequence_length,))
embedding_layer = TokenAndPositionEmbedding(sequence_length, max_features, embedding_dim)
x = embedding_layer(inputs)
transformer_block = TransformerBlock(embedding_dim, num_heads, ff_dim)
x = transformer_block(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(2, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

## Dataset

**For the exercise at the end of this notebook, you will need to comment out the below cell**

In [None]:
#(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=max_features)
#print(len(x_train), "Training sequences")
#print(len(x_val), "Validation sequences")
#x_train = keras.utils.pad_sequences(x_train, maxlen=sequence_length)
#x_val = keras.utils.pad_sequences(x_val, maxlen=sequence_length)

## Train

**For the exercise at the end of this notebook, you will need to comment out the below cell**

In [None]:
#model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
#history = model.fit(
#    x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_val, y_val)
#)

# Exercise

* Comment out the above two cells (Dataset and Training.
* Write new code to get the IMDb text dataset from [https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz](https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz)
* Read it in to Keras datasets, one each for training, validation and held out testing.
* Preprocess the text, vectorize it, and use to train the model.
* Evaluate the model against the held out test set.

## Get the text

In [None]:
!curl -O https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar -xf aclImdb_v1.tar.gz

We don't need the ``unsup``` directory of unsupervised training data

In [None]:
# unix command to remove directory recursively
# check it has worked!
!rm -r aclImdb/train/unsup

## Read in the text

In [None]:
# Training data, 80% of the train directory
train_raw = text_dataset_from_directory(
    "aclImdb/train",
    batch_size=batch_size,           # batches for use in future processing
    validation_split=0.2,            # proportion of data to put in dev set
    subset="training",               # which train / val subset is this?
    seed=1337,                       # you need to set the same seed here
                                     # and in the val data to avoid overlap
)

# Validation / dev data - the remaining 20%
val_raw = text_dataset_from_directory(
    "aclImdb/train",
    batch_size=batch_size,
    validation_split=0.2,
    subset="validation",
    seed=1337,
)

# Held-out test data, all of it
test_raw = text_dataset_from_directory(
    "aclImdb/test",
    batch_size=batch_size
)

## Preprocess

In [None]:
# Process text to standardise
def preprocess_text(input_data):

    # lowercase everything
    lowercase = tf.strings.lower(input_data)

    # remove html line breaks
    stripped_html = tf.strings.regex_replace(lowercase, "<br />", " ")

    # remove escaped punctuation characters (e.g. \' and \xc3)
    esc_removed = tf.strings.regex_replace(
        stripped_html, f"[{re.escape(string.punctuation)}]", ""
        )
    
    return esc_removed

## Vectorization layer

In [None]:
# Make a TextVectorization layer, using the preprocess_text
# function that we wrote above.
# output_mode="int" - builds an integer index, each unique
#                     token mapped to an integer
# max_tokens: I think this will restrict the integer index
#             to the given number of most frequent tokens
# output_sequence_length: restrict and pad output to this length
vectorize_layer = layers.TextVectorization(
    standardize=preprocess_text,
    output_mode="int",
    max_tokens=max_features,
    output_sequence_length=sequence_length,
)

# We need to "adapt" the layer to our corpus of texts,
# i.e. fit it to the vocabulary, computing the integer
# mappings. Note this does not vectorize the text,
# just computes the vocabulary

# To do this we first need a text-only dataset with no labels.
# Dataset.map(function) maps the values in a dataset using
# the function. Here we use a simple lambda expression for
# our function
train_texts = train_raw.map(lambda x, y: x)

# Now we can adapt to this text
vectorize_layer.adapt(train_texts)


## Vectorize

In [None]:
# function to use out vectorize_layer to vectorize a text tensor
# and return it with the label tensor
def vectorize_text(text, label):

    # add an innermost (right hand) dimension to the text
    text = tf.expand_dims(text, -1)   
    return vectorize_layer(text), label


# Now we can vectorize the data.
train_clean = train_raw.map(vectorize_text)
val_clean = val_raw.map(vectorize_text)
test_clean = test_raw.map(vectorize_text)

## Improve performance

In [None]:
# Do async prefetching / buffering of the data for best performance on GPU.
train_clean = train_clean.cache().prefetch(buffer_size=10)
val_clean = val_clean.cache().prefetch(buffer_size=10)
test_clean = test_clean.cache().prefetch(buffer_size=10)

## Train

In [None]:
# Fit the model to the training data, validating against our validation
# data on each epoch. Save the results to a History object.
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
history = model.fit(train_clean, validation_data=val_clean, epochs=epochs)

## Plot

In [None]:
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

## Evaluate

In [None]:
score = model.evaluate(test_clean)
print(f"{'Test loss:':16}{score[0]:.2f}")
print(f"{'Test accuracy:':16}{score[1]:.2f}")