# IMDB Movie Review Sentiment Analysis
This model will predict sentiments from IMDB movie reviews.
- data is taken from https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz and extracted
- training data is placed in aclImdb; subdivided into train/neg, train/pos, test/neg, and test/pos

In [None]:
# imports
import tensorflow as tf
from tensorflow import keras
from keras import layers
import os, pathlib, shutil, random
import numpy as np

In [None]:
# base, training, and validation directory
base_dir = pathlib.Path("aclImdb")
train_dir = base_dir / "train"
val_dir = base_dir / "val"
test_dir = base_dir / "test"

### ONLY RUN ONCE - Split 20% of the training data into validation data

In [None]:
# the ratio of files to move
rat = 0.2

# loop through each category
for category in ("neg", "pos"):
    # shuffle the files in the category in the training data
    os.makedirs(val_dir / category)
    files = os.listdir(train_dir / category)
    random.Random(1337).shuffle(files)
    num_val = int(rat * len(files))

    # move the first [num_val] into the validation set
    for i in range(num_val):
        shutil.move(train_dir / category / files[i], val_dir / category / files[i])

### Data Preprocessing

In [None]:
batch_size = 32
train_ds = keras.utils.text_dataset_from_directory(train_dir, batch_size=batch_size)
test_ds = keras.utils.text_dataset_from_directory(test_dir, batch_size = batch_size)
val_ds = keras.utils.text_dataset_from_directory(val_dir, batch_size=batch_size)
text_only_train_ds = train_ds.map(lambda x, y: x)

In [None]:
def display_batch(dataset: tf.data.Dataset):
    # taking a look at the inputs and targets in our batch
    for inputs, targets in dataset:
        print(f"Input Shape: {inputs.shape}")
        print(f"Input Type: {inputs.dtype}")
        print(f"Target Shape: {targets.shape}")
        print(f"Target Type: {targets.dtype}")
        print(f"Inputs[0]: {inputs[0]}")
        print(f"Targets[0]: {targets[0]}")
        break
display_batch(train_ds)

## Bag-of-words approach

Arguably easiest, we disregard the order and just look at the "existance" of words

### We start with a 1-gram bag-of-words approach

In [None]:
# create the vectorization layer and add the vocabulary from our training dataset
vocab_size = 20000
text_vectorization = layers.TextVectorization(output_mode="multi_hot", max_tokens=vocab_size)
text_vectorization.adapt(text_only_train_ds)

# vectorize the input in each
binary_1gram_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)
binary_1gram_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)
binary_1gram_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)

display_batch(binary_1gram_train_ds)

In [None]:
def get_model(max_tokens=vocab_size, hidden_dim=16):
    # create a model, compiled and all
    inputs = keras.Input(shape=(max_tokens,))
    x = layers.Dense(hidden_dim, activation="relu")(inputs)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)
    model = keras.Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
    return model

In [None]:
# fit the model
model = get_model()
history = model.fit(binary_1gram_train_ds, epochs=10, validation_data=binary_1gram_val_ds.cache(),
                    callbacks=[keras.callbacks.ModelCheckpoint(filepath="binary_1gram.keras", save_best_only=True)])
model = keras.models.load_model("binary_1gram.keras")
print(model.evaluate(binary_1gram_test_ds))

### Bi (2) gram Text Vectorization

In [None]:
# redefine text vectorization
text_vectorization = layers.TextVectorization(max_tokens=vocab_size, ngrams=2, output_mode="multi_hot")
text_vectorization.adapt(text_only_train_ds)

# remap our data into the encoding
binary_2gram_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)
binary_2gram_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)
binary_2gram_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=tf.data.AUTOTUNE)

display_batch(binary_2gram_train_ds)

In [None]:
model = get_model()
callbacks = [keras.callbacks.ModelCheckpoint(filepath="binary_2gram.keras", save_best_only=True)]
model.fit(binary_2gram_train_ds, validation_data=binary_2gram_val_ds, epochs=10, callbacks=callbacks)
model = keras.models.load_model("binary_2gram.keras")
print(model.evaluate(binary_2gram_test_ds))

