Imports

In [2]:
import tensorflow as tf
import numpy as np

1. Data collection and pre-processing

In [3]:
batch_size = 128
epochs = 100
latent_dim = 100
num_tokens = 1000
max_sequence_length = 20
embedding_dim = 64
lstm_units = 128
learning_rate = 0.001

In [4]:
file = open('usable_data.txt', 'r', encoding='utf-8')
text = file.read()

In [5]:
text = text.lower()
lines = text.split('\n')
line_words = [line.split() for line in lines]

vocab = [word for line in line_words for word in line]
vocab = sorted(set(vocab))

In [6]:
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=num_tokens, oov_token='<OOV>')
# tokenizer = tf_text.WhitespaceTokenizer()
tokenizer.fit_on_texts(line_words)

vocab_size = len(tokenizer.word_index) + 1
print('Vocabulary Size: %d' % vocab_size)

Vocabulary Size: 40460


In [7]:
sequences = tokenizer.texts_to_sequences(line_words)
sequences = [seq for seq in sequences if len(seq) <= max_sequence_length]

# sequences = np.array(sequences, dtype=np.float64)

# _sequences = tokenizer.texts_to_sequences([text])[0]
# print(sequences)
# print(len(sequences))

ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=vocab, mask_token=None)

all_ids = ids_from_chars(tf.constant(vocab))

In [8]:
dataset = tf.data.Dataset.from_tensor_slices(all_ids)
dataset = dataset.shuffle(buffer_size=10000)
dataset = dataset.batch(batch_size, drop_remainder=True)


In [9]:
# Determine the number of sequences in the dataset
num_sequences = len(all_ids)
# Calculate the number of batches
num_batches = num_sequences // batch_size

print('Number of sequences:', num_sequences)
print('Number of batches:', num_batches)

Number of sequences: 40458
Number of batches: 316


Define generator model

In [10]:
generator = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(latent_dim,)),
    tf.keras.layers.Dense(embedding_dim),
    tf.keras.layers.Dense(max_sequence_length * lstm_units),
    tf.keras.layers.Reshape((max_sequence_length, lstm_units)),
    tf.keras.layers.LSTM(lstm_units, return_sequences=True),
    tf.keras.layers.LSTM(lstm_units, return_sequences=True),
    tf.keras.layers.TimeDistributed(
        tf.keras.layers.Dense(num_tokens, activation='softmax'))
])


Define discriminator model

In [11]:
discriminator = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(max_sequence_length, num_tokens)),
    tf.keras.layers.LSTM(lstm_units, return_sequences=True),
    tf.keras.layers.LSTM(lstm_units),
    tf.keras.layers.Dense(1, activation='sigmoid')
])


Define loss functions and optimizers

In [12]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
generator_optimizer = tf.keras.optimizers.Adam(learning_rate)
discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate)


In [13]:
gen_trainable_variables = generator.trainable_variables
disc_trainable_variables = discriminator.trainable_variables

generator_optimizer.build(gen_trainable_variables)
discriminator_optimizer.build(disc_trainable_variables)

Define training loop

In [14]:
@tf.function
def train_step(real_sequences):
    # Generate random noise vectors
    noise = tf.random.normal([batch_size, latent_dim])

    # Generate fake sequences using generator
    with tf.GradientTape() as gen_tape:
        generated_sequences = generator(noise, training=True)

        # Evaluate discriminator on real and fake sequences
        real_scores = discriminator(real_sequences, training=True)
        fake_scores = discriminator(generated_sequences, training=True)

        # Compute generator loss and gradients
        gen_loss = cross_entropy(tf.ones_like(fake_scores), fake_scores)
        gen_gradients = gen_tape.gradient(
            gen_loss, generator.trainable_variables)

    # Update generator
    generator_optimizer.apply_gradients(
        zip(gen_gradients, generator.trainable_variables))

    # Train discriminator on real sequences
    with tf.GradientTape() as disc_tape:
        real_scores = discriminator(real_sequences, training=True)
        real_loss = cross_entropy(tf.ones_like(real_scores), real_scores)
        real_gradients = disc_tape.gradient(
            real_loss, discriminator.trainable_variables)

    # Train discriminator on fake sequences
    with tf.GradientTape() as disc_tape:
        fake_scores = discriminator(generated_sequences, training=True)
        fake_loss = cross_entropy(tf.zeros_like(fake_scores), fake_scores)
        fake_gradients = disc_tape.gradient(
            fake_loss, discriminator.trainable_variables)

    # Update discriminator
    discriminator_gradients = [real_grad + fake_grad for real_grad,
                               fake_grad in zip(real_gradients, fake_gradients)]
    discriminator_optimizer.apply_gradients(
        zip(discriminator_gradients, discriminator.trainable_variables))
    
    print(
        f"Generator loss: {gen_loss:.4f} | Discriminator loss: [real_loss: {real_loss:.4f}, fake_loss: {fake_loss:.4f}]")


