In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install numpy==1.19.4
!pip install pandas==1.1.5
!pip install tensorflow==2.4.0
!pip install tensorflow-hub==0.10.0
!pip install bert-for-tf2==0.14.7
!pip install sentencepiece==0.1.94

In [None]:
import numpy as np
import pandas as pd

try:
    %tensorflow_version 2.x
except Exception:
    pass

import bert
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers

In [None]:
FullTokenizer = bert.bert_tokenization.FullTokenizer
bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1", trainable=False)
vocab_file = bert_layer.resolved_object.vocab_file.asset_path.numpy()
do_lower_case = bert_layer.resolved_object.do_lower_case.numpy()
tokenizer = FullTokenizer(vocab_file, do_lower_case)

In [None]:
dataset_path = r'/content/drive/MyDrive/Text-Classification/Datasets/cleaned_train.csv'
data = pd.read_csv(dataset_path, dtype={'sentiment': int, 'text': str})
data_labels = data.sentiment.values

In [None]:
inputs = data['text'].apply(encode_sentence)

In [None]:
# def encode_sentence(sentence):
#     return tokenizer.convert_tokens_to_ids(tokenizer.tokenize(sentence))

def encode_sentence(sentence):
    return ['[CLS]']+tokenizer.tokenize(sentence)+['[SEP]']

In [None]:
def get_ids(tokens):
    return tokenizer.convert_tokens_to_ids(tokens)

def get_mask(tokens):
    return np.char.not_equal(tokens, '[PAD]').astype(int)

def get_segments(tokens):
    current_seg_id, seg_ids = 0, []
    for token in tokens:
        seg_ids.append(current_seg_id)
        if token == '[SEP]':
            current_seg_id = 1-current_seg_id
    return seg_ids

In [None]:
random_data_with_len = [[sentence, data_labels[i], len(sentence)] for i, sentence in enumerate(inputs) if len(sentence) > 7]
np.random.shuffle(random_data_with_len)
random_data_with_len.sort(key=lambda x: x[2])
sorted_data = [(sent[0], sent[1]) for sent in random_data_with_len]

In [None]:
sent = encode_sentence('The sunrise was beautiful this morning.')
bert_layer([tf.expand_dims(tf.cast(get_ids(sent), tf.int32), 0), tf.expand_dims(tf.cast(get_mask(sent), tf.int32), 0), tf.expand_dims(tf.cast(get_segments(sent), tf.int32), 0)])

In [None]:
dataset = tf.data.Dataset.from_generator(lambda: sorted_data, output_types=(tf.int32, tf.int32))
# next(iter(dataset))

In [None]:
BATCH_SIZE = 32
batched_dataset = dataset.padded_batch(BATCH_SIZE, padded_shapes=((None, ), ()))
# next(iter(batched_dataset))

In [None]:
NB_BATCHES = np.ceil(len(sorted_data)/BATCH_SIZE)
NB_BATCHES_TEST = NB_BATCHES // 10
batched_dataset.shuffle(buffer_size=NB_BATCHES)
train_dataset, test_dataset = batched_dataset.skip(NB_BATCHES_TEST), batched_dataset.take(NB_BATCHES_TEST)

In [None]:
# class DCNN(tf.keras.Model):

#     def __init__(self, vocab_size, emb_dim=200, nb_filters=100, FFN_units=256, dropout_rate=0.2, training=False, name='dcnn'):
#         super(DCNN, self).__init__(name=name)
#         self.embedding = layers.Embedding(vocab_size, emb_dim)
#         self.bigram = layers.Conv1D(filters=nb_filters, kernel_size=2, padding='valid', activation='relu')
#         self.trigram = layers.Conv1D(filters=nb_filters, kernel_size=3, padding='valid', activation='relu')
#         self.quadgram = layers.Conv1D(filters=nb_filters, kernel_size=4, padding='valid', activation='relu')
#         self.pool = layers.GlobalMaxPooling1D()
#         self.dense1 = layers.Dense(units=FFN_units, activation='relu')
#         self.dropout = layers.Dropout(rate=dropout_rate)
#         self.densel = layers.Dense(units=1, activation='sigmoid') # nb_classes=2
#         # self.densel = layers.Dense(units=nb_classes, activation='softmax')

