In [1]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import pad_sequences
from tensorflow.keras.layers import Layer, Embedding, Dense, LayerNormalization, Dropout
import numpy as np

def load_data(path):
    # Use utf-8 encoding which handles a wider range of characters
    # Add error handling strategy (errors='replace' will replace problematic characters)
    with open(path, 'r', encoding='utf-8', errors='replace') as f:
        data = f.read()
    return data    

path = "Book1.txt"  
text = load_data(path).lower()

# Tokenize the text
tokenizer = Tokenizer(oov_token='<OOV>')
tokenizer.fit_on_texts([text])
total_words = len(tokenizer.word_index) + 1

# Convert text to sequences
input_sequences = []
tokens = tokenizer.texts_to_sequences([text])[0]
seq_length = 50

# First seq_length tokens (input): Used for training the model.
# Last token (target): Used as the label the model tries to predict.
# so total of (50 + 1) in one input_sequence index

for i in range(seq_length, len(tokens)):
    input_sequences.append(tokens[i - seq_length:i + 1])

# Pad sequences and split inputs/targets
# after this X will have inputs and y will have label for those inputs
input_sequences = np.array(pad_sequences(input_sequences, maxlen=seq_length + 1, padding='pre'))
X, y = input_sequences[:, :-1], input_sequences[:, -1]

# One-hot encode the labels , note- there are other ways for
# encoding like pre-trained word2vec encoding and so on
y = tf.keras.utils.to_categorical(y, num_classes=total_words)

#Transformer Model

class MultiHeadSelfAttention(Layer):
    def __init__(self, embed_dim, num_heads):
        super(MultiHeadSelfAttention, self).__init__()  # Added missing super() call
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.projection_dim = embed_dim // num_heads

        self.query_dense = Dense(embed_dim)
        self.key_dense = Dense(embed_dim)
        self.value_dense = Dense(embed_dim)
        self.combine_heads = Dense(embed_dim)

    def split_heads(self, x):
        batch_size = tf.shape(x)[0]
        # Split the embedding into multiple heads
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def attention(self, query, key, value, mask=None):
        # Compute attention scores
        scores = tf.matmul(query, key, transpose_b=True)
        scores = scores / tf.sqrt(tf.cast(self.projection_dim, tf.float32))
        
        # Apply causal mask if provided
        if mask is not None:
            scores += (mask * -1e9)
            
        weights = tf.nn.softmax(scores, axis=-1)
        return tf.matmul(weights, value)

    def call(self, inputs, mask=None):
        # For self-attention, use the same input for query, key, and value
        batch_size = tf.shape(inputs)[0]

        query = self.split_heads(self.query_dense(inputs))
        key = self.split_heads(self.key_dense(inputs))
        value = self.split_heads(self.value_dense(inputs))

        # Scaled dot-product attention
        attention_output = self.attention(query, key, value, mask)
        attention_output = tf.transpose(attention_output, perm=[0, 2, 1, 3])
        concatenated = tf.reshape(attention_output, (batch_size, -1, self.embed_dim))
        return self.combine_heads(concatenated)
    
class TransformerBlock(Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = tf.keras.Sequential([
            Dense(ff_dim, activation='relu'),  # Fixed: use ff_dim instead of embed_dim
            Dense(embed_dim)
        ])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, inputs, training=None, mask=None):
        # Pass mask to attention layer
        attention_output = self.attention(inputs, mask=mask)
        attention_output = self.dropout1(attention_output, training=training)
        out1 = self.layernorm1(inputs + attention_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
class TokenAndPositionEmbedding(Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_embedding = Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.position_embedding = Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, inputs):
        seq_length = tf.shape(inputs)[1]
        positions = tf.range(start=0, limit=seq_length, delta=1)
        positions = self.position_embedding(positions)
        return self.token_embedding(inputs) + positions

def create_causal_mask(seq_length):
    """Create a causal mask to prevent attention to future positions"""
    mask = 1 - tf.linalg.band_part(tf.ones((seq_length, seq_length)), -1, 0)
    return mask

# Model Parameters
embed_dim = 128  # Embedding size
num_heads = 4    # Number of attention heads
ff_dim = 512     # Feed-forward layer size
maxlen = seq_length  # max it is 50 defined above

# Build the model
inputs = tf.keras.Input(shape=(maxlen,))
embedding_layer = TokenAndPositionEmbedding(maxlen, total_words, embed_dim)
x = embedding_layer(inputs)
print(f"After embedding: {x.shape}")

# Create causal mask for the sequence
causal_mask = create_causal_mask(maxlen)

transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim)
x = transformer_block(x, training=True, mask=causal_mask)
print(f"After transformer block: {x.shape}")

# Take the last token's output for next word prediction
x = x[:, -1, :]
print(f"After taking last token: {x.shape}")

x = Dense(total_words, activation="softmax")(x)
print(f"Final output shape: {x.shape}")

model = tf.keras.Model(inputs=inputs, outputs=x)

# Compile the model
model.compile(optimizer="adam", loss="categorical_crossentropy", metrics=["accuracy"])

model.summary()

# Optional: Add a text generation function
def generate_text(seed_text, next_words, model, tokenizer, seq_length):
    """Generate text using the trained model"""
    for _ in range(next_words):
        token_list = tokenizer.texts_to_sequences([seed_text])[0]
        token_list = pad_sequences([token_list], maxlen=seq_length, padding='pre')
        
        predicted_probs = model.predict(token_list, verbose=0)
        predicted_id = np.argmax(predicted_probs, axis=-1)[0]
        
        output_word = ""
        for word, index in tokenizer.word_index.items():
            if index == predicted_id:
                output_word = word
                break
        
        seed_text += " " + output_word
    
    return seed_text

# Example usage after training:
# generated_text = generate_text("harry potter", 50, model, tokenizer, seq_length)
# print(generated_text)


After embedding: (None, 50, 128)
After transformer block: (None, 50, 128)
After taking last token: (None, 128)
Final output shape: (None, 6663)


