# Project 4
## Students:
 - Bryson Gullett
 - Robert Schaffer
 - Matthew Dixson
 

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow import optimizers
from tensorflow.keras import layers
import numpy as np
import re
import random as rand

In [2]:
print(tf.__version__)# you may want to upgrade to 2.10.0

2.12.0


### Please Use Markdown
> for markdown, see here: https://www.ibm.com/docs/en/watson-studio-local/1.2.3?topic=notebooks-markdown-jupyter-cheatsheet

## Task 1

In [3]:
class TransformerModel():
    def __init__(self, vocab_size, embed_dim=256, num_heads=2, num_blocks=1, ff_dim=256, maxlen=80, rate=0.1):
        #initailize variables
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.num_blocks = num_blocks
        self.ff_dim = ff_dim
        self.maxlen = maxlen
        self.rate = rate

    def TransformerBlock(self, inputs):
        #create the transformer block as discribed in the writeup, use the Keras functional API (https://keras.io/guides/functional_api/)
        #MultiHeadAttention layer, specifiy 'use_causal_mask=True' (https://keras.io/api/layers/attention_layers/multi_head_attention/)
        #LayerNormalization layer, specifiy 'epsilon=1e-6' (https://keras.io/api/layers/normalization_layers/layer_normalization/)
        #Use the rate variable for the dropout layers and remember to use two dense layers
        #See assignment and its figures for more details.
        multihead_atten = layers.MultiHeadAttention(num_heads=self.num_heads, key_dim=self.num_heads, dropout=self.rate)(inputs, inputs, use_causal_mask=True)
        add1 = layers.Add()([multihead_atten, inputs])
        layer_norm1 = layers.LayerNormalization(epsilon=1e-6)(add1)
        dense1 = layers.Dense(self.ff_dim, activation='relu')(layer_norm1)
        dense2 = layers.Dense(self.ff_dim, activation='relu')(dense1)
        dropout1 = layers.Dropout(self.rate)(dense2)
        add2 = layers.Add()([dropout1, layer_norm1])
        layer_norm2 = layers.LayerNormalization(epsilon=1e-6)(add2)

        return layer_norm2
    
    def EmbeddingLayer(self, inputs):
        #create the embedding layer
        #create (1) an embedding for the tokens and (2) an embedding for the positions
        #you can use https://keras.io/api/layers/core_layers/embedding/ Embedding class
        #you can use tf.range to enocde positions
        #add (1) and (2) and return the layer
        positions = layers.Lambda(lambda x: tf.multiply(tf.ones_like(x, dtype='int32'), tf.range(self.maxlen)))(inputs)
        token_embeddings = layers.Embedding(self.vocab_size, self.embed_dim, input_length=self.maxlen)(inputs)
        position_embeddings = layers.Embedding(self.maxlen, self.embed_dim, input_length=self.maxlen)(positions)

        add = layers.Add()([token_embeddings, position_embeddings])
        return add
    
    def create_model(self):
        #combine the EmbeddingLayer and num_blocks TransformerBlocks to create the model, use the Keras functional API (https://keras.io/guides/functional_api/)
        #use the SparseCategoricalCrossentropy loss function (https://keras.io/api/losses/probabilistic_losses/#sparsecategoricalcrossentropy-class)
        inputs = keras.Input(shape=(self.maxlen,))
        embedding_layer = self.EmbeddingLayer(inputs)
        prev_layer = embedding_layer
        for _ in range(self.num_blocks):
            transformer_block = self.TransformerBlock(prev_layer)
            prev_layer = transformer_block
        final_dense = layers.Dense(self.vocab_size, activation='softmax')(prev_layer)
        model = keras.Model(inputs=inputs, outputs=final_dense)
        self.opt = optimizers.Adam(learning_rate=self.rate)
        self.loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        model.compile(loss=keras.losses.SparseCategoricalCrossentropy, optimizer=self.opt)
        return model
    
my_model = TransformerModel(1000)
print(my_model.create_model().summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 80)]         0           []                               
                                                                                                  
 lambda (Lambda)                (None, 80)           0           ['input_1[0][0]']                
                                                                                                  
 embedding (Embedding)          (None, 80, 256)      256000      ['input_1[0][0]']                
                                                                                                  
 embedding_1 (Embedding)        (None, 80, 256)      20480       ['lambda[0][0]']                 
                                                                                              

