In [None]:
!pip install farasapy
import pandas as pd
import numpy as np
from time import process_time
import pickle

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

from custom_functions import *
from custom_models import *

import tensorflow as tf
from tensorflow.keras import Sequential, Model
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import Input, Embedding, Dense, GlobalMaxPooling1D, MaxPooling1D, Conv1D, Dropout, GlobalAveragePooling1D,LSTM, Bidirectional, TimeDistributed, Flatten
from tensorflow.keras.layers import concatenate
from tensorflow.keras.utils import plot_model

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# For windows users (If exception in plot_model() function)
# import os
# os.environ["PATH"] += os.pathsep + 'C:/Program Files/Graphviz/bin'

# Data prep

3.  Split into train and test

In [None]:
# D_train, D_val =  train_test_split(data ,test_size=0.2, random_state=42, stratify )
D_train = pd.read_csv("../Arabic Sentiment Identification/dataset/experiments/stratified_train_set.csv")
D_val = pd.read_csv("../Arabic Sentiment Identification/dataset/experiments/stratified_val_set.csv")

D_train = D_train.dropna()
D_val  = D_val.dropna()
D_train = D_train.reset_index()
D_val = D_val.reset_index()

4. Encode target variables (labels) to integers

In [None]:
class_label = "sentiment"

y_train, y_val = D_train[class_label].values.tolist(), D_val[class_label].values.tolist()

y_train = get_label_encoding(y_train)
y_val = get_label_encoding(y_val)

print(y_train.shape, y_val.shape)

Mapping:
{'NEG': 0, 'NEU': 1, 'POS': 2}
Mapping:
{'NEG': 0, 'NEU': 1, 'POS': 2}
(10029, 1) (2508, 1)


# Load Word Embeddings

1. Tokenize Tweets

In [None]:
text_label = "tweet_preprocessed"

# Tokenize Tweets
x_train = D_train[text_label].values.tolist()
x_val = D_val[text_label].values.tolist()
corpus  = x_train + x_test

tokenizer, x_train_tokenized, x_val_tokenized = tokenize_text(corpus,
                                                               x_train, x_val)

print('x_train_tokenized:',len(x_train_tokenized),'\nx_val_tokenized:',len(x_val_tokenized),
      "\nTotal Vocab: ",len(tokenizer.word_counts))

x_train_tokenized: 10029 
x_val_tokenized: 2508 
Total Vocab:  48543


2. Pad tweets

In [None]:
# Pad Tweets
maximum_text_length = 50
padding_type = "post"
truncating_type = "post"

x_train_padded = pad_text_sequence(x_train_tokenized, maximum_text_length, padding_type, truncating_type)
x_val_padded = pad_text_sequence(x_val_tokenized, maximum_text_length, padding_type, truncating_type)

print('x_train_padded:',x_train_padded.shape,'\nx_val_padded',x_val_padded.shape)

x_train_padded: (10029, 50) 
x_val_padded (2508, 50)


3. Generate embedding matrix

In [None]:
# Load word embeddings from file
vocab = tokenizer.word_index
embedding_dimension = 300
embedding_dict_file = "../Arabic Sentiment Identification/Word Embeddings/mazajak_pretrained_300.pkl"

embedding_matrix = get_embedding_matrix(vocab, embedding_dict_file = embedding_dict_file, embedding_dimension = embedding_dimension)

# Validate embedding_matrix shape
print("\nTotal Vocab:",len(vocab), "\nEmbeddings:",embedding_matrix.shape[0] -1 )

Loading embeddings from:  ../Arabic Sentiment Identification/word embeddings/fasttext_pretrained_300.pkl

Total words processed: 34722
Words not found:  13823

Total Vocab: 48544 
Embeddings: 48544


# Load Sentiment Features 

