<a href="https://colab.research.google.com/github/cheeseleeeeena/text_classification_bert/blob/main/multiclass_text_classification_BERT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparation - Google Colab

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!nvidia-smi

In [None]:
!pip install transformers

In [None]:
train_path = "/content/drive/MyDrive/Colab Notebooks/dataset/train.csv"
test_path = "/content/drive/MyDrive/Colab Notebooks/dataset/test.csv"

# Preparation - Local

In [None]:
train_path = "dataset/train.csv"
test_path = "dataset/test.csv"

In [19]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import tensorflow as tf
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

# Load dataset

In [None]:
train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

In [None]:
train_df.head()

In [None]:
train_df.info()

In [None]:
train_df["Class Index"].value_counts()

In [None]:
"""
The class labels are:

idx  cls
0    1: World
1    2: Sports
2    3: Business
3    4: Sci/Tech
"""

In [None]:
labeling = {
    1:0,
    2:1,
    3:2,
    4:3
}

In [None]:
train_df['Class Index'] = train_df['Class Index'].apply(lambda x : labeling[x])
test_df['Class Index'] = test_df['Class Index'].apply(lambda x: labeling[x])

In [None]:
test_df.head()

In [None]:
sns.countplot(x ='Class Index', data = train_df)

# Data Preprocessing

In [None]:
pre_trained_model_name = 'bert-base-cased'

tokenizer = BertTokenizer.from_pretrained(pre_trained_model_name)

In [None]:
# Choosing Sequence Length
token_lens = []
train_df['content'] = train_df['Title'] + ' ' + train_df['Description']
del train_df['Title']
del train_df['Description']

for txt in train_df.content:
    tokens = tokenizer.encode(txt, max_length=512, truncation=True)
    token_lens.append(len(tokens))

In [None]:
# set the background style of the plot

sns.distplot(token_lens)
plt.xlim([0, 256])
plt.xlabel('Token Count')


In [None]:
X_input_ids = np.zeros((len(train_df), 256))
X_attn_masks = np.zeros((len(train_df), 256))

In [None]:
X_input_ids.shape

In [None]:
# MAX_LEN = 256

def generate_training_data(df, ids, masks, tokenizer):
    for i, text in tqdm(enumerate(df['content'])):
        tokenized_text = tokenizer.encode_plus(
            text,
            max_length=256,
            add_special_tokens=True,
            return_token_type_ids=False,
            padding="max_length",
            truncation = True,
            return_tensors='tf'
        )
        ids[i, :] = tokenized_text.input_ids
        masks[i, :] = tokenized_text.attention_mask
    return ids, masks

In [None]:
X_input_ids, X_attn_masks = generate_training_data(train_df, X_input_ids, X_attn_masks, tokenizer)

In [None]:
X_input_ids

In [None]:
labels = np.zeros((len(train_df), 4))
labels.shape

In [None]:
labels[np.arange(len(train_df)), train_df['Class Index'].values] = 1 # one-hot encoded target tensor

In [None]:
labels

In [None]:
# creating a data pipeline using tensorflow dataset utility, creates batches of data for easy loading...
dataset = tf.data.Dataset.from_tensor_slices((X_input_ids, X_attn_masks, labels))
dataset.take(1) # one sample data

In [None]:
def DatasetMapFunction(input_ids, attn_masks, labels):
    return {
        'input_ids': input_ids,
        'attention_mask': attn_masks
    }, labels

In [None]:
dataset = dataset.map(DatasetMapFunction) # converting to required format for tensorflow dataset

In [None]:
dataset.take(1)

In [None]:
dataset = dataset.shuffle(10000).batch(16, drop_remainder=True) # batch size, drop any left out tensor

In [None]:
dataset.take(1)

In [None]:
p = 0.8
train_size = int((len(train_df)//16)*p)
# for each 16 batch of data we will have len(df)//16 samples, take 80% of that for train.

print(train_size)

In [None]:
train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size)

In [None]:
len(train_dataset), len(val_dataset)

#Model

In [None]:
from transformers import TFBertModel

In [None]:
model = TFBertModel.from_pretrained('bert-base-cased') # bert base model with pretrained weights

In [None]:
# defining 2 input layers for input_ids and attn_masks
input_ids = tf.keras.layers.Input(shape=(256,), name='input_ids', dtype='int32')
attn_masks = tf.keras.layers.Input(shape=(256,), name='attention_mask', dtype='int32')

bert_embds = model.bert(input_ids, attention_mask=attn_masks)[1] # 0 -> activation layer (3D), 1 -> pooled output layer (2D)
intermediate_layer = tf.keras.layers.Dense(512, activation='relu', name='intermediate_layer')(bert_embds)
output_layer = tf.keras.layers.Dense(4, activation='softmax', name='output_layer')(intermediate_layer) # softmax -> calcs probs of classes

my_model = tf.keras.Model(inputs=[input_ids, attn_masks], outputs=output_layer)
my_model.summary()

In [None]:
"""
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.01,
    decay_steps=10000,
    decay_rate=0.9)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
"""

optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=1e-5, decay=1e-6)
loss_function = tf.keras.losses.CategoricalCrossentropy()
accuracy = tf.keras.metrics.CategoricalAccuracy('accuracy')

