<a href="https://colab.research.google.com/github/bridg3r/myclasses/blob/main/PythonMachineLearningProjects/SelfCnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf

import numpy as np
import os
import time

In [None]:
#get text to use for training 
path_to_fileYe = tf.keras.utils.get_file('KanyeWestLyrics.txt', 'https://raw.githubusercontent.com/bridg3r/Ds350test/main/KanyeWestLyrics.txt')
path_to_filetaylorSwift = tf.keras.utils.get_file('all_tswift_lyrics.txt', 'https://raw.githubusercontent.com/irenetrampoline/taylor-swift-lyrics/master/all_tswift_lyrics.txt')
path_to_fileAusten = tf.keras.utils.get_file('austen.txt', 'https://raw.githubusercontent.com/byui-cse/cse450-course/master/data/austen/austen.txt')

# load text
text = open(path_to_fileAusten, 'rb').read().decode(encoding='utf-8')
vocab = sorted(set(text))

In [None]:
#processing the text
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

def text_from_ids(ids):
  joinedTensor = tf.strings.reduce_join(chars_from_ids(ids), axis=-1)
  return joinedTensor.numpy().decode("utf-8")

In [None]:
#Constructing our data
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
seq_length = 100
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

#create input and label pairs (Where input and label are sequences)
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

dataset = sequences.map(split_input_target)

In [None]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

In [None]:
#building the model architecture

#states variable msut be differernt
#lstm outputs two states that must be combined into one array to pass back in 
# (cell state and hidden state)
# sparse embedding by adding an embedding layer Ex of Keras embedding layer

#open AI = gpt3, googles = deep mind, <- leading 

class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
    self.lstm = tf.keras.layers.LSTM(rnn_units, return_sequences=True, return_state=True)
    self.lstm2 = tf.keras.layers.LSTM(rnn_units, return_sequences=True, return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, states2=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)

    if states is None:
      states = self.lstm.get_initial_state(x)
    
    if states2 is None:
      states2 = self.lstm.get_initial_state(x)
    
    x, state_h, state_c = self.lstm(x, initial_state=states, training=training)
    states = [state_h, state_c]

    x, state_h2, state_c2 = self.lstm2(x, initial_state=states2, training=training)
    states2 = [state_h2, state_c2]

    x = self.dense(x, training=training)

    if return_state:
      return x, states, states2
    else:
      return x

In [None]:
#model parameters

# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [None]:
# instantiate the model
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [None]:
# just to see if the model is working well
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 87) # (batch_size, sequence_length, vocab_size)


In [None]:
model.summary()

Model: "my_model_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_12 (Embedding)    multiple                  22272     
                                                                 
 gru_2 (GRU)                 multiple                  0 (unused)
                                                                 
 lstm_14 (LSTM)              multiple                  5246976   
                                                                 
 lstm_15 (LSTM)              multiple                  8392704   
                                                                 
 dense_11 (Dense)            multiple                  89175     
                                                                 
Total params: 13,751,127
Trainable params: 13,751,127
Non-trainable params: 0
_________________________________________________________________


In [None]:
# compile the model using custom training/ gradient tape
class CustomTraining(MyModel):
  @tf.function
  def train_step(self, inputs):
      inputs, labels = inputs
      with tf.GradientTape() as tape:
          predictions = self(inputs, training=True)
          loss = self.loss(labels, predictions)
      grads = tape.gradient(loss, model.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, model.trainable_variables))

      return {'loss': loss}

model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [None]:
EPOCHS = 1
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])



In [None]:
# something about making a single-step prediction
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [None]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [None]:
# generating text

# Create an instance of the character generator
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

# Now, let's generate a 1000 character chapter by giving our model "Chapter 1"
# as its starting text
states = None
states2 = None
next_char = tf.constant(['Chorus']) # ['ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:']
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)

# Print the results formatted.
print(result[0].numpy().decode('utf-8'))

ValueError: ignored

In [None]:
# 