#     def call(self, inputs, training):
#         x = self.embedding(inputs)
#         x2 = self.bigram(x)
#         x2 = self.pool(x2)
#         x3 = self.trigram(x)
#         x3 = self.pool(x3)
#         x4 = self.quadgram(x)
#         x4 = self.pool(x4) # (batch_size, nb_filters)

#         merged = tf.concat([x2, x3, x4], axis=1) # (batch_size, 3*nb_filters)
#         merged = self.dense1(merged)
#         merged = self.dropout(merged, training)
#         output = self.densel(merged)

#         return output

In [None]:
class DCNN(tf.keras.Model):

    def __init__(self, nb_filters=100, FFN_units=256, dropout_rate=0.2, training=False, name='dcnn'):
        super(DCNN, self).__init__(name=name)
        self.bert_layer = hub.KerasLayer("https://tfhub.dev/tensorflow/bert_en_uncased_L-12_H-768_A-12/1", trainable=False)
        self.bigram = layers.Conv1D(filters=nb_filters, kernel_size=2, padding='valid', activation='relu')
        self.trigram = layers.Conv1D(filters=nb_filters, kernel_size=3, padding='valid', activation='relu')
        self.quadgram = layers.Conv1D(filters=nb_filters, kernel_size=4, padding='valid', activation='relu')
        self.pool = layers.GlobalMaxPooling1D()
        self.dense1 = layers.Dense(units=FFN_units, activation='relu')
        self.dropout = layers.Dropout(rate=dropout_rate)
        self.densel = layers.Dense(units=1, activation='sigmoid') # nb_classes=2
        # self.densel = layers.Dense(units=nb_classes, activation='softmax')

    def bert_embedding(self, inputs):
        _, embeds = self.bert_layer([inputs[:, 0, :], inputs[:, 1, :], inputs[:, 2, :]])
        return embeds

    def call(self, inputs, training):
        x = self.bert_embedding(inputs)
        x2 = self.bigram(x)
        x2 = self.pool(x2)
        x3 = self.trigram(x)
        x3 = self.pool(x3)
        x4 = self.quadgram(x)
        x4 = self.pool(x4) # (batch_size, nb_filters)

        merged = tf.concat([x2, x3, x4], axis=1) # (batch_size, 3*nb_filters)
        merged = self.dense1(merged)
        merged = self.dropout(merged, training)
        output = self.densel(merged)

        return output

In [None]:
dcnn = DCNN(len(tokenizer.vocab))
dcnn.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# dcnn.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['sparse_categorical_accuracy'])

In [None]:
checkpoint = tf.train.Checkpoint(dcnn)
cppath = '/content/drive/MyDrive/Text-Classification-v1/Checkpoint/'
cpmanager = tf.train.CheckpointManager(checkpoint, cppath, max_to_keep=1)

if cpmanager.latest_checkpoint:
    checkpoint.restore(cpmanager.latest_checkpoint)
    print('latest checkpoint has been restored')

latest checkpoint has been restored


In [None]:
class saveCheckpointCallback(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None):
        cpmanager.save()
        print('checkpoint has been created at {}'.format(cppath))

In [None]:
# dcnn.fit(train_dataset, epochs=5, callbacks=[saveCheckpointCallback()])

In [None]:
# results = dcnn.evaluate(test_dataset)
# results

In [None]:
# dcnn.save('/content/drive/MyDrive/Text-Classification-v1/model')
# model = tf.keras.models.load_model('/content/drive/MyDrive/Text-Classification-v1/model')