Building three RNN models for Sentiment Analysis, using three different architectures [Tanh, the traditional architecture, LSTM and GRU, the Gated Unit Architecture]

In [1]:
# importing the required libraries

import pandas as pd
import tensorflow as tf
from tensorflow.python.keras.layers.embeddings import Embedding
import re
import matplotlib.pyplot as plt
import tensorflow as tf
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import json

nltk.download('punkt')
nltk.download('wordnet')

2023-11-16 10:00:02.781115: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2023-11-16 10:00:02.983047: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2023-11-16 10:00:02.983093: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2023-11-16 10:00:02.984566: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2023-11-16 10:00:03.014436: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2023-11-16 10:00:03.015080: I tensorflow/core/platform/cpu_feature_guard.cc:1

True

In [2]:
# model parameters

units = 128
embedding_size = 128
vocab_size = 10000
max_length = 256
learning_rate = 0.01
optimizer = 'rmsprop'
test_size = 0.2
batch_size = 32
buffer_size = 128
epochs = 10
num_classes = 2
data_path = 'IMDB_Dataset.csv'
data_classes = {'negative': 0, 'positive': 1}
data_name = 'review'
label_name = 'sentiment'    

# constants

PADDING='post'
TRUNC='post'
OOV='<OOV>'

In [3]:
# GRU Model

class GRU(tf.keras.layers.Layer):
  '''
    Arguments:
      units (int): hidden dimension 
      inp_shape (int): Embedding dimension 
    Output:
      h_t (Tensor): 
        Current hidden state
        shape=(None, units) 
  '''

  def __init__(self, units, inp_shape):
    super(GRU, self).__init__()
    self.units = units
    self.inp_shape = inp_shape
    self.W = self.add_weight("W", shape=(3, self.units, self.inp_shape))
    self.U = self.add_weight("U", shape=(3, self.inp_shape, self.units))
    
  def call(self, pre_h, x):

    # Update gate: Decide how much the unit updates its activation, or content
    z_t = tf.nn.sigmoid(
        tf.matmul(x, tf.transpose(self.W[0])) + tf.matmul(pre_h, tf.transpose(self.U[0])))

    # Reset gate: Forget the previously state
    r_t = tf.nn.sigmoid(
        tf.matmul(x, tf.transpose(self.W[1])) + tf.matmul(pre_h, tf.transpose(self.U[1])))

    # Current memory content
    h_proposal = tf.nn.tanh(
        tf.matmul(x, tf.transpose(self.W[2])) + tf.matmul(tf.multiply(r_t, pre_h), tf.transpose(self.U[2])))

    # Current hidden state
    h_t = tf.multiply((1 - z_t), pre_h) + tf.multiply(z_t, h_proposal)
    
    return h_t

# Define GRU model
class GRU_RNN(tf.keras.Model):
  """
    Using GRU cell and Dense layers for training model
  """
  def __init__(self, units, embedding_size, vocab_size, input_length, num_class):
    super(GRU_RNN, self).__init__()
    self.input_length = input_length
    self.units = units
    self.num_class = num_class

    # Embedding
    self.embedding = tf.keras.layers.Embedding(
      vocab_size,
      embedding_size,
      input_length=input_length
    )

    # Using gru cell
    self.model = GRU(units, embedding_size)

    # Pass each hidden state through Rnn basic
    self.classification_layer = tf.keras.models.Sequential([
        tf.keras.layers.Dense(32, input_shape=(units,), activation="relu"),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(num_class, activation='softmax')
    ])

  def call(self, sentence):
    """
      parameters: sentence need to trained
        type: Tensor
        shape: ( batch_size, input_length)

      return: Predition by lastest of model
        type: Tensor
        shape: (batch_size,1)
    """
    batch_size = tf.shape(sentence)[0]

    # Initial hidden_state
    pre_h = tf.zeros([batch_size, self.units])

    # embedded_sentence: (batch_size, input_length, embedding_size)
    embedded_sentence = self.embedding(sentence)
    
    for i in range(self.input_length):
      word = embedded_sentence[:, i, :]
      pre_h = self.model(pre_h, word)
    
    h = pre_h

    # Predition by lastest hidden_state
    output = self.classification_layer(h)
    # print("===output_layer===", output)

    return output

In [4]:
# LSTM Model

