<a href="https://colab.research.google.com/github/MarufRayhan/bangla-sentiment-analysis/blob/main/bangla_emotion_bert.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from keras.preprocessing import text, sequence
from sklearn.preprocessing import LabelEncoder
import numpy as np
from sklearn.utils import shuffle

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
dataset = pd.read_csv("/content/drive/My Drive/Bangla_NLP/finalpuredataset.csv")
# dataset = pd.read_csv("finalbangladataset.csv")

In [None]:
dataset.head()

In [None]:
dataset = shuffle(dataset)

In [None]:
train_size = int(len(dataset) * 0.8)

In [None]:
train_text = dataset['Text'][:train_size]
train_label = dataset['Label'][:train_size]
test_text = dataset['Text'][train_size:]
test_label = dataset['Label'][train_size:]

In [None]:
token = text.Tokenizer(num_words=15000,char_level=False)
token.fit_on_texts(train_text)
X_train = token.texts_to_matrix(train_text)

In [None]:
# token = text.Tokenizer(num_words=15000,char_level=False)
# token.fit_on_texts(train_label)
# train_label_matrix = token.texts_to_matrix(train_label)

In [None]:
token = text.Tokenizer(num_words=15000,char_level=False)
token.fit_on_texts(test_text)
X_test = token.texts_to_matrix(test_text)

In [None]:
# token = text.Tokenizer(num_words=15000,char_level=False)
# token.fit_on_texts(test_label)
# test_label_matrix = token.texts_to_matrix(test_label)

In [None]:
print(X_train.shape)
print(X_test.shape)

In [None]:
encoder = LabelEncoder()
Y_train = encoder.fit_transform(train_label)
Y_test = encoder.fit_transform(test_label)

In [None]:
num_classes = np.max(Y_train) + 1

In [None]:
print(num_classes)

In [None]:
print(Y_test)
print(Y_test.shape)
print(Y_train)
print(Y_train.shape)

In [None]:
from keras import regularizers
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Activation, Dense

