### Imports

In [2]:
import time
import torch
import numpy as np
from typing import List
from torch.utils.data import DataLoader, TensorDataset, RandomSampler, SequentialSampler
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence

%run preprocessing.ipynb

### Constants

In [3]:
EMBEDDING_DIM = 300
HIDDEN_SIZE = 512
NUM_LAYERS = 2
NUM_EPOCHS = 20
LEARNING_RATE = 0.001
BATCH_SIZE = 256
VOCAB_SIZE = len(basic_arabic_letters) + 1
LABELS_SIZE = len(DIACRITICS)
PAD = 0

TRAIN_PATH = "./dataset/train.txt"
VAL_PATH = "./dataset/val.txt"
TEST_PATH = "./dataset/test.txt"
LSTM_PATH="./models/lstm.pth"
RNN_PATH="./models/rnn.pth"
CNN_PATH = "./models/cnn.pth"
CRF_PATH = "./models/crf.pth"

### GPU

In [4]:
device = None
if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"There are {torch.cuda.device_count()} GPU(s) available.")
    print("Device name:", torch.cuda.get_device_name(0))

else:
    print("No GPU available, using the CPU instead.")
    device = torch.device("cpu")

No GPU available, using the CPU instead.


### Model

In [5]:
class CNN(nn.Module):
    def __init__(self, vocab_size=VOCAB_SIZE, num_classes=LABELS_SIZE, embedding_dim=EMBEDDING_DIM, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS):
        super(CNN, self).__init__()
        
        # Embedding Layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # Convolutional Layer
        self.conv1d = nn.Conv1d(embedding_dim, 256, kernel_size=3, padding=1)
        
        # LSTM Layer
        self.lstm = nn.LSTM(256, hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True)
        
        # Linear Layer
        self.linear = nn.Linear(2 * hidden_size, num_classes)

    def forward(self, sentences):
        embeddings = self.embedding(sentences)
        
        # Convolutional Layer
        conv_out = self.conv1d(embeddings.permute(0, 2, 1))
        conv_out = F.relu(conv_out)
        
        # LSTM Layer
        lstm_out, _ = self.lstm(conv_out.permute(0, 2, 1))
        
        # Linear Layer
        output = self.linear(lstm_out)

        return output

### Train

In [6]:
def data_loader(
    train_inputs, val_inputs, train_labels, val_labels, batch_size=BATCH_SIZE
):
    # Create DataLoader for training data
    train_data = TensorDataset(train_inputs, train_labels)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(
        train_data, sampler=train_sampler, batch_size=batch_size
    )

    # Create DataLoader for validation data
    val_data = TensorDataset(val_inputs, val_labels)
    val_sampler = SequentialSampler(val_data)
    val_dataloader = DataLoader(val_data, sampler=val_sampler, batch_size=batch_size)

    return train_dataloader, val_dataloader


# Specify loss function
loss_fn = nn.CrossEntropyLoss()


def train(
    path, model, optimizer, train_dataloader, val_dataloader=None, epochs=NUM_EPOCHS
):
    """Train the model"""

    # Tracking best validation accuracy
    best_accuracy = 0

    # Start training loop
    print("Start training...\n")
    print(
        f"{'Epoch':^7} | {'Train Loss':^12} | {'Val Loss':^10} | {'Val Acc':^9} | {'Elapsed':^9}"
    )
    print("-" * 60)

    for epoch_i in range(epochs):
        # =======================================
        #               Training
        # =======================================

        # Tracking time and loss
        t0_epoch = time.time()
        total_loss = 0

        # Put the model into the training mode
        model.train()

        for step, batch in enumerate(train_dataloader):
            # Load batch to GPU
            b_input_ids, b_labels = tuple(t.to(device) for t in batch)

            # Zero out any previously calculated gradients
            model.zero_grad()

            # Perform a forward pass
            output = model(b_input_ids)

            # Compute loss and accumulate the loss values
            loss = loss_fn(output.view(-1, output.shape[-1]), b_labels.view(-1))
            total_loss += loss.item()

            # Perform a backward pass to calculate gradients
            loss.backward()

            # Update parameters
            optimizer.step()

        # Calculate the average loss over the entire training data
        avg_train_loss = total_loss / len(train_dataloader)

        # =======================================
        #               Evaluation
        # =======================================
        if val_dataloader is not None:
            # After the completion of each training epoch, measure the model's
            # performance on our validation set.
            val_loss, val_accuracy = evaluate(model, val_dataloader)

            # Track the best accuracy
            if val_accuracy > best_accuracy:
                best_accuracy = val_accuracy
                torch.save(model.state_dict(), path)

            # Print performance over the entire training data
            time_elapsed = time.time() - t0_epoch
            print(
                f"{epoch_i + 1:^7} | {avg_train_loss:^12.6f} | {val_loss:^10.6f} | {val_accuracy:^9.2f} | {time_elapsed:^9.2f}"
            )

    print("\n")
    print(f"Training complete! Best accuracy: {best_accuracy:.2f}%.")


