In [None]:
# mounting drive
from google.colab import drive
drive.mount('/content/gdrive')


In [None]:
import pandas as pd
import numpy as np
import string
import re
from keras.preprocessing.text import Tokenizer
from keras.utils import pad_sequences
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from sklearn.metrics import precision_score, recall_score, f1_score






In [None]:
# getting paths for HAUSA dataset
train_set_path = '/content/gdrive/My Drive/Colab Notebooks/HAUSA/train.tsv'
test_set_path = '/content/gdrive/My Drive/Colab Notebooks/HAUSA/test.tsv'
validation_set_path = '/content/gdrive/My Drive/Colab Notebooks/HAUSA/dev.tsv'

In [None]:
# getting paths for IGBO dataset
train_set_path_1 = '/content/gdrive/My Drive/Colab Notebooks/IGBO/train.tsv'
test_set_path_1 = '/content/gdrive/My Drive/Colab Notebooks/IGBO/test.tsv'
validation_set_path_1 = '/content/gdrive/My Drive/Colab Notebooks/IGBO/dev.tsv'

In [None]:
# getting paths for NIGERIAN PIDGIN dataset
train_set_path_2 = '/content/gdrive/My Drive/Colab Notebooks/PIDGIN/train.tsv'
test_set_path_2 = '/content/gdrive/My Drive/Colab Notebooks/PIDGIN/test.tsv'
validation_set_path_2 = '/content/gdrive/My Drive/Colab Notebooks/PIDGIN/dev.tsv'

In [None]:
# reading files for HAUSA dataset
train_set = pd.read_csv(train_set_path, sep='\t')
test_set = pd.read_csv(test_set_path, sep='\t')
validation_set = pd.read_csv(validation_set_path, sep='\t')


In [None]:
# reading files for IGBO dataset
train_set_1 = pd.read_csv(train_set_path_1, sep='\t')
test_set_1 = pd.read_csv(test_set_path_1, sep='\t')
validation_set_1 = pd.read_csv(validation_set_path_1, sep='\t')

In [None]:
# reading files for  NIGERIAN PIDGIN dataset
train_set_2 = pd.read_csv(train_set_path_2, sep='\t')
test_set_2 = pd.read_csv(test_set_path_2, sep='\t')
validation_set_2 = pd.read_csv(validation_set_path_2, sep='\t')

In [None]:
# preparing data for pre-processing
train_set_data = np.array(train_set["tweet"])
train_labels = np.array(train_set["label"])
test_set_data = np.array(test_set["tweet"])
test_labels = np.array(test_set["label"])
validation_set_data = np.array(validation_set["tweet"])
val_labels = np.array(validation_set["label"])

train_set_data_1 = np.array(train_set_1["tweet"])
print(train_set_data_1)
train_labels_1 = np.array(train_set_1["label"])
test_set_data_1 = np.array(test_set_1["tweet"])
test_labels_1 = np.array(test_set_1["label"])
validation_set_data_1 = np.array(validation_set_1["tweet"])
val_labels_1 = np.array(validation_set_1["label"])

train_set_data_2 = np.array(train_set_2["tweet"])
train_labels_2 = np.array(train_set_2["label"])
test_set_data_2 = np.array(test_set_2["tweet"])
test_labels_2 = np.array(test_set_2["label"])
validation_set_data_2 = np.array(validation_set_2["tweet"])
val_labels_2 = np.array(validation_set_2["label"])

In [None]:
# removing emojis from sentences
def remove_emojis(sentence):
  emoji_pattern = re.compile("["
        u"\U0001F600-\U0001F64F"  
        u"\U0001F300-\U0001F5FF"  
        u"\U0001F680-\U0001F6FF"  
        u"\U0001F1E0-\U0001F1FF" 
        u"\U00002500-\U00002BEF"  
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642" 
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  
        u"\u3030"
                      "]+", re.UNICODE)
  return emoji_pattern.sub(r'', sentence)

In [None]:
def data_clean_up(dataset):  
  new_dataset = []
  # removing emojis, twitter ids, URLs, punctuations, trailing spaces and digits
  # from dataset
  for line in dataset:
    line_1 = ""
    line_2 = line.split(" ")
    for word in line_2:
      if len(word) != 0 and word[0] != "@" and ("http" not in word) and word[0] != "#" and any(c.isalpha() for c in word):
        line_1 = line_1 + word + " "
    new_line_1 = line_1.translate(str.maketrans('', '', string.digits))
    new_line_2 = new_line_1.translate(str.maketrans('', '', string.punctuation))
    new_line_3 = remove_emojis(new_line_2)
    new_line_4 = new_line_3.lower()
    new_dataset.append(new_line_4.rstrip())
  return new_dataset
  


In [None]:
train_data = data_clean_up(train_set_data)
test_data = data_clean_up(test_set_data)
val_data = data_clean_up(validation_set_data)


