The below code is taken from code examples in:
https://keras.io/examples/nlp/text_classification_with_transformer/

The code is modified according to the lecture on Introduction to AI (CSI4106)

In [8]:
import numpy as np
import pandas as pd
from collections import Counter
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization

from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.preprocessing import LabelEncoder

In [9]:
class TransformerLayer(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        
        super(TransformerLayer, self).__init__()
        
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    # the computation
    def call(self, inputs, training):
        
        attn_output = self.att(inputs, inputs)

        attn_output = self.dropout1(attn_output, training=training)
        
        # inputs + attention -> Residual connection
        # layernorm -> normalizes the residual output
        out1 = self.layernorm1(inputs + attn_output)

        # further processing through a set of dense layers with relu activation
        ffn_output = self.ffn(out1)

        ffn_output = self.dropout2(ffn_output, training=training)

        # output from the dense layers combined with the previous normalized input
        # combined input is further normalized
        return self.layernorm2(out1 + ffn_output)

In [10]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):

        super(TokenAndPositionEmbedding, self).__init__()
        
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [11]:
# split the data
df = pd.read_csv('./data/yelp_cleaned.csv')
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.3, random_state=42)

for train_index, test_index in sss.split(df, df['stars']):
    df_train = df.loc[train_index]
    df_test = df.loc[test_index]

print(df_train.shape)
print(df_test.shape)
print(df_train.stars.value_counts()/len(df_train))
print(df_test.stars.value_counts()/len(df_test))

train_x = df_train.clean_text.to_numpy()
train_y = df_train.stars.to_numpy()
test_x = df_test.clean_text.to_numpy()
test_y = df_test.stars.to_numpy()

print(Counter(train_y))
print(Counter(test_y))

lbl_encoder = LabelEncoder()
train_y = lbl_encoder.fit_transform(train_y)
test_y = lbl_encoder.transform(test_y)

print(Counter(train_y))
print(Counter(test_y))

(6524, 4)
(2797, 4)
4    0.356683
5    0.325414
3    0.148835
2    0.093961
1    0.075107
Name: stars, dtype: float64
4    0.356811
5    0.325349
3    0.148731
2    0.094029
1    0.075080
Name: stars, dtype: float64
Counter({4: 2327, 5: 2123, 3: 971, 2: 613, 1: 490})
Counter({4: 998, 5: 910, 3: 416, 2: 263, 1: 210})
Counter({3: 2327, 4: 2123, 2: 971, 1: 613, 0: 490})
Counter({3: 998, 4: 910, 2: 416, 1: 263, 0: 210})


In [12]:
vocab_size = 20000
# token embedding size
embed_dim = 256
# number of attention heads
num_heads = 2
# hidden layer size
ff_dim = 32
# maximum length for text
maxlen = 450

In [13]:
tokenizer = Tokenizer(num_words=vocab_size, oov_token='[UNK]')
tokenizer.fit_on_texts(train_x)

word_index = tokenizer.word_index

# print(word_index)
print(len(word_index))

# converting train data to a sequence
train_x_seq = tokenizer.texts_to_sequences(train_x)
# print(train_x_seq)
train_x_pad = pad_sequences(train_x_seq, maxlen=maxlen, padding="post", truncating="post")

# print(len(train_x_seq[0]))
# print(len(train_x_pad[0]))

# print(train_x_seq[0])
# print(train_x_pad[0])

# converting test data to a sequence
test_x_seq = tokenizer.texts_to_sequences(test_x)
# print(train_x_seq)
test_x_pad = pad_sequences(test_x_seq, maxlen=maxlen, padding="post", truncating="post")


print(train_x_pad.shape)
print(train_y.shape)

23718
(6524, 450)
(6524,)


In [14]:
inputs = layers.Input(shape=(maxlen,))
# embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim)
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerLayer(embed_dim, num_heads, ff_dim)(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)
x = layers.Dense(20, activation="relu")(x)
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(5, activation="softmax")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

In [15]:
model.compile(
    optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
model.fit(
    train_x_pad, train_y, batch_size=32, epochs=2, validation_split=0.3
)

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1fc6b328490>