https://www.tensorflow.org/tutorials/text/text_classification_rnn

In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow.keras import losses
from keras import regularizers
from keras.models import load_model
import os
import tensorflow_datasets as tfds
from sklearn.model_selection import train_test_split

In [2]:
print(tf.version.VERSION)

2.4.1


In [3]:
import matplotlib.pyplot as plt

def plot_graphs(history, metric):
    plt.plot(history.history[metric])
    plt.plot(history.history['val_'+metric], '')
    plt.xlabel("Epochs")
    plt.ylabel(metric)
    plt.legend([metric, 'val_'+metric])

In [4]:
import pandas as pd

In [5]:
texts = pd.read_csv('3 classes/facebook_health_cases (all).csv')
df = pd.DataFrame(texts)
df

FileNotFoundError: [Errno 2] File 3 classes/facebook_health_cases (all).csv does not exist: '3 classes/facebook_health_cases (all).csv'

In [None]:
# remove possible empty text cell
print("before cleaned: ", df.shape)
df['text'].replace('', np.nan, inplace=True)
df = df.dropna()
df = df.reset_index(drop=True)
print("After: ", df.shape)

In [None]:
df['sentiment'] = pd.Categorical(df['sentiment'])
df['sentiment'] = df.sentiment.cat.codes

In [None]:
df
# 0:negative 1:neutral 2:positive

In [None]:
x_train, x_test, y_train, y_test = train_test_split(df['check_stop'], df['sentiment'], test_size=0.10, random_state=42)
x_val, x_test, y_val, y_test = train_test_split(x_test, y_test, test_size=0.75, random_state=42)

In [None]:
a = tf.keras.utils.to_categorical(y_train, num_classes=3)
dataset = tf.data.Dataset.from_tensor_slices((x_train.values, a))

In [None]:
a = tf.keras.utils.to_categorical(y_val, num_classes=3)
dataset_val = tf.data.Dataset.from_tensor_slices((x_val.values, a))

In [None]:
a = tf.keras.utils.to_categorical(y_test, num_classes=3)
dataset_test = tf.data.Dataset.from_tensor_slices((x_test.values, a))

In [None]:
BUFFER_SIZE = 10000
BATCH_SIZE = 64

In [None]:
train_ds_size = int(x_train.shape[0])
val_ds_size = int(x_val.shape[0])
test_ds_size = int(x_test.shape[0])

train_data = dataset.take(train_ds_size)
val_data = dataset_val.take(val_ds_size)
test_data = dataset_test.take(test_ds_size)

In [None]:
print(len(list(train_data.as_numpy_iterator())))
print(len(list(val_data.as_numpy_iterator())))
print(len(list(test_data.as_numpy_iterator())))

In [None]:
#all_dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
train_dataset = train_data.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
val_dataset = val_data.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
test_dataset = test_data.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

In [None]:
for example, label in val_dataset.take(1):
    print('texts: ', example.numpy()[:3])
    print()
    print('labels: ', label.numpy()[:3])

In [None]:
for example, label in test_dataset.take(1):
    print('texts: ', example.numpy()[:3])
    print()
    print('labels: ', label.numpy()[:3])

### Create Text Encoder

In [None]:
VOCAB_SIZE=10155
encoder = tf.keras.layers.experimental.preprocessing.TextVectorization(max_tokens=VOCAB_SIZE)
encoder.adapt(train_dataset.map(lambda text, label: text))

In [None]:
len(encoder.get_vocabulary())

The .adapt method sets the layer's vocabulary. Here are the first 20 tokens. After the padding and unknown tokens they're sorted by frequency:

In [None]:
vocab = np.array(encoder.get_vocabulary())
vocab[:20]

Once the vocabulary is set, the layer can encode text into indices. The tensors of indices are 0-padded to the longest sequence in the batch (unless you set a fixed output_sequence_length):

In [None]:
encoded_example = encoder(example)[:3].numpy()
encoded_example

With the default settings, the process is not completely reversible. There are three main reasons for that:

1. The default value for preprocessing.TextVectorization's standardize argument is "lower_and_strip_punctuation".
2. The limited vocabulary size and lack of character-based fallback results in some unknown tokens.

In [None]:
for n in range(3):
    print("Original: ", example[n].numpy())
    print("Round-trip: ", " ".join(vocab[encoded_example[n]]))
    print()

