# Project 4
## Students:
 > Austin Houston,
 > Alexander Krneta
 
 

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split


print(tf.__version__)# you may want to upgrade to 2.10.0 

2.9.0



## Task 1

In [30]:
class TransformerModel(keras.Model):
    def __init__(self, vocab_size, embed_dim=256, num_heads=2, num_blocks=1, ff_dim=256, maxlen=80, dropout_rate=0.1):
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.maxlen = maxlen
        self.dropout_rate = dropout_rate
        self.num_blocks = num_blocks
        self.embeddings = None
        self.outputs = None

        self.inputs = keras.Input(shape=(self.maxlen, self.embed_dim))


    def EmbeddingLayer(self):
        # Initialize embeddings
        self.token_embedding = layers.Embedding(input_dim=self.vocab_size, output_dim=self.embed_dim, input_length=self.maxlen)
        self.positional_embedding = layers.Embedding(input_dim=self.maxlen, output_dim=self.embed_dim, input_length=self.maxlen, embeddings_initializer=keras.initializers.RandomUniform())
        self.dropout = layers.Dropout(self.dropout_rate)

        position_ids = tf.range(start=0, limit=tf.shape(self.inputs)[-1], delta=1, dtype=tf.int32)
        position_embedding = self.positional_embedding(position_ids)
        token_embedding = self.token_embedding(self.inputs)
        self.embeddings = token_embedding + position_embedding


    def TransformerBlock(self):
        # Multi-Head Attention layer 
        # Sums the input to the block and the output from the first dropout
        attention = layers.MultiHeadAttention(num_heads=self.num_heads, key_dim=self.embed_dim)(self.embeddings, self.embeddings)
        attention = layers.Dropout(rate=self.dropout_rate)(attention)
        attention = layers.LayerNormalization(epsilon=1e-6)(layers.Add()([self.embeddings, attention]))
        
        # Feed-Forward Dense layer
        # Sums the output of the first LayerNormalization and second dropout
        dense = layers.Dense(units=self.ff_dim, activation='relu')(attention)
        dense = layers.Dropout(rate=self.dropout_rate)(dense)
        dense = layers.Dense(units=self.embed_dim)(dense)
        dense = layers.Dropout(rate=self.dropout_rate)(dense)
        dense = layers.LayerNormalization(epsilon=1e-6)(layers.Add()([attention, dense]))

        self.outputs = layers.Dense(units=self.embed_dim)(dense)

    def create_model(self,vocab_size, embed_dim, num_heads, num_blocks, ff_dim, maxlen, dropout_rate):
        
        self.EmbeddingLayer()
        self.TransformerBlock()

        model = tf.keras.models.Model(inputs = self.inputs, outputs=self.outputs)

        # Compile the model with sparse categorical crossentropy loss and Adam optimizer
        model.compile(
            loss='sparse_categorical_crossentropy',
            optimizer=keras.optimizers.Adam(),
            metrics=['accuracy']
        )
        return model

## Task 2

In [9]:
class Dataset():
    def __init__(self, filepath):
        # Object Attributes
        self.text = None
        self.vocab = None
        self.reverse_vocab = None

        # Initialize variable(s)
        with open(filepath, 'r') as f:
            self.text = f.read()


    def prep_text(self):
        self.text = self.text.lower()
        self.text = ''.join([c for c in self.text if c.isalnum() or c.isspace()])
    
    def tokenize_text(self):
        # Turn the text into a list of integers
        self.text = self.text.split()
        unique_words = np.unique(self.text)

        # Create vocab dictionaries
        self.vocab = {w: i+1 for i, w in enumerate(unique_words)}

        # Create reverse vocab dictionary
        self.reverse_vocab = {i+1: w for i, w in enumerate(unique_words)}

        # Convert text to list of integers
        self.text = [self.vocab[w] for w in self.text]
  
    def create_dataset(self):
        self.prep_text()
        self.tokenize_text()

        x = []
        y = []
        for i in range(0, len(self.text) - 1):
            x.append(self.text[i])
            y.append(self.text[i+1])
        
        return x, y, self.vocab, self.reverse_vocab

## Task 3

