<a href="https://colab.research.google.com/github/HuyenNguyenHelen/LING-5412/blob/main/Assignment5_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import tensorflow_datasets as tfds
import tensorflow as tf
import torch
import sklearn
from sklearn.metrics import accuracy_score, classification_report
tfds.disable_progress_bar()
from keras import backend as K


In [2]:
# Setting hypermeters
batch_size = 32
units = 64
max_length = 120
n_epochs = 5

In [3]:
# Get the GPU device name.
device_name = tf.test.gpu_device_name()

# The device name should look like the following:
if device_name == '/device:GPU:0':
    print('Found GPU at: {}'.format(device_name))
else:
    raise SystemError('GPU device not found')

# If there's a GPU available...
if torch.cuda.is_available():    

    # Tell PyTorch to use the GPU.    
    device = torch.device("cuda")

    print('There are %d GPU(s) available.' % torch.cuda.device_count())

    print('We will use the GPU:', torch.cuda.get_device_name(0))

# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

Found GPU at: /device:GPU:0
There are 1 GPU(s) available.
We will use the GPU: Tesla K80


# Loading the dataset

In [4]:
dataset, info = tfds.load('imdb_reviews', with_info=True, as_supervised=True)
train_dataset, test_dataset = dataset['train'], dataset['test']

train_dataset.element_spec

[1mDownloading and preparing dataset imdb_reviews/plain_text/1.0.0 (download: 80.23 MiB, generated: Unknown size, total: 80.23 MiB) to /root/tensorflow_datasets/imdb_reviews/plain_text/1.0.0...[0m
Shuffling and writing examples to /root/tensorflow_datasets/imdb_reviews/plain_text/1.0.0.incompleteP1C28C/imdb_reviews-train.tfrecord
Shuffling and writing examples to /root/tensorflow_datasets/imdb_reviews/plain_text/1.0.0.incompleteP1C28C/imdb_reviews-test.tfrecord
Shuffling and writing examples to /root/tensorflow_datasets/imdb_reviews/plain_text/1.0.0.incompleteP1C28C/imdb_reviews-unsupervised.tfrecord




[1mDataset imdb_reviews downloaded and prepared to /root/tensorflow_datasets/imdb_reviews/plain_text/1.0.0. Subsequent calls will reuse this data.[0m


(TensorSpec(shape=(), dtype=tf.string, name=None),
 TensorSpec(shape=(), dtype=tf.int64, name=None))

In [5]:
# Shuffling the dataset
buffer_size = 10000
train_dataset = train_dataset.shuffle(buffer_size).batch(batch_size) #.prefetch (tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size) #.prefetch(tf.data.AUTOTUNE)

# Building the model

### Representing the text

In [6]:
## Representing the  text
vocab_size = 10000
encoder = tf.keras.layers.TextVectorization(max_tokens=vocab_size)
encoder.adapt(train_dataset.map(lambda x,y: x))

# Store vocabulary
vocab = np.array(encoder.get_vocabulary())
vocab[:20]

# # Defining a function for fitting vectorizer function/layer to vectorize text (review)
# def fitting_vectorizer (text, label):
#   text = tf.expand_dims(text, -1)
#   return encoder (text), label

# # storing text batch and label batch
# text_batch, label_batch = next(iter(train_dataset))

# # ## print an instance with vectorized review and label for observing
# # print ('REVIEW:', text_batch[0])
# # print('LABEL:', raw_train.class_names[label_batch[0]] )


array(['', '[UNK]', 'the', 'and', 'a', 'of', 'to', 'is', 'in', 'it', 'i',
       'this', 'that', 'br', 'was', 'as', 'for', 'with', 'movie', 'but'],
      dtype='<U17')

## Vanilla Bidirectional LSTM 

In [7]:
# Defining an evaluation metric function
def printing_eval_scores (y_true, y_pred, report=''):
  accuracy = sklearn.metrics.accuracy_score(y_true, y_pred)
  precision = sklearn.metrics.precision_score(y_true, y_pred, average='binary')
  recall = sklearn.metrics.recall_score(y_true, y_pred, average='binary')
  f1 = sklearn.metrics.f1_score(y_true, y_pred , average='binary')
  print('accuracy score: {:.3f}'.format(accuracy))
  print('precision score: {:.3f}'.format(precision))
  print('recall score: {:.3f}'.format(recall))
  print('F1 score: {:.3f}'.format(f1))
  if report is True:
    print(classification_report(y_true, y_pred))
  else:
    pass
  return accuracy, precision, recall, f1

### With different embedding sizes

In [9]:
# Creating the model
embedding_sizes = [32,64,128]
for size in embedding_sizes:
  print ("\n========= embedding vectors'size= %s ============" %size)
  model = tf.keras.Sequential([encoder,
                              tf.keras.layers.Embedding(
                                  input_dim = len(encoder.get_vocabulary()),
                                  output_dim = size,
                                  mask_zero = True),
                              tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units)),
                              tf.keras.layers.Dense(64, activation = 'relu'),
                              tf.keras.layers.Dense(1, activation = 'sigmoid')]) 
  print(model.summary())
  # Compile the model for training
  model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
                optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
                metrics = ['accuracy'])
  # Training the model
  history = model.fit (train_dataset,
                      epochs = n_epochs, 
                      validation_data = test_dataset,
                      validation_steps = 30)
  # testing the model
  ### pred_label = tf.argmax(model.predict(test),1)
  pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
  true_label = np.concatenate([y for x, y in test_dataset], axis=0)

  test_loss, test_acc = model.evaluate (test_dataset)
  # print('Test loss: ', test_loss)
  # print('Test acurracy: ', test_acc)
  print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
  printing_eval_scores (true_label, pred_label, report=True)




Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 text_vectorization (TextVec  (None, None)             0         
 torization)                                                     
                                                                 
 embedding_1 (Embedding)     (None, None, 32)          320000    
                                                                 
 bidirectional_1 (Bidirectio  (None, 128)              49664     
 nal)                                                            
                                                                 
 dense_2 (Dense)             (None, 64)                8256      
                                                                 
 dense_3 (Dense)             (None, 1)                 65        
                                                                 
Total params: 377,985
Trainable params: 377,985
Non-t

### With different vocabulary size

In [10]:
vocab_sizes = [5000, 7000, 10000]
for size in vocab_sizes:
  print ("\n========= vocabulary size = %s ============" %size)
  encoder = tf.keras.layers.TextVectorization(max_tokens=size)
  encoder.adapt(train_dataset.map(lambda x,y: x))
  # Store vocabulary
  vocab = np.array(encoder.get_vocabulary())

  model = tf.keras.Sequential([encoder,
                              tf.keras.layers.Embedding(
                                  input_dim = len(encoder.get_vocabulary()),
                                  output_dim = 64,
                                  mask_zero = True),
                              tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units)),
                              tf.keras.layers.Dense(32, activation = 'relu'),
                              tf.keras.layers.Dense(1, activation = 'sigmoid')]) 
  print(model.summary())
  # Compile the model for training
  model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
                optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
                metrics = ['accuracy'])
  # Training the model
  history = model.fit (train_dataset,
                      epochs = n_epochs, 
                      validation_data = test_dataset,
                      validation_steps = 30)
  # testing the model
  ### pred_label = tf.argmax(model.predict(test),1)
  pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
  true_label = np.concatenate([y for x, y in test_dataset], axis=0)

  test_loss, test_acc = model.evaluate (test_dataset)
  # print('Test loss: ', test_loss)
  print('Test acurracy: ', test_acc)
  print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
  printing_eval_scores (true_label, pred_label, report=True)


Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 text_vectorization_1 (TextV  (None, None)             0         
 ectorization)                                                   
                                                                 
 embedding_4 (Embedding)     (None, None, 64)          320000    
                                                                 
 bidirectional_4 (Bidirectio  (None, 128)              66048     
 nal)                                                            
                                                                 
 dense_8 (Dense)             (None, 32)                4128      
                                                                 
 dense_9 (Dense)             (None, 1)                 33        
                                                                 