def evaluate(model, val_dataloader):
    """
    After the completion of each training epoch, measure the model's
    performance on our validation set.
    """
    # Put the model into the evaluation mode.
    model.eval()

    # Tracking variables
    val_accuracy = []
    val_loss = []

    # For each batch in our validation set...
    for batch in val_dataloader:
        # Load batch to GPU
        b_input_ids, b_labels = tuple(t.to(device) for t in batch)

        # Get the output
        with torch.no_grad():
            output = model(b_input_ids)

        # Compute loss
        loss = loss_fn(output.view(-1, output.shape[-1]), b_labels.view(-1))
        val_loss.append(loss.item())

        # Get the predictions
        preds = output.argmax(dim=2)

        # Calculate the accuracy rate
        accuracy = (preds == b_labels).cpu().numpy().mean() * 100
        val_accuracy.append(accuracy)

    # Compute the average accuracy and loss over the validation set.
    val_loss = np.mean(val_loss)
    val_accuracy = np.mean(val_accuracy)

    return val_loss, val_accuracy

### Prepare Data

In [17]:
train_corpus = readFile(TRAIN_PATH)
val_corpus = readFile(VAL_PATH)

X_train = []
Y_train = []

X_val = []
Y_val = []

for sentence in train_corpus:
    # Clean each sentence in the corpus
    # Get the char list for each word in the sentence and its corresponding diacritics
    char_list, diacritics_list = separate_words_and_diacritics(sentence.strip())

    for i in range(len(char_list)):
        X_train.append(char_list[i])
        Y_train.append(diacritics_list[i])

X_train_padded = [torch.tensor([char_to_index[char] for char in sentence]) for sentence in X_train]
X_train_padded = pad_sequence(X_train_padded, batch_first=True)

y_train_padded = [torch.tensor([diacritic_to_index[char] for char in sentence]) for sentence in Y_train]
y_train_padded = pad_sequence(y_train_padded, batch_first=True, padding_value=PAD)


for sentence in val_corpus:
    # Clean each sentence in the corpus
    # Get the char list for each word in the sentence and its corresponding diacritics
    char_list, diacritics_list = separate_words_and_diacritics(sentence.strip())

    for i in range(len(char_list)):
        X_val.append(char_list[i])
        Y_val.append(diacritics_list[i])

x_val_padded = [torch.tensor([char_to_index[char] for char in sentence]) for sentence in X_val]
x_val_padded = pad_sequence(x_val_padded, batch_first=True)

y_val_padded = [torch.tensor([diacritic_to_index[char] for char in sentence]) for sentence in Y_val]
y_val_padded = pad_sequence(y_val_padded, batch_first=True, padding_value=PAD)

# val_inputs, test_inputs, val_labels, test_labels = train_test_split(
#     x_val_padded, y_val_padded, test_size=0.5, random_state=42
# )

torch.Size([106, 477])


### Initialize Model

In [88]:
def init_model(learning_rate=LEARNING_RATE):
    model = CNN()
    path = CNN_PATH

    # Send model to `device` (GPU/CPU)
    model.to(device)

    # Instantiate the optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    return path, model, optimizer

### Execute

In [None]:
path, model, optimizer = init_model()
train_dataloader, val_dataloader = data_loader(X_train_padded, val_inputs, y_train_padded, val_labels)
train(path, model, optimizer, train_dataloader, val_dataloader)

### Test

In [None]:
test_corpus = readFile(TEST_PATH)

x_test = []
y_test = []

for sentence in test_corpus:
    char_list, diacritics_list = separate_words_and_diacritics(sentence.strip())

    for i in range(len(char_list)):
        x_test.append(char_list[i])
        y_test.append(diacritics_list[i])

x_test_padded = [torch.tensor([char_to_index[char] for char in sentence]) for sentence in x_test]
x_test_padded = pad_sequence(x_test_padded, batch_first=True)

y_test_padded = [torch.tensor([diacritic_to_index[char] for char in sentence]) for sentence in y_test]
y_test_padded = pad_sequence(y_test_padded, batch_first=True, padding_value=PAD)

test_data = TensorDataset(x_test_padded, y_test_padded)
test_sampler = SequentialSampler(test_data)
test_dataloader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)

model = CNN()
model.load_state_dict(torch.load(CNN_PATH, map_location=torch.device('cuda')))
model.to(device)

loss, acc = evaluate(model, test_dataloader)

print(f'Accuracy: {acc} | DER: {1 - (acc / 100)}\n')