In [1]:
# import libraries
import numpy as np
import tensorflow as tf

In [2]:
# load text and covert to lowercase
filename = "alice_in_wonderland.txt"
raw_text = open(filename, 'r', encoding='utf-8').read()
raw_text = raw_text.lower()
raw_text = raw_text[:3000]

In [3]:
# create tokenizer to convert from string to integers
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts([raw_text]) 

# convert text to sequences
sequences = tokenizer.texts_to_sequences([raw_text])[0]

# define parameters
vocab_size = len(tokenizer.word_index) + 1  
seq_length = 20

In [4]:
# prepare the dataset of input to output pairs encoded as integers
input = []
output = []

for i in range(seq_length, len(sequences)):
    input.append(sequences[i-seq_length:i])
    output.append(sequences[i])

In [5]:
# convert to arrays
X = np.array(input)
y = np.array(output)

In [6]:
embed_dim = 256 # embedding dimension
num_heads = 4 # number of attention heads
num_layers = 4  # number of transformer layers

In [13]:
# positions 
def positional_encoding(position, embed_dim):
    angle_rads = np.arange(position)[:, np.newaxis] / np.power(10000, (2 * (np.arange(embed_dim) // 2)) / np.float32(embed_dim))
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    return tf.cast(angle_rads[np.newaxis, ...], dtype=tf.float32)

In [55]:
class MultiHeadSelfAttention(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim # model's dimension
        self.num_heads = num_heads # number of attention heads
        self.dept = embed_dim // num_heads # dimension of each head's vectors
        
        # layers for query, key, and value vectors
        self.wq = tf.keras.layers.Dense(embed_dim)
        self.wk = tf.keras.layers.Dense(embed_dim)
        self.wv = tf.keras.layers.Dense(embed_dim)
        self.dense = tf.keras.layers.Dense(embed_dim) # final classification layer

    # reshape input to have num_heads for multi-head attention
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.dept))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    # finding attention scores
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]

        # create vectors from input sequences
        q = self.wq(inputs)
        k = self.wk(inputs)
        v = self.wv(inputs)

        # split into multiple heads
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        # calculate attention scores from vectors above
        scaled_attention_logits = tf.matmul(q, k, transpose_b=True) / tf.sqrt(tf.cast(self.dept, tf.float32)) # scaling
        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # normalize

        # attention weights x values
        output = tf.matmul(attention_weights, v)
        output = tf.transpose(output, perm=[0, 2, 1, 3])
        output = tf.reshape(output, (batch_size, -1, self.embed_dim))
        return self.dense(output) # return classification layer

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super(TransformerBlock, self).__init__()

        # attention layer and feed forward network
        self.attention = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential([
            tf.keras.layers.Dense(ff_dim, activation='relu'),
            tf.keras.layers.Dense(embed_dim)
        ])

        # normalize and dropout layers
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(0.1)
        self.dropout2 = tf.keras.layers.Dropout(0.1)

    # get outputs for attention and feed forward layers
    def call(self, inputs, training):
        attn_output = self.attention(inputs)
        out1 = self.layernorm1(inputs + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout2(ffn_output, training=training))

class TransformerModel(tf.keras.Model):
    def __init__(self, input_dim, embed_dim, num_heads, num_layers, output_dim):
        super(TransformerModel, self).__init__()
        self.embedding = tf.keras.layers.Embedding(input_dim, embed_dim) # embedding layer
        self.transformer_blocks = [TransformerBlock(embed_dim, num_heads, embed_dim * 4) for _ in range(num_layers)] # transformer blocks
        self.fc = tf.keras.layers.Dense(output_dim) # output layer

    def call(self, x, training=False):
        embedded = self.embedding(x) # convert input to embeddings
        for transformer in self.transformer_blocks:
            embedded = transformer(embedded, training=training) # pass through blocks
        output = tf.reduce_mean(embedded, axis=1) # pooling over sequence length
        return self.fc(output) # final output

In [57]:
# model
model = TransformerModel(vocab_size, embed_dim, num_heads, num_layers, vocab_size)

In [59]:
# compile model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [61]:
# train model
model.fit(X, y, epochs=100, batch_size=32, validation_split=0.2)

Epoch 1/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 117ms/step - accuracy: 0.0371 - loss: 11.5119 - val_accuracy: 0.0467 - val_loss: 14.7607
Epoch 2/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 106ms/step - accuracy: 0.0447 - loss: 11.5693 - val_accuracy: 0.0000e+00 - val_loss: 14.7265
Epoch 3/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 110ms/step - accuracy: 0.0074 - loss: 12.4116 - val_accuracy: 0.0000e+00 - val_loss: 14.9643
Epoch 4/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 113ms/step - accuracy: 0.0075 - loss: 12.0519 - val_accuracy: 0.0000e+00 - val_loss: 15.0955
Epoch 5/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 104ms/step - accuracy: 0.0036 - loss: 13.0864 - val_accuracy: 0.0000e+00 - val_loss: 15.0647
Epoch 6/100
[1m14/14[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 117ms/step - accuracy: 0.0029 - loss: 12.6786 - val_accuracy: 0.0000e+00 - val_loss

<keras.src.callbacks.history.History at 0x3036d0bc0>

In [62]:
# example prediction
input_example = np.array(X[110:111])
predicted = model.predict(input_example)
predicted_word_index = np.argmax(predicted, axis=-1)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 132ms/step


In [63]:
# get the actual
predicted_word = tokenizer.index_word[predicted_word_index[0]]
input_indices = input_example[0]
actual_next_index = y[110] 
actual_next_word = tokenizer.index_word[actual_next_index]

# Show results
print("Input sequence:", [tokenizer.index_word.get(index) for index in input_indices if index > 0])
print("Predicted word:", predicted_word)
print("Actual next word:", actual_next_word)

Input sequence: ['a', 'daisy', 'chain', 'would', 'be', 'worth', 'the', 'trouble', 'of', 'getting', 'up', 'and', 'picking', 'the', 'daisies', 'when', 'suddenly', 'a', 'white', 'rabbit']
Predicted word: dear
Actual next word: with