In [None]:
my_model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

In [None]:
hist = my_model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=1
)

In [None]:
my_model.save('my_multiclass_bert')

# Prediction

In [None]:
test_df.shape

In [None]:
test_df.info()

In [None]:
test_df.head(10)

In [None]:
test_df['content'] = test_df['Title'] + ' ' + test_df['Description']
del test_df['Title']
del test_df['Description']

In [None]:
test_df.head(5)

In [None]:
my_multiclass_bert = tf.keras.models.load_model('my_multiclass_bert')

tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

## Another way?

In [None]:
input_ids = np.zeros((len(train_df), 256))
attn_masks = np.zeros((len(train_df), 256))

def prepare_data2(df, ids, masks, tokenizer):
    for i, text in tqdm(enumerate(df['content'])):
        tokenized_text = tokenizer.encode_plus(
            text,
            max_length=256,
            add_special_tokens=True,
            return_token_type_ids=False,
            padding="max_length",
            truncation = True,
            return_tensors='tf'
        )
        ids[i, :] = tokenized_text.input_ids
        masks[i, :] = tokenized_text.attention_mask
    return {
        'input_ids': ids,
        'attention_mask': masks
        }

def make_prediction2(model, processed_data,
                    class_index=['1', '2', '3', '4']):
    results = model.predict(processed_data, batch_size=20)
    pred_class_index = []
    for prob in results:
      idx = class_index[np.argmax(prob)]
      pred_class_index.append(idx)
    return pred_class_index

In [None]:
processed_data = prepare_data2(test_df, input_ids, attn_masks, tokenizer)
pred_class_index = make_prediction2(my_multiclass_bert, processed_data=processed_data)

## Brutal Force

In [None]:
def prepare_data(input_text, tokenizer):
    token = tokenizer.encode_plus(
        input_text,
        max_length=256,
        truncation=True,
        padding='max_length',
        add_special_tokens=True,
        return_tensors='tf'
    )
    return {
        'input_ids': tf.cast(token.input_ids, tf.float64),
        'attention_mask': tf.cast(token.attention_mask, tf.float64)
    }

def make_prediction(model, processed_data,
                    class_index=['1', '2', '3', '4'],
                    class_names=['World','Sports','Business', 'Sci/Tech']):
    probs = model.predict(processed_data)[0]
    class_idx = class_index[np.argmax(probs)]
    class_name = class_names[np.argmax(probs)]
    return class_idx, class_name

In [None]:
# exec time: 27min

pred_class_index = []
class_names = []

for txt in test_df.content:
    processed_data = prepare_data(txt, tokenizer)
    index, name = make_prediction(my_multiclass_bert, processed_data=processed_data)
    pred_class_index.append(index)
    class_names.append(name)

test_df['Predicted Class'] = pred_class_index
test_df['Predicted Class Names'] = class_names

# Evaluation



In [None]:
test_df.head(5)

In [None]:
reverse_labeling = {
    0:1,
    1:2,
    2:3,
    3:4
}

test_df['Class Index'] = test_df['Class Index'].apply(lambda x: reverse_labeling[x])

In [None]:
test_df = test_df.astype({"Class Index": int, "Predicted Class": int})

In [None]:
y_test = list(test_df["Class Index"])
y_pred = list(test_df["Predicted Class"])

In [None]:
print(classification_report(y_test, y_pred, target_names=['1', '2', '3', '4']))

In [None]:
def show_confusion_matrix(confusion_matrix):
  hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues")
  hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right')
  hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right')
  plt.ylabel('True class')
  plt.xlabel('Predicted class');

cm = confusion_matrix(y_test, y_pred)
df_cm = pd.DataFrame(cm, index=['1', '2', '3', '4'], columns=['1', '2', '3', '4'])
show_confusion_matrix(df_cm)

In [None]:
input_text = """
The Race is On: Second Private Team Sets Launch Date for Human Spaceflight (SPACE.com) SPACE.com - TORONTO, Canada -- A second\team of rocketeers competing for the #36;10 million Ansari X Prize, a contest for\privately funded suborbital space flight, has officially announced the first\launch date for its manned rocket.
"""

processed_data = prepare_data(input_text, tokenizer)
index, name = make_prediction(my_multiclass_bert, processed_data=processed_data)
print(f'Raw text: {input_text}')
print(f'Predicted index: {index}')
print(f'Predicted class: {name}')