##Step 1:Impoert Dependencies

In [None]:
import numpy as np
import math
import re
import pandas as pd
from bs4 import BeautifulSoup
import random

from google.colab import drive

In [None]:
!pip install bert-for-tf2
!pip install sentencepiece

In [None]:
try:
    %tensorflow_version 2.x
except Exception:
    pass
import tensorflow as tf

import tensorflow_hub as hub

from tensorflow.keras import layers
import bert

##Step 2:Data Preprocessing

###Loading Files

In [None]:
drive.mount("/content/drive")

In [None]:
clos = ["sentiment", "id", "date", "query", "user", "text"]
data = pd.read_csv(
    "/content/drive/MyDrive/BERT/train.csv",
    header=None,
    names=clos,
    engine="python",
    encoding="latin1"
)

In [None]:
data.drop(["id", "date", "query", "user"],
          axis=1,
          inplace=True)

In [None]:
data.head(5)

###Cleaning

In [None]:
def clean_tweet(tweet):
    tweet = BeautifulSoup(tweet, "lxml").get_text()
    tweet = re.sub(r"@[A-Za-z0-9]+", ' ', tweet)
    tweet = re.sub(r"https?://[A-Za-z0-9./]+", ' ', tweet)
    tweet = re.sub(r"[a-zA-Z.!?']", ' ', tweet)
    tweet = re.sub(r" +", ' ', tweet)
    return tweet

In [None]:
data_clean = [clean_tweet(tweet) for tweet in data.text]

In [None]:
data_labels = data.sentiment.values
data_labels[data_labels == 4] = 1 #In dataset there are 0 and 4 sentiment value, relplacing 4 value with 1

###Tokenization

We need to create a BERT layer to have access to meta data for the tokenizer (like vocab size)

In [None]:
FullTokenizer = bert.bert_tokenization.FullTokenizer
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
                            trainable=False) #BERT Base Model, using bert for tokenization
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()  #Vocab file for tokenizer
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy() #Converting it into lower case
tokenizer = FullTokenizer(vocab_file, do_lower_case)

We only use the first sentence for BERT inputs so we add the CLS token at the beginning and the SEP token at the end of each sentence.

In [None]:
def encode_sentence(sent):
    return ["[CLS]"] + tokenizer.tokenize(sent) + ["[SEP]"]

In [None]:
data_inputs = [encode_sentence(sentence) for sentence in data_clean]

###Dataset Creation

We need to create the 3 different inputs for each sentence.

In [None]:
def get_ids(tokens):
    return tokenizer.convert_tokens_to_ids(tokens)

def get_mask(tokens):
    return np.char.not_equal(tokens, "[PAD]").astype(int)

def get_segments(tokens):
    seg_ids = []
    curret_seg_id = 0
    for tok in tokens:
        seg_ids.append(current_seg_id)
        if tok == "[SEP]":
            current_sig_id = 1-current_seg_id
    return seg_ids

We will create padded batches(so we pad sentences for each batch independently), this way we add the minimum of padding tokens possible. For that, we sort sentences by length, apply padded_batches and then shuffle.

In [None]:
data_with_len = [[sent, data_labels(i), len(sent)]
                 for i, sent in enumerate(data_inputs)]

random.shuffle(data_with_len)

data_with_len.sort(key=lambda x: x[2])

sorted_all = [([get_ids(sent_lab[0]),
                get_mask(sent_lab[0]),
                get_segments(sent_lab[0])],
               sent_lab[1])
              for sent_lab in data_with_len if sent_lab[2] > 7]

In [None]:
all_dataset = tf.data.Dataset.from_generator(lambda: sorted_all,
                                             output_types=(tf.int32, tf.int32))

In [None]:
BATCH_SIZE = 32
all_batched = all_dataset.padded_batch(BATCH_SIZE, padded_shapes=((None, ), ()))

In [None]:
NB_BATCHES = math.ceil(len(sorted_all) / BATCH_SIZE)
NB_BATCHES_TEST = NB_BATCHES // 10
all_batched.shuffle(NB_BATCHES)
test_dataset = all_batched.take(NB_BATCHES_TEST)
train_dataset = all_batched.skip(NB_BATCHES_TEST)

##Step 3:Model Building