class LSTM(tf.keras.layers.Layer):
    """
        Scratch LSTM with the equations by modifying the original LSTM tensorflow model
    """
    def __init__(self, units, inp_shape):
        super(LSTM, self).__init__()
        self.units = units
        self.inp_shape = inp_shape
        self.W = self.add_weight("W", shape=(4, self.units, self.inp_shape))
        self.U = self.add_weight("U", shape=(4, self.units, self.units))

    def call(self, pre_layer, x):
        pre_h, pre_c = tf.unstack(pre_layer)

        # Control the input values :  Input Gate:
        i_t = tf.nn.sigmoid(tf.matmul(x, tf.transpose(self.W[0])) + tf.matmul(pre_h, tf.transpose(self.U[0])))

        # Control the numbers of data need to keep: Forget Gate
        f_t = tf.nn.sigmoid(tf.matmul(x, tf.transpose(self.W[1])) + tf.matmul(pre_h, tf.transpose(self.U[1])))

        # Control the numbers of data in output: Output Gate
        o_t = tf.nn.sigmoid(tf.matmul(x, tf.transpose(self.W[2])) + tf.matmul(pre_h, tf.transpose(self.U[2])))

        # New memory for new information
        n_c_t = tf.nn.tanh(tf.matmul(x, tf.transpose(self.W[3])) + tf.matmul(pre_h, tf.transpose(self.U[3])))

        # Combination between storing information and new information
        c = tf.multiply(f_t, pre_c) + tf.multiply(i_t, n_c_t)

        # How information are allowed to be output of cell
        h = tf.multiply(o_t, tf.nn.tanh(c))

        return tf.stack([h, c])
         

