<a href="https://colab.research.google.com/github/AsmaTidafi/Bert-keras-implementation/blob/main/Bert_for_Question_Paragraph_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install tensorflow
!pip install tokenizers
!pip install transformers

import os
import re
import string
import sklearn
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tokenizers import BertWordPieceTokenizer
from transformers import BertTokenizer, TFBertModel
from transformers import BertConfig

## Load the data

In [None]:
df_train = pd.read_csv('train.tsv', delimiter='\t', names=['index', 'question', 'sentence', 'label'])
df_train.set_index('index', inplace=True)

df_test = pd.read_csv('dev.tsv', delimiter='\t', names=['index', 'question', 'sentence', 'label'])
df_test.set_index('index', inplace=True)

In [None]:
df_train = df_train.drop([df_train.index[0]])
df_test = df_test.drop([df_test.index[0]])

In [None]:
df_test = df_test[df_test.label.str.contains('entailment')]
df_train = df_train[df_train.label.str.contains('entailment')]

In [None]:
possible_labels = df_train.label.unique()

In [None]:
label_dict = {}
for index, possible_label in enumerate(possible_labels):
    label_dict[possible_label] = index

In [None]:
label_dict

In [None]:
df_train['label'] = df_train.label.replace(label_dict)
df_test['label'] = df_test.label.replace(label_dict)

In [None]:
from sklearn.model_selection import train_test_split

train, val = train_test_split(df_train, test_size=0.2, random_state=0)
df_train = pd.DataFrame(train)
df_val = pd.DataFrame(val)

In [None]:
len(df_train)

In [None]:
len(df_val)

## Set up tokenizer

In [None]:
tokenizer = BertTokenizer.from_pretrained(
    'bert-base-uncased',
    do_lower_case=True
)

In [None]:
max_len = 300
class SquadExample:
    def __init__(self, question, context, is_impossible):
        self.question = question
        self.context = context
        self.is_impossible = is_impossible

    def preprocess(self):
        context = self.context
        question = self.question
        is_impossible = self.is_impossible

        # Clean context, answer and question
        context = " ".join(str(context).split())
        question = " ".join(str(question).split())

        # Tokenize context
        tokenized_context = tokenizer.encode(context)

        # Tokenize question
        tokenized_question = tokenizer.encode(question)

        # Create inputs
        input_ids = tokenized_context + tokenized_question[1:]
        token_type_ids = [0] * len(tokenized_context) + [1] * len(tokenized_question[1:])
        attention_mask = [1] * len(input_ids)

        # Pad and create attention masks.
        # Skip if truncation is needed
        padding_length = max_len - len(input_ids)
        if padding_length > 0:  # pad
            input_ids = input_ids + ([0] * padding_length)
            attention_mask = attention_mask + ([0] * padding_length)
            token_type_ids = token_type_ids + ([0] * padding_length)
        elif padding_length < 0:  # skip
            tokenized_context_ = []
            m = len(tokenized_context) + padding_length - 1
            
            i = 0
            for item in tokenized_context:
              if i > m:
                break
              else:
                tokenized_context_.append(item)
                i += 1
            
            input_ids = tokenized_context_ + tokenized_question[1:]
            token_type_ids = [0] * len(tokenized_context_) + [1] * len(tokenized_question[1:])
            attention_mask = [1] * len(input_ids)

        self.input_ids = input_ids
        self.token_type_ids = token_type_ids
        self.attention_mask = attention_mask
        self.is_impossible = is_impossible


def create_squad_examples(df):
    squad_examples = []
    for index, row in df.iterrows():
        question = str(row['question'])
        sentence = str(row['sentence'])
        label = row['label']
        squad_eg = SquadExample(question, sentence, label)
        squad_eg.preprocess()
        squad_examples.append(squad_eg)
    return squad_examples


def create_inputs_targets(squad_examples):
    dataset_dict = {
        "input_ids": [],
        "token_type_ids": [],
        "attention_mask": [],
        "is_impossible": [],
    }
    for item in squad_examples:
        for key in dataset_dict:
            dataset_dict[key].append(getattr(item, key))
    for key in dataset_dict:
        dataset_dict[key] = np.array(dataset_dict[key])

    x = [
        dataset_dict["input_ids"],
        dataset_dict["token_type_ids"],
        dataset_dict["attention_mask"],
    ]
    y = [dataset_dict["is_impossible"]]

    return x, y

## Tokenize data

In [None]:
train_squad_examples = create_squad_examples(df_train)
val_squad_examples = create_squad_examples(df_val)
test_squad_examples = create_squad_examples(df_test)

In [None]:
x_train, y_train = create_inputs_targets(train_squad_examples)
print(f"{len(train_squad_examples)} training points created.")

x_val, y_val = create_inputs_targets(val_squad_examples)
print(f"{len(val_squad_examples)} val points created.")

x_test, y_test = create_inputs_targets(test_squad_examples)
print(f"{len(test_squad_examples)} test points created.")

## Create model class

In [None]:
def create_model(dropout_value):
    config = BertConfig(hidden_dropout_prob=dropout_value)
    ## BERT encoder
    encoder = TFBertModel.from_pretrained("bert-base-uncased", config=config)
    
    # QA Model
    input_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
    token_type_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
    attention_mask = layers.Input(shape=(max_len,), dtype=tf.int32)
    output = encoder(
        input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask
    )[1]
    
    output = layers.Dense(1, use_bias=True)(output)
    output = layers.Activation(keras.activations.sigmoid)(output)


    model = keras.Model(
        inputs=[input_ids, token_type_ids, attention_mask],
        outputs=output,
    )

    # creates an optimizer with learning rate schedule
    optimizer = tf.keras.optimizers.Adam(lr=1e-5)
    metrics = [tf.keras.metrics.SparseCategoricalAccuracy('accuracy', dtype=tf.float32)
               tf.keras.metrics.TruePositives(),
               tf.keras.metrics.TrueNegatives(),
               tf.keras.metrics.FalsePositives(),
               tf.keras.metrics.FalseNegatives()]
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics)
    
    return model

## Use TPU

In [None]:
# Create distribution strategy
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)

## Run training

In [None]:
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

# Create model
with strategy.scope():
    model = create_model(0.2)

model.summary()

output = model.fit(x_train,
                  y_train,
                  epochs=100,
                  verbose=1,
                  batch_size=128,
                  validation_data=(x_val, y_val),
                  callbacks = callback)

## Plot graphics

In [None]:
plot([x for x in range(1,len(output.history['val_loss'])+1)], [x for x in output.history['val_loss']], label="Validation set", color="g")
plot([x for x in range(1,len(output.history['val_loss'])+1)], [x for x in output.history['loss']], label="Training set", color="r")
ylabel('Loss')
xlabel('Epochs')
legend(loc='best')
show()

In [None]:
plot([x for x in range(1,len(output.history['val_loss'])+1)], [x for x in output.history['val_accuracy']], label="Validation set", color="g")
plot([x for x in range(1,len(output.history['val_loss'])+1)], [x for x in output.history['accuracy']], label="Training set", color="r")
ylabel('Accuracy')
xlabel('Epochs')
legend(loc='best')
show()

In [None]:
## Save the model
model.save_weights('/content/drive/My Drive/Model.h5')

## Test the model

In [None]:
y_pred = model.predict(x_test)

In [None]:
y = []
for i in y_pred:
  if i < 0.5:
    y.append(0)
  else:
    y.append(1)

In [None]:
sklearn.metrics.accuracy_score(y_test[0], y)