**GAN based random text generator**

GAN is well known for it's non-discrete data generation abilities.<br>
In this notebook I'll adapt and train GAN for text generation(based on [LaText GAN](https://arxiv.org/pdf/1810.06640.pdf)). 

First, let's create data loader with tokenizer.

In [2]:
import re
from tensorflow.keras.utils import to_categorical
import numpy as np
import pandas as pd
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer


class DataLoader:

    def __init__(self, src_csv="", src_column='', sentence_len=100, num_words=10000):
        self.__src_csv = src_csv  # source csv file with test data
        self.__src_column = src_column # column name in source file
        self.__sentence_len = sentence_len  # max sentence length
        self.__num_words = num_words  # number of most frequent words to keep
        self.__tokenizer = None # tokenizer
        self.sentences = []  # text data
        self.sequences = np.array([])  # indexed data from tokenizer

    def __load_file(self):
        df = pd.read_csv(self.__src_csv, keep_default_na=False) 
        self.sentences = [re.sub(r"<.*?>", " ", x) for x in df['review']]  # remove html tags
        self.sentences = [s.replace('.', ' .') for s in self.sentences] # let's keep period
        # rest of data cleaning is done by tokenizer

    def __init_tokenizer(self):
        # Tokenization
        # splitted by space
        # filters = default - period sign
        # unknown word set to <OOV>
        # lower case
        # saves only __num_words most frequent words
        self.tokenizer = Tokenizer(num_words=self.__num_words, lower=True, split=' ', oov_token='<OOV>', filters='!"#$%&()*+,-/:;<=>?@[\\]^_`{|}~\t\n')
        self.tokenizer.fit_on_texts(self.sentences) # fit on text data
        
    def __init_sequences(self):
        if self.sequences.size < 1:
            self.__load_file()
            self.__init_tokenizer()
            self.sequences = self.tokenizer.texts_to_sequences(self.sentences) # text to tokenizer indexes
            self.sequences = pad_sequences(self.sequences, maxlen=self.__sentence_len, dtype='float32', padding='post') # padding 
            

    def get(self):
        self.__init_sequences()
        return self.sequences

Define all parameters and paths.

In [3]:
SOURCE_FILE = "drive/MyDrive/Colab Notebooks/IMDB Dataset.csv" # test data file path (here IMDB reviews)
AE_MODEL_PATH="drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce" # autoencoder path
PLOT_PATH="drive/MyDrive/Colab Notebooks/plots/{}.png" # plots path
HISTORY_PATH="drive/MyDrive/Colab Notebooks/history/{}.csv" # learning history path
EMBEDDING_PATH="model.txt" # path to prepared word embedding
EMBEDDING_MATRIX_PATH="embedding_matrix.npy" # path to embedding matrix

MAX_WORD_NUM = 2500 # max number of most frequent words to save
MAX_SEQUENCE_LEN = 20 # max sentence length
ENCODED_VECTOR_LEN = 300 # enoded sentence length
EMBEDDING_SIZE = 300 # embedding vector length

Load our data.

In [4]:
dataLoader = DataLoader(SOURCE_FILE, sentence_len=MAX_SEQUENCE_LEN, num_words=MAX_WORD_NUM)
data = dataLoader.get()

np.random.seed(2137)
np.random.shuffle(data) 

data.shape

(50000, 20)

Now, we need to create embedding matrix (saved in EMBEDDING_MATRIX_PATH). To do this already trained word embedding model is necessary. Embedding matrix will keep vectors for words from vocabulary/tokenizer.



Word embedding model is meant to represent complex relationships between words (i.e. 'female', 'girl', 'woman' - similar concept, but completly different spelling) by placing each word in some N dimensional space.

Here I use 300 dimensional model only for english.


In [None]:
# read word embedding model
# each line consist of word and X normalized values
embeddings_index = {}
f = open(EMBEDDING_PATH, 'r')
for line in f:
    values = line.split()
    word = values[0]
    try:
        coefs = np.asarray(values[1:], dtype='float32')
    except ValueError:
        continue
    embeddings_index[word] = coefs
f.close()

