<a href="https://colab.research.google.com/github/AlexSbb/Text-Generation-with-RNN-and-GAN/blob/main/CS13ColabNotebooks/RNN_Autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import all necessary libraries:

In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
import time
import math
import random
print('TensorFlow version (should be 2.4.1):  ',tf.__version__)

TensorFlow version (should be 2.4.1):   2.4.1


# Open, read and process the text file
In our case the we use the log file from an Elevator Group Control System simulation - "ElevatorLogFile.txt"

In [3]:
PATH_TO_FILE = "/content/drive/MyDrive/Colab Notebooks/CS13ColabNotebooks/ElevatorLogFile.txt"

# Open and read the whole txt file
def open_and_read_test_file (path :str):
    with open(path) as text_file:
        text = text_file.read()
    print('Number of characters in the log file:', len(text))
    print('Number of words in the log file (with whitespace as a splitter):', len(text.split()))
    return text

file_text = open_and_read_test_file(PATH_TO_FILE)

Number of characters in the log file: 73037
Number of words in the log file (with whitespace as a splitter): 9010


Creating a dictionary from a text file. 
We will use a character-level dictionary. To do this, you need to count the number of unique characters in the text file. Each character is then assigned a unique sequence number. As a result, we will get two dictionaries, one for converting characters to numbers, and the second for reverse conversion.

In [5]:
def dictionary_from_text(text:str):
  set_of_uniq_ch = set(text) # set of uniq characters from text
  print('Number of uniq characters in the log file:', len(set_of_uniq_ch))
  char_to_index = {char:index for index, char in enumerate(set_of_uniq_ch)}
  index_to_char = {index:char for char,index in char_to_index.items()}
  return char_to_index, index_to_char

char_to_index, index_to_char =  dictionary_from_text(file_text)

Number of uniq characters in the log file: 54


In order to make it convenient to use dictionaries, I will create two auxiliary functions, for converting text to numbers and for reverse conversion.

In [6]:
def text_to_numbers(dictionary: dict, text: str):
  numbers = [dictionary[char] for char in text]
  return numbers

def numbers_to_text(dictionary: dict, numbers: list):
  text =""
  for num in numbers:
    # text += str(dictionary[num])
    text += dictionary[num]
  return text

Now we can easily convert our entire text file to numbers

In [7]:
whole_text_as_numbers = text_to_numbers(char_to_index, file_text)

# Test:
first_64_numbers = whole_text_as_numbers[0:64]
print("First 64 numberst from 'whole_text_as_numbers':")
print(first_64_numbers)
# test reverse conversion
reverse_conversion = numbers_to_text(index_to_char, first_64_numbers)
print("Conversin from numbers to text:")
print(reverse_conversion)

First 64 numberst from 'whole_text_as_numbers':
[42, 0, 30, 25, 37, 1, 40, 36, 37, 15, 15, 17, 36, 33, 42, 0, 30, 25, 37, 1, 40, 36, 37, 15, 15, 17, 36, 30, 36, 17, 16, 41, 49, 38, 38, 38, 29, 14, 0, 30, 43, 16, 35, 35, 17, 1, 6, 17, 36, 33, 42, 0, 30, 44, 17, 1, 17, 36, 16, 40, 26, 1, 6, 30]
Conversin from numbers to text:
0: Controller_0: Controller ready...
1: Passenger_0: Generating 


In [15]:
#Split the input sequence into x and y, where x and y have the same shape
def split_to_x_y(sequence: list, x_length: int = 128, step: int = 3):
    x = []
    y = []
    for i in range(0, len(sequence) - x_length, step):
        x.append(np.array(sequence[i: i + x_length]) )
        y.append(np.array(sequence[i+1:i+1 + x_length]))
    print("Number of sequences:", len(x))
    x = np.array(x)
    y = np.array(y)   
    print('x.shape=', x.shape)
    print('y.shape=', y.shape)
    return x , y

# Left one last element in a sequence
def take_last_element(original_y: np.array):
  y_1 = [y[-1] for y in original_y]
  y_1 = np.array(y_1) 
  print('y_1.shape=', y_1.shape)
  return y_1

sequence_length = 32
x, y = split_to_x_y(whole_text_as_numbers, x_length = sequence_length, step = 3)
# y_1 = take_last_element(y)

Number of sequences: 24335
x.shape= (24335, 32)
y.shape= (24335, 32)


# One-hot encoding

In [9]:
dictionary_lenth = len(index_to_char)
one_hot_matrix = np.eye(dictionary_lenth)
print('one_hot_matrix.shape=', one_hot_matrix.shape)

def array_of_seq_to_one_hot(array_of_seq:np.array, matrix: np.array):
  one_hot_seq = np.array(
    [np.array([matrix[:,ind] for ind in seq]) for seq in array_of_seq]
    )
  return one_hot_seq

def array_of_int_to_one_hot(array_of_int:np.array, matrix: np.array):
  one_hot_seq = np.array(
    np.array([matrix[:,ind] for ind in array_of_int])
    )
  return one_hot_seq

one_hot_x = array_of_seq_to_one_hot(x,one_hot_matrix)
one_hot_y = array_of_seq_to_one_hot(y,one_hot_matrix)

# one_hot_y_1 = array_of_int_to_one_hot(y_1,one_hot_matrix)

print('one_hot_x.shape:', one_hot_x.shape, "# (batch_size, sequence_length, vocab_size)")
print('one_hot_y.shape:', one_hot_y.shape)
# print('one_hot_y_1.shape:', one_hot_y_1.shape)