## Task 2

In [4]:
class DataSet():
    def __init__(self, filename, len):
        #load the text from the file
        self.text = ''
        self.len = len
        with open(filename) as fin:
            for line in fin:
                self.text += line

    def prep_text(self):
        #remove all punctuation, set to lowercase, remove duplicate spaces and other whitespace (keep newlines)
        self.text = re.sub(r'[^\w\s]', ' ', self.text)
        self.text = re.sub(r' +', ' ', self.text)
        self.text = re.sub(r'â', '', self.text)
        
        
        
        
    def tokenize_text(self):
        #seperate into words, create a vocab and convert the text to a list of numbers using the vocab such that each unique word is represented by its own number number
        self.text = self.text.split()
        self.vocab = np.unique(self.text)
        self.vocab = np.append(self.vocab, ['PAD'])
        self.vocab_nums = [i for i in range(len(self.vocab))]
        self.vocab_dict = dict(zip(self.vocab, self.vocab_nums))
        

    def create_dataset(self):
        #split the tokenized data into sequences of length len, return the sequences and vocab
        self.prep_text()
        self.tokenize_text()
        X = []
        Y = []
        for i in range(int(len(self.text)/self.len)):
            sequence_X = []
            sequence_Y = []
            for j in range(self.len):
                if i*self.len+j < len(self.text):
                    sequence_X.append(self.vocab_dict[self.text[i*self.len+j]])
                else:
                    pass
                    #sequence_X.append(self.vocab_dict['PAD'])
                if i*self.len+j+1 < len(self.text):
                    #y = np.zeros(len(self.vocab))
                    #y[self.vocab_dict[self.text[i*self.len+j+1]]] = 1
                    #y = [0 if self.vocab_dict[self.text[i*self.len+j+1]] != k else 1 for k in range(len(self.vocab))]
                    sequence_Y.append(self.vocab_dict[self.text[i*self.len+j+1]])
                else:
                    #y = np.zeros(len(self.vocab))
                    #y[self.vocab_dict['PAD']] = 1
                    #y = [0 if self.vocab_dict['PAD'] != k else 1 for k in range(len(self.vocab))]
                    #sequence_Y.append([self.vocab_dict['PAD']])
                    pass
            X.append(sequence_X)
            Y.append(sequence_Y)
        return X, Y, self.vocab

## Task 3

In [5]:
class GenerateText():
    def __init__(self, model, vocab, vocab_dict):
        self.model = model
        self.vocab = vocab
        self.vocab_dict = vocab_dict
    
    def generate_text(self, start_string, num_generate=100, seq_len=80):
        #generate text using the model and vocab, start with the start_string and generate num_generate words
        model_input = [self.vocab_dict['PAD'] for _ in range(seq_len)]
        
        #Text is the output of generate_text()
        text = start_string
        #Tokenize string to encode into integers
        start_string = start_string.split()
        #Get starting index for first word generated
        next_index = len(start_string)
        #Generate model input
        for i in range(len(start_string)):
            model_input[i] = self.vocab_dict[start_string[i]]
        
        #Generate text
        for i in range(num_generate):
            prediction = self.model.predict(np.array([model_input]), verbose=0)
            # print(len(prediction))
            # print(len(prediction[0]))
            # print(len(prediction[0][0]))
            word_index = np.argmax(prediction[0][next_index])
            next_index += 1
            if word_index >= len(self.vocab):
                print('Invalid word index: ', word_index, "Valid max word index = ", len(self.vocab))
                word_index = self.vocab_dict['PAD']
            if next_index < seq_len:
                model_input[next_index] = word_index
            else:
                model_input.pop(0)
                model_input.append(word_index)
                next_index -= 1
          
            next_word = self.vocab[word_index]
            text += ' ' + next_word
            # print(model_input)
        return text
    
    def generate_random_text(self, num_generate=100):
        text = ''
        for _ in range(num_generate):
            word = self.vocab[rand.randrange(len(self.vocab))]
            text += word + ' '
            if word == 'PAD':
                break
        return text   

In [6]:
data = DataSet('beatles.txt', 80)
_, _, vocab = data.create_dataset()
transformer = TransformerModel(len(vocab))
model = transformer.create_model()
text = GenerateText(model, vocab, data.vocab_dict)
out = text.generate_text("i read a crowd of people", num_generate=5)
print(out)
out = text.generate_random_text(num_generate=5)
print(out)
# print(len(out.split()))

