In [1]:
import torch
from torch.utils.data import TensorDataset, DataLoader  # these are needed for the training data
import numpy as np
from constants import *


In [2]:
characters :list[str] = ARABIC_LETTERS + [' '] + ARABIC_PUNCTUATIONS + ['.'] + ENGLISH_PUNCTUATIONS + ['\u200f'] + ['']
char2idx = {u:i for i, u in enumerate(characters)}
idx2char = np.array(characters)
# create one hot encoding for each character
characters_hot_encoding = torch.eye(len(characters), dtype=torch.int32)
print(characters_hot_encoding)
print(characters_hot_encoding.shape)


tensor([[1, 0, 0,  ..., 0, 0, 0],
        [0, 1, 0,  ..., 0, 0, 0],
        [0, 0, 1,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 1, 0, 0],
        [0, 0, 0,  ..., 0, 1, 0],
        [0, 0, 0,  ..., 0, 0, 1]], dtype=torch.int32)
torch.Size([95, 95])


In [3]:

NON_HARAKA = '$'

all_harakat = [
    FATHA,
    DAMMA,
    KASRA,
    SUKUN,
    FATHATAN,
    DAMMATAN,
    KASRATAN,
    SHADDA,
    SHADDA + FATHA,
    SHADDA + DAMMA,
    SHADDA + KASRA,
    SHADDA + FATHATAN,
    SHADDA + DAMMATAN,
    SHADDA + KASRATAN,
    NON_HARAKA
]

# harakat to index and vice versa
harakat2idx = {u:i for i, u in enumerate(all_harakat)}
idx2harakat = np.array(all_harakat)

print(harakat2idx)
print(idx2harakat)
print(len(all_harakat))

{'َ': 0, 'ُ': 1, 'ِ': 2, 'ْ': 3, 'ً': 4, 'ٌ': 5, 'ٍ': 6, 'ّ': 7, 'َّ': 8, 'ُّ': 9, 'ِّ': 10, 'ًّ': 11, 'ٌّ': 12, 'ٍّ': 13, '$': 14}
['َ' 'ُ' 'ِ' 'ْ' 'ً' 'ٌ' 'ٍ' 'ّ' 'َّ' 'ُّ' 'ِّ' 'ًّ' 'ٌّ' 'ٍّ' '$']
15


In [4]:
XTRAIN_PATH = f'clean_out/X.csv'
YTRAIN_PATH = f'clean_out/y.csv'


def read_data(input_path,output_path,verbose=False):
    X = []
    Y = []
    # read csv files
    with open(input_path, 'r', encoding="utf8") as f:
        X = f.readlines()
    with open(output_path, 'r', encoding="utf8") as f:
        Y = f.readlines()

    # remove the \n from the end of each line
    # make sure the number of lines in the two files are the same
    assert len(X) == len(Y)

    # split each line into a list of characters
    X = [x.strip().split('s') for x in X]
    Y = [y.strip().split('s') for y in Y]

    X = [item for sublist in X for item in sublist] + ['']*8
    Y = [item for sublist in Y for item in sublist] + ['$']*8

    if verbose:
        print('Data read successfully.')
        print(f'Number of lines: {len(X)}')
        print(f'X: {X[0]}')
        print(f'Y: {Y[0]}')

    return X, Y

In [5]:
X, Y = read_data(XTRAIN_PATH, YTRAIN_PATH, verbose=False)

x_new = [char2idx[x] for x in X]
y_new = [harakat2idx[y] for y in Y]

data_len = len(x_new)

# get the largest num that len of x_new can be divided by
# so that we can convert it to a tensor
num = 1
l = 1
r = 1001
for i in range(r, l, -1):
    if data_len % i == 0:
        num = i
        break

# convert data to 1000 sentences each and convert it to tensors
x_new = torch.tensor(x_new).view(-1, num)
y_new = torch.tensor(y_new).view(-1, num)

# convert to hot encoding
x_train = x_new

y_train = y_new

print(x_train.shape)
print(y_train.shape)

torch.Size([16636, 650])
torch.Size([16636, 650])


In [6]:
import lightning as pl

In [7]:
import torch # torch will allow us to create tensors.
import torch.nn as nn # torch.nn allows us to create a neural network.
from torch.optim import Adam # optim contains many optimizers. This time we're using Adam
from torchmetrics import Accuracy


# batch size, sequence length, input size