model = Sequential()
model.add(Dense(512, input_shape=(15000,), activation='relu'))
model.add(Dense(256, activation='relu'))
# model.add(Dense(128, activation='relu'))
model.add(Dense(num_classes))
model.add(Activation('softmax'))
opt = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(loss='binary_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])

In [None]:
history = model.fit(X_train,Y_train, batch_size=2, validation_split=0.33,epochs=50)

In [None]:
score = model.evaluate(X_test, Y_test,
                       batch_size=2, verbose=1)
print('Test accuracy:', score)

Using **BERT**

In [None]:

!pip install transformers

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
import torch.nn.functional as F
from transformers import BertTokenizer, BertConfig,AdamW, BertForSequenceClassification,get_linear_schedule_with_warmup


import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix,classification_report
# Import and evaluate each test batch using Matthew's correlation coefficient
from sklearn.metrics import accuracy_score,matthews_corrcoef

from tqdm import tqdm, trange,tnrange,tqdm_notebook
import random
import os
import io
% matplotlib inline

In [None]:
# identify and specify the GPU as the device, later in training loop we will load data into device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
n_gpu = torch.cuda.device_count()
torch.cuda.get_device_name(0)

SEED = 19

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if device == torch.device("cuda"):
    torch.cuda.manual_seed_all(SEED)

In [None]:
device = torch.device("cuda")

In [None]:
print(device)

In [None]:
dataset['Label'].unique()

In [None]:
# from sklearn.preprocessing import LabelEncoder
# labelencoder = LabelEncoder()
# dataset['label_enc'] = labelencoder.fit_transform(dataset['Label'])

In [None]:
# print(dataset['label_enc'])

In [None]:
# dataset[['Label','label_enc']].drop_duplicates(keep='first')
# dataset.rename(columns={'Label':'label_desc'},inplace=True)
# dataset.rename(columns={'label_enc':'Label'},inplace=True)

In [None]:
dataset

In [None]:
## create label and sentence list
sentences = dataset.Text.values

In [None]:
print(sentences)

In [None]:
#check distribution of data based on labels
print("Distribution of data based on labels: ",dataset.Label.value_counts())

In [None]:
MAX_LEN = 512

In [None]:
## Import BERT tokenizer, that is used to convert our text into tokens that corresponds to BERT library
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased',do_lower_case=False)
# input_ids = [tokenizer.encode(sent, add_special_tokens=True,max_length=MAX_LEN,pad_to_max_length=True) for sent in sentences]

In [None]:
print(tokenizer)

In [None]:
input_ids = [tokenizer.encode(sent, add_special_tokens=True,truncation=True,pad_to_max_length=True, max_length=100) for sent in sentences]

In [None]:
print(input_ids)

In [None]:
labels = dataset.Label.values

In [None]:
print(labels)

In [None]:
print("Actual sentence before tokenization: ",sentences[2])
print("Encoded Input from dataset: ",input_ids[2])

In [None]:
## Create attention mask
attention_masks = []
## Create a mask of 1 for all input tokens and 0 for all padding tokens
attention_masks = [[float(i>0) for i in seq] for seq in input_ids]
print(attention_masks[2])

In [None]:

train_inputs,validation_inputs,train_labels,validation_labels = train_test_split(input_ids,labels,random_state=41,test_size=0.1)
train_masks,validation_masks,_,_ = train_test_split(attention_masks,input_ids,random_state=41,test_size=0.1)

In [None]:
print(train_inputs)
print(len(train_inputs))

In [None]:
# convert all our data into torch tensors, required data type for our model
train_inputs = torch.tensor(train_inputs)
validation_inputs = torch.tensor(validation_inputs)
train_labels = torch.tensor(train_labels)
validation_labels = torch.tensor(validation_labels)
train_masks = torch.tensor(train_masks)
validation_masks = torch.tensor(validation_masks)

In [None]:
print(len(train_labels))
print(len(train_inputs))
print(len(train_masks))

In [None]:
# Select a batch size for training. For fine-tuning BERT on a specific task, the authors recommend a batch size of 16 or 32
batch_size = 32

# Create an iterator of our data with torch DataLoader. This helps save on memory during training because, unlike a for loop,
# with an iterator the entire dataset does not need to be loaded into memory
train_data = TensorDataset(train_inputs,train_masks,train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data,sampler=train_sampler,batch_size=batch_size)

validation_data = TensorDataset(validation_inputs,validation_masks,validation_labels)
validation_sampler = RandomSampler(validation_data)
validation_dataloader = DataLoader(validation_data,sampler=validation_sampler,batch_size=batch_size)

In [None]:
train_data[0]

In [None]:
print(len(train_dataloader))

In [None]:
# Load BertForSequenceClassification, the pretrained BERT model with a single linear classification layer on top.
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=7)

# Parameters:
lr = 2e-5
adam_epsilon = 1e-8

# Number of training epochs (authors recommend between 2 and 4)
epochs = 2

num_warmup_steps = 0
# num_training_steps = len(train_dataloader)*epochs
num_training_steps = 5*epochs

### In Transformers, optimizer and schedules are splitted and instantiated like this:
optimizer = AdamW(model.parameters(), lr=lr,eps=adam_epsilon,correct_bias=False)  # To reproduce BertAdam specific behavior set correct_bias=False
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=num_warmup_steps, num_training_steps=num_training_steps)  # PyTorch scheduler

In [None]:
# print((b_labels))

In [None]:
## Store our loss and accuracy for plotting
train_loss_set = []
learning_rate = []

# Gradients gets accumulated by default
model.zero_grad()

