In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/CSE_497/Midterm_Project/

Importing Dependencies

In [None]:
import re
import string
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")

import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer, WordNetLemmatizer
nltk.download("stopwords")
nltk.download('omw-1.4')
nltk.download('wordnet')
stop_words = set(stopwords.words("english"))

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split,KFold, GridSearchCV
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, f1_score

import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import Adamax, Adam
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.layers import BatchNormalization, Concatenate
from tensorflow.keras.layers import Conv1D, GlobalMaxPooling1D, Dropout, SpatialDropout1D, MaxPooling1D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, LSTM, Embedding, Bidirectional, ReLU

Load dataset

In [None]:
df = pd.read_csv('emotion_data_alt.csv')
len(df)

Data preprocessing

In [None]:
lemmatizer= WordNetLemmatizer()
contraction_dict = {"ain't": "is not", "aren't": "are not","can't": "cannot", "coz":"because", "'cause": "because", "could've": "could have", "couldn't": "could not", "didn't": "did not",  "doesn't": "does not", "don't": "do not", "hadn't": "had not", "hasn't": "has not", "haven't": "have not", "he'd": "he would","he'll": "he will", "he's": "he is", "how'd": "how did", "how'd'y": "how do you", "how'll": "how will", "how's": "how is",  "I'd": "I would", "I'd've": "I would have", "I'll": "I will", "I'll've": "I will have","I'm": "I am", "I've": "I have", "i'd": "i would", "i'd've": "i would have", "i'll": "i will",  "i'll've": "i will have","i'm": "i am", "i've": "i have", "isn't": "is not", "it'd": "it would", "it'd've": "it would have", "it'll": "it will", "it'll've": "it will have","it's": "it is", "let's": "let us", "ma'am": "madam", "mayn't": "may not", "might've": "might have","mightn't": "might not","mightn't've": "might not have", "must've": "must have", "mustn't": "must not", "mustn't've": "must not have", "needn't": "need not", "needn't've": "need not have","o'clock": "of the clock", "oughtn't": "ought not", "oughtn't've": "ought not have", "shan't": "shall not", "sha'n't": "shall not", "shan't've": "shall not have", "she'd": "she would", "she'd've": "she would have", "she'll": "she will", "she'll've": "she will have", "she's": "she is", "should've": "should have", "shouldn't": "should not", "shouldn't've": "should not have", "so've": "so have","so's": "so as", "this's": "this is","that'd": "that would", "that'd've": "that would have", "that's": "that is", "there'd": "there would", "there'd've": "there would have", "there's": "there is", "here's": "here is","they'd": "they would", "they'd've": "they would have", "they'll": "they will", "they'll've": "they will have", "they're": "they are", "they've": "they have", "to've": "to have", "wasn't": "was not", "we'd": "we would", "we'd've": "we would have", "we'll": "we will", "we'll've": "we will have", "we're": "we are", "we've": "we have", "weren't": "were not", "what'll": "what will", "what'll've": "what will have", "what're": "what are",  "what's": "what is", "what've": "what have", "when's": "when is", "when've": "when have", "where'd": "where did", "where's": "where is", "where've": "where have", "who'll": "who will", "who'll've": "who will have", "who's": "who is", "who've": "who have", "why's": "why is", "why've": "why have", "will've": "will have", "won't": "will not", "won't've": "will not have", "would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have", "y'all": "you all", "y'all'd": "you all would","y'all'd've": "you all would have","y'all're": "you all are","y'all've": "you all have","you'd": "you would", "you'd've": "you would have", "you'll": "you will", "you'll've": "you will have", "you're": "you are", "you've": "you have"}

def _get_contractions(contraction_dict):
    contraction_re = re.compile('(%s)' % '|'.join(contraction_dict.keys()))
    return contraction_dict, contraction_re

contractions, contractions_re = _get_contractions(contraction_dict)

def replace_contractions(text): # Expanding contraction
    def replace(match):
        return contractions[match.group(0)]
    return contractions_re.sub(replace, text)