one_hot_matrix.shape= (54, 54)
one_hot_x.shape: (24335, 32, 54) # (batch_size, sequence_length, vocab_size)
one_hot_y.shape: (24335, 32, 54)


# RNN-Autoencoder

## Encoder

In [10]:
class Encoder(tf.keras.Model):
  def __init__(self,):
    super(Encoder, self).__init__()

    self.RNN_1 = tf.keras.layers.GRU(128, return_sequences=True)
    self.RNN_2 = tf.keras.layers.GRU(64, return_sequences=True)
    self.RNN_3 = tf.keras.layers.GRU(16, return_state=True)

  def call(self, x):
    x = self.RNN_1(x)
    x = self.RNN_2(x)
    whole_seq_output, final_state = self.RNN_3(x)
    z = tf.concat([whole_seq_output, final_state],1) # Output and state fron last time step
    return z, final_state

### Test Encoder shapes (dimentions)

In [11]:
text_encoder = Encoder()

In [14]:
test_encoder_input = one_hot_x[1:2]
print('test_encoder_input spahe:',test_encoder_input.shape)
test_encoder_output_z, test_encoder_output_final_state = text_encoder(test_encoder_input)
print('test_encoder_output_z shape:', test_encoder_output_z.shape)
print('test_encoder_output_final_state shape:', test_encoder_output_final_state.shape)

test_encoder_input spahe: (1, 32, 54)
test_encoder_output_z shape: (1, 32)
test_encoder_output_final_state shape: (1, 16)


In [16]:
decoder_input = tf.reshape(test_encoder_output_z ,[test_encoder_output_z .shape[0],1,test_encoder_output_z.shape[1]])
print('decoder_input shape v1:', decoder_input.shape)

decoder_input = tf.repeat(decoder_input,[sequence_length],axis=1)
decoder_input.shape

decoder_input shape v1: (1, 1, 32)


TensorShape([1, 32, 32])

## Decoder

In [47]:
class Decoder(tf.keras.Model):
  def __init__(self,dictionary_lenth=54):
    super(Decoder, self).__init__()

    self.RNN_1 = tf.keras.layers.GRU(16, return_sequences=True)
    self.RNN_2 = tf.keras.layers.GRU(64, return_sequences=True)
    self.RNN_3 = tf.keras.layers.GRU(128, return_sequences=True)
    self.Dense = tf.keras.layers.Dense(dictionary_lenth, activation="softmax") 

  def call(self, x, inital_state = None):
    decoder_input = tf.reshape(x ,[tf.shape(x)[0], 1, tf.shape(x)[1] ])
    decoder_input = tf.repeat(decoder_input, x.shape[1]  ,axis=1)
    x = self.RNN_1(decoder_input, inital_state)
    x = self.RNN_2(x)
    x = self.RNN_3(x)
    z= self.Dense(x)
    return z

### Test Decoder shapes (dimentions)

In [48]:
test_decoder = Decoder(dictionary_lenth=dictionary_lenth)
test_decoder_output = test_decoder(test_encoder_output_z)
test_decoder_output.shape

TensorShape([1, 32, 54])

# Autoencoder

In [49]:
class Autoencoder(tf.keras.Model):
  def __init__(self,):
    super(Autoencoder, self).__init__() 
    self.encoder = Encoder()
    self.decoder = Decoder()

  def call(self, x):
    encoded,state = self.encoder(x)
    decoded = self.decoder(encoded,state)
    return decoded

# tf.keras.layers.Dropout(0.2),

In [50]:
autoencoder = Autoencoder()



### Test Autoencoder shapes (dimentions)

In [52]:
z,s = autoencoder.encoder(one_hot_x[0:16])
print('z shape:', z.shape)

z shape: (16, 32)


In [55]:
autoencoder_output = autoencoder.decoder(z,s)
autoencoder_output.shape

TensorShape([16, 32, 54])

### Fit Autoencoder

In [76]:
autoencoder.compile(optimizer='adam', loss=tf.losses.categorical_crossentropy)
autoencoder.fit(x = one_hot_x,y = one_hot_x, epochs=15)



<tensorflow.python.keras.callbacks.History at 0x7fbbc4285ed0>

### Test Autoencoder

#### Encode a text to a laten vector and cell state

In [77]:
sample_text = file_text[0:32]
converted_sample_text = text_to_numbers(char_to_index, sample_text)
one_hot_sample_text = np.array([array_of_int_to_one_hot(converted_sample_text, one_hot_matrix)])
encoder_input = one_hot_sample_text
print("encoder_input shape", encoder_input.shape)

encoder_output,state = autoencoder.encoder(one_hot_sample_text)
print("encoder output shape", encoder_output.shape)

encoder_input shape (1, 32, 54)
encoder output shape (1, 32)


#### Decode text from the the laten vector

In [78]:
decoder_output = autoencoder.decoder(encoder_output,state)
print('Decoder output shape', decoder_output.shape)

Decoder output shape (1, 32, 54)


#### Convert softmax output to numbers and numbers to text

In [79]:
pred_array = tf.squeeze(decoder_output).numpy()
temp_texp = '' 
for vec in pred_array:
  ind = np.array(np.argmax(vec), ndmin=1 )
  char = numbers_to_text(index_to_char, ind)
  # print(char)
  temp_texp +=char

print('Original text:', sample_text)
print('Decoded text:', temp_texp)

Original text: 0: Controller_0: Controller read
Decoded text: :: Controller_0: Clreeol errrraa