# tnrange is a tqdm wrapper around the normal python range
for _ in tnrange(1,epochs+1,desc='Epoch'):
  print("<" + "="*22 + F" Epoch {_} "+ "="*22 + ">")
  # Calculate total loss for this epoch
  batch_loss = 0

  for step, batch in enumerate(train_dataloader):
    # Set our model to training mode (as opposed to evaluation mode)
    model.train()
    print( step)
    # Add batch to GPU
    # batch = tuple(t.to(device) for t in batch)
    # Unpack the inputs from our dataloader
    b_input_ids, b_input_mask, b_labels = batch

    # Forward pass
    outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
    loss = outputs[0]

    # Backward pass
    loss.backward()

    # Clip the norm of the gradients to 1.0
    # Gradient clipping is not in AdamW anymore
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

    # Update parameters and take a step using the computed gradient
    optimizer.step()

    # Update learning rate schedule
    scheduler.step()

    # Clear the previous accumulated gradients
    optimizer.zero_grad()

    # Update tracking variables
    batch_loss += loss.item()

  # Calculate the average loss over the training data.
  avg_train_loss = batch_loss / len(train_dataloader)

  #store the current learning rate
  for param_group in optimizer.param_groups:
    print("\n\tCurrent Learning rate: ",param_group['lr'])
    learning_rate.append(param_group['lr'])

  train_loss_set.append(avg_train_loss)
  print(F'\n\tAverage Training loss: {avg_train_loss}')

  # Validation

  # Put model in evaluation mode to evaluate loss on the validation set
  model.eval()

  # Tracking variables
  eval_accuracy,eval_mcc_accuracy,nb_eval_steps = 0, 0, 0

  # Evaluate data for one epoch
  for batch in validation_dataloader:
    # Add batch to GPU
    batch = tuple(t.to(device) for t in batch)
    # Unpack the inputs from our dataloader
    b_input_ids, b_input_mask, b_labels = batch
    # Telling the model not to compute or store gradients, saving memory and speeding up validation
    with torch.no_grad():
      # Forward pass, calculate logit predictions
      logits = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)

    # Move logits and labels to CPU
    logits = logits[0].to('cpu').numpy()
    label_ids = b_labels.to('cpu').numpy()

    pred_flat = np.argmax(logits, axis=1).flatten()
    labels_flat = label_ids.flatten()

    df_metrics=pd.DataFrame({'Epoch':epochs,'Actual_class':labels_flat,'Predicted_class':pred_flat})

    tmp_eval_accuracy = accuracy_score(labels_flat,pred_flat)
    tmp_eval_mcc_accuracy = matthews_corrcoef(labels_flat, pred_flat)

    eval_accuracy += tmp_eval_accuracy
    eval_mcc_accuracy += tmp_eval_mcc_accuracy
    nb_eval_steps += 1
    print("eval : ", tmp_eval_accuracy)

  print(F'\n\tValidation Accuracy: {eval_accuracy/nb_eval_steps}')
  print(F'\n\tValidation MCC Accuracy: {eval_mcc_accuracy/nb_eval_steps}')

In [None]:
!nvidia-smi

In [None]:
import torch
torch.cuda.is_available()

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

***USING GRU***

In [None]:

from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import pandas as pd
import seaborn as sns
import re,nltk,json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.preprocessing.sequence import pad_sequences
# from keras import models
# from keras import layers
from tensorflow.keras.layers import LSTM,GRU
from tensorflow.keras.models import load_model
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,roc_auc_score
from sklearn.metrics import average_precision_score,roc_auc_score, roc_curve, precision_recall_curve
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.text import Tokenizer
np.random.seed(42)
class color: # Text style
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'
# dataset path
path = '/content/drive/My Drive/Bangla_NLP/'

In [None]:
# txt_file = open(path+"train.txt","r")
# data = pd.read_csv(path+'finalpuredataset.csv',encoding='utf-8')
# f = txt_file.readlines()
# print(f)
# import csv
# with open(path+'finalpuredataset_29_8.csv', 'w', newline='') as file:
#   writer = csv.writer(file)
#   writer.writerow(["Text", "Label"])
#   for line in f:
#     splitted_data = line.split(' ',1)
#     if(splitted_data[0] == "surprise"):
#       print(splitted_data)
#       writer.writerow([splitted_data[1], 6])

#   for ind in data.index:
#     #  print(data['Text'][ind], data['Label'][ind])
#      writer.writerow([data['Text'][ind], data['Label'][ind]])