def preprocess_text(text):

  url_pattern = re.compile(r'https?://\S+|www\.\S+') # Remove URLS
  text = url_pattern.sub(r'', text)

  text = replace_contractions(text)

  text = re.sub('[%s]' % re.escape("""!"#$%&'()*+,،-./:;<=>؟?@[\]^_`{|}~"""), ' ', text) # Remove Punctuations
  text = text.replace('؛',"", )

  text = re.sub('\s+', ' ', text) # Remove Empty spaces
  text =  " ".join(text.split())

  text = text.split()
  text = [y.lower().strip() for y in text] # Lower case

  lemmatized = []
  for word in text:
    if( word not in stop_words) and (word not in string.punctuation) and (not word.isdigit()): # Remove stopwords and numbers
      word = lemmatizer.lemmatize(word)  # Lemmatize the words
      lemmatized.append(word)

  return " ".join(lemmatized)


def normalize_sentence(sentence):
    sentence = preprocess_text(sentence)
    return sentence


Apply the pre processing to the dataset

In [None]:
df.Text = df.Text.apply(lambda text : normalize_sentence(text))

Splitting the dataset into train, test and validation set

In [None]:
X = df['Text']
y = df['Emotion']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=0.33, random_state=42)

Encoding the labels using LabelEncoder

In [None]:
le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)

y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
y_val = to_categorical(y_val)

Tokenizing the data

In [None]:
tokenizer = Tokenizer(oov_token='UNK')
tokenizer.fit_on_texts(pd.concat([X_train, X_test, X_val], axis=0))

Converting text to sequences

In [None]:
sequences_train = tokenizer.texts_to_sequences(X_train)
sequences_test = tokenizer.texts_to_sequences(X_test)
sequences_val = tokenizer.texts_to_sequences(X_val)

In [None]:
maxlen = max([len(t.split()) for t in df['Text']])
maxlen

Padding the data values to make it uniform

In [None]:
X_train = pad_sequences(sequences_train, maxlen=maxlen, truncating='post')
X_test = pad_sequences(sequences_test, maxlen=maxlen, truncating='post')
X_val = pad_sequences(sequences_val, maxlen=maxlen, truncating='post')

vocabSize = len(tokenizer.index_word) + 1
print(f"Vocabulary size = {vocabSize}")

Loading GloVe dataset

In [None]:
!wget http://nlp.stanford.edu/data/glove.6B.zip
!unzip glove*.zip

In [None]:
num_tokens = vocabSize
embedding_dim = 200
embeddings_index = {}

with open('./glove.6B.200d.txt') as f:
    for line in f:
        word, coefs = line.split(maxsplit=1)
        coefs = np.fromstring(coefs, "f", sep=" ")
        embeddings_index[word] = coefs
print("Found %s word vectors." % len(embeddings_index))

embedding_matrix = np.zeros((num_tokens, embedding_dim))
for word, i in tokenizer.word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

Early Stopping callback configuration

In [None]:
callback = EarlyStopping(monitor="val_loss",patience=4, restore_best_weights=True)

Defining the CNN model

In [None]:
#cnn model
CNNmodel = Sequential()
CNNmodel.add(Embedding(vocabSize, 200, weights=[embedding_matrix], input_length=maxlen, trainable=False, input_shape = X_train[0].shape))

CNNmodel.add(Conv1D(256, 5, activation='leaky_relu'))
CNNmodel.add(BatchNormalization())

CNNmodel.add(Conv1D(256, 5, activation='relu'))
CNNmodel.add(BatchNormalization())

CNNmodel.add(GlobalMaxPooling1D())

CNNmodel.add(Dense(256, activation='relu'))
CNNmodel.add(BatchNormalization())
CNNmodel.add(Dropout(0.5))

CNNmodel.add(Dense(128, activation='relu'))
CNNmodel.add(BatchNormalization())
CNNmodel.add(Dropout(0.3))

CNNmodel.add(Dense(6, activation='softmax'))

CNNmodel.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
CNNmodel.summary()

Training the CNN model

In [None]:
# Fit model
with tf.device('/device:GPU:0'):
  performance = CNNmodel.fit(X_train,
                      y_train,
                      validation_data=(X_val, y_val),
                      verbose=1,
                      batch_size=128,
                      epochs=30,
                      callbacks=[callback]
                    )

