In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd ./drive/MyDrive/Colab\ Notebooks/NLP_Project/

/content/drive/.shortcut-targets-by-id/1zPjf1cHfdKqObemkPReffGbQHU_wotr2/NLP_Project


In [3]:
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
from torch import nn
import random as rnd
from torch.optim.lr_scheduler import StepLR
import gc

# Constants

In [5]:
MODEL = "BI_LSTM"
NUM_LAYERS = 2
EMBEDDING_SIZE = 300

In [6]:
def get_vocab(vocab_path, tags_path):
    vocab = {}
    with open(vocab_path) as f:
        for i, l in enumerate(f.read().splitlines()):
            vocab[l] = i  # to avoid the 0
        # loading tags (we require this to map tags to their indices)
    vocab['<PAD>'] = len(vocab) # 35180
    tag_map = {}
    with open(tags_path) as f:
        for i, t in enumerate(f.read().splitlines()):
            tag_map[t] = i

    return vocab, tag_map

def get_params(vocab, tag_map, sentences_file, labels_file):
    sentences = []
    labels = []

    with open(sentences_file) as f:
        for sentence in f.read().splitlines():
            # replace each token by its index if it is in vocab
            # else use index of UNK_WORD
            s = [vocab[token] if token in vocab
                 else vocab['UNK']
                 for token in sentence.split(' ')]
            sentences.append(s)

    with open(labels_file) as f:
        for sentence in f.read().splitlines():
            # replace each label by its index
            s = sentence.split(' ')
            # remove empty strings
            s = list(filter(None, s))
            l = [tag_map[label] for label in s] # I added plus 1 here
            labels.append(l)
    return sentences, labels, len(sentences)


# Importing and discovering the data

In [7]:

vocab, tag_map = get_vocab('./Dataset/new_new_characters/unique_chars.txt', './Dataset/new_new_characters/unique_labels.txt')
t_sentences, t_labels, t_size = get_params(vocab, tag_map, './Dataset/new_new_characters/t_chars.txt', './Dataset/new_new_characters/t_labels.txt')
v_sentences, v_labels, v_size = get_params(vocab, tag_map, './Dataset/new_new_characters/v_chars.txt', './Dataset/new_new_characters/v_labels.txt')
test_sentences1, test_labels1, test_size1 = get_params(vocab, tag_map, './Dataset/new_new_characters/test_chars.txt', './Dataset/new_new_characters/test_labels.txt')

In [8]:
test_sentences2, test_labels2, test_size2 = get_params(vocab, tag_map, './Dataset/new_new_characters/test_no_diacritics_chars.txt', './Dataset/new_new_characters/test_no_diacritics_labels.txt')
test_sentences3, test_labels3, test_size3 = get_params(vocab, tag_map, './Dataset/new_new_characters/test2_chars.txt', './Dataset/new_new_characters/test2_labels.txt')

In [9]:
# # NOTE: to increase the size of the dataset
# t_sentences  = t_sentences + v_sentences
# t_labels = t_labels + v_labels
# t_size = t_size + v_size

# NERDataset
The class that impelements the dataset for NER

In [10]:
class NERDataset(torch.utils.data.Dataset):

  def __init__(self, x, y, pad):
    self.x = nn.utils.rnn.pad_sequence([torch.tensor(i) for i in x], padding_value=pad,batch_first = True)
    self.y = nn.utils.rnn.pad_sequence([torch.tensor(i) for i in y], padding_value=tag_map["pad"],batch_first = True)
    print('The max length of the sentences is', self.x.shape[1])
    print('The max length of the labels is', self.y.shape[1])
  def __len__(self):
    return len(self.x)

  def __getitem__(self, idx):
    return self.x[idx], self.y[idx]

# Classifiers
The class that implementss the pytorch model for arabic diacritic classification

In [11]:
class ArabicDiacriticsClassifier(nn.Module):
  def __init__(self, vocab_size=len(t_sentences) + len(v_sentences) + len(v_sentences), num_layers = 3, embedding_dim = 512, hidden_size=256, n_classes=len(tag_map)):
    super(ArabicDiacriticsClassifier, self).__init__()
    # (1) Create the embedding layer
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    # import gensim.downloader as api
    # self.embedding.weight.data.copy_(torch.from_numpy(api.load('word2vec-google-news-300').vectors[:1000])) WORD2VEc
    # import fasttext.util
    # self.embedding.weight.data.copy_(torch.from_numpy(fasttext.util.download_model('en', if_exists='ignore').get_input_matrix()[:1000]))

    # (2) Create an LSTM layer with hidden size = hidden_size and batch_first = True
    self.num_layers = num_layers
    self.hidden_size = hidden_size


    self.lstm = nn.LSTM(embedding_dim, hidden_size,num_layers, batch_first=True, bidirectional=True)
    self.linear = nn.Linear(2 * hidden_size, n_classes)

    # (3) Create a linear layer with number of neorons = n_classes
    # self.linear = nn.Linear(hidden_size, n_classes)


  def forward(self, sentences):
    embeddings = self.embedding(sentences)

    # BIDIRECTIONAL
    # Initialize hidden states for bidirectional LSTM
    # h0 = torch.zeros(self.num_layers*2, embeddings.size(0), self.hidden_size).to(sentences.device)
    # c0 = torch.zeros(self.num_layers*2, embeddings.size(0), self.hidden_size).to(sentences.device)
    # lstm_out, (a, b) = self.lstm(embeddings, (h0, c0))

    # LSTM
    lstm1_out, (h_n, c_n) = self.lstm(embeddings)
    final_output = self.linear(lstm1_out)
    # final_output = self.linear(lstm_out[:, -1, :])
    return final_output