In [None]:
def get_sentiment_matrix(df):
    sentiment_matrix  = np.zeros((len(df), 5), dtype='float64')
    
    for i in range(len(df)):
        try:
            sentiment_matrix[i] = np.array([df['NEU_WEIGHTS'][i],df['POSNEG_WEIGHTS'][i], df['Pos_P'][i],
                                            df['Neg_P'][i], df['Neu_P'][i]], dtype='float64')
        except Exception as e:
            print(e)
    return sentiment_matrix

def get_tweet_weights(corpus, weights_dict):
  neu_weights = []
  posneg_weights = []
  for tweet in corpus:
    tweet = str(tweet)
    neu_c, posneg_c = 0,0
    for word in tweet.split():
      if word in weights_dict.keys():
        neu_c += weights_dict.get(word)[0]
        posneg_c += weights_dict.get(word)[1]
      else:
        neu_c += 0
        posneg_c += 0
    neu_weights.append(neu_c)
    posneg_weights.append(posneg_c)
  return neu_weights, posneg_weights

def get_tweet_probabilities(corpus, word_probabilities):
  Pos_P = []
  Neg_P = []
  Neu_P = []
  for tweet in corpus:
    tweet = str(tweet)
    pos, neg, neu = 0,0,0
    for word in tweet.split():
      p = word_probabilities.get((word, 'POS'))
      if p!=None:
        pos += p
      n = word_probabilities.get((word, 'NEG'))
      if n!=None:
        neg += n
      n1 = word_probabilities.get((word, 'NEU'))
      if n1!=None:
        neu += n1
    Pos_P.append(pos)
    Neg_P.append(neg)
    Neu_P.append(neu)
  return Pos_P, Neg_P, Neu_P



In [None]:
weights_dict = load_from_pickle("../Arabic Sentiment Identification/dataset/experiments/word_probabilities.pkl")
word_probabilities = load_from_pickle("../Arabic Sentiment Identification/dataset/experiments/word_weights.pkl")

# Getting x-train features
neu_weights, posneg_weights = get_tweet_weights(x_train, weights_dict)
Pos_P, Neg_P, Neu_P = get_tweet_probabilities(x_train, word_probabilities)

D_train["NEU_WEIGHTS"] = neu_weights
D_train['POSNEG_WEIGHTS'] = posneg_weights
D_train['Neg_P'] = Neg_P 
D_train['Pos_P'] = Pos_P
D_train['Neu_P'] = Neu_P

x_train_sentiment_matrix = get_sentiment_matrix(D_train)
print("Sentiment Matrix (Training): ",x_train_sentiment_matrix.shape)

# Getting x-train features
neu_weights, posneg_weights = get_tweet_weights(x_val, weights_dict)
Pos_P, Neg_P, Neu_P = get_tweet_probabilities(x_val, word_probabilities)

D_val["NEU_WEIGHTS"] = neu_weights
D_val['POSNEG_WEIGHTS'] = posneg_weights
D_val['Neg_P'] = Neg_P 
D_val['Pos_P'] = Pos_P
D_val['Neu_P'] = Neu_P

x_val_sentiment_matrix = get_sentiment_matrix(D_val)

print("Sentiment Matrix (Val): ",x_val_sentiment_matrix.shape)

Sentiment matrix (Train): (10029, 4)
Sentiment matrix (Test): (2508, 4)


# CNN baseline

In [None]:
# Define callback
custom_callback = myCallbacks(metrics="acc", threshold = 0.95)

# Define model hyperparameters
input_length = maximum_text_length
embedding_vocab = embedding_matrix.shape[0]
embedding_dimension = embedding_matrix.shape[1]
output_dimension = len(np.unique(y_train))

print('Shape of each Input Sentence: ',input_length,"x",embedding_dimension)
print('Shape of Input layer: ',len(x_train),"x",embedding_dimension)
print("Output classes: ",output_dimension)

# Load model
CNN_model = CNN(input_length = input_length, input_dimension = embedding_vocab, 
                  embedding_dimension = embedding_dimension, output_dimension = output_dimension,
                  embedding_matrix = embedding_matrix, num_layers = 1, trainable = False,
                   kernel_size = 5, dropout_rate = 0.25)