# build embedding matrix
num_words = min(MAX_WORD_NUM, len(dataLoader.tokenizer.word_index))
embedding_matrix = np.zeros((num_words, EMBEDDING_SIZE))
for word, i in dataLoader.tokenizer.word_index.items():
    if i >= MAX_WORD_NUM:
        continue
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector.astype('float32')   
    else:
        embedding_matrix[i] = np.random.normal(-0.25, 0.25, EMBEDDING_SIZE)

np.save(EMBEDDING_MATRIX_PATH, embedding_matrix) # save matrix
embedding_matrix.shape

In case of hardware limitations You might want to use this simple data generator.

Autoencoder(explained below) produces data in one-hot encoding, that's why we need to convert y_train.

Standard encoding:<br>
[0, 3]

one-hot:<br>
[<br>
     &ensp;  [1, 0, 0, 0],<br>
    &ensp;     [0, 0, 0, 1]<br>
]



In [5]:
from tensorflow.keras.utils import Sequence
from tensorflow.keras.utils import to_categorical

# if you need to convert sequence to word embedding form
embedding_matrix = np.load(EMBEDDING_MATRIX_PATH) 
def get_embedding(seq):
    return np.array([embedding_matrix[int(w)] for w in seq])

class Mygenerator(Sequence):
    def __init__(self, data, batch_size):
        self.data = data # in/out data
        self.batch_size = batch_size
    
    def on_epoch_end(self): 
        # shuffling each epoch
        np.random.shuffle(self.data) 

    def __len__(self):
        return int(np.ceil(len(self.data) / float(self.batch_size))) # number of batches

    def __getitem__(self, idx):
        batch_x = self.data[idx * self.batch_size:(idx + 1) * self.batch_size] # batch
        x = batch_x
        y = np.array([to_categorical(seq, num_classes=MAX_WORD_NUM) for seq in batch_x]) # y data in one-hot
        return x, y

**AUTOENCODER**

Autoencoder is a pair of two networks - encoder and decoder - connected with 'bottleneck'. Encoder is meant to represent each sequence in compressed form(i.e. 20 seq with 300D embedding requires 6000 values, this encoder will compress it to 300), while decoder decompresses it.

This autoencoder takes whole sequence and outputs whole sequence at once!

<br>
Encoder: <br>
&ensp; input - sequences of indexes from tokenizer<br>
&ensp; output - compressed vector 

<br>
Decoder: <br>
&ensp; input - compressed vector<br>
&ensp; output - sequences of indexes in one-hot encoding



In [6]:
import keras.backend as K
from keras.layers import *
from keras.models import *
from keras.callbacks import ModelCheckpoint

# ENCODER
# 1D input of indexes from tokenizer
inputs = Input(shape=(MAX_SEQUENCE_LEN,), dtype='int32')

# this layer converts indexes to embedding form using prepared embedding_matrix
e = Embedding(MAX_WORD_NUM, EMBEDDING_SIZE, weights=[embedding_matrix], input_length=MAX_SEQUENCE_LEN, trainable=False)(inputs)

# stacked LSTM layers
# depending on your parametrs you might want to stack more LSTMs
encoded = Bidirectional(LSTM(ENCODED_VECTOR_LEN, return_sequences=True, name="encoder_bi_lstm"), name="encoder_bi")(e)
# encoder output in form of compressed 1D vector
encoded = LSTM(ENCODED_VECTOR_LEN, name="encoder")(encoded)


# DECODER
# decoder input repeats compressed vector to decode whole sequence
decode_input = RepeatVector(MAX_SEQUENCE_LEN, name="decoder_input")(encoded)

# stacked LSTM layers
decode_input = LSTM(EMBEDDING_SIZE, return_sequences=True, name="decoder_lstm")(decode_input)
decoded = Bidirectional(LSTM(EMBEDDING_SIZE, return_sequences=True, name="decoder_bi_lstm"), name="decoder_bi")(decode_input)

# one-hot encoding mapping
# TimeDistributed allows to apply Dense to each word in sequence
decoded = TimeDistributed(Dense(MAX_WORD_NUM, activation='softmax'), name="decoder")(decoded)

autoencoder = Model(inputs, decoded) 

# categorical crossentropy is great to use with one-hot encoding
autoencoder.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])  
autoencoder.summary() 