train_data_1 = data_clean_up(train_set_data_1)
test_data_1 = data_clean_up(test_set_data_1)
val_data_1= data_clean_up(validation_set_data_1)

train_data_2 = data_clean_up(train_set_data_2)
test_data_2 = data_clean_up(test_set_data_2)
val_data_2 = data_clean_up(validation_set_data_2)


In [None]:
def get_vocab(dataset):
  all_text = ' '.join(dataset)
  words = all_text.split()
  count_words = Counter(words)
  total_words = len(words)
  sorted_words = count_words.most_common(total_words)
  vocab = {w:i+1 for i, (w,c) in enumerate(sorted_words)}
  return vocab

In [None]:
def encode(dataset, vocab):
  words_int = []
  for tweet in dataset:
    r = [vocab[w] for w in tweet.split()]
    words_int.append(r)
  return words_int  


In [None]:
def encode_labels(labels):
  encoded_labels = []
  for label in labels:
      if label == 'positive':
          encoded_labels.append(1)
      elif label == 'negative':
          encoded_labels.append(0)
      else:
          encoded_labels.append(2)    

  encoded_labels = np.array(encoded_labels)
  return encoded_labels
  

In [None]:

train_vocab = get_vocab(train_data)
test_vocab = get_vocab(test_data)
val_vocab =  get_vocab(val_data)

train_vocab_1 = get_vocab(train_data_1)
test_vocab_1 = get_vocab(test_data_1)
val_vocab_1 =  get_vocab(val_data_1)

train_vocab_2 = get_vocab(train_data_2)
test_vocab_2 = get_vocab(test_data_2)
val_vocab_2 =  get_vocab(val_data_2)


In [None]:
train_encoded = encode(train_data,train_vocab)
test_encoded = encode(test_data,test_vocab)
val_encoded = encode(val_data,val_vocab)

train_encoded_1 = encode(train_data_1,train_vocab_1)
test_encoded_1 = encode(test_data_1,test_vocab_1)
val_encoded_1 = encode(val_data_1,val_vocab_1)


train_encoded_2 = encode(train_data_2,train_vocab_2)
test_encoded_2 = encode(test_data_2,test_vocab_2)
val_encoded_2 = encode(val_data_2,val_vocab_2)




In [None]:
train_encoded_labels = encode_labels(train_labels)
test_encoded_labels =  encode_labels(test_labels)
val_encoded_labels = encode_labels(val_labels)

train_encoded_labels_1 = encode_labels(train_labels_1)
test_encoded_labels_1 =  encode_labels(test_labels_1)
val_encoded_labels_1 = encode_labels(val_labels_1)

train_encoded_labels_2= encode_labels(train_labels_2)
test_encoded_labels_2 =  encode_labels(test_labels_2)
val_encoded_labels_2 = encode_labels(val_labels_2)



In [None]:
seq_len = max([len(s.split()) for s in train_data])
train_padded = pad_sequences(train_encoded , maxlen=seq_len, padding='post', truncating='post')
test_padded = pad_sequences(test_encoded , maxlen=seq_len, padding='post', truncating='post')
val_padded = pad_sequences(val_encoded , maxlen=seq_len, padding='post', truncating='post')


seq_len_1 = max([len(s.split()) for s in train_data_1])
train_padded_1 = pad_sequences(train_encoded_1 , maxlen=seq_len_1, padding='post', truncating='post')
test_padded_1 = pad_sequences(test_encoded_1 , maxlen=seq_len_1, padding='post', truncating='post')
val_padded_1 = pad_sequences(val_encoded_1 , maxlen=seq_len_1, padding='post', truncating='post')

seq_len_2 = max([len(s.split()) for s in train_data_2])
train_padded_2 = pad_sequences(train_encoded_2 , maxlen=seq_len_2, padding='post', truncating='post')
test_padded_2 = pad_sequences(test_encoded_2 , maxlen=seq_len_2, padding='post', truncating='post')
val_padded_2 = pad_sequences(val_encoded_2 , maxlen=seq_len_2, padding='post', truncating='post')


In [None]:
def dataloaders(train_padded, train_encoded_labels, test_padded, test_encoded_labels, val_padded, val_encoded_labels):

        # Load the data and labels into PyTorch tensors
        train_data_ = torch.Tensor(train_padded)
        train_labels = torch.Tensor(train_encoded_labels)
        test_data_= torch.Tensor(test_padded)
        test_labels = torch.Tensor(test_encoded_labels)
        val_data_ = torch.Tensor(val_padded)
        val_labels = torch.Tensor(val_encoded_labels)

        # Create a TensorDataset for each dataset
        train_dataset = TensorDataset(train_data_, train_labels)
        test_dataset = TensorDataset(test_data_, test_labels)
        val_dataset = TensorDataset(val_data_, val_labels)

        # Create a DataLoader for each dataset
        batch_size = 32
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, drop_last=True)

        return train_loader, test_loader, val_loader



