# **Computational Creativity**

---

### **Generative Adversarial Recurrent Neural Networks for Text Generation**


When testing trained models, scroll to the end to see results.

In [None]:
# Imports and GPU setup

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tqdm.autonotebook import tqdm
from google.colab import drive

print(f"Found GPU at: {tf.test.gpu_device_name()}.\nTensorFlow version: {tf.__version__}")

Found GPU at: /device:GPU:0.
TensorFlow version: 2.8.0


# Dataset

## <h1>Load text</h1> 

Load Choose a book from [Project Gutenberg](https://www.gutenberg.org/). For any book on the website, you can get to a link pointing to a plain-text version on its page. Copy the link into `book_choice`.

The text will be converted into lower case.

In [None]:
book_choice = "https://www.gutenberg.org/files/1342/1342-0.txt"  # @param {type: "string"}
path_to_file = tf.keras.utils.get_file("Book", book_choice)
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')

# convert the text into lower case
text = text.lower()

'''
# save the file
f = open(path + "datasets/text.txt", 'w')
f.write(text)
f.close()
'''

# The length of text is the number of characters in it
print ("The dataset has", len(text), "characters in total.")

The dataset has 789417 characters in total.


In [None]:
'''
f = open(path + "datasets/text.txt", 'r')
text = f.read()
f.close()
'''

## Data pre-processing

### Vectorize the text

Map the characters to vectors, so that the neural network can process it. In this case, we work at character level (i.e. generate sequences of characters).

Store all unique characters in `vocabulary` and create two more data structures and one algorithm:
  * `char2idx`: a dictionary mapping from each unique character to its index in the list
  * `idx2char`: a numpy array of all unique characters in the text
  * `one_hot(.)`: a function to convert a character into its one-hot encoding

In [None]:
# compute the list of all unique characters in the file
text_set = set(text)
# the python file saving/reading discards the '\r' character. add it back
text_set.add('\r')

vocabulary = sorted(text_set)

VOCAB_SIZE = len(vocabulary)  # length of the vocabulary in chars

# create the 2 data structures
char2idx = {u:i for i, u in enumerate(vocabulary)}
idx2char = np.array(vocabulary)

# one-hot encoding
def one_hot(idx):
    oh = np.zeros(VOCAB_SIZE)
    oh[idx] = 1
    return oh

print("Vocabulary size:", VOCAB_SIZE)

Vocabulary size: 68


### Hyper parameters:

In [None]:
# buffer size to shuffle our dataset, default 10000
BUFFER_SIZE = 10000  # @param {type: "integer"}
# batch size, default: 64
BATCH_SIZE =   64# @param {type: "integer"}
# number of RNN units, default 1024
N_RNN_UNITS = 1024  # @param {type: "integer"}
# length of the training data for the discriminator
MAX_LENGTH =   100# @param {type: "integer"}
# size of the embedding layer, default 256
EMBEDDING_DIM = 256    # @param {type: "integer"}

### Create training data
Simply slice the data into chunks of length `MAX_LENGTH` and then create batches from the dataset.

In [None]:
vector_text = []
matrix_text = []
next_text = [] # for generator's pretraining
permuted_text = [] # for discriminator's pretraining
disc_label = [] # for discriminator's pretraining

for c in range(0, len(text)-MAX_LENGTH, MAX_LENGTH):
    chunk = text[c : c + MAX_LENGTH]
    indices = [char2idx[i] for i in chunk]
    matrices = [one_hot(i) for i in indices]
    vector_text.append(indices)
    matrix_text.append(matrices)

    # for generator's pretraining
    next_chunk = text[c + 1 : c + 1 + MAX_LENGTH]
    next_text.append([char2idx[i] for i in next_chunk])

    # for discriminator's pretraining
    permuted = list(chunk)
    np.random.shuffle(permuted)
    permuted_text.append([one_hot(char2idx[i]) for i in permuted])
    permuted_text.append(matrices)
    disc_label += [0, 1]

print(np.array(vector_text).shape)
print(np.array(matrix_text).shape)
print(np.array(permuted_text).shape)
print(np.array(disc_label).shape)

# batch the dataset
dataset = tf.data.Dataset.from_tensor_slices((vector_text, matrix_text)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

N_TRAIN_BATCHES = len(dataset)

# for generator's pretraining
gen_dataset = tf.data.Dataset.from_tensor_slices((vector_text, next_text)).shuffle(BUFFER_SIZE)
gen_dataset = gen_dataset.batch(BATCH_SIZE, drop_remainder=True)

# for discriminator's pretraining
disc_dataset = tf.data.Dataset.from_tensor_slices((permuted_text, disc_label)).shuffle(BUFFER_SIZE)
disc_dataset = disc_dataset.batch(BATCH_SIZE, drop_remainder=True)

print(dataset)
print(gen_dataset)
print(disc_dataset)

# Generative Adversarial Recurrent Neural Network (GARNN)

## Generator Network
This network takes as input a chunk of text and produces a probability distribution over the vocabulary predicting the next character.

In [None]:
# Define input and output around the RNN (GRU)
def Generator(batch_size=BATCH_SIZE, vocab_size=VOCAB_SIZE, embedding_dim=EMBEDDING_DIM, n_rnn_units=N_RNN_UNITS):
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Embedding(
                vocab_size,
                embedding_dim,
                batch_input_shape=[batch_size, None]
            ),
            tf.keras.layers.GRU(
                n_rnn_units,
                return_sequences=True,
                stateful=True,
                recurrent_activation='sigmoid',
                recurrent_initializer='glorot_uniform'
            ),
            tf.keras.layers.Dense(vocab_size, activation="softmax")
        ]
    )
    return model