Total params: 390,209
Trainable params: 390,209
Non-t

### With different optimizers

In [None]:
# configure the model uisng optimizer and loss function
print(model.summary())

optimizers = ['adagrad', 'rmsprop', 'adam']
for opt in optimizers:
  print( '\n========== optimizer = %s' %opt)
  # Compile the model for training
  model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
                optimizer = opt,
                metrics = ['accuracy'])
  # Training the model
  history = model.fit (train_dataset,
                      epochs = n_epochs, 
                      validation_data = test_dataset,
                      validation_steps = 30)
  # testing the model
  pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
  true_label = np.concatenate([y for x, y in test_dataset], axis=0)

  test_loss, test_acc = model.evaluate (test_dataset)
  # print('Test loss: ', test_loss)
  print('Test acurracy: ', test_acc)
  print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
  printing_eval_scores (true_label, pred_label, report=True)

Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 text_vectorization_3 (TextV  (None, None)             0         
 ectorization)                                                   
                                                                 
 embedding_6 (Embedding)     (None, None, 64)          640000    
                                                                 
 bidirectional_6 (Bidirectio  (None, 128)              66048     
 nal)                                                            
                                                                 
 dense_12 (Dense)            (None, 32)                4128      
                                                                 
 dense_13 (Dense)            (None, 1)                 33        
                                                                 
Total params: 710,209
Trainable params: 710,209
Non-tr

### Replacing with LSTM with GRU 

In [None]:
model = tf.keras.Sequential([encoder,
                              tf.keras.layers.Embedding(
                                  input_dim = len(encoder.get_vocabulary()),
                                  output_dim = 32,
                                  mask_zero = True),
                              tf.keras.layers.GRU(64,
                                                  activation = 'tanh',
                                                  recurrent_activation = 'sigmoid',
                                                  recurrent_dropout = 0.0,
                                                  use_bias = True),
                              tf.keras.layers.Dense(32, activation = 'relu'),
                              tf.keras.layers.Dense(1, activation = 'sigmoid')]) 