i read a crowd of people edgar anymore eagle carry you
holiday most playin jay friend 


## Task 4: Model Traning and Testing

In [7]:
#Train the model while periodically generating text to show progress
# Used Keras' example on how to write a training loop
# https://keras.io/guides/writing_a_training_loop_from_scratch
def train_model(model, vocab, X, Y, epochs=50):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    for i in range(epochs):
        
        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(X, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(Y, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)
       
        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        model.optimizer.apply_gradients(zip(grads, model.trainable_weights))



        # model.fit(x,y)
        text = GenerateText(model, vocab, data.vocab_dict)
        out = text.generate_text('hello world', 50)
        #out = text.generate_random_text(num_generate=10)
        print(f"Epoch {i}\n\tloss: {loss_value}\n\tText: {out}")

    return model

## Setup input data for training

In [8]:
data = DataSet('beatles.txt', 80)
x, y, vocab = data.create_dataset()
X = []
for e in x:
    X.append(np.asarray(e).astype(np.int64))
X = np.asarray(X)

# Some elements of y are lists of size 1, convert to int
for i in range(451):
    for j in range(80):
        if isinstance(y[i][j], list):
            y[i][j] = y[i][j][0]

Y = np.asarray(y)


## Train models

In [9]:
transformer = TransformerModel(len(vocab), rate=.001)
model1 = transformer.create_model()
train_model(model1, vocab, X, Y, epochs=1)

  output, from_logits = _get_logits(


Epoch 0
	loss: 7.928467750549316
	Text: hello world tear cares seven spinnin fly needs questo sits aaaaaahhhhhh finds wine cup almost strange king lady vanish come takes ears spinnin yard k fighting heavy asleep flat southampton lotta come buried darn bags king breakfast discreetly sung where alerted newspapers cola discreetly getting fill blind letter hog comfort dinner as


<keras.engine.functional.Functional at 0x1339b45e4d0>

In [None]:
transformer = TransformerModel(len(vocab), rate=.001)
model50 = transformer.create_model()
train_model(model50, vocab, X, Y, epochs=50)

Epoch 0
	loss: 7.935318946838379
	Text: hello world starched helping such kept ya joan mon drive worked resign saw faces discreetly higher you said forgotten arms peter kind out styes you bin ich change sail mundo parted could surely girlfriend u licks deeper who silent here reply lear wings mother lips spinal talked hab butterflies noticed six carve
Epoch 1
	loss: 7.726151943206787
	Text: hello world back nose song obladi pies swim views thrill bundle upset peanuts faces from of you said forgotten arms peter round out styes you help to oh to oh speaking public i a kiss still l who i my road you still arriving knives morning tall you re voices mojo leisure
Epoch 2
	loss: 7.513859748840332
	Text: hello world back in i saw when i at becomes looked up na waves kiss yes you yes you still look so something i a our einer change i a problems i i could does people hung i i my you i a PAD back had a paperback other will i m
Epoch 3
	loss: 7.288165092468262
	Text: hello world back in i m i i m y

In [None]:
transformer = TransformerModel(len(vocab), rate=.001)
model100 = transformer.create_model()
train_model(model100, vocab, X, Y, epochs=100)

## Overtrain

In [None]:
transformer = TransformerModel(len(vocab))
model500 = transformer.create_model()
train_model(model500, vocab, X, Y, epochs=500)

## Test with different starting phrases

In [None]:

phrases = [
    "a day in the life",
    "hello world",
    # "I",
    "la la la"
]

for phrase in phrases:
    text1 = GenerateText(model1, vocab, data.vocab_dict)
    out1 = text1.generate_random_text(num_generate=10)
    
    text50 = GenerateText(model50, vocab, data.vocab_dict)
    out50 = text50.generate_random_text(num_generate=10)
    
    text100 = GenerateText(model50, vocab, data.vocab_dict)
    out100 = text100.generate_random_text(num_generate=10)    
    print(f"Phrase: {phrase}")
    print(f"\t1 Epoch: {out1}")
    print(f"\t50 Epochs: {out50}")
    print(f"\t100 Epochs: {out100}")



# Report

## Introduction

## Results

## Conclusion

## How to Run Code

Please include any special libraries and list your tf version here.