In [1]:
import tensorflow_datasets as tfds
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf
from gensim.utils import simple_preprocess
from collections import defaultdict
from tensorflow.keras.callbacks import Callback
from gensim.models import Word2Vec
from gensim.utils import simple_preprocess
from itertools import chain
from tensorflow.keras.layers import Embedding
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, Lambda, Bidirectional, Dropout, TimeDistributed, Reshape, RepeatVector, Activation
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow.keras.backend as K

latent_dim = 128
embedding_dim = 150
epochs = 50
min_count_words = 3
max_sequence_len=25
batch_size=256

def split_corpus_by_len(corpus, max_sequence_len=max_sequence_len):
    new_corpus = [corpus[i:i+max_sequence_len] for i in range(0, len(corpus), max_sequence_len)]
    new_corpus.pop()
    return new_corpus

def build_vocab(corpus):
    global min_count_words
    word_counts = defaultdict(int)
    for sequence in corpus:
      for word in sequence:
        word_counts[word] += 1
    print(word_counts)
    # Step 2: Filter out low-frequency words
    vocab = [word for word, count in word_counts.items() if count >= min_count_words]
    return vocab

def load_and_preprocess_data():
    dataset, _ = tfds.load('tiny_shakespeare', with_info=True, as_supervised=False)
    corpus = [simple_preprocess(data['text'].numpy().decode('utf-8')) for data in dataset['train']][0]
    corpus = split_corpus_by_len(corpus)

    vocab = build_vocab(corpus)
    vocab_size = len(vocab)
    print(f"Vocab Size: {vocab_size}")


    # Tokenize the corpus
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(vocab)
    sequences = tokenizer.texts_to_sequences(corpus)

    tokenizer.word_index = {word: index for word, index in tokenizer.word_index.items() if word in vocab}
    tokenizer.index_word = {index: word for word, index in tokenizer.index_word.items() if word in vocab}


    max_sequence_len = max([len(x) for x in sequences])
    sequences = np.array(pad_sequences(sequences, maxlen=max_sequence_len, padding='post'))

    return corpus, sequences, tokenizer

corpus, sequences, tokenizer = load_and_preprocess_data()


Vocab Size: 4443


In [2]:
sequences.shape

(7064, 25)

In [3]:
print(f"Total Sequences: {len(sequences)}")

Total Sequences: 7064


In [4]:
train_sequences, test_sequences = train_test_split(sequences, test_size=0.1)
print(f"Training Sequences: {len(train_sequences)}")
print(f"Test Sequences: {len(test_sequences)}")


Training Sequences: 6357
Test Sequences: 707


In [5]:
def prepare_dataset(sequences, batch_size=batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((sequences, sequences))
    return dataset.shuffle(10000).batch(batch_size)

train_dataset = prepare_dataset(train_sequences).prefetch(tf.data.AUTOTUNE)
test_dataset = prepare_dataset(test_sequences).prefetch(tf.data.AUTOTUNE)


In [6]:

def create_word_embeddings(corpus):
    global min_count_words
    word2vec_model = Word2Vec(corpus, vector_size=embedding_dim, window=5, min_count=min_count_words, workers=4, epochs=100)
    return word2vec_model

word2vec_model = create_word_embeddings(corpus)


In [7]:
word2vec_model.wv.vectors.shape

(4443, 150)

In [8]:
import numpy as np

def create_embedding_matrix(word2vec_model, tokenizer, embedding_dim):
    vocab_size = len(tokenizer.word_index) + 1  # Adding 1 because of reserved 0 index
    embedding_matrix = np.zeros((vocab_size, embedding_dim))

    for word, i in tokenizer.word_index.items():
        if word in word2vec_model.wv:
            embedding_vector = word2vec_model.wv[word]
            embedding_matrix[i] = embedding_vector

    return embedding_matrix

embedding_matrix = create_embedding_matrix(word2vec_model, tokenizer, embedding_dim)


In [9]:
embedding_matrix.shape

(4444, 150)

In [10]:
vocab_size = embedding_matrix.shape[0]

def build_vae(embedding_matrix, max_sequence_len=max_sequence_len, latent_dim=latent_dim):
    vocab_size = embedding_matrix.shape[0]
    embedding_dim = embedding_matrix.shape[1]
    # Embedding layer
    embedding_layer = Embedding(embedding_matrix.shape[0],
                                embedding_matrix.shape[1],
                                weights=[embedding_matrix],
                                input_length=max_sequence_len,
                                trainable=False)

    # Encoder
    encoder_inputs = Input(shape=(max_sequence_len,))
    x = embedding_layer(encoder_inputs)
    x = Bidirectional(LSTM(256, return_sequences=True))(x)
    x = Dropout(0.15)(x)
    x = Bidirectional(LSTM(128))(x)
    z_mean = Dense(latent_dim)(x)
    z_log_var = Dense(latent_dim)(x)

    # Sampling function
    def sampling(args):
        z_mean, z_log_var = args
        epsilon = K.random_normal(shape=(K.shape(z_mean)[0], latent_dim))
        return z_mean + K.exp(0.5 * z_log_var) * epsilon

    z = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])

    decoder_inputs = Input(shape=(latent_dim,))
    x = Dense(128, activation='relu')(decoder_inputs)
    x = RepeatVector(max_sequence_len)(x)
    x = LSTM(64, return_sequences=True)(x)
    x = Dense(max_sequence_len * vocab_size, activation='relu')(x)  # Adjust this line
    decoder_outputs = TimeDistributed(Dense(vocab_size, activation='softmax'))(x)


    # VAE model
    encoder = Model(encoder_inputs, [z_mean, z_log_var, z])
    decoder = Model(decoder_inputs, decoder_outputs)

    vae = Model(encoder_inputs, decoder(encoder(encoder_inputs)[2]))
    return vae, encoder, decoder