In [None]:
data = pd.read_csv(path+'finalpuredataset.csv',encoding='utf-8')
# data = pd.read_csv('finalbangladataset.csv',encoding='utf-8')
data = data.sample(frac=1)
# data = data.drop(data[data.Label == 5].index)
print(f'Total number of Emotion: {len(data)}')
sns.set(font_scale=1.4)
data['Label'].value_counts().plot(kind='barh', figsize=(6, 4))
plt.xlabel("Number of Sentence", labelpad=12)
plt.ylabel("Label", labelpad=12)
plt.yticks(rotation = 45)
plt.title("Dataset Distribution", y=1.02);

In [None]:

data.columns

In [None]:

# Cleaning Data [Remove unncessary symbols]
def cleaning_data(row):
      headlines = re.sub('[^\u0980-\u09FF]',' ',str(row)) #removing unnecessary punctuation
      return headlines
# Apply the function into the dataframe
data['cleaned'] = data['Text'].apply(cleaning_data)

# print some cleaned reviews from the dataset
sample_data = [200,500,1000,1500,3000,3500]
for i in sample_data:
  print('Original: ',data.Text[i],'\nCleaned:',
           data.cleaned[i],'\n','Lebel:-- ',data.Label[i],'\n')

In [None]:
# Length of each headlines
data['length'] = data['cleaned'].apply(lambda x:len(x.split()))
# Remove the headlines with least words
dataset = data.loc[data.length>2]
dataset = dataset.reset_index(drop = True)
print("After Cleaning:","\nRemoved {} Small Emotion Data".format(len(data)-len(dataset)),
      "\nTotal Emotion Data:",len(dataset))

In [None]:
def data_summary(dataset):

    """
    This function will print the summary of the headlines and words distribution in the dataset.

    Args:
        dataset: list of cleaned sentences

    Returns:
        Number of documnets per class: int
        Number of words per class: int
        Number of unique words per class: int
    """
    documents = []
    words = []
    u_words = []
    total_u_words = [word.strip().lower() for t in list(dataset.cleaned) for word in t.strip().split()]
    class_label= [k for k,v in dataset.Label.value_counts().to_dict().items()]
  # find word list
    for label in class_label:
        word_list = [word.strip().lower() for t in list(dataset[dataset.Label==label].cleaned) for word in t.strip().split()]
        counts = dict()
        for word in word_list:
                counts[word] = counts.get(word, 0)+1
        # sort the dictionary of word list
        ordered = sorted(counts.items(), key= lambda item: item[1],reverse = True)
        # Documents per class
        documents.append(len(list(dataset[dataset.Label==label].cleaned)))
        # Total Word per class
        words.append(len(word_list))
        # Unique words per class
        u_words.append(len(np.unique(word_list)))

        print("\nClass Name : ",label)
        print("Number of Documents:{}".format(len(list(dataset[dataset.Label==label].cleaned))))
        print("Number of Words:{}".format(len(word_list)))
        print("Number of Unique Words:{}".format(len(np.unique(word_list))))
        print("Most Frequent Words:\n")
        for k,v in ordered[:10]:
              print("{}\t{}".format(k,v))
    print("Total Number of Unique Words:{}".format(len(np.unique(total_u_words))))

    return documents,words,u_words,class_label

#call the fucntion
documents,words,u_words,class_names = data_summary(dataset)

In [None]:
data_matrix = pd.DataFrame({'Total Documents':documents,
                            'Total Words':words,
                            'Unique Words':u_words,
                            'Class Names':class_names})
df = pd.melt(data_matrix, id_vars="Class Names", var_name="Label", value_name="Values")
plt.figure(figsize=(8, 6))
ax = plt.subplot()

sns.barplot(data=df,x='Class Names', y='Values' ,hue='Label')
ax.set_xlabel('Class Names')
ax.set_title('Data Statistics')

ax.xaxis.set_ticklabels(class_names, rotation=45);

In [None]:
# Calculate the Review of each of the Review
dataset['TextLength'] = dataset.cleaned.apply(lambda x:len(x.split()))
frequency = dict()
for i in dataset.TextLength:
    frequency[i] = frequency.get(i, 0)+1