In [None]:
class DCNNBERTEmbedding(tf.keras.Model):

    def __init__(self,
                 nb_filters=50,
                 FFN_units=512,
                 nb_classes=2,
                 dropout_rate=0.1,
                 training=False,
                 name="dcnn"):
        super(DCNNBERTEmbedding, self).__init__(name=name)

        self.bert_layer = hub.KerasLayer(
            "https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1",
            trainable=False)

        self.bigram = layers.Conv1D(filters=nb_filters,  #Convoluation filter size for 2
                                    kernel_size=2,
                                    padding="valid",
                                    activation="relu")
        self.trigram = layers.Conv1D(filters=nb_filters,  #Convoluation filter size for 3
                                    kernel_size=3,
                                    padding="valid",
                                    activation="relu")
        self.fourgram = layers.Conv1D(filters=nb_filters,  #Convoluation filter size for 4
                                    kernel_size=4,
                                    padding="valid",
                                    activation="relu")
        self.pool = layers.GlobalMaxPool1D()  #Maxpooling layer

        self.dense_1 = layers.Dense(units=FFN_units,
                                    activation="relu")
        self.dropout = layers.Dropout(rate=dropout_rate)
        if nb_classes == 2:
            self.last_dense = layers.Dense(units=1,
                                           activation="sigmoid")
        else:
            self.last_dense = layers.Dense(units=nb_classes,
                                           activation="softmax")

    def embed_with_bert(self, all_tokens):
        _, embs = self.bert_layer([all_tokens[:, 0, :],   #One vector representing whole sequence and second one a list of vector representing each word indivisualy
                                   all_tokens[:, 1, :],
                                   all_tokens[:, 2, :]])
        return embs




    def call(self, inputs, training):
        x = self.embed_with_bert(inputs)

        x_1 = self.bigram(x)
        x_1 = self.pool(x_1)
        x_2 = self.trigram(x)
        x_2 = self.pool(x_2)
        x_3 = self.fourgram(x)
        x_3 = self.pool(x_3) # (batch_size, nb_filters)

        merged = tf.concat([x_1, x_2, x_3], axis=-1) # (batch_size, 3*nb_filters)
        merged = self.dense_1(merged)
        merged = self.dropout(merged, training)
        output = self.last_dense(merged)

        return output


##Step 4:Training

In [None]:
NB_FILTERS = 100
FNN_UNITS = 256
NB_CLASSES = 2

DROPOUT_RATE = 0.2

BATCH_SIZE = 32
NB_EPOCHS = 5

In [None]:
Dcnn = DCNNBERTEmbedding(nb_filters=NB_FILTERS,
                         FFN_units=FNN_UNITS,
                         nb_classes=NB_CLASSES,
                         dropout_rate=DROPOUT_RATE)

In [None]:
if NB_CLASSES == 2:
    Dcnn.compile(loss="binary_crossentropy",
                 optimizer="adam",
                 metrics=["accuracy"])
else:
    Dcnn.compile(loss="sparse_categorical_crossentropy",
                 optimizer="adam",
                 metrics=["sparse_categorical_accuracy"])

In [None]:
checkpoint_path = "/content/drive/MyDrive/BERT/checkpoints"

ckpt = tf.train.Checkpoint(Dcnn=Dcnn)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=1)

if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print("Latest checkpoint restored!!")

In [None]:
class MyCustomCallback(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None):
        ckpt_manager.save()
        print("Checkpoint saved at {}.".format(checkpoint_path))

###Result

In [None]:
Dcnn.fit(train_dataset,
         epochs=NB_EPOCHS,
         callbacks=[MyCustomCallback()])

##Step 5:Evaluation

In [None]:
results = Dcnn.evaluate(test_dataset)
print(results)

Acc on train set = 88.5

Acc on test set = 84.6

In [None]:
def get_prediction(sentence):
    tokens = encode_sentence(sentence)
    inputs = tf.expand_dims(tokens, 0)

    output = Dcnn(inputs, training=False)

    sentiment = math.floor(output*2)

    if sentiment == 0:
        print("Output of the model: {}\nPredicted sentiment: negative.".format(output))
    elif sentiment == 1:
        print("Output of the model: {}\nPredicted sentiment: Positive.".format(output))

In [None]:
get_prediction("This movie was pretty interesting")