vae, encoder, decoder = build_vae(embedding_matrix=embedding_matrix)

class VAELoss(tf.keras.losses.Loss):
    def __init__(self, encoder, vocab_size, **kwargs):
        super(VAELoss, self).__init__(**kwargs)
        self.encoder = encoder
        self.vocab_size = vocab_size

    def call(self, y_true, y_pred):
        z_mean, z_log_var, _ = self.encoder(y_true)

        # Ensure y_true is one-hot encoded
        y_true_one_hot = tf.one_hot(tf.cast(tf.squeeze(y_true), tf.int32), depth=self.vocab_size)

        # Compute the binary cross-entropy
        reconstruction_loss = tf.reduce_mean(
            tf.keras.losses.categorical_crossentropy(y_true_one_hot, y_pred))

        # Compute the KL divergence
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)

        return reconstruction_loss + kl_loss


loss_function = VAELoss(encoder, vocab_size=vocab_size)

# Compile and train the VAE
vae.compile(optimizer=Adam(0.01), loss=loss_function)
vae.summary()


Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 25)]              0         
                                                                 
 model (Functional)          [(None, 128),             2222312   
                              (None, 128),                       
                              (None, 128)]                       
                                                                 
 model_1 (Functional)        (None, 25, 4444)          501020264 
                                                                 
Total params: 503242576 (1.87 GB)
Trainable params: 502575976 (1.87 GB)
Non-trainable params: 666600 (2.54 MB)
_________________________________________________________________


In [11]:
print(f"Train data shape: {train_dataset.element_spec}")
print(f"Test data shape: {test_dataset.element_spec}")


Train data shape: (TensorSpec(shape=(None, 25), dtype=tf.int32, name=None), TensorSpec(shape=(None, 25), dtype=tf.int32, name=None))
Test data shape: (TensorSpec(shape=(None, 25), dtype=tf.int32, name=None), TensorSpec(shape=(None, 25), dtype=tf.int32, name=None))


In [12]:


early_stopping = EarlyStopping(monitor='loss', patience=5)

class AdjustLRCallback(Callback):
    def __init__(self, factor, every_epochs):
        super(AdjustLRCallback, self).__init__()
        self.factor = factor
        self.every_epochs = every_epochs

    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % self.every_epochs == 0:
            old_lr = self.model.optimizer.lr.read_value()
            new_lr = old_lr * self.factor
            self.model.optimizer.lr.assign(new_lr)
            print(f"\nEpoch {epoch+1}: Reducing learning rate to {new_lr}.")

adjust_lr_callback = AdjustLRCallback(factor=0.05, every_epochs=10)
vae.fit(train_dataset, validation_data=test_dataset, epochs=epochs, callbacks=[early_stopping, adjust_lr_callback])

Epoch 1/50



Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 10: Reducing learning rate to 0.0005000000237487257.
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 20: Reducing learning rate to 2.5000001187436283e-05.
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 30: Reducing learning rate to 1.2500000821091817e-06.
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 40: Reducing learning rate to 6.250000694763003e-08.


<keras.src.callbacks.History at 0x7b05a8193790>

In [13]:
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences

def sample(predictions, temperature=1.0):
    # Apply temperature to predictions and sample an index
    predictions = np.asarray(predictions).astype('float64').flatten()
    predictions = np.log(predictions + 1e-8) / temperature
    exp_predictions = np.exp(predictions)
    predictions = exp_predictions / np.sum(exp_predictions)
    probas = np.random.multinomial(1, predictions, 1)
    return np.argmax(probas)

def generate_new_text(decoder, tokenizer, latent_dim, max_sequence_len, num_words, temperature=1.0):
    # Sample from the latent space
    latent_sample = np.random.normal(size=(1, latent_dim))

    # Decode the sample to a sequence of tokens
    predicted_sequence = decoder.predict(latent_sample)[0]
    generated_text = ''
    for i in range(min(num_words, max_sequence_len)):
        # Apply temperature sampling to the predictions
        next_index = sample(predicted_sequence[i], temperature)
        next_word = word2vec_model.wv.index_to_key[next_index]
        # Append the new word to the generated text
        generated_text += " " + next_word
    return generated_text.strip()

# Example usage
num_words_to_generate = 125  # Number of words you want to generate
temperature = 1.5  # Temperature for sampling

# Generate new text
new_text = generate_new_text(decoder, word2vec_model, latent_dim, max_sequence_len, num_words_to_generate, temperature)
print(new_text)


yesternight mine deserve council less polixenes return virgilia confession thou speak power powerful ask benvolio guilty kin wolf receive kin gentlemen slight back scarce show
