## Assignment 2: text classification.
In this notebook assignment, you will use **Tensorflow 2** to build a deep neural network for a simple text classification task. Specifically, you will
- build a simple LSTM network with basic **numpy** functions;
- trains a sentiment analysis model to classify movie reviews [IMDB](https://ai.stanford.edu/%7Eamaas/data/sentiment/) as positive or negative, based on the text of the review. You will implement a deep neural network, with the advantage of pre-trained word embedding and BERT model.

## Important!!
- Please finish Problem 1 to 4. Fill the codes between **STAET CODE HERE** and **END CODE HERE**.
- Make sure you pass the tester functions before your submission. Feel free to add cells to debug your codes.
- **Please rename your sumited jupyter notebok with your student ID**. Otherwise, your work can not be scored accordingly.
- You can add cells when you are preparing your assignment. DO **delete** them when you are ready to submit.

In [1]:
# Install necessary packages
!pip3 install -q tensorflow
!pip3 install -q tensorflow-text
!pip3 install -q tensorflow_datasets
!pip3 install -q matplotlib

In [1]:
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_text as text
import tensorflow_datasets as tfds
from matplotlib import pyplot as plt
import IPython
assert IPython.version_info[0] >= 3, "Your version of IPython is too old, please update it."
assert tf.__version__.split(".")[0] == "2", "Your version of Tensorflow is not version 2."

### Problem :1 activation function (10 points) 

Implement **tanh** and **sigmoid** activatin function with **numpy**.

In [2]:
def tanh(x):
    """
    Arguments:
    x: a numpy array
    Return:
    s : a numpy array
    """
    ## START CODE HERE ## (~ 1 line of code)
    # YOUR CODE HERE
    s = np.tanh(x)
    ## END CODE HERE ##
    return s

In [3]:
def sigmoid(x):
    """
    Arguments:
    x: a numpy array
    Return:
    s : a numpy array
    """
    ## START CODE HERE ## (~ 1 line of code)
    # YOUR CODE HERE
    s = 1.0 / (1.0 + np.exp(-x))
    ## END CODE HERE ##
    return s

In [4]:
def test_tanh(x):
    assert isinstance(x, np.ndarray) is True, "x should be of type numpy.ndarray"
    print("Pass")
    return

x = np.array([0.5], dtype=np.float)
test_tanh(x)

Pass


In [5]:
def test_sigmoid(x):
    assert isinstance(x, np.ndarray) is True, "x should be of type numpy.ndarray"
    print("Pass")
    return
x = np.array([0.5], dtype=np.float)
test_sigmoid(x)

Pass


### Problem 2: LSTM cell (40 points)
LSTM Cell computes $c$, and $h$. $c$ is like the long-term memory, and $h$ is like the short term memory. We use the input $x$ and $h$ to update the long term memory. In the update, some features of $c$ are cleared with a forget gate $f$, and some features $i$ are added through a gate $g$.

Here is the update rule:
$$c_t = \sigma(f_t) \bigodot c_{t-1} + \sigma(i_t) \bigodot tanh(g_t)$$

$$h_t = \sigma(o_t) \bigodot tanh(c_t)$$

$\bigodot$ stands for element-wise multiplication.

Intermediate values and gates are computed as linear transformations of the hidden state and input.

$$f_t = W_{xf} x_t + W_{hf} h_{t-1} + b_f$$

$$i_t = W_{xi} x_t + W_{hi} h_{t-1} + b_i$$

$$g_t = W_{xg} x_t + W_{hg} h_{t-1} + b_g$$

$$o_t = W_{xo} x_t + W_{ho} h_{t-1} + b_o$$

In [6]:
def lstm_cell(x, h, c,
              weight_xi, weight_hi, bias_i,
              weight_xf, weight_hf, bias_f,
              weight_xg, weight_hg, bias_g,
              weight_xo, weight_ho, bias_o):
    """
    Arguments:
      x: a numpy array, input state
      h: a numpy array, hidden state
      c: a numpy array, context memory
      weight_xi: a numpy array, weight for input gate
      weight_hi: a numpy array, weight for input gate
      bias_i: a numpy array, bias for input gate
      weight_xf: a numpy array, weight for forget gate
      weight_hf: a numpy array, weight for forget gate
      bias_f: a numpy array, bias for forget gate
      weight_xg: a numpy array, weight for g gate
      weight_hg: a numpy array, weight for g gate
      bias_i: a numpy array, bias for g gate
      weight_xo: a numpy array, weight for output gate
      weight_ho: a numpy array, weight for output gate
      bias_o: a numpy array, bias for output gate
    Return:
      h : a numpy array, hidden state
      c: a numpy array, context memory
    """

    # Compute f_t, i_t, g_t, o_t as defined above
    ## START CODE HERE ## (~ 4 line of code)
    # YOUR CODE HERE
    f_t = np.dot(weight_xf, x) + np.dot(weight_hf, h) + bias_f
    i_t = np.dot(weight_xi, x) + np.dot(weight_hi, h) + bias_i
    g_t = np.dot(weight_xg, x) + np.dot(weight_hg, h) + bias_g
    o_t = np.dot(weight_xo, x) + np.dot(weight_ho, h) + bias_o
    ## END CODE HERE ##

    # Compute c_t
    ## START CODE HERE ## (1 ~ 3 line of code)
    # YOUR CODE HERE
    c_t = np.multiply(sigmoid(f_t), c)  + np.multiply(sigmoid(i_t), tanh(g_t))
    ## END CODE HERE ##

    # Compute h_t
    ## START CODE HERE ## (1 ~ 2 line of code)
    # YOUR CODE HERE
    h_t = np.multiply(sigmoid(o_t), tanh(c_t)) 
    ## END CODE HERE ##

    return h_t, c_t

In [7]:
def lstm_layer(inputs, h, c,
               weight_xi, weight_hi, bias_i,
               weight_xf, weight_hf, bias_f,
               weight_xl, weight_hl, bias_l,
               weight_xo, weight_ho, bias_o):
    """
    Arguments:
      inputs: a numpy array, a sequence of inputs: x_1, x_2, ...
      h: a numpy array, initialized hidden state
      c: a numpy array, initialized context memory
      weight_xi: a numpy array, weight for input gate
      weight_hi: a numpy array, weight for input gate
      bias_i: a numpy array, bias for input gate
      weight_xf: a numpy array, weight for forget gate
      weight_hf: a numpy array, weight for forget gate
      bias_f: a numpy array, bias for forget gate
      weight_xg: a numpy array, weight for g gate
      weight_hg: a numpy array, weight for g gate
      bias_i: a numpy array, bias for g gate
      weight_xo: a numpy array, weight for output gate
      weight_ho: a numpy array, weight for output gate
      bias_o: a numpy array, bias for output gate
    Return:
      h : a numpy array, final hidden state which is the vector representation of the inputs
    """

    for x in inputs:
        h, c = lstm_cell(x, h, c,
                    weight_xi, weight_hi, bias_i,
                    weight_xf, weight_hf, bias_f,
                    weight_xl, weight_hl, bias_l,
                    weight_xo, weight_ho, bias_o)
    return h

In [8]:
def test_lstm_cell():
    hidden_dim = 3
    output_size = 1
    input_size = 2
    dtype = np.float

    h = np.zeros((hidden_dim, output_size), dtype=dtype)
    c = np.zeros((hidden_dim, output_size), dtype=dtype)

    inputs = np.array([[[1], [1]], [[2], [2]], [[3], [3]]], dtype=dtype)
    x = np.array([[1.0], [1.0]], dtype=dtype)

    weight_xf = np.random.randn(hidden_dim, input_size)
    weight_xi = np.random.randn(hidden_dim, input_size)
    weight_xg = np.random.randn(hidden_dim, input_size)
    weight_xo = np.random.randn(hidden_dim, input_size)

    weight_hf = np.random.randn(hidden_dim, hidden_dim)
    weight_hi = np.random.randn(hidden_dim, hidden_dim)
    weight_hg = np.random.randn(hidden_dim, hidden_dim)
    weight_ho = np.random.randn(hidden_dim, hidden_dim)

    bias_f = np.random.randn(hidden_dim, output_size)
    bias_i = np.random.randn(hidden_dim, output_size)
    bias_g = np.random.randn(hidden_dim, output_size)
    bias_o = np.random.randn(hidden_dim, output_size)

    cell_h, cell_c = lstm_cell(x, h, c,
                            weight_xi, weight_hi, bias_i,
                            weight_xf, weight_hf, bias_f,
                            weight_xg, weight_hg, bias_g,
                            weight_xo, weight_ho, bias_o)

    output_h = lstm_layer(inputs, h, c,
                        weight_xi, weight_hi, bias_i,
                        weight_xf, weight_hf, bias_f,
                        weight_xg, weight_hg, bias_g,
                        weight_xo, weight_ho, bias_o)

    assert cell_h.shape == (hidden_dim, output_size)
    assert cell_c.shape == (hidden_dim, output_size)
    assert output_h.shape == (hidden_dim, output_size)
    print("Pass")
    return

test_lstm_cell()

Pass


In [9]:
def test_lstm_layer():
    """
    Compare the output of Tensorflow LSTM cell with yours. The output shall be the same as long as
    your codes are correct.
    """
    hidden_dim = 3
    output_size = 1
    input_size = 2
    dtype = np.float

    # Define inputs
    inputs = np.array([[[1], [1]], [[2], [2]], [[3], [3]], [[4], [4]]], dtype=dtype)
    inputs = tf.constant(inputs)
    inputs = tf.transpose(inputs, perm=[2, 0, 1])
    # LSTM cell from Tensorflow
    lstm = tf.keras.layers.LSTM(units=hidden_dim)
    tf_lstm_output = lstm(inputs)

    # Get weights and bias from LSTM.
    # They will be fed into your LSTM layer, so both networks will output the same value
    kernel, recurrent_kernel, bias = lstm.get_weights()
    kernel = tf.transpose(kernel)
    recurrent_kernel = tf.transpose(recurrent_kernel)

    weight_xi, weight_xf, weight_xg, weight_xo = tf.split(kernel, 4, axis=0)
    weight_hi, weight_hf, weight_hg, weight_ho = tf.split(recurrent_kernel, 4, axis=0)
    bias_i, bias_f, bias_g, bias_o = tf.split(bias, 4, axis=0)
    bias_i = np.expand_dims(bias_i, -1)
    bias_f = np.expand_dims(bias_f, -1)
    bias_g = np.expand_dims(bias_g, -1)
    bias_o = np.expand_dims(bias_o, -1)

    # Calculate the output of your LSTM layer
    inputs = np.array([[[1], [1]], [[2], [2]], [[3], [3]], [[4], [4]]], dtype=dtype)
    h = np.zeros((hidden_dim, output_size), dtype=dtype)
    c = np.zeros((hidden_dim, output_size), dtype=dtype)
    np_lstm_output = lstm_layer(inputs, h, c,
                  weight_xi, weight_hi, bias_i,
                  weight_xf, weight_hf, bias_f,
                  weight_xg, weight_hg, bias_g,
                  weight_xo, weight_ho, bias_o)

    # Compare the reults
    np_lstm_output = np.reshape(np_lstm_output, tf_lstm_output.shape)
    assert np.allclose(tf_lstm_output.numpy(), np_lstm_output), "The outputs from Tensorflow LSTM and yours are not equal."
    print("Pass")
    return

test_lstm_layer()

Pass


### Problem 3: Implement text classification with Tensorflow simple dense layers (30 points)

In [10]:
# Load IMDB data from tensorflow dataset
train_data, validation_data, test_data = tfds.load(
    name="imdb_reviews", 
    split=("train[:60%]", "train[60%:]", "test"),
    as_supervised=True)

[1mDownloading and preparing dataset 80.23 MiB (download: 80.23 MiB, generated: Unknown size, total: 80.23 MiB) to /userhome/cs/adrianxu/tensorflow_datasets/imdb_reviews/plain_text/1.0.0...[0m


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]





Generating splits...:   0%|          | 0/3 [00:00<?, ? splits/s]

Generating train examples...:   0%|          | 0/25000 [00:00<?, ? examples/s]

Shuffling imdb_reviews-train.tfrecord...:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test examples...:   0%|          | 0/25000 [00:00<?, ? examples/s]

Shuffling imdb_reviews-test.tfrecord...:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised examples...:   0%|          | 0/50000 [00:00<?, ? examples/s]

Shuffling imdb_reviews-unsupervised.tfrecord...:   0%|          | 0/50000 [00:00<?, ? examples/s]

[1mDataset imdb_reviews downloaded and prepared to /userhome/cs/adrianxu/tensorflow_datasets/imdb_reviews/plain_text/1.0.0. Subsequent calls will reuse this data.[0m


In [12]:
# Load pre-trained word embeddings
# If you encounter certification error, please refer to https://stackoverflow.com/questions/50236117/scraping-ssl-certificate-verify-failed-error-for-http-en-wikipedia-org
embedding_url = "https://tfhub.dev/google/nnlm-en-dim50/2"
embedding_layer = hub.KerasLayer(embedding_url, input_shape=[],
                                 dtype=tf.string, trainable=True)

PermissionDeniedError: /tmp/tfhub_modules/74a841d6eb84e8d93d913d716fb5440d020cc291.lock.tmp5d678066668c469da1b6b1b9cbae3137; Permission denied

In [None]:
class TextClassifier:
    """
    Text classifier with LSTM
    """
    def __init__(self, embedding_layer, hidden_size=16, learning_rate=1e-2):
        """
        Arguments:
            embedding_layer: pre-trained word embedding layer from hub.KerasLayer
            hidden_size: hidden units of dense layer
            learning_rate: learning rate of optimizer
        """
        self.embedding_layer = embedding_layer
        self.hidden_size = hidden_size
        self.learning_rate = learning_rate
        self.create_model()
        self.compile_model()

    def create_model(self):

        self.model = tf.keras.Sequential()
        # Add pre-trained embedding layer
        ## START CODE HERE ## (~1 line of code)
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

        # Add a dense layer with hidden size as self.hidden_size
        # and relu activation function
        ## START CODE HERE ## (~1 line of code)
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

        # Add a dense layer to project the hidden layer into
        # output layer, which hidden units is 1
        ## START CODE HERE ## (~1 line of code)
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

    def compile_model(self):
        # compile your model with Adam optimizer, which learning rate is defined
        # as self.learning_rate; loss as BinaryCrossentropy, and metrics as "accuracy"
        ## START CODE HERE (Hint: self.model.compile(...))
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

    def train(self, train_data, validation_data,
            batch_size=512, train_epochs=3):
        history = self.model.fit(train_data.shuffle(10000).batch(batch_size),
                        epochs=train_epochs,
                        validation_data=validation_data.batch(batch_size),
                        verbose=1)
        return history

In [None]:
def test_text_classifier_1():
    classifier = TextClassifier(embedding_layer)
    model = classifier.model
    assert len(model.layers) == 3, "There should be 3 layers"
    layer_1_config = model.layers[1].get_config()
    assert model.layers[1].units == classifier.hidden_size, "Hidden units for second layer should be {}".format(classifier.hidden_size)
    assert model.layers[1].activation == tf.keras.activations.relu, "Activation for second layer should be relu"
    assert model.layers[-1].units == 1, "Hidden units for last layer should be 1"
    print("Pass")


test_text_classifier_1()

In [None]:
def test_text_classifier_2():
    classifier = TextClassifier(embedding_layer)
    model = classifier.model
    assert isinstance(model.optimizer, tf.keras.optimizers.Adam), "The optimizer should be Adam"
    assert isinstance(model.loss, tf.keras.losses.BinaryCrossentropy) or model.loss == "binary_crossentropy"
    print("Pass")


test_text_classifier_2()

In [None]:
# Summary the parameters of the model
# Note: You can try differnt learning rate to train the model,
# and find out how it affect the model performance.
classifier = TextClassifier(embedding_layer)
classifier.model.summary()

In [None]:
# To train the model, you can copy the following codes and paste them in
# another cell. Please delete the added cell when you are ready to submit
# you assignment.

# history = classifier.train(train_data, validation_data)
# plt.plot(history.history["accuracy"])
# plt.plot(history.history["val_accuracy"])
# plt.title("Model accuracy")
# plt.ylabel("accuracy")
# plt.xlabel("epoch")
# plt.legend(["train", "val"], loc="lower right")
# plt.show()

### Problem 4: Implement text classification with BERT (20 points)

In [None]:
# Load text processor and pre-trained weights for BERT
tfhub_handle_preprocess = "https://tfhub.dev/tensorflow/bert_en_uncased_preprocess/3"
tfhub_handle_encoder = "https://tfhub.dev/tensorflow/small_bert/bert_en_uncased_L-4_H-512_A-8/1"
bert_preprocess_model = hub.KerasLayer(tfhub_handle_preprocess)
bert_model = hub.KerasLayer(tfhub_handle_encoder)

In [None]:
class BertTextClassifier():
    """
    Text classifier with BERT pre-trained model
    """
    def __init__(self, dropout_rate=0.1, learning_rate=1e-4):
        self.dropout_rate = dropout_rate
        self.learning_rate = learning_rate
        self.create_model()
        self.compile_model()

    def create_model(self):
        # Define the model input as string
        text_input = tf.keras.layers.Input(shape=(), dtype=tf.string, name="text")
        # BERT tokenization
        preprocessing_layer = hub.KerasLayer(tfhub_handle_preprocess, name="preprocessing")
        encoder_inputs = preprocessing_layer(text_input)
        # BERT encodes the text into hidden values
        encoder = hub.KerasLayer(tfhub_handle_encoder, trainable=True, name="bert_encoder")
        outputs = encoder(encoder_inputs)
        # Get pooled_output from outputs
        out = outputs["pooled_output"]

        # Dropout layer with dropout rate.
        # Hint: out = dropout_layer(out)
        ## START CODE HERE ## (~1 line of code)
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

        # Fully connected layer
        # Fully-connected layer with hidden units 1.
        # Hint: out = fully_connected(out)
        ## START CODE HERE ## (~1 line of code)
        # YOUR CODE HERE
        raise NotImplementedError()
        ## END CODE HERE

        # Return output
        self.model = tf.keras.Model(text_input, out)

    def compile_model(self):
        self.model.compile(optimizer=tf.keras.optimizers.Adam(self.learning_rate),
                           loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                           metrics=["accuracy"])

    def train(self, train_ds, validation_ds, epochs=3):
        history = self.model.fit(x=train_ds.shuffle(1000).batch(16),
                                 validation_data=validation_ds.batch(16),
                                 epochs=epochs)
        return history

In [None]:
bert_classifier = BertTextClassifier()
def test_bert_text_classifer():
    model = bert_classifier.model
    assert len(model.layers) == 5, "The model should have 5 layers"
    assert isinstance(model.layers[3], tf.keras.layers.Dropout), "There is no dropout layer"
    assert isinstance(model.layers[4], tf.keras.layers.Dense), "There is no dense layer"
    assert model.layers[4].units == 1
    print("Pass")

test_bert_text_classifer()

In [None]:
# Train bert model
# You may need GPUs for this training.
# Follow the guidelines as stated in Assignment one
# Run the following line in another cell when needed
# Please be remined that the added cells should be
# removed when you submit your assignment

# history = bert_classifier.train(train_data, validation_data)