## Importing the relevant libraries

In [1]:
!pip install wget

[0m[31mERROR: Could not find a version that satisfies the requirement wget (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for wget[0m[31m
[0m

In [None]:
!pip install keras-tuner

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import warnings, wget
warnings.filterwarnings('ignore')
wget.download("https://raw.githubusercontent.com/yogawicaksana/helper_prabowo/main/helper_prabowo_ml.py",out="helper_prabowo_ml.py")
import tensorflow as tf
import keras_tuner as kt
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from wordcloud import WordCloud
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, Flatten, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical, plot_model
from sklearn.metrics import classification_report, accuracy_score, precision_score, f1_score, recall_score
from tqdm.notebook import tqdm
tqdm.pandas()
from helper_prabowo_ml import clean_html, remove_links, non_ascii, lower, email_address, removeStopWords, punct, remove_, remove_special_characters, remove_digits

In [None]:
plt.rcParams['figure.figsize'] = (12,8) # Setting the default figure size

## Loading the dataset

In [None]:
train = pd.read_csv("/kaggle/input/emotion-classification-nlp/emotion-labels-train.csv")
val = pd.read_csv("/kaggle/input/emotion-classification-nlp/emotion-labels-val.csv")
test = pd.read_csv("/kaggle/input/emotion-classification-nlp/emotion-labels-test.csv")
train.head()

In [None]:
train.shape, val.shape, test.shape

In [None]:
df = pd.concat([train,val,test],axis=0)
df = df.sample(frac=1).reset_index(drop=True)
df.head()

## Data Exploration, Exploratory Data Analysis (EDA) and Feature Engineering

In [None]:
df.shape

In [None]:
df.label.value_counts()

In [None]:
df.info()

In [None]:
df.isnull().sum()

In [None]:
df.duplicated().sum()

In [None]:
## Checking for any empty strings within the text column
df.text.str.isspace().sum()

In [None]:
wc = WordCloud(width=600,height=300,random_state=101).generate(' '.join(df.text))
plt.imshow(wc)
plt.title('Word Cloud of text',fontsize=32,fontweight='bold',color='crimson',pad=20);

In [None]:
df['text_len'] = df.text.apply(len)
df.head()

In [None]:
sns.distplot(df.text_len)
plt.title('Distribution Plot of text length',fontsize=32,color='magenta',fontweight='bold',pad=20);

In [None]:
df.describe()

In [None]:
max_text_len = 90 # Setting the max text length to its average value

## Encoding the target labels

In [None]:
labels_dict = {}

for idx, label in enumerate(df.label.unique()):
    labels_dict[label] = idx

labels_dict

In [None]:
sns.countplot(df,x='label');

There are 4 emotions in total, fear, sadness, joy and anger. As is evident from the above graph, the class "fear" has the highest instances, which is followed by the emotions "anger", "joy" and "sadness" with "sadness" having the least instances.

In [None]:
df.label = df.label.apply(lambda x: labels_dict[x])

## Preprocessing the text data

In [None]:
def text_preprocess(data,col):
    data[col] = data[col].progress_apply(func=clean_html) # Eliminates HTML tags and other HTML syntax
    data[col] = data[col].progress_apply(func=punct) # Removes punctuation characters
    data[col] = data[col].progress_apply(func=remove_) # Removes underspace characters
    data[col] = data[col].progress_apply(func=remove_digits) # Removes digits
    data[col] = data[col].progress_apply(func=remove_links) # Eliminates links and URLs
    data[col] = data[col].progress_apply(func=remove_special_characters) # Removes special characters
    data[col] = data[col].progress_apply(func=removeStopWords) # Removes stopwords
    data[col] = data[col].progress_apply(func=lower) # Converts text to lowercase
    data[col] = data[col].progress_apply(func=non_ascii) # Removes non-ASCII characters
    data[col] = data[col].progress_apply(func=email_address) # Removes email addresses
    return data

In [None]:
preprocessed_df = text_preprocess(df,'text')
preprocessed_df.head()

## Downloading the pretrained tokenizer and BERT transformer model from the Hugging Face platform

In [None]:
tokenizer = AutoTokenizer.from_pretrained('LawalAfeez/emotion_detection')
bert_model = TFAutoModelForSequenceClassification.from_pretrained('LawalAfeez/emotion_detection')

## Splitting the preprecessed dataset into train and test sets

In [None]:
train_df, test_df = train_test_split(preprocessed_df,test_size=0.3,random_state=101,shuffle=True,stratify=preprocessed_df.label)

## Performing tokenization of text data

In [None]:
X_train = tokenizer(text=train_df.text.tolist(),
                   add_special_tokens=True,
                   padding="max_length",
                   truncation=False,
                   max_length=max_text_len,
                   return_tensors='tf',
                   return_token_type_ids=False,
                   return_attention_mask=True,
                   verbose=1)

X_test = tokenizer(text=test_df.text.tolist(),
                  add_special_tokens=True,
                  padding="max_length",
                  truncation=False,
                  max_length=max_text_len,
                  return_tensors='tf',
                  return_token_type_ids=False,
                  return_attention_mask=True,
                  verbose=1)

## Defining the model architecture

In [None]:
input_ids = Input(shape=(max_text_len,),dtype=tf.int32,name='input_ids')
attention_mask = Input(shape=(max_text_len,),dtype=tf.int32,name='attention_mask')

In [None]:
word_embeddings = bert_model(input_ids,attention_mask=attention_mask)[0] # 0 --> final hidden state, 1 --> pooling output

output = Flatten()(word_embeddings)
output = Dense(units=2048,activation='relu')(output)
output = BatchNormalization()(output)
output = Dropout(0.3)(output)
output = Dense(units=1024,activation='relu')(output)
output = BatchNormalization()(output)
output = Dropout(0.25)(output)
output = Dense(units=1024,activation='relu')(output)
output = BatchNormalization()(output)
output = Dropout(0.2)(output)
output = Dense(units=512,activation='relu')(output)
output = BatchNormalization()(output)
output = Dropout(0.2)(output)
output = Dense(units=128,activation='relu')(output)
output = Dense(units=4,activation='softmax')(output)

model = Model(inputs=[input_ids,attention_mask],outputs=output)
model.layers

In [None]:
# Making the pretrained BERT transformer model layer trainable so that it can be fine-tuned according to our custom dataset
model.layers[2].trainable = True

## Visualizing the model architecture

In [None]:
model.summary()

In [None]:
plot_model(model,to_file='fine-tuned_transformer_model.png',dpi=100,show_shapes=True)

## Compiling the model

In [None]:
adam = Adam(learning_rate=5e-5,
           epsilon=2e-8,
           weight_decay=1e-2,
           clipnorm=1.0)

model.compile(loss='categorical_crossentropy',optimizer=adam,metrics='categorical_accuracy')

## Training the fine-tuned BERT transformer model

In [None]:
es = EarlyStopping(monitor='val_balanced_accuracy',patience=150,mode='max',verbose=1,restore_best_weights=True,start_from_epoch=50)
mc = ModelCheckpoint('emotion_detector.h5',monitor='val_balanced_accuracy',verbose=1,save_best_only=True,mode='max')

r = model.fit(x={'input_ids': X_train['input_ids'], 'attention_mask': X_train['attention_mask']},
             y=to_categorical(train_df.label),
             epochs=200,
             batch_size=64,
             callbacks=[es,mc],
             validation_data=({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']},to_categorical(test_df.label))
             )

## Visualizing the model performance during training

In [None]:
plt.plot(r.history['loss'],'r',label='train loss')
plt.plot(r.history['val_loss'],'b',label='validation loss')
plt.xlabel('Number of Epochs')
plt.ylabel('Categorical Crossentropy Loss')
plt.title('Loss Graph')
plt.legend();

In [None]:
plt.plot(r.history['categorical_accuracy'],'r',label='train accuracy')
plt.plot(r.history['val_categorical_accuracy'],'b',label='validation accuracy')
plt.xlabel('Number of Epochs')
plt.ylabel('Categorical Accuracy')
plt.title('Categorical Accuracy Graph')
plt.legend();

## Tuning the hyperparameters of the fine-tuned BERT transformer model

In [None]:
def build_model(hp):
    input_ids = Input(shape=(max_text_len,),dtype=tf.int32,name='input_ids')
    attention_mask = Input(shape=(max_text_len,),dtype=tf.int32,name='attention_mask')
    word_embeddings = bert_model(input_ids,attention_mask=attention_mask)[0] # 0 --> final hidden state, 1 --> pooling output
    output = Flatten()(word_embeddings)
    
    for i in range(hp.Int('num_layers',min_value=1,max_value=10)):
        output = Dense(units=hp.Int('units'+str(i),min_value=32,max_value=1024,step=32),
                       activation=hp.Choice("activation",["relu","tanh","sigmoid"]))(output)
        output = BatchNormalization()(output)
        output = Dropout(hp.Choice('dropout'+str(i),values=[0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]))(output)
        
    output = Dense(units=4,activation='softmax')(output)
    model = Model(inputs=[input_ids,attention_mask],outputs=output)
    model.layers[2].trainable = True
    optimizer = hp.Choice('optimizer',values=['adam','rmsprop','sgd','adadelta','nadam'])
    model.compile(loss='categorical_crossentropy',optimizer=optimizer,metrics='categorical_accuracy')
    return model

In [None]:
tuner = kt.RandomSearch(build_model,
                        objective=kt.Objective(name='val_categorical_accuracy',direction='max'),
                        project_name='hyperparameter-tuning',
                        directory='my_dir')

In [None]:
tuner.search({'input_ids': X_train['input_ids'], 'attention_mask': X_train['attention_mask']}, 
             to_categorical(train_df.label), 
             epochs=50,
             batch_size=64,
             validation_data=({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']}, to_categorical(test_df.label)))

In [None]:
tuner.get_best_hyperparameters()[0].values # Displaying the most optimal hyperparameters

In [None]:
try:
    final_model = tuner.get_best_models(num_models=1)[0] # Fetching the best performing model
    print(final_model)
except Exception as e:
    print(e)

## Evaluating the performance of the fine-tuned DistilBERT transformer model on test data

### Baseline fine-tuned DistilBERT model evaluation

In [None]:
loss, acc = model.evaluate({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']},to_categorical(test_df.label))
print("Test Sparse Categorical Balanced Crossentropy Loss:",loss)
print("Test Balanced Categorical Accuracy:",acc)

In [None]:
test_predictions = model.predict({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']})
test_predictions = np.argmax(test_predictions,axis=1)
print("Dictionary of encoded labels:",labels_dict)
print("Classification Report:")
print(classification_report(test_df.label,test_predictions))

In [None]:
print("Accuracy Score:",accuracy_score(test_df.label,test_predictions))
print("Micro-Averaged Precision Score:",precision_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged Precision Score:",precision_score(test_df.label,test_predictions,average='macro'))
print("Weighted Precision Score:",precision_score(test_df.label,test_predictions,average='weighted'))
print("Micro-Averaged Recall Score:",recall_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged Recall Score:",recall_score(test_df.label,test_predictions,average='macro'))
print("Weighted Recall Score:",recall_score(test_df.label,test_predictions,average='weighted'))
print("Micro-Averaged F1 Score:",f1_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged F1 Score:",f1_score(test_df.label,test_predictions,average='macro'))
print("Weighted F1 Score:",f1_score(test_df.label,test_predictions,average='weighted'))

### Model evaluation after tuning the hyperparameters of fine-tuned DistilBERT model

In [None]:
loss, acc = final_model.evaluate({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']},to_categorical(test_df.label))
print("Test Sparse Categorical Balanced Crossentropy Loss:",loss)
print("Test Balanced Categorical Accuracy:",acc)

In [None]:
test_predictions = final_model.predict({'input_ids': X_test['input_ids'], 'attention_mask': X_test['attention_mask']})
test_predictions = np.argmax(test_predictions,axis=1)
print("Dictionary of encoded labels:",labels_dict)
print("Classification Report:")
print(classification_report(test_df.label,test_predictions))

In [None]:
print("Accuracy Score:",accuracy_score(test_df.label,test_predictions))
print("Micro-Averaged Precision Score:",precision_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged Precision Score:",precision_score(test_df.label,test_predictions,average='macro'))
print("Weighted Precision Score:",precision_score(test_df.label,test_predictions,average='weighted'))
print("Micro-Averaged Recall Score:",recall_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged Recall Score:",recall_score(test_df.label,test_predictions,average='macro'))
print("Weighted Recall Score:",recall_score(test_df.label,test_predictions,average='weighted'))
print("Micro-Averaged F1 Score:",f1_score(test_df.label,test_predictions,average='micro'))
print("Macro-Averaged F1 Score:",f1_score(test_df.label,test_predictions,average='macro'))
print("Weighted F1 Score:",f1_score(test_df.label,test_predictions,average='weighted'))