plt.bar(frequency.keys(), frequency.values(), color ="b")
plt.xlim(1, 60)
# in this notbook color is not working but it should work.
plt.xlabel('Length of the Text')
plt.ylabel('Frequency')
plt.title('Length-Frequency Distribution')
plt.show()
print(f"Maximum Length of a Text: {max(dataset.TextLength)}")
print(f"Minimum Length of a Text: {min(dataset.TextLength)}")
print(f"Average Length of a Text: {round(np.mean(dataset.TextLength),0)}")

In [None]:
#==================================================
                                       ################# Label Encoding Function #########
                                       #==================================================

def label_encoding(Label,bool):
    """
    This function will return the encoded labels in array format.

    Args:
        Label: series of class names(str)
        bool: boolean (True or False)

    Returns:
        labels: numpy array
    """
    le = LabelEncoder()
    le.fit(Label)
    encoded_labels = le.transform(Label)
    labels = np.array(encoded_labels) # Converting into numpy array
    class_names =le.classes_ ## Define the class names again
    if bool == True:
        print("\n\t\t\t===== Label Encoding =====","\nClass Names:-->",le.classes_)
        for i in sample_data:
            print(Label[i],' ', encoded_labels[i],'\n')

    return labels



                           #===========================================================
                           ################# Dataset Splitting Function ###############
                           #===========================================================

def dataset_split(Text,Label):
    """
    This function will return the splitted (90%-10%-10%) feature vector .

    Args:
        Text: sequenced Text
        Label: encoded lables (array)

    Returns:
        X_train: training data
        X_valid: validation data
        X_test : testing feature vector
        y_train: training encoded labels (array)
        y_valid: training encoded labels (array)
        y_test : testing encoded labels (array)
    """

    X,X_test,y,y_test = train_test_split(Text,Label,train_size = 0.9,
                                                  test_size = 0.1,random_state =0)
    X_train,X_valid,y_train,y_valid = train_test_split(X,y,train_size = 0.8,
                                                  test_size = 0.2,random_state =0)
    print(color.BOLD+"\nDataset Distribution:\n"+color.END)
    print("\tSet Name","\t\tSize")
    print("\t========\t\t======")

    print("\tFull\t\t\t",len(Text),
        "\n\tTraining\t\t",len(X_train),
        "\n\tTest\t\t\t",len(X_test),
        "\n\tValidation\t\t",len(X_valid))

    return X_train,X_valid,X_test,y_train,y_valid,y_test

In [None]:
labels = label_encoding(dataset.Label,True)

In [None]:
print(labels)

In [None]:
X_train,X_valid,X_test,y_train,y_valid,y_test = dataset_split(dataset.Text,labels)

In [None]:
vocab_size = 57000
embedding_dim = 64
max_length = 59
trunc_type='post'
padding_type='post'
oov_tok = "<OOV>"

def padded_headlines(original,encoded,padded):
  '''
  print the samples padded headlines
  '''
  print(color.BOLD+"\n\t\t\t====== Encoded Sequences ======"+color.END,"\n")
  print(original,"\n",encoded)
  print(color.BOLD+"\n\t\t\t====== Paded Sequences ======\n"+color.END,original,"\n",padded)

In [None]:
# Train Data Tokenization
tokenizer = Tokenizer(num_words = vocab_size, oov_token=oov_tok)
tokenizer.fit_on_texts(X_train)
word_index = tokenizer.word_index
train_sequences = tokenizer.texts_to_sequences(X_train)
train_padded = pad_sequences(train_sequences, padding=padding_type, maxlen=max_length)

In [None]:
#============================== Tokenizer Info =================================
(word_counts,word_docs,word_index,document_count) = (tokenizer.word_counts,
                                                       tokenizer.word_docs,
                                                       tokenizer.word_index,
                                                       tokenizer.document_count)
def tokenizer_info(mylist,bool):
  ordered = sorted(mylist.items(), key= lambda item: item[1],reverse = bool)
  for w,c in ordered[:10]:
    print(w,"\t",c)
  #=============================== Print all the information =========================
print(color.BOLD+"\t\t\t====== Tokenizer Info ======"+color.END)
print("Words --> Counts:")
tokenizer_info(word_counts,bool =True )
print("\nWords --> Documents:")
tokenizer_info(word_docs,bool =True )
print("\nWords --> Index:")
tokenizer_info(word_index,bool =True )
print("\nTotal Documents -->",document_count)
print(f"Found {len(word_index)} unique tokens")