### Bigram Count Approach
This time, not only will we take a 2-gram of the words, but also take note of their frequency

In [None]:
# another text vectorization
text_vectorization = layers.TextVectorization(max_tokens=vocab_size, ngrams=2, output_mode="tf_idf")
text_vectorization.adapt(text_only_train_ds)

# map the datasets
tfidf_2gram_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)
tfidf_2gram_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)
tfidf_2gram_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)
display_batch(tfidf_2gram_train_ds)

In [None]:
# train and evaluate the models
model = get_model()
model.fit(tfidf_2gram_train_ds, epochs=10, validation_data=tfidf_2gram_val_ds,
          callbacks=[keras.callbacks.ModelCheckpoint(filepath="tfidf_2gram.keras", save_best_only=True)])
print(model.evaluate(tfidf_2gram_test_ds))

### Inferring a Model

In a production environment, it is best to include the preprocessing into the model itself

In [None]:
inputs = keras.Input(shape=(1,), dtype="string")
processed_input = text_vectorization(inputs)
outputs = model(processed_input)

prod_model = keras.Model(inputs, outputs)

raw_text_data = tf.convert_to_tensor([["That was an excellent movie! I loved it"], ["It was a horrible movie. I hated it"]])
print(raw_text_data)
predictions = prod_model(raw_text_data)
print(predictions)

## Sequential Models
This time, instead of doing feature engineering, we allow a recursive model learn the results themselves

In [None]:
# prepare integer sequences for each word; we will still use data variables from the first attempt
vocab_size = 20000
max_length = 600
text_vectorization = layers.TextVectorization(max_tokens=vocab_size, output_mode="int", output_sequence_length=max_length)
text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)
int_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)
int_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=16)

display_batch(int_train_ds)

### Let's start with a one-hot bidirectional lstm layer

In [None]:
inputs = keras.Input((None,), dtype="int64")
embedded = tf.one_hot(inputs, depth=vocab_size)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
model.fit(int_train_ds, epochs=10, validation_data=int_val_ds,
          callbacks=[keras.callbacks.ModelCheckpoint(filepath="one_hot_bidir_lstm.keras", save_best_only=True)])
model = keras.models.load_model("one_hot_bidir_lstm.keras")
print(model.evaluate(test_ds))

### Adding Embeddings
Embeddings convert our words into a vector - rather than a one_hot, it compresses the data into a much smaller vector
- Semantic relationships become geometric - "closer" words are more similar, different types of relationships can often be represented as a vector translation
- These relationships are learned
- the mask_zero parameter adds a mask
    - Without it, there are a bunch of zeros at the end which will dillute the information that the forward Recursive layer can extract from the actually significant parts of the sentence
    - It propogates through the entire model, each layer has access to the mask
    - lstm's automatically only return the last nonzero result, and other layers will make sure, if a full sequence is returned, to only take the last nonzero (important) word

In [None]:
inputs = keras.Input((max_length,))
embedded = layers.Embedding(input_dim=vocab_size, output_dim=256, mask_zero=True)(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10,
          callbacks=[keras.callbacks.ModelCheckpoint(filepath="embeddings_bidir_gru.keras", save_best_only=True)])
model = keras.models.load_model("embeddings_bidir_gru.keras")
print(model.evaluate(int_test_ds))