Evaluating the CNN model

In [None]:
predicted = CNNmodel.predict(X_test)
y_pred = predicted.argmax(axis=-1)

print(classification_report(y_test.argmax(axis=-1), y_pred))

Saving the CNN model

In [None]:
import pickle

with open('CNNmodel_alt.pkl', 'wb') as f:
    pickle.dump(CNNmodel, f)

Defining the SimpleRNN model

In [None]:
adam = Adam(learning_rate=0.005)

simpleRNN = Sequential()
simpleRNN.add(Embedding(vocabSize, 200, input_length=X_train.shape[1], weights=[embedding_matrix], trainable=False, input_shape=X_train[0].shape))

simpleRNN.add(SimpleRNN(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=True))
simpleRNN.add(SimpleRNN(128, dropout=0.2, recurrent_dropout=0.2))

simpleRNN.add(Dense(6, activation='softmax'))

simpleRNN.compile(loss='categorical_crossentropy', optimizer=adam, metrics=['accuracy'])
simpleRNN.summary()

Training the SimpleRNN model

In [None]:
# Fit model
with tf.device('/device:GPU:0'):
  history = simpleRNN.fit(X_train,
                      y_train,
                      validation_data=(X_val, y_val),
                      verbose=1,
                      batch_size=128,
                      epochs=5,
                      callbacks=[callback]
                    )

Evaluating the SimpleRNN model

In [None]:
predicted = simpleRNN.predict(X_test)
y_pred = predicted.argmax(axis=-1)

print(classification_report(y_test.argmax(axis=-1), y_pred))

Saving the SimpleRNN model

In [None]:
import pickle

with open('SimpleRNN_alt.pkl', 'wb') as f:
    pickle.dump(simpleRNN, f)

Defining the BiLSTM model

In [None]:
adam = Adam(learning_rate=0.005)

modelRNN = Sequential()
modelRNN.add(Embedding(vocabSize, 200, input_length=X_train.shape[1], weights=[embedding_matrix], trainable=False, input_shape=X_train[0].shape))

modelRNN.add(Bidirectional(LSTM(256, dropout=0.2, recurrent_dropout=0.2, return_sequences=True)))
modelRNN.add(Bidirectional(LSTM(128, dropout=0.2, recurrent_dropout=0.2, return_sequences=True)))
modelRNN.add(Bidirectional(LSTM(128, dropout=0.2, recurrent_dropout=0.2)))

modelRNN.add(Dense(6, activation='softmax'))

modelRNN.compile(loss='sparse_categorical_crossentropy', optimizer=adam, metrics=['accuracy'])
modelRNN.summary()

Training the BiLSTM model

In [None]:
# Fit model
with tf.device('/device:GPU:0'):
  history = modelRNN.fit(X_train,
                      y_train,
                      validation_data=(X_val, y_val),
                      verbose=1,
                      batch_size=128,
                      epochs=5,
                      callbacks=[callback]
                    )

Evaluating the BiLSTM model

In [None]:
predicted = modelRNN.predict(X_test)
y_pred = predicted.argmax(axis=-1)

print(classification_report(y_test.argmax(axis=-1), y_pred))

Plotting the confusion matrix

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

confusion_matrix = confusion_matrix(y_test.argmax(axis=-1), y_pred)
cm_display = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels=le.inverse_transform([0, 1, 2, 3, 4, 5]))

cm_display.plot()
plt.show()

Saving the BiLSTM model

In [None]:
import pickle

with open('RNNmodel_alt.pkl', 'wb') as f:
    pickle.dump(modelRNN, f)

Defining the Combined (CNN + BiLSTM) model

In [None]:
CombinedModel = Sequential()
CombinedModel.add(Embedding(vocabSize,
                        embedding_dim,
                        weights=[embedding_matrix],
                        input_length=maxlen,
                        trainable=False, input_shape = X_train[0].shape))

CombinedModel.add(SpatialDropout1D(0.2))

# CNN layers
CombinedModel.add(Conv1D(256, 5, activation='relu', padding='same'))
CombinedModel.add(BatchNormalization())
CombinedModel.add(Conv1D(256, 5, activation='relu', padding='same'))
CombinedModel.add(BatchNormalization())
CombinedModel.add(MaxPooling1D(pool_size=2))