In [None]:
padded_headlines(X_train[398],train_sequences[10],train_padded[10])

In [None]:
print(X_test[354:364])

In [None]:
# Validation Data Tokenization
validation_sequences = tokenizer.texts_to_sequences(X_valid)
validation_padded = pad_sequences(validation_sequences, padding=padding_type , maxlen=max_length)
padded_headlines(X_valid[5608],validation_sequences[1],validation_padded[1])

In [None]:
# Test Data Tokenization
test_sequences = tokenizer.texts_to_sequences(X_test)
test_padded = pad_sequences(test_sequences, padding=padding_type , maxlen=max_length)
padded_headlines(X_test[444],test_sequences[100],test_padded[100])

In [None]:
print(X_test)

In [None]:
# Labels Tokenization
#label_tokenizer = Tokenizer()
#label_tokenizer.fit_on_texts(dataset.category)

train_label_seq = y_train
valid_label_seq = y_valid
testing_label_seq = y_test

#print(train_label_seq.shape)
#print(valid_label_seq.shape)
#print(testing_label_seq.shape)

In [None]:
from keras.models import Sequential
from keras.layers import Dense, GRU, Bidirectional, Embedding, Dropout, Conv1D, MaxPooling1D
keras.backend.clear_session()
accuracy_threshold = 0.97
vocab_size = 57000
embedding_dim = 64
max_length = 59
num_category = 6

class myCallback(keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs={}):
      if(logs.get('accuracy')>accuracy_threshold):
        print("\nReached %2.2f%% accuracy so we will stop trianing" % (accuracy_threshold*100))
        self.model.stop_training = True

acc_callback = myCallback()
# Saved the Best Model
filepath = path+"Model.h5"
checkpoint = keras.callbacks.ModelCheckpoint(filepath, monitor='val_accuracy', verbose=2, save_best_only=True,
                                             save_weights_only=False, mode='max')
callback_list = [acc_callback, checkpoint]
# model = tf.keras.Sequential([
#     tf.keras.layers.Embedding(vocab_size, embedding_dim, input_length=max_length),
#     tf.keras.layers.Bidirectional(GRU(64,dropout=0.2)),
#     tf.keras.layers.Dense(24, activation='relu'),
#     tf.keras.layers.Flatten(),
#     tf.keras.layers.Dense(num_category, activation='softmax')
# ])

model = Sequential()
model.add(Embedding(vocab_size, embedding_dim, input_length=max_length))
model.add(Dropout(.2))

model.add(Conv1D(128, 5, activation="relu"))
model.add(MaxPooling1D(pool_size=4))

model.add(Bidirectional(LSTM(128)))
model.add(Dropout(.5))

model.add(Dense(66, activation="softmax"))
# opt = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(loss='sparse_categorical_crossentropy',optimizer="adam",metrics=['accuracy'])
model.summary()

In [None]:
# !pip3 install --upgrade tensorflow

In [None]:
# from keras.models import Sequential
# from keras.layers import Dense, GRU, Bidirectional, Embedding, Dropout, Conv1D, MaxPooling1D

# keras.backend.clear_session()
# accuracy_threshold = 0.97
# vocab_size = 4809
# embedding_dim = 64
# max_length = 59
# num_category = 5

# class myCallback(keras.callbacks.Callback):
#   def on_epoch_end(self, epoch, logs={}):
#       if(logs.get('accuracy')>accuracy_threshold):
#         print("\nReached %2.2f%% accuracy so we will stop trianing" % (accuracy_threshold*100))
#         self.model.stop_training = True

# acc_callback = myCallback()
# # Saved the Best Model
# filepath = path+"Model.h5"
# checkpoint = keras.callbacks.ModelCheckpoint(filepath, monitor='val_accuracy', verbose=2, save_best_only=True,
#                                              save_weights_only=False, mode='max')
# callback_list = [acc_callback, checkpoint]