1. This model can be build as a tf.keras.Sequential.

2. The **first layer** is the *encoder*, which converts the text to a sequence of token indices.

3. After the encoder is an **embedding layer**. An embedding layer stores one vector per word. When called, it converts the sequences of word indices to sequences of vectors. These vectors are trainable. After training (on enough data), words with similar meanings often have similar vectors.

4. This index-lookup is much more efficient than the equivalent operation of passing a one-hot encoded vector through a tf.keras.layers.Dense layer.

5. A **recurrent neural network (RNN)** processes sequence input by iterating through the elements. RNNs pass the outputs from one timestep to their input on the next timestep.

6. The **tf.keras.layers.Bidirectional** wrapper can also be used with an RNN layer. This propagates the input *forward and backwards* through the RNN layer and then *concatenates the final output*.

- The main advantage to a bidirectional RNN is that the signal from the beginning of the input doesn't need to be processed all the way through every timestep to affect the output.

- The main disadvantage of a bidirectional RNN is that you can't efficiently stream predictions as words are being added to the end.

7. After the RNN has converted the sequence to a single vector the two layers.Dense do some final processing, and *convert from this vector representation to a single logit* as the classification output.


<img src="bidirectional.png">

In [None]:
model = tf.keras.Sequential([
    encoder,
    tf.keras.layers.Embedding(
        input_dim=len(encoder.get_vocabulary()),
        output_dim=64,
        #embeddings_regularizer = tf.keras.regularizers.L2(0.01),
        # Use masking to handle the variable sequence lengths
        mask_zero=True),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64)),
    tf.keras.layers.Dense(64, activation='softmax'),
    tf.keras.layers.Dense(3)
])

Please note that Keras sequential model is used here since all the layers in the model only have single input and produce single output. In case you want to use stateful RNN layer, you might want to build your model with Keras functional API or model subclassing so that you can retrieve and reuse the RNN layer states. Please check Keras RNN guide for more details.

The embedding layer uses masking to handle the varying sequence-lengths. All the layers after the Embedding support masking:


The embedding layer uses masking to handle the varying sequence-lengths. All the layers after the Embedding support masking:

In [None]:
print([layer.supports_masking for layer in model.layers])

To confirm that this works as expected, evaluate a sentence twice. First, alone so there's no padding to mask:

In [None]:
# predict on a sample text without padding.

sample_text = ('alhamdulillah kes aktif menurun')
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

reference: [class 0 (**negative**), class 1 (**neutral**), class 2 (**positive**)]

Now, evaluate it again in a batch with a longer sentence. The result should be identical:



In [None]:
# predict on a sample text with padding

padding = "the " * 2000
predictions = model.predict(np.array([sample_text, padding]))
print(predictions[0])

Compile the Keras model to configure the training process:

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              optimizer=tf.keras.optimizers.Adam(0.001),
              metrics=['accuracy',
                        tfa.metrics.F1Score(num_classes= 3, name = 'micro_f1_score', average='micro'),
                        tfa.metrics.F1Score(num_classes= 3, name = 'macro_f1_score', average='macro')])

In [None]:
metric = 'val_macro_f1_score'
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor= metric,
                                                 factor= 0.1,
                                                 patience= 10,
                                                 verbose= 1,
                                                 min_lr= 0.00001)

#file_path = "cnn_weights.{epoch:02d}-{val_loss:.2f}.hdf5"
checkpoint_path = "training_1/one_layer_lstm_emoji.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
#model_chkpt = callbacks.ModelCheckpoint(filepath=file_path,monitor= metric, save_best_only=True)
model_chkpt = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, 
                                                 monitor=metric, 
                                                 save_weights_only=True,
                                                 verbose=1, save_best_only=True, mode='max')
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor = metric, patience = 15)

### Train the Model

In [None]:
history = model.fit(train_dataset, epochs=100,
                    validation_data=val_dataset,
                    shuffle=True,
                    batch_size=32,
                    callbacks = [early_stopping_callback, reduce_lr, model_chkpt])

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, 'accuracy')
plt.ylim(None,1)
plt.subplot(1,2,2)
plot_graphs(history, 'loss')
plt.ylim(0,None)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, 'micro_f1_score')
plt.ylim(None,1)
plt.subplot(1,2,2)
plot_graphs(history, 'macro_f1_score')
plt.ylim(0,None)

