In [1]:
import torch

import numpy as np

from tqdm import tqdm

from torch import nn

# Encodes categorical labels into numerical format (used for label preprocessing)
from sklearn.preprocessing import LabelEncoder

# Calculates the accuracy of a classification model (used for model evaluation)
from sklearn.metrics import accuracy_score

# Defines a custom dataset class for PyTorch (used for handling data)
from torch.utils.data import Dataset

# Creates a DataLoader for efficient batch processing in PyTorch (used for data loading)
from torch.utils.data import DataLoader

# Splits a dataset into training and validation sets (used for data splitting)
from torch.utils.data import random_split

# Represents a multi-dimensional matrix in PyTorch (used for tensor manipulation)
from torch import Tensor

# Implements a linear layer in a neural network (used for defining neural network architecture)
from torch.nn import Linear

# Applies rectified linear unit (ReLU) activation function (used for introducing non-linearity)
from torch.nn import ReLU

# Applies sigmoid activation function (used for binary classification output)
from torch.nn import Sigmoid

# Base class for all neural network modules in PyTorch (used for creating custom models)
from torch.nn import Module

# Stochastic Gradient Descent optimizer (used for model optimization during training)
from torch.optim import SGD

# Binary Cross Entropy Loss function (used for binary classification problems)
from torch.nn import BCELoss

# Initializes weights using Kaiming uniform initialization (used for weight initialization)
from torch.nn.init import kaiming_uniform_

# Initializes weights using Xavier (Glorot) uniform initialization (used for weight initialization)
from torch.nn.init import xavier_uniform_

In [2]:
import pickle
with open('./pickles/word_sequences.pkl', 'rb') as file:
    word_sequences = pickle.load(file)

with open('./pickles/char_sequences_without_tashkeel.pkl', 'rb') as file:
    char_sequences = pickle.load(file)

with open('./pickles/tashkeel_sequences.pkl', 'rb') as file:
    labels = pickle.load(file)

with open('./pickles/val_word_sequences.pkl', 'rb') as file:
    val_word_sequences = pickle.load(file)

with open('./pickles/val_char_sequences_without_tashkeel.pkl', 'rb') as file:
    val_char_sequences = pickle.load(file)

with open('./pickles/val_tashkeel_sequences.pkl', 'rb') as file:
    val_labels = pickle.load(file)


In [3]:
print(len(word_sequences))
print(len(char_sequences))
print(len(labels))

50000
50000
50000


In [4]:
# dataset definition
# A custom Dataset class must implement three functions: __init__, __len__, and __getitem__.
class Dataset(Dataset):
    # load the dataset
    # The __init__ function is run once when instantiating the Dataset object
    def __init__(self, char_sequences, labels, word_sequences):
        
        self.x = torch.tensor(char_sequences)
        self.y = torch.tensor(labels)
        self.z = torch.tensor(word_sequences)
        print(self.z.shape)
    # number of rows in the dataset
    # The __len__ function returns the number of samples in our dataset.
    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx], self.z[idx]

    # get indexes for train and test rows
    def get_splits(self, n_test=0.33):
        # determine sizes
        test_size = round(n_test * len(self.x))
        train_size = len(self.x) - test_size
        # calculate the split
        return random_split(self, [train_size, test_size])


# # prepare the dataset
# def prepare_data():
#     # load the dataset
#     dataset = CSVDataset()
#     # calculate split
#     train, test = dataset.get_splits()
#     # prepare data loaders
#     # The Dataset retrieves our dataset’s features and labels one sample at a time.
#     # While training a model, we typically want to pass samples in “minibatches”,
#     # reshuffle the data at every epoch to reduce model overfitting,
#     train_dl = DataLoader(train, batch_size=32, shuffle=True)
#     test_dl = DataLoader(test, batch_size=1024, shuffle=False)
#     return dataset.encoding_mapping, train_dl, test_dl

In [5]:
#convert labels to numpy array
print(len(labels[3]))
print(len(char_sequences[3]))
train_ds = Dataset(char_sequences, labels, word_sequences)
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=64, shuffle=True)

dg = iter(train_dl)
X1, Y1, z1 = next(dg)
X2, Y2, z2 = next(dg)
print(Y1.shape, X1.shape, z1.shape, Y2.shape, X2.shape, z2.shape)
print(X1[0][:], "\n", Y1[0][:])

