In [None]:
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from keras import models
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive
import pickle
path_set = '/gdrive/MyDrive/Colab Notebooks/project3/DS_5_train_input'
path_next_word = '/gdrive/MyDrive/Colab Notebooks/project3/DS_5_train_input_nextWord'
path_prefix = '/gdrive/MyDrive/Colab Notebooks/project3/DS_5_train_input_prefixList'
dataset = pickle.load(open(path_set, 'rb'))
data_prefix = pickle.load(open(path_prefix, 'rb'))
data_nextword = pickle.load(open(path_next_word, 'rb'))
datap = []
i = 0
for i in range(len(data_prefix)):
  mystr = ' '.join(data_prefix[i])
  datap.append(mystr)
datan = []
i = 0
for i in range(len(data_nextword)):
  mystrr = ' '.join(data_nextword[i])
  datan.append(mystrr)

num_test_samples = int(0.15 * len(datap))
num_train_samples = len(datap) - num_test_samples
train_p = datap[:num_train_samples]
test_p = datap[num_train_samples:]

num_test_samples = int(0.15 * len(datan))
num_train_samples = len(datan) - num_test_samples
train_n = datan[:num_train_samples]
test_n = datan[num_train_samples:]

from tensorflow.keras.layers import TextVectorization
sequence_length = 35
vocab_size = 1500
text_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
text_vectorization.adapt(dataset)

train_p = text_vectorization(train_p)
train_n = text_vectorization(train_n)

train_dataset = tf.data.Dataset.from_tensor_slices((train_p, train_n))
batch_size = 256
train_dataset = train_dataset.batch(batch_size)

import tensorflow as tf
from tensorflow.keras import layers

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
          num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
          num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super(TransformerDecoder, self).get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

from tensorflow.keras import layers
embed_dim = 256
latent_dim = 2048
num_heads = 2

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, x)
outputs = layers.Dense(vocab_size, activation="softmax")(x)
model = keras.Model(inputs, outputs)
model.compile(loss="sparse_categorical_crossentropy", optimizer="rmsprop", metrics=["accuracy"])

import numpy as np

tokens_index = dict(enumerate(text_vectorization.get_vocabulary()))

def sample_next(predictions, temperature=1.0):
    predictions = np.asarray(predictions).astype("float64")
    predictions = np.log(predictions) / temperature
    exp_preds = np.exp(predictions)
    predictions = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, predictions, 1)
    return np.argmax(probas)

class TextGenerator(keras.callbacks.Callback):
    def __init__(self,
                 prompt,
                 generate_length,
                 model_input_length,
                 temperatures=(1.,),
                 print_freq=1):
        self.prompt = prompt
        self.generate_length = generate_length
        self.model_input_length = model_input_length
        self.temperatures = temperatures
        self.print_freq = print_freq

    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % self.print_freq != 0:
            return
        for temperature in self.temperatures:
            print("== Generating with temperature", temperature)
            sentence = self.prompt
            for i in range(self.generate_length):
                tokenized_sentence = text_vectorization([sentence])
                predictions = self.model(tokenized_sentence)
                next_token = sample_next(predictions[0, i, :])
                sampled_token = tokens_index[next_token]
                sentence += " " + sampled_token
            print(sentence)

prompt = "a g b f a f a e a k a j c f b f c d a k a k c e b g a h a k b d b f b f b d c"
text_gen_callback = TextGenerator(
    prompt,
    generate_length=1,
    model_input_length=sequence_length,
    temperatures=(0.2, 0.5, 0.7, 1., 1.5)
    )

callbacks2 = [keras.callbacks.ModelCheckpoint("/gdrive/MyDrive/Colab Notebooks/project3/project3model1.keras", 
                                              save_best_only=False)]

In [None]:
import numpy as np
tokens_index = dict(enumerate(text_vectorization.get_vocabulary()))

In [None]:
model.fit(train_dataset, epochs=300, callbacks=[text_gen_callback, callbacks2])

In [None]:
mymodel = keras.models.load_model("/gdrive/MyDrive/Colab Notebooks/project3/AAA3.keras", 
                                  custom_objects={"TransformerDecoder": TransformerDecoder, 
                                                  "PositionalEmbedding": PositionalEmbedding})

import numpy as np
max_predicted_words = 1
tokens_index = dict(enumerate(text_vectorization.get_vocabulary()))

def decode_sequence(input_sentence):
    tokenized_input_sentence = text_vectorization([input_sentence])
    decoded_sentence = ""
    for i in range(max_predicted_words):
        predictions = mymodel(tokenized_input_sentence)
        next_token = sample_next(predictions[0, i, :])
        sampled_token = tokens_index[next_token]
        decoded_sentence += sampled_token
    return decoded_sentence

i=0
mylist = []
for i in range(len(test_p)):
  input_sentence = test_p[i]
  mylist.append(decode_sequence(input_sentence))

import math
k = 0
for i in range(len(test_n)):
  if test_n[i] == mylist[i]:
    k += 1
y = math.ceil(k/(len(test_n))*100)