In [None]:
loss,acc,micro_f1,macro_f1 = model.evaluate(test_dataset, verbose = 2, batch_size = 32)
print("lss: %.4f" % (loss))
print("acc: %.4f" % (acc))
print("micro f1: %.4f" % (micro_f1))
print("macro f1: %.4f" % (macro_f1))

In [None]:
model.load_weights(checkpoint_path)
loss,acc,micro_f1,macro_f1 = model.evaluate(test_dataset, verbose = 2, batch_size = 32)
print("lss: %.4f" % (loss))
print("acc: %.4f" % (acc))
print("micro f1: %.4f" % (micro_f1))
print("macro f1: %.4f" % (macro_f1))

### Stack two or more LSTM layers

Keras recurrent layers have two available modes that are controlled by the return_sequences constructor argument:

- If False it returns only the last output for each input sequence (a 2D tensor of shape (batch_size, output_features)). This is the default, used in the previous model.

- If True the full sequences of successive outputs for each timestep is returned (a 3D tensor of shape (batch_size, timesteps, output_features)).

Here is what the flow of information looks like with return_sequences=True:


<img src="image/layered_bidirectional.png">

The interesting thing about using an RNN with return_sequences=True is that the output still has 3-axes, like the input, so it can be passed to another RNN layer, like this:

In [None]:
model = tf.keras.Sequential([
    encoder,
    tf.keras.layers.Embedding(len(encoder.get_vocabulary()), 64, mask_zero=True),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64,  return_sequences=True)),
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),
    tf.keras.layers.Dense(64, activation='softmax'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(3)
])

In [None]:
model.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              optimizer=tf.keras.optimizers.Adam(0.001),
              metrics=['accuracy',
                        tfa.metrics.F1Score(num_classes= 3, name = 'micro_f1_score', average='micro'),
                        tfa.metrics.F1Score(num_classes= 3, name = 'macro_f1_score', average='macro')])

In [None]:
metric = 'val_macro_f1_score'
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor= metric,
                                                 factor= 0.1,
                                                 patience= 10,
                                                 verbose= 1,
                                                 min_lr= 0.00001)

#file_path = "cnn_weights.{epoch:02d}-{val_loss:.2f}.hdf5"
checkpoint_path = "training_1/one_layer_lstm_emoji.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
#model_chkpt = callbacks.ModelCheckpoint(filepath=file_path,monitor= metric, save_best_only=True)
model_chkpt = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path, 
                                                 monitor=metric, 
                                                 save_weights_only=True,
                                                 verbose=1, save_best_only=True, mode='max')
early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor = metric, patience = 15)

acc metric increases, while your val_acc metric decreases. This means that your model is fitting the training set better, but is losing it's ability to predict on new data, indicating that your model is starting to fit on noise and is beginning to overfit.

In [None]:
history = model.fit(train_dataset, epochs=100,
                    validation_data=val_dataset,
                    #shuffle=True,
                    batch_size=32,
                    callbacks = [early_stopping_callback, reduce_lr, model_chkpt])

In [None]:
loss,acc,micro_f1,macro_f1 = model.evaluate(test_dataset, verbose = 2, batch_size = 32)
print("lss: %.4f" % (loss))
print("acc: %.4f" % (acc))
print("micro f1: %.4f" % (micro_f1))
print("macro f1: %.4f" % (macro_f1))

In [None]:
model.load_weights(checkpoint_path)
loss,acc,micro_f1,macro_f1 = model.evaluate(test_dataset, verbose = 2, batch_size = 32)
print("lss: %.4f" % (loss))
print("acc: %.4f" % (acc))
print("micro f1: %.4f" % (micro_f1))
print("macro f1: %.4f" % (macro_f1))

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, 'accuracy')
plt.ylim(None,1)
plt.subplot(1,2,2)
plot_graphs(history, 'loss')
plt.ylim(0,None)

In [None]:
plt.figure(figsize=(16,8))
plt.subplot(1,2,1)
plot_graphs(history, 'micro_f1_score')
plt.ylim(None,1)
plt.subplot(1,2,2)
plot_graphs(history, 'macro_f1_score')
plt.ylim(0,None)

In [None]:
# predict on a sample text without padding.

sample_text = ('negeri angka')
predictions = model.predict(np.array([sample_text]))
print(predictions[0])

In [None]:
for text, label in test_dataset:
    print("text = ", text.numpy())
    print("label = ", label.numpy())