# model = Sequential()
# model.add(Embedding(vocab_size, embedding_dim, input_length=max_length,
#                             trainable=False))
# model.add(Dropout(0.5))
# model.add(Conv1D(128, 5, activation='relu'))
# model.add(MaxPooling1D(pool_size=4))

# model.add(Bidirectional(GRU(100)))
# model.add(Dense(32, activation="relu"))
# model.add(Dropout(0.5))
# model.add(Dense(66, activation="softmax"))
# model.compile(loss='sparse_categorical_crossentropy',optimizer='adam',metrics=['accuracy'])
# model.summary()

In [None]:
num_epochs = 30
batch = 8
history = model.fit(train_padded, train_label_seq,
                    epochs=num_epochs,
                    batch_size = batch,
                    validation_data=(validation_padded, valid_label_seq),
                    verbose=1,
                    callbacks = callback_list)

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
# load the Saved model from directory
# model = load_model(path+"LSTM_Model.h5")
model = load_model(path+"Model.h5")
predictions = model.predict(test_padded)
y_pred = np.argmax(predictions, axis=1)

cm = confusion_matrix(testing_label_seq, y_pred)

# Transform to df for easier plotting
cm_df = pd.DataFrame(cm,

                     index = ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise'],
                     columns = ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise'])

plt.figure(figsize=(8,6))
sns.heatmap(cm_df, annot=True,cmap="YlGnBu", fmt='g')
# plt.title('LSTM \n Test Accuracy: {0:.2f}'.format(accuracy_score(testing_label_seq, y_pred)*100))
plt.title('Confusion Matrix')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.xticks(rotation = 45)
plt.yticks(rotation = 45)
plt.show()

In [None]:
# 1 => 'আনন্দ'
# 2 => 'বিষণ্ণতা'
# 3 => 'ভয়'
# 4 => 'রাগ'
# 5 => 'ভালবাসা'
# 6 => 'আশ্চর্য'
# ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise']

In [None]:
report = pd.DataFrame(classification_report(y_true = testing_label_seq, y_pred = y_pred, output_dict=True)).transpose()
report = report.rename(index={'0': 'Joy','1':'Sadness','2':'Fear','3':'Anger','4':'Love','5':'Surprise'})
report[['precision','recall','f1-score']]=report[['precision','recall','f1-score']].apply(lambda x: round(x*100,2))
report

In [None]:
import matplotlib.pyplot as plt
print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

In [None]:
# new_complaint = ['কাজীরাঙ্গা জাতীয় উদ্যানে আমি বাঘ দেখে ভয়ে দৌড়ে পালালাম ']
# new_complaint = ['আমি প্রচুর অসুস্থ মানুষ রয়েছি তা শুনে আমি হতবাক ও দুঃখ বোধ করি']
# new_complaint = ['ভাই হেসে তো পেটে খিল ধরে গেছে।অসাধারণ']
# new_complaint = ['আজবতো প্রশ্ন হবে ফাঁস আর সাধারণ জনগণ খাবে বাঁশ? প্রশ্ন ফাঁস নিয়ে চরম একটি']
# new_complaint = ['আমি বরং পচা বোধ করছি তাই এখনই খুব উচ্চাকাঙ্ক্ষী নই']
# new_complaint = ['পুরাই অস্থির']
# new_complaint = ['ভিডিওটা দিয়ে কি বুঝাতে চেয়েছে কেউ একটু বলবেন']
# new_complaint = ['আমি এখনই খুব অসন্তুষ্ট বোধ করছি']
new_complaint = ['আমি এমন কিছু সম্পর্কে উত্তেজিত বোধ করি যা কেবলমাত্র আমার পক্ষে এটির ভিডিও এখানে']

seq = tokenizer.texts_to_sequences(new_complaint)
padded = pad_sequences(seq, maxlen=max_length)
pred = model.predict(padded)
labels = ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise']
print(pred, labels[np.argmax(pred)])

In [None]:
print(model)

In [None]:
print(X_test[394])
print(test_padded)

In [None]:
model = load_model(path+"Model.h5")
# model = load_model("Model.h5")
predictions = model.predict(test_padded[354:364])
y_pred = np.argmax(predictions, axis=1)
print(y_pred)
temp_y_pred = y_pred

In [None]:
print(testing_label_seq[354:364])
temp_testing_label_seq = testing_label_seq[354:364]

In [None]:
print("test data", " ", "Predicted Label", "True Label")
index = ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise']
test_result = {"test_sentence": [], "p_label": [], "T_label": []}
for i in X_test[354:364]:
  test_result["test_sentence"].append(i)
for i in temp_y_pred:
  test_result["p_label"].append(index[i])
for j in temp_testing_label_seq:
  test_result["T_label"].append(index[j])
pd.DataFrame.from_dict(test_result).to_csv(path+"bigru_test_result.csv", index= False)

In [None]:
print(test_result)

In [None]:
temp_x_test = ['আমি তার মৃত্যু দেখে মন খারাপ করেছিলাম',
               'আমি পরীক্ষায় ফেল করার পরে মন খারাপ করেছিলাম',
               'আমি বাঘ দেখে ভয় পেয়েছি',
               'আমি তার কাজ দেখে অবাক হয়েছি',
               'মানবতার কাজে নিয়োজিত ব্যাক্তিদের আমি মন থেকে ভালবাসি',
               'আমি তোমায় ভালোবাসি',
               'চিড়িয়াখানায় গিয়ে আমি খুশি হয়েছিলাম',
               'আমি এই মুহুর্তে নিরাপত্তাহীন বোধ করছি',
               'সে জোরে হাসছিল কার্টুনটি দেখে',
               'আমি আজ একটু স্বাচ্ছন্দ্য বোধ করছি',
               'করোনায় পরিস্থিতিতে মানুষের আকস্মিক  দুদর্শা দেখে আমি বেশ ব্যথিত',
               'আমি সাপ দেখে ভয় পেয়েছি',
               'আমি তার খারাপ ব্যবহারের জন্য রেগে গেছি',
               'মেসির ফ্রিকিক গোল দেখে আমি অবাক হয়ে গেলাম',
               'মীরের অভিনয়ে দেখে আমি প্রায়শই আট্ট হাসি দেই',
               'আমি তোমায় দেখে মুগ্ধ হয়েছি']
temp_testing_label_seq = [1, 1, 2, 5, 4, 4, 0, 2, 0, 0, 1, 2, 3, 5, 0,5]

In [None]:
# Test Data Tokenization
temp_test_sequences = tokenizer.texts_to_sequences(temp_x_test)
temp_test_padded = pad_sequences(temp_test_sequences, padding=padding_type , maxlen=59)
# temp_padded_headlines(X_test[444],temp_test_sequences[100],test_padded[100])

In [None]:
model = load_model(path+"Model.h5")
# model = load_model("Model.h5")
predictions = model.predict(temp_test_padded)
temp_y_pred = np.argmax(predictions, axis=1)

In [None]:
index = ['Joy' ,'Sadness' ,'Fear', 'Anger', 'Love', 'Surprise']
test_result = {"test_sentence": [], "p_label": [], "T_label": []}
error = 0
for i in temp_x_test:
  test_result["test_sentence"].append(i)
# for i in temp_y_pred:
#   test_result["p_label"].append(index[i])
# for j in temp_testing_label_seq:
#   test_result["T_label"].append(index[j])
for i in range(len(temp_y_pred)):
  if temp_y_pred[i] != temp_testing_label_seq[i]:
    test_result["T_label"].append(str(index[temp_testing_label_seq[i]])+" "+str(error))
    test_result["p_label"].append(str(index[temp_y_pred[i]]) + " "+str(error))
    error += 1
  else:
    test_result["T_label"].append(index[temp_testing_label_seq[i]])
    test_result["p_label"].append(index[temp_y_pred[i]])
df = pd.DataFrame.from_dict(test_result)
print(df)
print(error)
df.to_csv(path+"lstm_test_result.csv", index= False)

In [None]:
f1_score(temp_testing_label_seq, temp_y_pred, average='macro')

In [None]:
f1_score(temp_testing_label_seq, temp_y_pred, average='weighted')

In [None]:
f1_score(temp_testing_label_seq, temp_y_pred, average=None)