### Pretrained Word Embeddings
- Using the GloVe embeddings (taken from https://nlp.stanford.edu/data/glove.6B.zip)

In [None]:
path_to_glove = pathlib.Path("glove.6B/glove.6B.100d.txt")
word_to_embedding = {}
with open(path_to_glove, "rt", encoding="utf8") as f:
    for i in f:
        word, coefs = i.split(maxsplit=1)
        coefs = np.fromstring(coefs, sep=" ")
        assert len(coefs) == 100
        word_to_embedding[word] = coefs

print(f"Found {len(word_to_embedding)} words")

In [None]:
embedding_dim = 100
vocabulary = text_vectorization.get_vocabulary()
word_index = {vocabulary[i]: i for i in range(len(vocabulary))}
embedding_matrix = np.zeros((len(vocabulary), embedding_dim))

for word, i in word_index.items():
    if i < vocab_size and word in word_to_embedding:
        embedding_matrix[i] = word_to_embedding[word]

embedding_layer = layers.Embedding(input_dim=vocab_size, output_dim=embedding_dim, mask_zero=True, embeddings_initializer=keras.initializers.Constant(embedding_matrix), trainable=False)

In [None]:
inputs = keras.Input((None,), dtype="int64")
embedded = embedding_layer(inputs)
x = layers.Bidirectional(layers.LSTM(32))(embedded)
x = layers.Dropout(0.5)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

In [None]:
model.fit(int_train_ds, validation_data=int_val_ds, epochs=10,
callbacks=[keras.callbacks.ModelCheckpoint(filepath="glove_embeddings_sequence_model.keras", save_best_only=True)])
model = keras.models.load_model("glove_embeddings_sequence_model.keras")
print(model.evaluate(int_test_ds))

### Transformer Encoder
- A layer with a MultiHeadAttention layer, Dense projections afterward, normalization, and residual connections
- MultiHeadAttention layers consist of many heads
    - each head stores one kind of "relationship" between words, and everything is concatenated together

In [None]:
class TransformerEncoder(layers.Layer):
    """
    A transformer encoder runs the input through a MultiHeadAttention layer and then a few Dense layers, performing various optimizations
    """

    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        """Transformer __init__ method\n
            embed_dim: the dimension of each word\n
            dense_dim: the dimension of the intermediate Dense layer\n
            num_heads: the number of heads in the multiheadattention layer
        """
        super().__init__(**kwargs)

        # set constants
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads

        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim)])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        """Calls the model; inputs.shape = (batch size, sequence length, embedding size)"""
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        # dense layers are run independently for the last dimension - so it is run for each word in proj_input
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        """Used to get the constructor arguments, so that loading the model will re-initialize this object properly"""
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim
        })
        return config

In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
embedded = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(embedded)
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=[keras.callbacks.ModelCheckpoint("transformer_encoder.keras", save_best_only=True)])

In [None]:
# the custom_objects is required so that they know which class the Custom Layer comes from
model = keras.models.load_model("transformer_encoder.keras", custom_objects={"TransformerEncoder": TransformerEncoder})
print(f"Test Accuracy: {model.evaluate(int_test_ds)[1]:.3f}")

Ok, let's add a positional embedding - alter the embedding, and add an embedding of the current sequence position

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.embed_dim = embed_dim

        self.word_embedding = layers.Embedding(input_dim, embed_dim)
        self.position_embedding = layers.Embedding(sequence_length, embed_dim)
    def call(self, inputs, *args, **kwargs):
        length = tf.shape(inputs)[-1]
        pos = tf.range(start=0, limit=length, delta=1)
        return self.word_embedding(inputs) + self.position_embedding(pos)
    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)
    def get_config(self):
        config = super().get_config()
        config.update({
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
            "embed_dim": self.embed_dim
        })
        return config

In [None]:
sequence_length = 600
inputs = keras.Input(shape=(None,), dtype="int64")
embedded = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(embedded)
x = layers.GlobalMaxPool1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)

model = keras.Model(inputs, outputs)
model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()

model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=[keras.callbacks.ModelCheckpoint("transformer_positional_encoder.keras", save_best_only=True)])

In [None]:
model = keras.models.load_model("transformer_positional_encoder.keras", custom_objects={"TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding})
print(f"Test Accuracy: {model.evaluate(int_test_ds)[1]:.3f}")