In [None]:
import tensorflow as tf
import os
print(f"Tensorflow version: {tf.__version__}")

import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split

import numpy as np
from sklearn.model_selection import train_test_split

from transformers import (TFBertForSequenceClassification, 
                          BertTokenizer)

from tqdm import tqdm

import tensorflow as tf
from tensorflow.keras import layers

from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score

In [None]:
pad_token=0
pad_token_segment_id=0
max_length=128

def convert_to_input(reviews):
  input_ids,attention_masks,token_type_ids=[],[],[]
  
  for x in tqdm(reviews,position=0, leave=True):
    inputs = bert_tokenizer.encode_plus(x,add_special_tokens=True, max_length=max_length)
    
    i, t = inputs["input_ids"], inputs["token_type_ids"]
    m = [1] * len(i)

    padding_length = max_length - len(i)

    i = i + ([pad_token] * padding_length)
    m = m + ([0] * padding_length)
    t = t + ([pad_token_segment_id] * padding_length)
    
    input_ids.append(i)
    attention_masks.append(m)
    token_type_ids.append(t)
  
  return [np.asarray(input_ids), 
            np.asarray(attention_masks), 
            np.asarray(token_type_ids)]

def example_to_features(input_ids,attention_masks,token_type_ids,y):
  return {"input_ids": input_ids,
          "attention_mask": attention_masks,
          "token_type_ids": token_type_ids},y

def get_token_ids(texts):
    return bert_tokenizer.batch_encode_plus(texts, 
                                            add_special_tokens=True, 
                                            max_length = 128, 
                                            pad_to_max_length = True)["input_ids"]

In [None]:
class CustomIMDBModel(tf.keras.Model):
    
    def __init__(self,
                 vocabulary_size,
                 embedding_dimensions=128,
                 cnn_filters=50,
                 dnn_units=512,
                 model_output_classes=2,
                 dropout_rate=0.1,
                 training=False,
                 name="custom_imdb_model"):
        super(CustomIMDBModel, self).__init__(name=name)
        
        self.embedding = layers.Embedding(vocabulary_size,
                                          embedding_dimensions)
        self.cnn_layer1 = layers.Conv1D(filters=cnn_filters,
                                        kernel_size=2,
                                        padding="valid",
                                        activation="relu")
        self.cnn_layer2 = layers.Conv1D(filters=cnn_filters,
                                        kernel_size=3,
                                        padding="valid",
                                        activation="relu")
        self.cnn_layer3 = layers.Conv1D(filters=cnn_filters,
                                        kernel_size=4,
                                        padding="valid",
                                        activation="relu")
        self.pool = layers.GlobalMaxPool1D()
        
        self.dense_1 = layers.Dense(units=dnn_units, activation="relu")
        self.dropout = layers.Dropout(rate=dropout_rate)
        if model_output_classes == 2:
            self.last_dense = layers.Dense(units=1,
                                           activation="sigmoid")
        else:
            self.last_dense = layers.Dense(units=model_output_classes,
                                           activation="softmax")
    
    def call(self, inputs, training):
        l = self.embedding(inputs)
        l_1 = self.cnn_layer1(l) 
        l_1 = self.pool(l_1) 
        l_2 = self.cnn_layer2(l) 
        l_2 = self.pool(l_2)
        l_3 = self.cnn_layer3(l)
        l_3 = self.pool(l_3) 
        
        concatenated = tf.concat([l_1, l_2, l_3], axis=-1) # (batch_size, 3 * cnn_filters)
        concatenated = self.dense_1(concatenated)
        concatenated = self.dropout(concatenated, training)
        model_output = self.last_dense(concatenated)
        
        return model_output

In [None]:
bert_tokenizer = BertTokenizer.from_pretrained("bert-base-cased")


In [None]:
data = pd.read_csv("dataset.csv")

# Transform positive/negative values to 1/0s
label_encoder = preprocessing.LabelEncoder()
data['sentiment'] = label_encoder.fit_transform(data['sentiment'])
X = (np.array(data['review']))
y = (np.array(data['sentiment']))

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=13)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=42)

X_test_input=convert_to_input(X_test)
X_train_input=convert_to_input(X_train)
X_val_input=convert_to_input(X_val)

train_token_ids = get_token_ids(X_train)
test_token_ids = get_token_ids(X_test)

train_data = tf.data.Dataset.from_tensor_slices((tf.constant(train_token_ids), tf.constant(y_train))).batch(12)
test_data = tf.data.Dataset.from_tensor_slices((tf.constant(test_token_ids), tf.constant(y_test))).batch(12)

In [None]:
results_true = test_ds.unbatch()
results_true = np.asarray([element[1].numpy() for element in results_true])

In [None]:
VOCAB_LENGTH = len(bert_tokenizer.vocab)
EMB_DIM = 200
CNN_FILTERS = 100
DNN_UNITS = 256
OUTPUT_CLASSES = 2
DROPOUT_RATE = 0.2
NB_EPOCHS = 5

custom_model = CustomIMDBModel(vocabulary_size=VOCAB_LENGTH,
                        embedding_dimensions=EMB_DIM,
                        cnn_filters=CNN_FILTERS,
                        dnn_units=DNN_UNITS,
                        model_output_classes=OUTPUT_CLASSES,
                        dropout_rate=DROPOUT_RATE)

if OUTPUT_CLASSES == 2:
    custom_model.compile(loss="binary_crossentropy",
                       optimizer="adam",
                       metrics=["accuracy"])
else:
    custom_model.compile(loss="sparse_categorical_crossentropy",
                       optimizer="adam",
                       metrics=["sparse_categorical_accuracy"])

In [None]:
custom_model.fit(train_data, epochs=NB_EPOCHS)
results_predicted = [1 if x>=0.5 else 0 for x in custom_model.predict(test_data) ]
results_true = np.array(y_test)

print(f"F1 score: {f1_score(results_true, results_predicted)}")
print(f"Accuracy score: {accuracy_score(results_true, results_predicted)}")