class Decoder(pl.LightningModule):
    def __init__(self, input_size, embedding_size, hidden_size, output_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.embedding_size = embedding_size
        self.embedding = nn.Embedding(input_size, embedding_size)
        self.rnn = nn.LSTM(embedding_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.loss = nn.CrossEntropyLoss()
        print("from decoder init")
        print("adham")

    def forward(self, x, h0, c0):
        # print("from decoder forward")
        # print(x.shape)
        embeddings = self.embedding(x).cuda()
        # print("from decoder forward after embedding")
        # print(embeddings.shape)
        h, (hn, cn) = self.rnn(embeddings, (h0, c0))
        # h is the output of the RNN
        # hn is the hidden state of the last timestep
        # cn is the cell state of the last timestep
        out = self.fc(h)
        return out

    def training_step(self, batch, batch_idx):
        x, y = batch
        batch_size = x.shape[0]
        # print("from decoder training step")
        # print(x.shape)
        h0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        c0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        y_hat = self.forward(x, h0, c0).cuda()
        # y_hat is the output of the model of shape (batch_size, sequence_length, output_size)
        # y is the target of shape (batch_size, sequence_length)
        # y contains the index of the correct word in the vocabulary
        loss = self.loss(y_hat.view(-1, self.output_size), y.view(-1)).cuda()
        self.log('train_loss', loss)
        return loss

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=0.1)
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        batch_size = x.shape[0]
        h0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        c0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        y_hat = self.forward(x, h0, c0).cuda()
        loss = self.loss(y_hat.view(-1, self.output_size), y.view(-1)).cuda()
        accuracy = Accuracy().cuda()
        acc = accuracy(y_hat.view(-1, self.output_size), y.view(-1)).cuda()
        self.log('val_acc', acc,on_epoch=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        batch_size = x.shape[0]
        h0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        c0 = torch.zeros(1, batch_size, self.hidden_size).cuda()
        y_hat = self.forward(x, h0, c0).cuda()
        loss = self.loss(y_hat.view(-1, self.output_size), y.view(-1)).cuda()
        accuracy = Accuracy().cuda()
        acc = accuracy(y_hat.view(-1, self.output_size), y.view(-1)).cuda()
        self.log('test_acc', acc,on_epoch=True)
        return loss


In [16]:
# create a tensor dataset
dataset_tensor = TensorDataset(x_train, y_train)

# create a data loader
data_loader = DataLoader(dataset_tensor, batch_size=100, shuffle=True)

In [21]:
# create a decoder
model = Decoder(
    input_size=len(characters),
    hidden_size=256,
    output_size=len(all_harakat),
    embedding_size=512,
)
if torch.cuda.is_available():
   print("cuda is available")
   model = model.cuda()
   x_train = x_train.cuda() 
   y_train = y_train.cuda()

from decoder init
adham
cuda is available


In [18]:
# load X_test and y_test
X_TEST_TRAIN_PATH = f'clean_out/X_val.csv'
Y_TEST_TRAIN_PATH = f'clean_out/y_val.csv'

In [19]:
X_test, Y_test = read_data(X_TEST_TRAIN_PATH, Y_TEST_TRAIN_PATH, verbose=False)

x_test_new = [char2idx[x] for x in X_test]
y_test_new = [harakat2idx[y] for y in Y_test]

num = 650

# padding the data to be divisible by num
x_test_new += [0] * (num - len(x_test_new) % num)
y_test_new += [0] * (num - len(y_test_new) % num)

x_test = torch.tensor(x_test_new).view(-1, num).cuda()
y_test = torch.tensor(y_test_new).view(-1, num).cuda()

print(x_test.shape)
print(y_test.shape)


torch.Size([840, 650])
torch.Size([840, 650])


In [20]:
test_dataset_tensor = TensorDataset(x_test, y_test)

# create a data loader
test_data_loader = DataLoader(test_dataset_tensor, batch_size=10, shuffle=True)

In [22]:
trainer = pl.Trainer(max_epochs=30, accelerator="auto", devices="auto",log_every_n_steps=10)
# trainer = pl.Trainer(max_epochs=3)
trainer.fit(model, train_dataloaders=data_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
c:\Users\adham ali\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\configuration_validator.py:74: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type             | Params
-----------------------------------------------
0 | embedding | Embedding        | 48.6 K
1 | rnn       | LSTM             | 788 K 
2 | fc        | Linear           | 3.9 K 
3 | loss      | CrossEntropyLoss | 0     
-----------------------------------------------
840 K     Trainable params
0         Non-trainable params
840 K     Total params
3.364     Total estimated model params size (MB)


Training: |          | 0/? [00:00<?, ?it/s]

In [14]:
# print training loss
print(trainer.logged_metrics)

{'train_loss': tensor(0.6528)}


In [15]:
trainer.test(model, test_data_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
c:\Users\adham ali\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:492: Your `test_dataloader`'s sampler has shuffling enabled, it is strongly recommended that you turn shuffling off for val/test dataloaders.
c:\Users\adham ali\AppData\Local\Programs\Python\Python310\lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'test_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=11` in the `DataLoader` to improve performance.


Testing: |          | 0/? [00:00<?, ?it/s]

[{'test_acc': 0.7661538124084473}]