7183
7183


torch.Size([50000, 1419])
torch.Size([64, 7183]) torch.Size([64, 7183]) torch.Size([64, 1419]) torch.Size([64, 7183]) torch.Size([64, 7183]) torch.Size([64, 1419])
tensor([14,  6,  2,  ...,  0,  0,  0], dtype=torch.int32) 
 tensor([3, 5, 6,  ..., 0, 0, 0], dtype=torch.int32)


# Utility functions

In [6]:
def concatenate_characters(characters):
    # Get the shape of the input tensor
    batch_size, sequence_length, feature_size = characters.shape

    # Create a tensor of zeros with the same shape as the last subsequence
    zeros_tensor = torch.zeros_like(characters[:, 0:1, :])

    # Concatenate it to the original tensor along the second dimension
    padded_x = torch.cat((characters, zeros_tensor), dim=1)

    # Now, padded_x will have zeros padded to the last subsequence

    temp1 = padded_x[:, :-1, :]
    temp2 = padded_x[:, 1:, :]

    # Concatenate along the last dimension
    concatenated_characters = torch.cat((temp1, temp2), dim=-1)

    return concatenated_characters


MODEL

In [7]:
class Char_model(nn.Module):
  def __init__(self, vocab_size=42, embedding_dim=50, hidden_size=50, n_classes=17):
    """
    The constructor of our NER model
    Inputs:
    - vacab_size: the number of unique words
    - embedding_dim: the embedding dimension
    - n_classes: the number of final classes (tags)

    embedding_dim here: 50 for char embedding + 50 for following char embedding = 100
    """

    super(Char_model, self).__init__()
    ####################### TODO: Create the layers of your model #######################################
    # (1) Create the embedding layer
    self.embedding = nn.Embedding(vocab_size, embedding_dim)

    # (2) Create an LSTM layer with hidden size = hidden_size and batch_first = True
    self.lstm = nn.LSTM(embedding_dim*2, hidden_size, batch_first=True)
    # batch_first makes the input and output tensors to be of shape (batch_size, seq_length, hidden_size)

    # (3) Create a linear layer
    self.linear = nn.Linear(hidden_size, n_classes)

    #####################################################################################################

  def forward(self, sentences, h_0=None, c_0=None):
    """
    This function does the forward pass of our model
    Inputs:
    - sentences: tensor of shape (batch_size, max_length)

    Returns:
    - final_output: tensor of shape (batch_size, max_length, n_classes)
    """

    final_output = None
    #############################################################
    sentences_embedded = self.embedding(sentences) 
    
    sentences_embedded = concatenate_characters(sentences_embedded)

    #check if h_0 and c_0 are provided or not
    if h_0 is None or c_0 is None:
      final_output, (h_0, c_0) = self.lstm(sentences_embedded)
    else:
      final_output, _ = self.lstm(sentences_embedded, (h_0, c_0)) 
      
    final_output = self.linear(final_output)  


    ############################################################
    return final_output

In [8]:
class Word_model(nn.Module):
  def __init__(self, vocab_size=2093761, embedding_dim=50, hidden_size=50, n_classes=17):
    """
    The constructor of our NER model
    Inputs:
    - vacab_size: the number of unique words
    - embedding_dim: the embedding dimension
    - n_classes: the number of final classes (tags)
    """
    super(Word_model, self).__init__()
    ####################### TODO: Create the layers of your model #######################################
    # (1) Create the embedding layer
    self.embedding = nn.Embedding(vocab_size, embedding_dim)

    # (2) Create an LSTM layer with hidden size = hidden_size and batch_first = True
    self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
    # batch_first makes the input and output tensors to be of shape (batch_size, seq_length, hidden_size)

    #####################################################################################################

  def forward(self, sentences):
    """
    This function does the forward pass of our model
    Inputs:
    - sentences: tensor of shape (batch_size, max_length)

    Returns:
    - final_output: tensor of shape (batch_size, max_length, n_classes)
    """

    final_output = None
    ######################### TODO: implement the forward pass ####################################
    final_output = self.embedding(sentences) 
    final_output, h = self.rnn(final_output)   

    ###############################################################################################
    return final_output, h