print(model.summary())

# Compile the model for training
print( '\nTraining GRU model...')
model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
              optimizer = 'adagrad',
              metrics = ['accuracy'])
# Training the model
history = model.fit (train_dataset,
                    epochs = n_epochs, 
                    validation_data = test_dataset,
                    validation_steps = 30)
# testing the model
### pred_label = tf.argmax(model.predict(test),1)
pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
true_label = np.concatenate([y for x, y in test_dataset], axis=0)

test_loss, test_acc = model.evaluate (test_dataset)
print('Test acurracy: ', test_acc)
print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
printing_eval_scores (true_label, pred_label, report=True)

### With an average of all hidden states to fully connected layer

In [None]:
#### Creating the model ########
input_dim = len(encoder.get_vocabulary())
_input = tf.keras.Input(shape = (1,), dtype=tf.string)

## Block 1
x = encoder (_input) 
x = tf.keras.layers.Embedding(input_dim = input_dim,output_dim = 64, mask_zero = True) (x)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units,  return_sequences = True)) (x)
x = tf.keras.layers.Dense(32, activation = 'relu') (x)
x = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1)) (x)
x = tf.keras.layers.GlobalAveragePooling1D() (x)

## output layer
output = tf.keras.layers.Dense(1, activation = 'sigmoid') (x)

## combine in one
model = tf.keras.Model(_input,output)
print(model.summary())

## Compile the model for training
model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
              optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
              metrics = ['accuracy'])

# Training the model
history = model.fit (train_dataset,
                    epochs = n_epochs, 
                    validation_data = test_dataset,
                    validation_steps = 30)
# testing the model
### pred_label = tf.argmax(model.predict(test),1)
pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
true_label = np.concatenate([y for x, y in test_dataset], axis=0)

test_loss, test_acc = model.evaluate (test_dataset)
print('Test acurracy: ', test_acc)
print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
printing_eval_scores (true_label, pred_label, report=True)


## Stacked bidirectional LSTM

In [None]:
####   Creating the model  ####

input_dim = len(encoder.get_vocabulary())
_input = tf.keras.Input(shape = (1,), dtype=tf.string)

## Block 1
x = encoder (_input) 
x = tf.keras.layers.Embedding(input_dim = input_dim,output_dim = 64, mask_zero = True) (x)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units,  return_sequences = True)) (x)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units)) (x)   #return_sequences = True
x = tf.keras.layers.Dense(64, activation = 'relu') (x)

## output layer
output = tf.keras.layers.Dense(1, activation = 'sigmoid') (x)

## combine in one
model = tf.keras.Model(_input,output)

### Compile the model for training
model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
              optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
              metrics = ['accuracy'])

#### Training the model
history = model.fit (train_dataset,
                    epochs = n_epochs, 
                    validation_data = test_dataset,
                    validation_steps = 30)
##### testing the model
### pred_label = tf.argmax(model.predict(test),1)
pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
true_label = np.concatenate([y for x, y in test_dataset], axis=0)

test_loss, test_acc = model.evaluate (test_dataset)
print('Test acurracy: ', test_acc)
print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
printing_eval_scores (true_label, pred_label, report=True)

## BiLSTM with attention layers

In [None]:
#### Creating the model ####
input_dim = len(encoder.get_vocabulary())
_input = tf.keras.Input(shape = (1,), dtype=tf.string)
vectorizer = encoder (_input) 
embeddings = tf.keras.layers.Embedding(input_dim = input_dim,output_dim = 64, input_length=max_length, mask_zero = True) (vectorizer)

lstm_hs = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units,  return_sequences = True)) (embeddings)
# attention layer
attention = tf.keras.layers.Dense(1, activation = 'tanh') (lstm_hs)
attention = tf.keras.layers.Flatten()(attention)
attention = tf.keras.layers.Activation('softmax') (attention)
attention = tf.keras.layers.RepeatVector (units*2) (attention)
attention = tf.keras.layers.Permute ([2,1]) (attention)

attention_weight = tf.keras.layers.Multiply()([lstm_hs, attention])
attention_weight = tf.keras.layers.Lambda(lambda x: K.sum(x, axis = 1)) (attention_weight)

## output layer
output = tf.keras.layers.Dense(1, activation = 'sigmoid') (attention_weight)

## combine in one
model = tf.keras.Model(_input,output)
print(model.summary())


#### Compile the model for training
model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = False),
              optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4),
              metrics = ['accuracy'])

#### Training the model
history = model.fit (train_dataset,
                    epochs = n_epochs, 
                    validation_data = test_dataset)
#### testing the model
### pred_label = tf.argmax(model.predict(test),1)
pred_label = (model.predict(test_dataset) > 0.5).astype("int32")
true_label = np.concatenate([y for x, y in test_dataset], axis=0)

test_loss, test_acc = model.evaluate (test_dataset)
print('Test acurracy: ', test_acc)
print('\nTesting performance:\n Loss: {:.3f} - Accuracy: {:.3f}'. format(test_loss, test_acc))
printing_eval_scores (true_label, pred_label, report=True)