class LSTM_RNN(tf.keras.Model):
    """
        Using LSTM cell and Dense layers for training model
    """

    def __init__(self, units, embedding_size, vocab_size, input_length, num_class):
        super(LSTM_RNN,self).__init__()
        self.input_length = input_length
        self.units = units

        self.embedding = tf.keras.layers.Embedding(
            vocab_size,
            embedding_size,
            input_length=input_length
        )

        self.LSTM = LSTM(units, embedding_size)

        self.classification_layer = tf.keras.models.Sequential([
            tf.keras.layers.Dense(32, input_shape=(units,), activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(num_class, activation='softmax')
        ])
        
    def call(self, sentence):
        """
        param: sentence need to trained
            type: Tensor
            shape: ( batch_size, input_length)

        return: Output predicted by the model
            type: Tensor
            shape: (batch_size,1)
        """
        batch_size = tf.shape(sentence)[0]

        # create hidden_state and context_state
        pre_layer = tf.stack([
            tf.zeros([batch_size, self.units]),
            tf.zeros([batch_size, self.units])
        ])

        # Put sentence into Embedding
        embedded_sentence = self.embedding(sentence)

        # Use LSTM with every single word in sentence
        for i in range (self.input_length):
            word = embedded_sentence[:, i, :]
            pre_layer = self.LSTM(pre_layer, word)

        # Take the last hidden _state
        h, _ = tf.unstack(pre_layer)

        # Using last hidden_state for for predicting or other processing
        return self.classification_layer(h)

In [5]:
# tanh Model (Traditional RNN)

class Tanh(tf.keras.layers.Layer):
    """
        Using traditional RNN but the bounded function is a tanh function
    """
    def __init__(self, units, inp_shape):
        super(Tanh,self).__init__()
        self.units = units
        self.inp_shape = inp_shape
        self.W = self.add_weight("W", shape=(1, self.units, self.inp_shape))
        self.U = self.add_weight("U", shape=(1, self.units, self.units))

    def call(self, pre_layer, x):
        # pre_h, pre_c = tf.unstack(pre_layer)
        h = tf.nn.tanh(tf.matmul(x, tf.transpose(self.W[0])) + tf.matmul(pre_layer, tf.transpose(self.U[0])))
        return h
    
class Tanh_RNN(tf.keras.Model):
    """
        Using Tanh and Dense layers for training model
    """

    def __init__(self, units, embedding_size, vocab_size, input_length, num_class):
        super(Tanh_RNN,self).__init__()
        self.input_length = input_length
        self.units = units

        self.embedding = tf.keras.layers.Embedding(
            vocab_size,
            embedding_size,
            input_length=input_length
        )

        self.model = Tanh(units, embedding_size)

        self.classification_layer = tf.keras.models.Sequential([
            tf.keras.layers.Dense(32, input_shape=(units,), activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(num_class, activation='softmax')
        ])

    def call(self, sentence):
        """
        param: sentence need to trained
            type: Tensor
            shape: ( batch_size, input_length)

        return: Output predicted by the model
            type: Tensor
            shape: (batch_size,1)
        """
        batch_size = tf.shape(sentence)[0]

        # create hidden_state and context_state
        pre_layer = tf.zeros([batch_size, self.units])

        # Put sentence into Embedding
        embedded_sentence = self.embedding(sentence)

        # Use Tanh with every single word in sentence
        for i in range(self.input_length):
            word = embedded_sentence[:, i, :]
            pre_layer = self.model(pre_layer, word)

        # Using last hidden_state for for predicting or other processing
        return self.classification_layer(pre_layer)

In [6]:
# Data Class

class Dataset:
  def __init__(self, data_path, vocab_size, data_classes):
    self.data_path = data_path
    self.vocab_size = vocab_size
    self.data_classes = data_classes
    self.sentences_tokenizer = None
    self.label_dict = None

  def labels_encode(self, labels, data_classes):
    '''Encode labels to categorical'''
    labels.replace(data_classes, inplace=True)

    labels_target = labels.values
    labels_target = tf.keras.utils.to_categorical(labels_target)

    return labels_target
  
  def removeHTML(self, text):
    '''Remove html tags from a string'''
    clean = re.compile('<.*?>')
    return re.sub(clean, '', text)
  
  def removePunc(self, text):
        #Remove punction in a texts
        return re.sub(r'[^\w\s]','', text)
  
  def removeURLs(self, text):
        #Remove url link in texts
        return re.sub(r'^https?:\/\/.*[\r\n]*', '', text, flags=re.MULTILINE)

  def removeEmoji(self, data):
        #Each emoji icon has their unique code
        #Gather all emoji icon code and remove it in texts
        cleanr= re.compile("["
                           u"\U0001F600-\U0001F64F"  
                           u"\U0001F300-\U0001F5FF"
                           u"\U0001F680-\U0001F6FF"  
                           u"\U0001F1E0-\U0001F1FF"  
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
        return re.sub(cleanr, '',data)

  def sentence_cleaning(self, sentence):
    '''Cleaning text'''
    out_sentence = []
    for line in tqdm(sentence):
      line = self.removeHTML(line)
      line = self.removePunc(line)
      line = self.removeURLs(line)
      line = self.removeEmoji(line)
      text = re.sub("[^a-zA-Z]", " ", line)
      word = word_tokenize(text.lower())

      lemmatizer = WordNetLemmatizer()

      lemm_word = [lemmatizer.lemmatize(i) for i in word]

      out_sentence.append(lemm_word)
    return (out_sentence)

  def data_processing(self, sentences, labels):
    '''Preprocessing both text and labels'''
    print("|--data_processing ...")
    sentences = self.sentence_cleaning(sentences)
    labels = self.labels_encode(labels, data_classes=self.data_classes)
    
    return sentences, labels

  def build_tokenizer(self, sentences, vocab_size, char_level=False):
    print("|--build_tokenizer ...")
    tokenizer = tf.keras.preprocessing.text.Tokenizer(
        num_words= vocab_size, oov_token=OOV, char_level=char_level)
    tokenizer.fit_on_texts(sentences)

    return tokenizer

  def tokenize(self, tokenizer, sentences, max_length):
    print("|--tokenize ...")
    sentences = tokenizer.texts_to_sequences(sentences)
    sentences = tf.keras.preprocessing.sequence.pad_sequences(sentences, maxlen=max_length,
                                                              padding=PADDING, truncating=TRUNC)
    
    return sentences

  def load_dataset(self, max_length, data_name, label_name):
    print(" ")
    print("Load dataset ... ")
    datastore = pd.read_csv(self.data_path)
    print(datastore.head())
    sentences = datastore[data_name]
    labels = datastore[label_name]

    # Cleaning
    sentences, labels = self.data_processing(sentences, labels)
        
    # Tokenizing
    self.sentences_tokenizer = self.build_tokenizer(sentences, self.vocab_size)
    tensor = self.tokenize(
        self.sentences_tokenizer, sentences, max_length)

    print("Done! Next to ... ")
    print(" ")

    # Saving label dict
    with open('label.json', 'w') as f:
        json.dump(self.label_dict, f)
        
    return tensor, labels
                                                                  
  def build_dataset(self, max_length=128, test_size=0.2, buffer_size=128, batch_size=128, data_name='review', label_name='sentiment'):
    sentences, labels = self.load_dataset(
        max_length, data_name, label_name)

    X_train, X_val, y_train, y_val = train_test_split(
        sentences, labels, test_size=test_size, stratify=labels, random_state=42)

    # Convert to tensor
    train_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(
        X_train, dtype=tf.int64), tf.convert_to_tensor(y_train, dtype=tf.int64)))
    train_dataset = train_dataset.shuffle(buffer_size).batch(batch_size)

    val_dataset = tf.data.Dataset.from_tensor_slices((tf.convert_to_tensor(
        X_val, dtype=tf.int64), tf.convert_to_tensor(y_val, dtype=tf.int64)))
    val_dataset = val_dataset.shuffle(buffer_size).batch(batch_size)
   
    return train_dataset, val_dataset

In [7]:
# prepare the data

dataset = Dataset(data_path, vocab_size, data_classes=data_classes)

train_ds, val_ds = dataset.build_dataset(max_length, test_size, buffer_size, batch_size, data_name, label_name)
sentences_tokenizer = dataset.sentences_tokenizer
sentences_tokenizer_size = len(sentences_tokenizer.word_counts) + 1
    
# Initializing variables
input_length = max_length

 
Load dataset ... 
                                              review sentiment
0  One of the other reviewers has mentioned that ...  positive
1  A wonderful little production. <br /><br />The...  positive
2  I thought this was a wonderful way to spend ti...  positive
3  Basically there's a family where a little boy ...  negative
4  Petter Mattei's "Love in the Time of Money" is...  positive
|--data_processing ...


100%|██████████| 50000/50000 [01:16<00:00, 649.82it/s]


|--build_tokenizer ...
|--tokenize ...
Done! Next to ... 
 


In [8]:
# initializing the models

gru_model = GRU_RNN(units, embedding_size, sentences_tokenizer_size, input_length, num_classes)
lstm_model = LSTM_RNN(units, embedding_size, sentences_tokenizer_size, input_length, num_classes)
tanh_model = Tanh_RNN(units, embedding_size, sentences_tokenizer_size, input_length, num_classes)

In [9]:
# set up the loss function and optimizer

loss_object = tf.keras.losses.CategoricalCrossentropy(name='categrical_crossentropy')
optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate, name='rmsprop')