In [9]:
lstm_model = Char_model()
word_model = Word_model()
print(lstm_model)
print(word_model)

Char_model(
  (embedding): Embedding(42, 50)
  (lstm): LSTM(100, 50, batch_first=True)
  (linear): Linear(in_features=50, out_features=17, bias=True)
)
Word_model(
  (embedding): Embedding(2093761, 50)
  (rnn): RNN(50, 50, batch_first=True)
)


# Training

In [10]:
def train(lstm_model, context_model, train_dataset, batch_size=128, epochs=5, learning_rate=0.01):
  """
  This function implements the training logic
  Inputs:
  - model: the model ot be trained
  - train_dataset: the training set of type NERDataset
  - batch_size: integer represents the number of examples per step
  - epochs: integer represents the total number of epochs (full training pass)
  - learning_rate: the learning rate to be used by the optimizer
  """

  # (1) create the dataloader of the training set (make the shuffle=True)
  train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

  # (2) make the criterion cross entropy loss
  criterion = nn.CrossEntropyLoss()

  # (3) create the optimizer (Adam)
  optimizer = torch.optim.Adam(list(lstm_model.parameters()) + list(context_model.parameters()), lr=learning_rate)
  # GPU configuration
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  if use_cuda:
    lstm_model = lstm_model.cuda()
    context_model = context_model.cuda()
    criterion = criterion.cuda()
  # device="cpu"
  for epoch_num in range(epochs):
    total_acc_train = 0
    total_loss_train = 0

    for train_input, train_label, train_context in tqdm(train_dataloader):
      

      # (4) move the train input to the device
      train_label = train_label.long().to(device)

      # (5) move the train label to the device
      train_input = train_input.long().to(device)

      train_context = train_context.long().to(device)

      # (6) do the forward pass
      # context, h_0 = context_model(train_context)
      # c_0 = torch.zeros(context.shape[0], 1, context.shape[2])
      # h_0 = torch.transpose(h_0, 0, 1)
      #h_0 = h_0.permute(1, 0, 2)
      #print(h_0.shape)
      output = lstm_model(train_input)
      
      # (7) loss calculation (you need to think in this part how to calculate the loss correctly)
      batch_loss = criterion(output.reshape(-1, 17), train_label.reshape(-1))
  
      # (8) append the batch loss to the total_loss_train
      total_loss_train += batch_loss.item()
      
      # (9) calculate the batch accuracy (just add the number of correct predictions)
      acc = (output.argmax(dim=2) == train_label).sum().item()
      total_acc_train += acc

      # (10) zero your gradients
      optimizer.zero_grad()
      
      # (11) do the backward pass
      batch_loss.backward()

      # (12) update the weights with your optimizer
      optimizer.step()
      
    # epoch loss
    epoch_loss = total_loss_train / len(train_dataset)

    # (13) calculate the accuracy
    epoch_acc = total_acc_train / (len(train_dataset) * len(train_dataset[0][0]))
    # ba2sem 3la 3adad el kalemat fy kol el gomal 
    # kol gomla asln fyha 104 kelma, fa badrab dh fy 3adad el gomal bs

    print(
        f'Epochs: {epoch_num + 1} | Train Loss: {epoch_loss} \
        | Train Accuracy: {epoch_acc}\n')

  ##############################################################################################################

In [11]:
train_dataset = Dataset(char_sequences, labels, word_sequences)
train(lstm_model, word_model, train_dataset)

torch.Size([50000, 1419])


100%|██████████| 391/391 [01:54<00:00,  3.42it/s]


Epochs: 1 | Train Loss: 0.00033241892525926234         | Train Accuracy: 0.9890531699846861



100%|██████████| 391/391 [01:46<00:00,  3.66it/s]


Epochs: 2 | Train Loss: 0.00010750667227432132         | Train Accuracy: 0.9954053292496171



100%|██████████| 391/391 [01:45<00:00,  3.70it/s]


Epochs: 3 | Train Loss: 8.765207145363093e-05         | Train Accuracy: 0.9961880941110957



100%|██████████| 391/391 [01:45<00:00,  3.71it/s]


Epochs: 4 | Train Loss: 7.698457059450447e-05         | Train Accuracy: 0.9966300877070862