# View model summary
# print("\n\nModel Summary:")
# CNN_model.summary()

# Train model
print("\n\nTraining Model:")
model_history = CNN_model.fit(x = np.asarray(x_train_padded), y = np.asarray(y_train),
                               validation_data = (np.asarray(x_val_padded),np.asarray(y_val)),
                               epochs = 10, callbacks = [custom_callback])

# plot results
plot_results(model_history)

# print classification report
y_pred = np.argmax(model.predict([x_val_padded]),axis=-1)
print("\nClassification Report:\n\n",classification_report(y_true=y_val, y_pred = y_pred, labels = np.unique(y_val)))

# BiLSTM baseline

In [None]:
# Define callback
custom_callback = myCallbacks(metrics="acc", threshold = 0.95)

# Define model hyperparameters
input_length = maximum_text_length
embedding_vocab = embedding_matrix.shape[0]
embedding_dimension = embedding_matrix.shape[1]
output_dimension = len(np.unique(y_train))

print('Shape of each Input Sentence: ',input_length,"x",embedding_dimension)
print('Shape of Input layer: ',len(x_train),"x",embedding_dimension)
print("Output classes: ",output_dimension)

# Load model
BILSTM_model = BILSTM(input_length = input_length, input_dimension = embedding_vocab, 
                   embedding_dimension = embedding_dimension, output_dimension = output_dimension,
                   embedding_matrix = embedding_matrix, layer1 = 64, layer2 = 64, trainable = False,
                   dropout_rate = 0.25)

# View model summary
# print("\n\nModel Summary:")
# BILSTM_model.summary()

# Train model
print("\n\nTraining Model:")
model_history = BILSTM_model.fit(x = np.asarray(x_train_padded), y = np.asarray(y_train),
                               validation_data = (np.asarray(x_val_padded),np.asarray(y_val)),
                               epochs = 10, batch_size = 32, callbacks = [custom_callback])

# plot results
plot_results(model_history)

# print classification report
y_pred = np.argmax(model.predict([x_val_padded]),axis=-1)
print("\nClassification Report:\n\n",classification_report(y_true=y_val, y_pred = y_pred, labels = np.unique(y_val)))

# CNN - BiLSTM baseline

In [None]:
# Input 
input1 = embedding_matrix
input_len1 = maximum_text_length
embedding_vocab1 = embedding_matrix.shape[0]
embedding_dimension1 = embedding_matrix.shape[1]

# Output
output_dim = len(np.unique(y_train))

#--------------------------------------------------------------------------------------------------------------------#

# Input Channel 1
i1 = Input(shape = (input_len1, ))
e1 = Embedding(input_length=input_len1, input_dim=embedding_vocab1, output_dim = embedding_dimension1,
               weights = [input1], trainable = False)(i1)
c1 = Conv1D(filters = 256, kernel_size=3, padding = "same", activation="relu")(e1)
c1_do = Dropout(0.3)(c1)
max_pool1 = MaxPooling1D(pool_size=3)(c1_do)

b1 = Bidirectional(LSTM(128, dropout = 0.3, return_sequences = True, ))(max_pool1)
gmp1 = GlobalMaxPooling1D()(b1)

d1 = Dense(64, activation = "relu")(gmp1)

# Output layer
output = Dense(output_dim, activation = "softmax")(d1)

#---------------------------------------------------------------------------------------------------------------------#

# Compile
model = Model(inputs = [i1], outputs = output)
model.compile(optimizer = "adam", loss = "sparse_categorical_crossentropy", metrics = ["acc"])
model_history = model.fit([x_train_padded], y_train, epochs=10, verbose = 1, batch_size=64,
                         validation_data=([x_val_padded], y_val))

# # plot results
# plot_results(model_history)

# print classification report
y_pred = np.argmax(model.predict([x_val_padded]),axis=-1)
print("\nClassification Report:\n\n",classification_report(y_true=y_val, y_pred = y_pred, labels = np.unique(y_val)))


# Expermiental models