# Define the loss function
def gen_loss_function(labels, pred):
    return tf.keras.losses.sparse_categorical_crossentropy(labels, pred)

gen = Generator()

# Define the optimiser
# default: 0.001
learning_rate = 0.001  #@param{type:"raw"}
# default: 0.5
beta = 0.5 #@param{type:"raw"}
gen_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)

# Build the network
gen.build((BATCH_SIZE, None, VOCAB_SIZE))  # specifies input shape

gen.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_1 (Embedding)     (64, None, 256)           17408     
                                                                 
 gru_1 (GRU)                 (64, None, 1024)          3938304   
                                                                 
 dense_4 (Dense)             (64, None, 68)            69700     
                                                                 
Total params: 4,025,412
Trainable params: 4,025,412
Non-trainable params: 0
_________________________________________________________________


## Discriminator Network
This network takes as input a chunk of text and guess whether it is generated by the generator network.

In [None]:
# Define input and output
def Discriminator(vocab_size=VOCAB_SIZE, batch_size=BATCH_SIZE, embedding_dim=EMBEDDING_DIM):
    
    inp = tf.keras.Input((MAX_LENGTH, vocab_size), batch_size=batch_size)
    x = tf.keras.layers.Dense(embedding_dim, use_bias=False)(inp)
    sentence = tf.expand_dims(x, -1)

    x1 = tf.keras.layers.Conv2D(32, (3, embedding_dim), activation="relu")(sentence)
    x1 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-2, 1))(x1)
    x2 = tf.keras.layers.Conv2D(32, (7, embedding_dim), activation="relu")(sentence)
    x2 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-6, 1))(x2)
    x3 = tf.keras.layers.Conv2D(32, (11, embedding_dim), activation="relu")(sentence)
    x3 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-10, 1))(x3)
    x4 = tf.keras.layers.Conv2D(32, (15, embedding_dim), activation="relu")(sentence)
    x4 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-14, 1))(x4)
    x5 = tf.keras.layers.Conv2D(32, (19, embedding_dim), activation="relu")(sentence)
    x5 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-18, 1))(x5)
    x6 = tf.keras.layers.Conv2D(32, (23, embedding_dim), activation="relu")(sentence)
    x6 = tf.keras.layers.MaxPooling2D(pool_size=(MAX_LENGTH-22, 1))(x6)

    conc = tf.keras.layers.Concatenate()([x1, x2, x3, x4, x5, x6])
    
    x = tf.squeeze(conc)
    
    x = tf.keras.layers.Dense(128)(x)
    y = tf.keras.layers.Dense(1)(x)
    y = tf.squeeze(y)

    model = tf.keras.Model(inputs=inp, outputs=y)
    
    return model