autoencoder.save(AE_MODEL_PATH) # save untrained model

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 20)]              0         
                                                                 
 embedding (Embedding)       (None, 20, 300)           750000    
                                                                 
 encoder_bi (Bidirectional)  (None, 20, 600)           1442400   
                                                                 
 encoder (LSTM)              (None, 300)               1081200   
                                                                 
 decoder_input (RepeatVector  (None, 20, 300)          0         
 )                                                               
                                                                 
 decoder_lstm (LSTM)         (None, 20, 300)           721200    
                                                             



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


In [None]:
from keras.models import load_model
from keras.callbacks import ModelCheckpoint, CSVLogger

# split data
# if you just want to create autoencoder there's no need for validation set
# here I take small batch for validation to see how well autoencoder responds to unseen data, which will be produced by GAN
split_ind = int(len(data) * 0.96) 
data_train = data[:split_ind,:]
data_val = data[split_ind:,:]
print(data_val.shape)

autoencoder = load_model(AE_MODEL_PATH) # load model
autoencoder.summary()

csv_logger = CSVLogger(HISTORY_PATH.format("AE"), append=True) # log history to file
checkpoint = ModelCheckpoint(filepath=AE_MODEL_PATH, save_best_only=True) # save best model for unseen data

# 64 might be optimal
batch_size = 32

# 50-70 epochs is enough for current parametrs
# depending on vocab size, sentence length etc. I achieved
# >99% on training set, 92-95 % on unseen data from same set
history = autoencoder.fit(x=Mygenerator(data_train, batch_size), epochs=50, validation_data=Mygenerator(data_val, batch_size), callbacks=[checkpoint, csv_logger])

(2000, 20)
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 20)]              0         
                                                                 
 embedding (Embedding)       (None, 20, 300)           750000    
                                                                 
 encoder_bi (Bidirectional)  (None, 20, 600)           1442400   
                                                                 
 encoder (LSTM)              (None, 300)               1081200   
                                                                 
 decoder_input (RepeatVector  (None, 20, 300)          0         
 )                                                               
                                                                 
 decoder_lstm (LSTM)         (None, 20, 300)           721200    
                                                  



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 2/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 3/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 4/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 5/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 6/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 7/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 8/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 9/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 10/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 11/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 12/50



INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


INFO:tensorflow:Assets written to: drive/MyDrive/Colab Notebooks/AEmodel_lstm_2500_cce/assets


Epoch 13/50

In [None]:
from matplotlib import pyplot as plt

# plot loss and accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.savefig(PLOT_PATH.format('AE_accuracy'))
plt.show()


plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.savefig(PLOT_PATH.format('AE_loss'))
plt.show()

**Autoencoder test**

In [None]:
from keras import *
import keras.backend as K
from keras.layers import *
from keras.models import load_model

# load model
def load_autoencoder(path):
      model = load_model(path)

      # split into encoder and decoder
      encoder = Model(model.input, model.get_layer('encoder').output)
      decoder = Model(model.get_layer('decoder_input').input, model.get_layer('decoder').output)

      # add Argmax layer to decoder to convert results to real one-hot encoding
      one_hot_to_index = Lambda(lambda x: K.cast(K.argmax(x), dtype='float32'))(decoder.output)
      decoder = Model(decoder.input, one_hot_to_index)
      return (encoder, decoder, model)

(enc, dec, model) = load_autoencoder(AE_MODEL_PATH)

In [None]:
from tensorflow.keras.utils import to_categorical

# prints input and predicted sentence
def print_test(seq):
    seq = seq.reshape(1, 20)
    predicted = model.predict(seq)
    predicted = np.argmax(predicted, axis=2) # convert one-hot to standard encoding
    
    print("IN SENTENCE {}".format(' '.join(dataLoader.tokenizer.sequences_to_texts(seq))))
    print("PREDICTED SENTENCE {}".format(' '.join(dataLoader.tokenizer.sequences_to_texts(predicted))))
    print("----------------------------")

print_test(data[0])
print_test(data[15])
print_test(data[20])
print_test(data[100])
print_test(data[40000])
print_test(data[40070])
print_test(data[40050])