In [12]:
model = ArabicDiacriticsClassifier()
print(model)

ArabicDiacriticsClassifier(
  (embedding): Embedding(128137, 512)
  (lstm): LSTM(512, 256, num_layers=3, batch_first=True, bidirectional=True)
  (linear): Linear(in_features=512, out_features=16, bias=True)
)


In [24]:
test_sentences = test_sentences1
test_labels = test_labels1
print(len(test_sentences3))

5770


In [25]:
test_dataset = NERDataset(test_sentences, test_labels, vocab['<PAD>'])

The max length of the sentences is 1936
The max length of the labels is 1936


In [26]:
def load_model(model,model_name):
  model.load_state_dict(torch.load(f'./SavedModels/{model_name}'))
  return model
def load_baseline_epoch_model(model,model_name):
  model.load_state_dict(torch.load(f'./BaseLineModels/{model_name}'))
  return model

In [34]:
model_name = "model_baseline_EPOCH6"
# model_name = "model_3_baseLine_batch256_lr0.001_embedding_512_epoch1"
model = load_baseline_epoch_model(model, model_name)

# Evaluation

In [35]:
diacritic_results = []
gold_results = []
test_input_list = []

def evaluate(model, test_dataset, batch_size=64):
  # (1) create the test data loader
  test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

  # GPU Configuration
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  if use_cuda:
    model = model.cuda()

  total_acc_test = 0


  # (2) disable gradients
  with torch.no_grad():

    for test_input, test_label in tqdm(test_dataloader):
      # (3) move the test input to the device
      test_label = test_label.to(device)

      # (4) move the test label to the device
      test_input = test_input.to(device)

      # (5) do the forward pass
      output = model(test_input)
      prediction = output.argmax(2)


      diacritic_results.extend(np.array(prediction.cpu().data).flatten())
      gold_results.extend(np.array(test_label.cpu().data).flatten())
      test_input_list.extend(np.array(test_input.cpu().data).flatten())

### Evaluatio and Save

In [36]:
def DER():
  der = 0
  total_size = 0
  for i in range(len(diacritic_results)):
    if test_input_list[i] != vocab['<PAD>']: # Do not include padding in DER calculations
      if diacritic_results[i] != gold_results[i] : # Miss Classification
        der += 1
      total_size += 1
  der /= total_size
  der *= 100
  print("DER = ",der,"%")
  print("Accuracy = ",100 - der,"%")

In [37]:
filtered_diacritic_results = [] # diacrtic results without paddings
filtered_inputs = [] # inputs without paddings
def PerpareForExportToCSV():
  # Prepare the data that will be written in the CSV file
  # these list are sorted as mentioned by the TA
  LIST_OF_DIACRITICS = [
      "FATHA",
      "FATHATAN",
      "DAMMA",
      "DAMMATAN",
      "KASRA",
      "KASRATAN",
      "SUKUN",
      "SHADDA",
      "SHADDA_FATHA",
      "SHADDA_FATHATAN",
      "SHADDA_DAMMA",
      "SHADDA_DAMMATAN",
      "SHADDA_KASRA",
      "SHADDA_KASRATAN",
      "_"
  ]
  LIST_OF_ARABIC_LETTERS = list(vocab.keys())



  for i in range(len(diacritic_results)):
    if test_input_list[i] != vocab['<PAD>']:
      filtered_diacritic_results.append(diacritic_results[i])
      filtered_inputs.append(test_input_list[i])

  index = len(filtered_diacritic_results)

  inputs = [LIST_OF_ARABIC_LETTERS[filtered_inputs[i]] for i in range(index)]
  model_prediction = [LIST_OF_DIACRITICS[filtered_diacritic_results[i]] for i in range(index)]

In [38]:
def ExportToCSV(model_name):
  data_length = len(filtered_diacritic_results)
  assert data_length == 417359, f"Expected data length to be 417359, but got {data_length}."
  df = pd.DataFrame(
      {
      'ID': range(len(filtered_diacritic_results[0: data_length])),
      'label': filtered_diacritic_results[0: data_length],
      })

  df.to_csv(f'./Results/result_{model_name}.csv', index=False)

In [39]:
evaluate(model, test_dataset)

100%|██████████| 96/96 [00:48<00:00,  1.97it/s]


In [40]:
if len(test_sentences2) != len(test_sentences): # If we are testing on our test sets
  print("Calculating DER for our test set")
  DER()
else:
  print("Exporting to CSV ...")
  model_name = f"model_baseline"
  PerpareForExportToCSV()
  ExportToCSV(model_name)
  print("Exported Successfully")

Calculating DER for our test set
DER =  3.018976760327245 %
Accuracy =  96.98102323967275 %