# Define the loss function
def disc_loss_function(labels, logits):
    return tf.keras.losses.binary_crossentropy(labels, logits, from_logits=True)

disc = Discriminator()

# Define the optimiser
# default: 0.001
learning_rate = 0.001  #@param{type:"raw"}
# default: 0.5
beta = 0.5 #@param{type:"raw"}
disc_optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=beta)

# Build the network
disc.build((BATCH_SIZE, None, VOCAB_SIZE))  # specifies input shape

disc.summary()

Model: "model_7"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_9 (InputLayer)           [(64, 100, 68)]      0           []                               
                                                                                                  
 dense_24 (Dense)               (64, 100, 256)       17408       ['input_9[0][0]']                
                                                                                                  
 tf.expand_dims_8 (TFOpLambda)  (64, 100, 256, 1)    0           ['dense_24[0][0]']               
                                                                                                  
 conv2d_44 (Conv2D)             (64, 98, 1, 32)      24608       ['tf.expand_dims_8[0][0]']       
                                                                                            

## Putting them together: Generative Adversarial Network

In [None]:
# A GAN class as an extension of the tf.keras.Model object

# Define the loss function (BCE) 
def gan_loss(logits, is_real=True):
    """
    Computes cross entropy between logits and labels
    """
    if is_real:
        labels = tf.ones_like(logits)
    else:
        labels = tf.zeros_like(logits)

    # Returns loss calculation
    return tf.nn.sigmoid_cross_entropy_with_logits(labels, logits)

class GARNN(tf.keras.Model):
    """ 
    A basic GAN class. Extends tf.keras.Model
    """

    def __init__(self, **kwargs):
        super(GARNN, self).__init__()
        self.__dict__.update(kwargs)

        self.gen = self.gen
        self.disc = self.disc

    def call(self, x):
        return self.gen(x)

    def generate(self, z):
        """
        Run input vector z through the generator to create fake data.
        """
        return self.gen(z)

    def discriminate(self, x):
        """
        Run data through the discriminator to label it as real or fake.
        """
        return self.disc(x)

    def compute_loss(self, x):
        """ 
        Passes through the network and computes loss for given data.
        """
        vector = x[0]
        matrix = x[1]
        # Use the data to generate a fake data set with the generator network.
        self.gen.reset_states()
        fakes = self.generate(vector)

        # Use the discriminator network to obtain labels for both the generated data (x_gen) and the real data (x)
        logits_reals = self.discriminate(matrix)
        logits_fakes = self.discriminate(fakes)

        # Discriminator loss, looking at correctly labeled data
        # Losses of the real data with correct label "1"
        disc_real_loss = gan_loss(logits=logits_reals, is_real=True)
        # Losses of the fake data with correct label "0"
        disc_fake_loss = gan_loss(logits=logits_fakes, is_real=False)
        # The discriminator loss is the sum of the 2 previous values
        disc_loss = disc_fake_loss + disc_real_loss

        # Generator loss, looking at the fake data labeled as real ("1")
        gen_loss = gan_loss(logits=logits_fakes, is_real=True)

        # Return losses
        return disc_loss, gen_loss

    def compute_gradients(self, x):
        """ 
        Passes through the network and computes gradients.
        """
        ### Pass x through network and compute losses
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            disc_loss, gen_loss = self.compute_loss(x)

        # Compute gradients
        gen_gradients = gen_tape.gradient(gen_loss, self.gen.trainable_variables)
        disc_gradients = disc_tape.gradient(disc_loss, self.disc.trainable_variables)

        return gen_gradients, disc_gradients

    def apply_gradients(self, gen_gradients, disc_gradients):
        """
        Apply given gradients to both networks.
        """
        self.gen_optimizer.apply_gradients(zip(gen_gradients, self.gen.trainable_variables))
        self.disc_optimizer.apply_gradients(zip(disc_gradients, self.disc.trainable_variables))

    @tf.function
    def train(self, train_x):
        """
        Train the GAN!
        """
        gen_gradients, disc_gradients = self.compute_gradients(train_x)
        self.apply_gradients(gen_gradients, disc_gradients)

# Training
While the GAN model set up takes care of a training iteration, we need to repeat this for several epochs and observe how the quality of text generated improves over time. The training is done in batches at each epoch, using the `tqdm` library to create these according to variables defined earlier and display progress.