# RNN layers
CombinedModel.add(Bidirectional(LSTM(256, return_sequences=True, dropout=0.2, recurrent_dropout=0.2)))
CombinedModel.add(Bidirectional(LSTM(128, dropout=0.2, recurrent_dropout=0.2)))
CombinedModel.add(Dense(128, activation='relu'))
CombinedModel.add(BatchNormalization())
CombinedModel.add(Dropout(0.3))

# Output layer
CombinedModel.add(Dense(6, activation='softmax'))

CombinedModel.summary()
CombinedModel.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])

Training the combined model

In [None]:
# Fit model
with tf.device('/device:GPU:0'):
  history = CombinedModel.fit(X_train,
                      y_train,
                      validation_data=(X_val, y_val),
                      verbose=1,
                      batch_size=128,
                      epochs=15,
                      callbacks=[callback]
                    )

Evaluating the combined model

In [None]:
predicted = CombinedModel.predict(X_test)
y_pred = predicted.argmax(axis=-1)

print(classification_report(y_test.argmax(axis=-1), y_pred))

Saving the combined model

In [None]:
import pickle

with open('CombinedModel_alt.pkl', 'wb') as f:
    pickle.dump(CombinedModel, f)

Displaying the confusion matrix

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

confusion_matrix = confusion_matrix(y_test.argmax(axis=-1), y_pred)
cm_display = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels=le.inverse_transform([0, 1, 2, 3, 4, 5]))

cm_display.plot()
plt.show()

Saving the combined model

In [None]:
import pickle
with open('CombinedModel_alt.pkl', 'rb') as f:
    CombinedModel = pickle.load(f)


Now, fine tuning the Combined Model to find the ideal parameters

In [None]:
!pip install scikeras scikit-learn==1.3.1

Implementing GridSearchCV with a parameter grid to find optimal hyper-parameters

In [None]:
from sklearn.model_selection import GridSearchCV
from scikeras.wrappers import KerasClassifier

# Define a function to create the model (required for KerasClassifier)
def create_model(filters=128, kernel_size=5, dropout_rate=0.5, learning_rate=0.001, act='relu'):
    CombinedModel = Sequential()
    CombinedModel.add(Embedding(vocabSize,
                            embedding_dim,
                            weights=[embedding_matrix],
                            input_length=maxlen,
                            trainable=False, input_shape = X_train[0].shape))

    # Spatial Dropout
    CombinedModel.add(SpatialDropout1D(dropout_rate))

    # CNN layers
    CombinedModel.add(Conv1D(256, kernel_size, activation=act, padding='same'))
    CombinedModel.add(BatchNormalization())
    CombinedModel.add(Conv1D(filters, kernel_size, activation=act, padding='same'))

    CombinedModel.add(BatchNormalization())
    CombinedModel.add(MaxPooling1D(pool_size=2))

    CombinedModel.add(GlobalMaxPooling1D())

    # RNN layers
    CombinedModel.add(Bidirectional(LSTM(filters, dropout=dropout_rate, return_sequences=True, recurrent_dropout=0.2)))
    CombinedModel.add(Bidirectional(LSTM(filters, dropout=dropout_rate, recurrent_dropout=0.2)))
    CombinedModel.add(Dense(128, activation=act))
    CombinedModel.add(BatchNormalization())
    CombinedModel.add(Dropout(dropout_rate))

    # Output layer
    CombinedModel.add(Dense(6, activation='softmax'))

    CombinedModel.summary()
    CombinedModel.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return CombinedModel

# Wrap the Keras model
model = KerasClassifier(create_model, epochs=5, batch_size=128)

# Define the hyperparameter grid
param_grid = {
    'model__filters': [128, 256],
    'model__kernel_size': [3, 5],
    'model__dropout_rate': [0.3, 0.5],
    'model__act': ['relu', 'leaky_relu']
}

# Create the GridSearchCV object
grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=3, scoring='accuracy', verbose=1)

# Perform the grid search
grid_result = grid.fit(X_train, y_train)

# Print the results
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))