In [11]:
class GenerateText:
    def __init__(self, model, vocab):
        self.model = model
        self.vocab = vocab
        self.tokenizer = keras.preprocessing.text.Tokenizer(num_words=len(vocab), char_level=True, oov_token='[UNK]')
        self.tokenizer.fit_on_texts(self.vocab)

    def generate_text(self, start_string, num_generate=100, temperature=1.0):
        #generate text using the model and vocab, start with the start_string and generate num_generate words
        # Convert input text to numerical sequence
        input_sequence = self.tokenizer.texts_to_sequences([start_string])[0]

        # Pad sequence to desired length
        input_sequence = keras.preprocessing.sequence.pad_sequences([input_sequence], maxlen=num_generate, truncating='pre')

        # Generate output sequence using the model
        output_sequence = self.model.predict(input_sequence)[0]

        # Apply temperature scaling to the output sequence
        output_sequence = output_sequence / temperature
        output_sequence = output_sequence ** 2
        output_sequence = output_sequence / tf.reduce_sum(output_sequence)

        # Sample the next token from the output distribution
        sampled_token_index = tf.random.categorical(output_sequence, num_samples=1)[-1,0].numpy()

        # Convert the sampled token to its corresponding character
        sampled_char = self.tokenizer.index_word.get(sampled_token_index, '[UNK]')

        # Append the sampled character to the input text and repeat
        output_text = start_string + sampled_char
        while sampled_char != '[UNK]' and len(output_text) < num_generate:
            input_sequence = keras.preprocessing.sequence.pad_sequences([input_sequence], maxlen=num_generate, truncating='pre')
            output_sequence = self.model.predict(input_sequence)[0]
            output_sequence = output_sequence / temperature
            output_sequence = output_sequence ** 2
            output_sequence = output_sequence / tf.reduce_sum(output_sequence)
            sampled_token_index = tf.random.categorical(output_sequence, num_samples=1)[-1,0].numpy()
            sampled_char = self.tokenizer.index_word.get(sampled_token_index, '[UNK]')
            output_text += sampled_char

        return output_text

    def generate_random_text(self, num_generate=100, temperature=1.0):
        return self.generate_text('', num_generate=num_generate, temperature=temperature)


## Task 4: Model Traning and Testing

In [22]:
#Train the model while periodically generating text to show progress
def train_model(model, vocab, x, y, epochs = 10):
    
    # Create a dictionary to map the token IDs to words in the vocabulary
    reverse_vocab = dict((i, word) for word, i in vocab.items())

    # Train the model for the specified number of epochs
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        # Shuffle the training data for each epoch
        
        # Train the model for each training example
        for i in range(len(x)):
            # Get the input and output sequences for the current example
            input_seq = x[i:i+1]
            target_seq = y[i:i+1]
            
            # Convert the input and target sequences to TensorFlow tensors
            input_tensor = tf.convert_to_tensor(input_seq)
            target_tensor = tf.convert_to_tensor(target_seq)
            
            # Generate predictions for the target sequence using the model
            with tf.GradientTape() as tape:
                predictions = model(input_tensor)
                loss = model.compiled_loss(target_tensor, predictions)

            # Compute gradients and update model weights
            gradients = tape.gradient(loss, model.trainable_variables)
            model.optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            
            # Print the loss every 100 steps
            if i % 100 == 0:
                print(f"Step {i}: loss={loss.numpy():.4f}")
            
            # Generate a sample output sequence every 1000 steps
            if i % 1000 == 0:
                output_seq = model.generate_sequence(input_seq)
                output_text = " ".join([reverse_vocab[tok] for tok in output_seq[0]])
                print(f"Sample output: {output_text}\n")


In [31]:
data = Dataset('beatles.txt')
x, y, vocab, reverse_vocab = data.create_dataset()

model = TransformerModel(vocab_size = 100)
model = model.create_model(vocab_size = 100, embed_dim=256, num_heads=2, num_blocks=1, ff_dim=256, maxlen=80, dropout_rate=0.1)
print(model.summary())

train_model(model, vocab, x, y, epochs = 1)

Model: "model_6"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_9 (InputLayer)           [(None, 80, 256)]    0           []                               
                                                                                                  
 tf.compat.v1.shape_6 (TFOpLamb  (3,)                0           ['input_9[0][0]']                
 da)                                                                                              
                                                                                                  
 tf.__operators__.getitem_6 (Sl  ()                  0           ['tf.compat.v1.shape_6[0][0]']   
 icingOpLambda)                                                                                   
                                                                                            

2023-05-06 22:31:45.434793: W tensorflow/core/framework/op_kernel.cc:1745] OP_REQUIRES failed at einsum_op_impl.h:502 : INVALID_ARGUMENT: Expected input 0 to have rank 4 but got: 2


InvalidArgumentError: Exception encountered when calling layer "query" (type EinsumDense).

Expected input 0 to have rank 4 but got: 2 [Op:Einsum]

Call arguments received by layer "query" (type EinsumDense):
  • inputs=tf.Tensor(shape=(1, 256), dtype=float32)


# Report

## Introduction

## Results

## Conclusion

## How to Run Code

Please include any special libraries and list your tf version here.