In [15]:
def generator_loss(fake_scores):
    return cross_entropy(tf.ones_like(fake_scores), fake_scores)


In [16]:
def train_generator(noise, target_batch):
    with tf.GradientTape() as gen_tape:
        # Generate fake text from noise input
        generated_text = generator(noise, training=True)
        # Get the discriminator's decision on the generated text
        disc_decision = discriminator(generated_text, training=False)
        # Calculate generator's loss based on the discriminator's decision
        gen_loss = generator_loss(disc_decision)
    # Get the generator's trainable variables
    gen_variables = generator.trainable_variables
    # Calculate the gradients of generator's loss with respect to its variables
    gen_gradients = gen_tape.gradient(gen_loss, gen_variables)
    # Use optimizer to apply the gradients to the generator's variables
    generator_optimizer.apply_gradients(zip(gen_gradients, gen_variables))
    return gen_loss


In [17]:
def discriminator_loss(real_decision, fake_decision):
    real_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        labels=tf.ones_like(real_decision), logits=real_decision))
    fake_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(
        labels=tf.zeros_like(fake_decision), logits=fake_decision))
    total_loss = real_loss + fake_loss
    return total_loss

In [18]:
def train_discriminator(input_batch, target_batch, noise):
    with tf.GradientTape() as disc_tape:
        # Generate fake text from noise input using generator
        generated_text = generator(noise, training=True)
        # Concatenate real input and target text
        combined_input = tf.concat([input_batch, target_batch], axis=-1)
        # Concatenate fake input and generated text
        combined_generated = tf.concat([input_batch, generated_text], axis=-1)
        # Get the discriminator's decision on the real and fake inputs
        real_decision = discriminator(combined_input, training=True)
        fake_decision = discriminator(combined_generated, training=True)
        # Calculate discriminator's loss based on real and fake decisions
        disc_loss = discriminator_loss(real_decision, fake_decision)
    # Get the discriminator's trainable variables
    disc_variables = discriminator.trainable_variables
    # Calculate the gradients of discriminator's loss with respect to its variables
    disc_gradients = disc_tape.gradient(disc_loss, disc_variables)
    # Use optimizer to apply the gradients to the discriminator's variables
    discriminator_optimizer.apply_gradients(
        zip(disc_gradients, disc_variables))
    return disc_loss


Training

In [19]:
buffer_size = 1000

In [20]:
for epoch in range(epochs):
    print("Epoch:", epoch+1)
    for batch in range(num_batches):
        # Get a random batch of input sequences
        # idx = np.random.randint(0, num_sequences-batch_size)
        # get the input and target batch from dataset
        dataset = dataset.shuffle(buffer_size=buffer_size)
        input_batch = dataset.batch(batch_size - 1, drop_remainder=True)

        # ! input_batch = sequences[idx:idx+batch_size, :-1]
        # input_batch = all_ids[idx:idx+batch_size, :-1]
        # ! target_batch = np.expand_dims(sequences[-1], axis=-1)
        # target_batch = np.expand_dims(all_ids[-1], axis=-1)
        target_batch = np.expand_dims(dataset.batch(1, drop_remainder=True), axis=-1)

        # Generate noise for the generator
        noise = tf.random.normal(shape=(batch_size, latent_dim), dtype=tf.float32).numpy()

        # noise = np.random.normal(    0, 1, size=(batch_size, latent_dim))

        noise = tf.constant(noise)

        # noise = tf.Tensor(noise, noise.shape, noise.dtype)

        print("Noise:", noise)
        # generator.summary()
        


        # Train the generator to fool the discriminator
        gan_loss, gen_loss = train_generator(noise, target_batch)

        # Train the discriminator on real and generated data
        dis_loss = train_discriminator(input_batch, target_batch, noise)

        print(f"Generator loss: {gen_loss:.4f} | Discriminator loss: {dis_loss:.4f}")


Epoch: 1
Noise: tf.Tensor(
[[-4.1729024e-01 -6.2729679e-02  1.0819298e+00 ... -1.7141645e-01
  -1.1548320e+00  5.2107024e-01]
 [ 1.9947000e-01 -1.2159010e+00  4.6054047e-01 ... -1.2564840e+00
  -7.5919509e-02 -1.0968996e+00]
 [-1.3574277e+00 -3.1679302e-01 -2.7606523e-01 ...  2.8225920e-01
   1.1259772e+00 -1.6625422e-01]
 ...
 [-2.1120114e-03  6.0322400e-02  2.4923999e+00 ... -7.9645211e-01
   7.6475954e-01  1.3280016e+00]
 [-9.5229769e-01  5.2533656e-01 -1.1828583e+00 ...  7.0421040e-01
   9.1505182e-01 -1.3617281e+00]
 [-6.6880065e-01 -2.0706030e-02  2.1592143e+00 ... -1.0033648e+00
  -3.5230283e-02 -1.1656549e+00]], shape=(128, 100), dtype=float32)


  output, from_logits = _get_logits(


TypeError: Cannot iterate over a scalar tensor.