100%|██████████| 391/391 [01:45<00:00,  3.70it/s]

Epochs: 5 | Train Loss: 7.091707359999418e-05         | Train Accuracy: 0.9968693053041905






In [12]:

x = torch.tensor([
    [[1, 2, 3], [4, 5, 6], [66, 55, 77]], 
    [[7, 8, 9], [10, 11, 12], [13, 14, 15]]
])

# Get the shape of the input tensor
batch_size, sequence_length, feature_size = x.shape

# Create a tensor of zeros with the same shape as the last subsequence
zeros_tensor = torch.zeros_like(x[:, 0:1, :])

# Concatenate it to the original tensor along the second dimension
padded_x = torch.cat((x, zeros_tensor), dim=1)

# Now, padded_x will have zeros padded to the last subsequence
print(padded_x)

tensor([[[ 1,  2,  3],
         [ 4,  5,  6],
         [66, 55, 77],
         [ 0,  0,  0]],

        [[ 7,  8,  9],
         [10, 11, 12],
         [13, 14, 15],
         [ 0,  0,  0]]])


In [13]:
print(x.shape)
y = padded_x[:,:-1,:]
z = padded_x[:,1:,:]

print(y.shape)
print(z.shape)

k = torch.cat((y, z), dim=-1)


print(k.shape)
print(k)


# tensor([[[ 1,  2,  3,  4,  5,  6],
#          [ 4,  5,  6, 66, 55, 77]
#          [66, 55, 77,  0,  0,  0]],

#         [[ 7,  8,  9, 10, 11, 12],
#          [10, 11, 12, 13, 14, 15]
#          [13, 14, 15,  0,  0,  0]])

torch.Size([2, 3, 3])
torch.Size([2, 3, 3])
torch.Size([2, 3, 3])
torch.Size([2, 3, 6])
tensor([[[ 1,  2,  3,  4,  5,  6],
         [ 4,  5,  6, 66, 55, 77],
         [66, 55, 77,  0,  0,  0]],

        [[ 7,  8,  9, 10, 11, 12],
         [10, 11, 12, 13, 14, 15],
         [13, 14, 15,  0,  0,  0]]])


# Evaluation

In [14]:
def evaluate(model, test_dataset, batch_size=256):
  """
  This function takes a NER model and evaluates its performance (accuracy) on a test data
  Inputs:
  - model: a NER model
  - test_dataset: dataset of type NERDataset
  """
  ########################### TODO: Replace the Nones in the following code ##########################

  # (1) create the test data loader
  test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

  # GPU Configuration
  use_cuda = torch.cuda.is_available()
  device = torch.device("cuda" if use_cuda else "cpu")
  if use_cuda:
    model = model.cuda()

  total_acc_test = 0
  
  # (2) disable gradients
  with torch.no_grad():
    # 3mlna disable 3lshan e7na bn-predict (aw evaluate y3ny) b2a dlw2ty, msh bn-train

    for test_input, test_label, test_context in tqdm(test_dataloader):
      # (3) move the test input to the device
      test_label = test_label.to(device)

      # (4) move the test label to the device
      test_input = test_input.to(device)
      # brdo the comments should be reversed 
      test_context = test_context.long().to(device)
      # (5) do the forward pass
      output = model(test_input)
      print(test_input.shape)
      print(test_label.shape)
      print(output.shape)
      # accuracy calculation (just add the correct predicted items to total_acc_test)
      acc = (output.argmax(dim=2) == test_label).sum().item()
      total_acc_test += acc
    
    # (6) calculate the over all accuracy
    total_acc_test /= (len(test_dataset) * len(test_dataset[0][0]))
  ##################################################################################################

  
  print(f'\nTest Accuracy: {total_acc_test}')

In [15]:
test_dataset = Dataset(val_char_sequences, val_labels, val_word_sequences)
evaluate(lstm_model, test_dataset)

torch.Size([2500, 242])


100%|██████████| 10/10 [00:00<00:00, 87.72it/s]

torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([256, 1212])
torch.Size([256, 1212])
torch.Size([256, 1212, 17])
torch.Size([196, 1212])
torch.Size([196, 1212])
torch.Size([196, 1212, 17])

Test Accuracy: 0.9817353135313531