## Pretraining
Train the generator and the discriminator before the adversarial game.

### Generator
Train it as an auto-regressive RNN for prediction next character in the sequence.

In [None]:
gen.compile(gen_optimizer, gen_loss_function)

# default: 50
n_epochs =  50 # @param{type: "integer"} 
# history = model.fit(dataset, epochs=n_epochs, callbacks=[checkpoint_callback])
history = gen.fit(gen_dataset, epochs=n_epochs)

### Discriminator
Train it to classified shuffled sentences and the normal ones.

In [None]:
# Compile the model
disc.compile(disc_optimizer, loss=disc_loss_function, metrics=['binary_accuracy'])

# default: 50
n_epochs =  5 # @param{type: "integer"} 
# history = model.fit(dataset, epochs=n_epochs, callbacks=[checkpoint_callback])
history = disc.fit(disc_dataset, epochs=n_epochs)

## Adversarial training

In [None]:
# Set up the model
model = GARNN(
    gen = gen,
    disc = disc,
    gen_optimizer = gen_optimizer,
    disc_optimizer = disc_optimizer
)

# default: 50
n_epochs =  5 #@param{type: "integer"} 

# losses = pd.DataFrame(columns = ['disc_loss', 'gen_loss'])

for epoch in range(1, n_epochs + 1):

    print("Epoch: {}".format(epoch))

    # Train the model
    for batch, train_x in tqdm(zip(range(N_TRAIN_BATCHES), dataset), total=N_TRAIN_BATCHES):
        model.train(train_x)

Saving the model. Make sure the path exists

In [None]:
drive.mount('/content/gdrive')
path = "/content/gdrive/My Drive/Colab Notebooks/computational creativity/GARNN/"

In [None]:
# Save trained models
gen.save(path + "models/gen.h5")
disc.save(path + "models/disc.h5")

# Testing

## Set up text generation function:

In [None]:
def generate_text(model, input_text, n_characters_output=1000):
    # First, vectorize the input text as before
    input_eval = [char2idx[s] for s in input_text]
    input_eval = tf.expand_dims(input_eval, 0)

    # We'll store results in this variable
    text_generated = []

    # Generate the number of characters desired
    model.reset_states()
    for i in range(n_characters_output):
        # Run input through model
        predictions = model(input_eval)

        # Remove the batch dimension
        predictions = predictions[0][-1]

        # Using argmax to predict the character returned by the model
        predicted_id = np.argmax(predictions)

        # Pass the predicted character as the next input to the model
        input_eval = tf.expand_dims([predicted_id], 0)

        # Add the predicted character to the output
        text_generated.append(idx2char[predicted_id])

    # Return output
    return (input_text + ''.join(text_generated))

## Generate text

Load the model.

In [None]:
'''
import gdown, os

url = "https://drive.google.com/drive/folders/1R3qslZrX4lE0jRi3lf3Bik_rlLdDjMht"

download_successful = None # A workaround to make sure that gdown downloads the whole folder successfully, see https://github.com/wkentaro/gdown/issues/43
while download_successful == None:
  download_successful = gdown.download_folder(url, quiet=True, use_cookies=False)
  os.system('rm ~/.cache/gdown/cookies.json')

gen = Generator(batch_size=1)
gen.load_weights("/content/GARNN/models/gen.h5")
# disc.load_weights("/content/GARNN/models/disc.h5")
'''

In [None]:
# Re-load trained networks

gen.load_weights(path + "models/gen.h5")
disc.load_weights(path + "models/disc.h5")

Give some input text, the AI will continue writing on it...

In [None]:
input_text = "to be or not to be, that is the question." # @param {type:"string"}
input_text = input_text.lower()

n_output_characters = 404 #@param {type:"integer"}

output_text = generate_text(gen, input_text, n_output_characters)

print("Generated text:", output_text, sep='\n')

Generated text:
to be or not to be, that is the question. it was
      more than civil; it was really attentive; and then i must
      speak plainly. if you, my dear father, i congratulate you.”

      “and this is also different from your affection, you make
      elizabeth treated with the rest of the house, as soon as mr. bennet were
      to make a small inconvenience to herself and her daughter, to whom i have
      related the affair of making 