In [None]:
train_loader, test_loader, val_loader = dataloaders(train_padded, train_encoded_labels, test_padded, test_encoded_labels, val_padded, val_encoded_labels)
train_loader_1, test_loader_1, val_loader_1 = dataloaders(train_padded_1, train_encoded_labels_1, test_padded_1, test_encoded_labels_1, val_padded_1, val_encoded_labels_1)
train_loader_2, test_loader_2, val_loader_2 = dataloaders(train_padded_2, train_encoded_labels_2, test_padded_2, test_encoded_labels_2, val_padded_2, val_encoded_labels_2)                                                 

In [None]:
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, drop_prob=0.5):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size

        #input2hidden
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)

        #input2output
        self.i2o = nn.Linear(input_size + hidden_size, output_size)

        self.dropout = nn.Dropout(drop_prob) 
        self.softmax = nn.LogSoftmax(dim=1)



    def forward(self, input, hidden):
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        hidden = self.dropout(hidden)
        output = self.i2o(combined)
        output = self.softmax(output)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, self.hidden_size)

In [None]:
def get_loss(input_size, hidden_size,  output_size, train_loader, val_loader):

    rnn = RNN(input_size=input_size, hidden_size=hidden_size, output_size=output_size,  drop_prob=0.5)

    # Define your loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(rnn.parameters(), lr=0.001,)

    # Define your training loop
    num_epochs =50
    
    all_losses = []
    val_losses = []
    for epoch in range(1, num_epochs+1):
        rnn.train()
        train_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data

            hidden = rnn.initHidden()
            optimizer.zero_grad()

            for j in range(inputs.shape[0]):
                output, hidden = rnn(inputs[j].view(1, -1), hidden)

            loss = criterion(output, labels[j].view(1).long())
            loss.backward()
            torch.nn.utils.clip_grad_norm_(rnn.parameters(), 5)
            optimizer.step()

            train_loss += loss.item()

        train_loss = train_loss / len(train_loader)
        all_losses.append(train_loss)

        val_loss = 0.0
        with torch.no_grad():
                   
            for i, data in enumerate(val_loader, 0):
                inputs, labels = data
                hidden = rnn.initHidden()

                for j in range(inputs.shape[0]):
                    output, hidden = rnn(inputs[j].view(1, -1), hidden)

                loss = criterion(output, labels[j].view(1).long())
                val_loss += loss.item()

            val_loss = val_loss / len(val_loader)
        
        # Add the validation loss to the list of validation losses
            val_losses.append(val_loss)

        print(f"Epoch {epoch}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
    return val_losses, all_losses, rnn
        
  

In [None]:
val_losses, train_losses, model = get_loss(seq_len, seq_len, 3, train_loader, val_loader)


In [None]:
val_losses_1, train_losses_1, model_1 = get_loss(seq_len_1, seq_len_1, 2, train_loader_1, val_loader_1)

In [None]:
val_losses_2, train_losses_2, model_2 = get_loss(seq_len_2, seq_len_2, 2, train_loader_2, val_loader_2)

In [None]:
import matplotlib.pyplot as plt

# create a list of epoch numbers
epochs = range(1, len(train_losses) + 1)

# plot the training and validation loss curves
plt.plot(epochs, train_losses, 'bo', label='Training loss')
plt.plot(epochs, val_losses, 'b', label='Validation loss')

# add axis labels and legend
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

# display the plot
plt.show()


In [None]:
def evaluate(model, dataloader):

    #Evaluate the model on the test set
  
    model.eval()
    total_tp = 0
    total_fp = 0
    total_tn = 0
    total_fn = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            labels = labels.numpy()
            hidden = model.initHidden()
            for i in range(inputs.shape[0]):
                output, hidden = model(inputs[i].view(1, -1), hidden)

            # Convert output probabilities to predicted class (0 or 1)
            predicted = torch.argmax(output).item()

            # Calculate the true positive, false positive, true negative, false negative
            if predicted == 1 and round(labels[i]) == 1:
                total_tp += 1
            elif predicted == 1 and round(labels[i]) == 0:
                total_fp += 1
            elif predicted == 0 and round(labels[i]) == 0:
                total_tn += 1
            elif predicted == 0 and round(labels[i]) == 1:
                total_fn += 1

    # Calculate precision, recall, and F1-score
    precision = total_tp / (total_tp + total_fp)
    recall = total_tp / (total_tp + total_fn)
    f1_score = 2 * precision * recall / (precision + recall)

    return precision, recall, f1_score


In [None]:
precision, recall, f1_score = evaluate(model, test_loader)
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1_score:.4f}")

In [None]:
precision, recall, f1_score = evaluate(model_1, test_loader_1)
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1_score:.4f}")

In [None]:
precision, recall, f1_score = evaluate(model_2, test_loader_2)
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1_score:.4f}")

In [None]:
#for inputs, labels in test_loader:
#    labels = labels.numpy()
#    for i in range(inputs.shape[0]):
#      output, hidden = model(inputs[i].view(1, -1), hidden)
#    print(output)