In [10]:
# Compile optimizer and loss function into model

metrics = ['accuracy', 'mse']

gru_model.compile(optimizer=optimizer, loss=loss_object, metrics=metrics)
lstm_model.compile(optimizer=optimizer, loss=loss_object, metrics=metrics)
tanh_model.compile(optimizer=optimizer, loss=loss_object, metrics=metrics)

In [11]:
# Callbacks: Early stopping

early_stopping = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=2)

checkpoint_path = "cp.ckpt"
checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, verbose=1, save_weights_only=True, period=5)



In [12]:
# Define the TensorBoard callback
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir='./logs', histogram_freq=1)

# Train the models with reduced steps_per_epoch
gru_history = gru_model.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[early_stopping, checkpoint, tensorboard_callback], verbose=1, steps_per_epoch=10, batch_size=batch_size)
lstm_history = lstm_model.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[early_stopping, checkpoint, tensorboard_callback], verbose=1, steps_per_epoch=10, batch_size=batch_size)
tanh_history = tanh_model.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[early_stopping, checkpoint, tensorboard_callback], verbose=1, steps_per_epoch=10, batch_size=batch_size)

# Plot losses
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(gru_history.history['loss'], label='GRU Training Loss')
plt.plot(gru_history.history['val_loss'], label='GRU Validation Loss')
plt.plot(lstm_history.history['loss'], label='LSTM Training Loss')
plt.plot(lstm_history.history['val_loss'], label='LSTM Validation Loss')
plt.plot(tanh_history.history['loss'], label='Tanh Training Loss')
plt.plot(tanh_history.history['val_loss'], label='Tanh Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot accuracies
plt.subplot(1, 2, 2)
plt.plot(gru_history.history['accuracy'], label='GRU Training Accuracy')
plt.plot(gru_history.history['val_accuracy'], label='GRU Validation Accuracy')
plt.plot(lstm_history.history['accuracy'], label='LSTM Training Accuracy')
plt.plot(lstm_history.history['val_accuracy'], label='LSTM Validation Accuracy')
plt.plot(tanh_history.history['accuracy'], label='Tanh Training Accuracy')
plt.plot(tanh_history.history['val_accuracy'], label='Tanh Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.show()

Epoch 1/10




: 