In [1]:
from util import *

In [2]:
EXTRACT_FEATRURES = False

In [3]:
train_dir = 'Dataset/train_clean.txt'
val_dir = 'Dataset/val_clean.txt'
test_dir = 'Dataset/test_clean.txt'

In [4]:
with open(train_dir, 'r', encoding='utf8') as f:
    train = f.read()
with open(val_dir, 'r', encoding='utf8') as f:
    val = f.read()
with open(test_dir, 'r', encoding='utf8') as f:
    test = f.read()

In [5]:
max_len = 600

In [6]:
X_train_dir = 'pkl_dir/X_train.txt'
y_train_dir = 'pkl_dir/y_train.txt'
X_val_dir = 'pkl_dir/X_val.txt'
y_val_dir = 'pkl_dir/y_val.txt'
X_test_dir = 'pkl_dir/X_test.txt'
y_test_dir = 'pkl_dir/y_test.txt'

if EXTRACT_FEATRURES:
    X_train, y_train = extract_features(train, max_len)
    X_val, y_val = extract_features(val, max_len)
    X_test, y_test = extract_features(test, max_len)

    with open(X_train_dir, 'wb') as f:
        pickle.dump(X_train, f)
    with open(y_train_dir, 'wb') as f:
        pickle.dump(y_train, f)
    with open(X_val_dir, 'wb') as f:
        pickle.dump(X_val, f)
    with open(y_val_dir, 'wb') as f:
        pickle.dump(y_val, f)
    with open(X_test_dir, 'wb') as f:
        pickle.dump(X_test, f)
    with open(y_test_dir, 'wb') as f:
        pickle.dump(y_test, f)

else:
    with open(X_train_dir, 'rb') as f:
        X_train = pickle.load(f)
    with open(y_train_dir, 'rb') as f:
        y_train = pickle.load(f)
    with open(X_val_dir, 'rb') as f:
        X_val = pickle.load(f)
    with open(y_val_dir, 'rb') as f:
        y_val = pickle.load(f)
    with open(X_test_dir, 'rb') as f:
        X_test = pickle.load(f)
    with open(y_test_dir, 'rb') as f:
        y_test = pickle.load(f)

In [7]:
j = 0
i = 20
print(chr(X_train[j][i]))
print(id2diacritic[y_train[j][i]])

و
َّ


In [8]:
del train, val, test

In [9]:
test_len = 50000
np.random.seed(42)
indices = np.arange(test_len)
np.random.shuffle(indices)
X_test = X_test[indices]
y_test = y_test[indices]

In [10]:
sentence_encoder = LabelEncoder().fit(X_train.flatten())
X_train = sentence_encoder.transform(X_train.flatten()).reshape(X_train.shape).astype(np.int16)
X_val = sentence_encoder.transform(X_val.flatten()).reshape(X_val.shape).astype(np.int16)
X_test = sentence_encoder.transform(X_test.flatten()).reshape(X_test.shape).astype(np.int16)

In [11]:
from typing import List, Optional

from torch import nn
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset


class BatchNormConv1d(nn.Module):
    """
    A nn.Conv1d followed by an optional activation function, and nn.BatchNorm1d
    """

    def __init__(
        self,
        in_dim: int,
        out_dim: int,
        kernel_size: int,
        stride: int,
        padding: int,
        activation = None,
    ):
        super().__init__()
        self.conv1d = nn.Conv1d(
            in_dim,
            out_dim,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            bias=False,
        )
        self.bn = nn.BatchNorm1d(out_dim)
        self.activation = activation

    def forward(self, x):
        x = self.conv1d(x)
        if self.activation is not None:
            x = self.activation(x)
        return self.bn(x)


class Prenet(nn.Module):
    """
    A prenet is a collection of linear layers with dropout(0.5),
    and RELU activation function
    Args:
    config: the hyperparameters object
    in_dim (int): the input dim
    """

    def __init__(
        self, in_dim: int, prenet_depth: List[int] = [256, 128], dropout: int = 0.5
    ):
        """ Initializing the prenet module """
        super().__init__()
        in_sizes = [in_dim] + prenet_depth[:-1]
        self.layers = nn.ModuleList(
            [
                nn.Linear(in_size, out_size)
                for (in_size, out_size) in zip(in_sizes, prenet_depth)
            ]
        )
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs: torch.Tensor):
        """Calculate forward propagation
        Args:
        inputs (batch_size, seqLen): the inputs to the prenet, the input shapes could
        be different as it is being used in both encoder and decoder.
        Returns:
        Tensor: the output of  the forward propagation
        """
        for linear in self.layers:
            inputs = self.dropout(self.relu(linear(inputs)))
        return inputs

class Highway(nn.Module):
    """Highway Networks were developed by (Srivastava et al., 2015)
    to overcome the difficulty of training deep neural networks
    (https://arxiv.org/abs/1507.06228).
    Args:
    in_size (int): the input size
    out_size (int): the output size
    """

    def __init__(self, in_size, out_size):
        """
        Initializing Highway networks
        """
        super().__init__()
        self.H = nn.Linear(in_size, out_size)
        self.H.bias.data.zero_()
        self.T = nn.Linear(in_size, out_size)
        self.T.bias.data.fill_(-1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs: torch.Tensor):
        """Calculate forward propagation
        Args:
        inputs (Tensor):
        """
        H = self.relu(self.H(inputs))
        T = self.sigmoid(self.T(inputs))
        return H * T + inputs * (1.0 - T)


class CBHG(nn.Module):
    """The CBHG module (1-D Convolution Bank + Highway network + Bidirectional GRU)
    was proposed by (Lee et al., 2017, https://www.aclweb.org/anthology/Q17-1026)
    for a character-level NMT model.
    It was adapted by (Wang et al., 2017) for building the Tacotron.
    It is used in both the encoder and decoder  with different parameters.
    """

    def __init__(
        self,
        in_dim: int,
        out_dim: int,
        K: int,
        projections: List[int],
        highway_layers: int = 4,
    ):
        """Initializing the CBHG module
        Args:
        in_dim (int): the input size
        out_dim (int): the output size
        k (int): number of filters
        """
        super().__init__()

        self.in_dim = in_dim
        self.out_dim = out_dim
        self.relu = nn.ReLU()
        self.conv1d_banks = nn.ModuleList(
            [
                BatchNormConv1d(
                    in_dim,
                    in_dim,
                    kernel_size=k,
                    stride=1,
                    padding=k // 2,
                    activation=self.relu,
                )
                for k in range(1, K + 1)
            ]
        )
        self.max_pool1d = nn.MaxPool1d(kernel_size=2, stride=1, padding=1)

        in_sizes = [K * in_dim] + projections[:-1]
        activations = [self.relu] * (len(projections) - 1) + [None]
        self.conv1d_projections = nn.ModuleList(
            [
                BatchNormConv1d(
                    in_size, out_size, kernel_size=3, stride=1, padding=1, activation=ac
                )
                for (in_size, out_size, ac) in zip(in_sizes, projections, activations)
            ]
        )

        self.pre_highway = nn.Linear(projections[-1], in_dim, bias=False)
        self.highways = nn.ModuleList([Highway(in_dim, in_dim) for _ in range(4)])

        self.gru = nn.GRU(in_dim, out_dim, 1, batch_first=True, bidirectional=True)

    def forward(self, inputs, input_lengths=None):
        # (B, T_in, in_dim)
        x = inputs
        x = x.transpose(1, 2)
        T = x.size(-1)

        # (B, in_dim*K, T_in)
        # Concat conv1d bank outputs
        x = torch.cat([conv1d(x)[:, :, :T] for conv1d in self.conv1d_banks], dim=1)
        assert x.size(1) == self.in_dim * len(self.conv1d_banks)
        x = self.max_pool1d(x)[:, :, :T]

        for conv1d in self.conv1d_projections:
            x = conv1d(x)

        # (B, T_in, in_dim)
        # Back to the original shape
        x = x.transpose(1, 2)

        if x.size(-1) != self.in_dim:
            x = self.pre_highway(x)

        # Residual connection
        x += inputs
        for highway in self.highways:
            x = highway(x)

        if input_lengths is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, input_lengths, batch_first=True)

        # (B, T_in, in_dim*2)
        self.gru.flatten_parameters()
        outputs, _ = self.gru(x)

        if input_lengths is not None:
            outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)

        return outputs

class CBHGModel(nn.Module):
    """CBHG model implementation as described in the paper:
     https://ieeexplore.ieee.org/document/9274427

    Args:
    inp_vocab_size (int): the number of the input symbols
    targ_vocab_size (int): the number of the target symbols (diacritics)
    embedding_dim (int): the embedding  size
    prenet_sizes (List[int]): the sizes of the prenet networks
    cbhg_gru_units (int): the number of units of the CBHG GRU, which is the last
    layer of the CBHG Model.
    cbhg_filters (int): number of filters used in the CBHG module
    cbhg_projections: projections used in the CBHG module

    Returns:
    diacritics Dict[str, Tensor]:
    """

    def __init__(
        self,
        inp_vocab_size: int,
        targ_vocab_size: int,
        embedding_dim: int = 512,
        prenet_sizes: List[int] = [512, 256],
        cbhg_gru_units: int = 256,
        cbhg_filters: int = 16,
        cbhg_projections: List[int] = [128, 256],
        post_cbhg_layers_units: List[int] = [256, 256],
        post_cbhg_use_batch_norm: bool = True
    ):
        super().__init__()
        self.embedding = nn.Embedding(inp_vocab_size, embedding_dim)

        self.prenet = Prenet(embedding_dim, prenet_depth=prenet_sizes)

        self.cbhg = CBHG(
            prenet_sizes[-1],
            cbhg_gru_units,
            K=cbhg_filters,
            projections=cbhg_projections,
        )

        layers = []
        post_cbhg_layers_units = [cbhg_gru_units] + post_cbhg_layers_units

        for i in range(1, len(post_cbhg_layers_units)):
            layers.append(
                nn.LSTM(
                    post_cbhg_layers_units[i - 1] * 2,
                    post_cbhg_layers_units[i],
                    bidirectional=True,
                    batch_first=True,
                )
            )
            if post_cbhg_use_batch_norm:
                layers.append(nn.BatchNorm1d(post_cbhg_layers_units[i] * 2))

        self.post_cbhg_layers = nn.ModuleList(layers)
        self.projections = nn.Linear(post_cbhg_layers_units[-1] * 2, targ_vocab_size)
        self.post_cbhg_layers_units = post_cbhg_layers_units
        self.post_cbhg_use_batch_norm = post_cbhg_use_batch_norm


    def forward(
        self,
        src: torch.Tensor,
        lengths: Optional[torch.Tensor] = None,
    ):
        """Compute forward propagation"""

        # src = [batch_size, src len]
        # lengths = [batch_size]
        # target = [batch_size, trg len]

        embedding_out = self.embedding(src)
        # embedding_out; [batch_size, src_len, embedding_dim]

        cbhg_input = self.prenet(embedding_out)

        # cbhg_input = [batch_size, src_len, prenet_sizes[-1]]

        outputs = self.cbhg(cbhg_input, lengths)

        hn = torch.zeros((2, 2, 2))
        cn = torch.zeros((2, 2, 2))

        for i, layer in enumerate(self.post_cbhg_layers):
            if isinstance(layer, nn.BatchNorm1d):
                outputs = layer(outputs.permute(0, 2, 1))
                outputs = outputs.permute(0, 2, 1)
                continue
            if i > 0:
                outputs, (hn, cn) = layer(outputs, (hn, cn))
            else:
                outputs, (hn, cn) = layer(outputs)


        predictions = self.projections(outputs)

        return predictions
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CBHGModel(len(sentence_encoder.classes_), np.unique(y_train).shape[0]).to(device)
model = torch.load('model_cbhg.ckpt')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-7)

  from .autonotebook import tqdm as notebook_tqdm


In [12]:
#print #number of trainable parameters only
print(
    sum(p.numel() for p in model.parameters() if p.requires_grad)
)

15560719


In [13]:
batch_size = 64

In [14]:
def batch_eval(x_np, y_np):
    X_tensor = torch.tensor(x_np, dtype=torch.int32)
    y_tensor = torch.tensor(y_np, dtype=torch.int32)
    model.eval()
    predictions = torch.zeros_like(y_tensor)

    with torch.no_grad():
        for i in range(0, len(X_tensor), batch_size):
            inputs = X_tensor[i:i+batch_size].to(device)
            outputs = model(inputs)
            _, predictions[i:i+batch_size] = torch.max(outputs.data, 2)
            del inputs, outputs

    return predictions.cpu().numpy()

In [15]:
def compute_DER(X, y):
    predictions = batch_eval(X, y)
    ignore = {'!', '«', ']', '[', '}', ':', '"', '-', '»', '؛', ')', '،', '؟', '(', '{', '/', ' ', PAD, SOS, EOS}
    cnt = 0
    for itm in ignore:
        cnt += np.sum(X == sentence_encoder.transform(np.array([ord(itm)]))[0])
    accuracy = (np.sum(predictions == y) - cnt) / (predictions.shape[0] * predictions.shape[1] - cnt)
    return 1 - accuracy

In [16]:
num_epochs = 20
for epoch in range(num_epochs):
    model.train()

    X_train_tensor = torch.tensor(X_train, dtype=torch.int64).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.int64).to(device)
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    accuracy = 0
    loss_cum_sum = 0
    len_ = 0
    for i, (x, y) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs.view(-1, np.unique(y_train).shape[0]), y.view(-1))
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.data, 2)
        accuracy += (predicted == y).sum().item()
        loss_cum_sum += loss.item()
        len_ += y.size(0) * y.size(1)
        if (i + 1) % 1 == 0:
            print('Step [{}/{}], Epoch [{}/{}], Loss: {:.4f}, Accuracy: {:.4f}'.format(i + 1, len(train_loader), epoch + 1, num_epochs, loss_cum_sum / (i + 1), accuracy / len_), end='\r')
    print()
    del X_train_tensor, y_train_tensor, train_dataset, train_loader
    print('DER: {:.5f}'.format(compute_DER(X_val, y_val)))

Step [625/625], Epoch [1/20], Loss: 0.0073, Accuracy: 0.9974
DER: 0.02925
Step [625/625], Epoch [2/20], Loss: 0.0063, Accuracy: 0.9977
DER: 0.02992
Step [625/625], Epoch [3/20], Loss: 0.0066, Accuracy: 0.9977
DER: 0.02958
Step [41/625], Epoch [4/20], Loss: 0.0053, Accuracy: 0.9982

KeyboardInterrupt: 

In [17]:
torch.save(model, 'model_cbhg.ckpt')

In [None]:
print('DER: {:.5f}'.format(compute_DER(X_test, y_test)))

DER: 0.02850


In [None]:
def diacritize_string(sentence_test_str, model, sentence_encoder, max_len):
    sentence = SOS + sentence_test_str + EOS
    sentence_no_diacritics, labels = extract_data_single(sentence)
    sentence_no_diac_clamped, labels_clamped = clamp_sentence(sentence_no_diacritics, labels, max_len)
    sentence, labels_encoded = encode_sentences(sentence_no_diac_clamped, labels_clamped)
    sentence = sentence_encoder.transform(sentence.reshape(-1)).reshape(1, -1)
    
    sentence = torch.tensor(sentence, dtype=torch.int32).to(device)
    outputs = model(sentence)
    _, pred = torch.max(outputs.data, 2)
    pred = pred.cpu().numpy().reshape(-1)

    sentence = ''
    for i in range(len(sentence_no_diacritics)):
        sentence += sentence_no_diacritics[i]
        sentence += id2diacritic[pred[i]]
            
    return sentence

In [None]:
sentence_test_str = 'كَانَ مَعَهُ أَلْفٌ فَقَالَ هِيَ مُضَارَبَةٌ لِفُلَانٍ بِالنِّصْفِ وَقَدْ رَبِحَ أَلْفًا فَقَالَ فُلَانٌ هِيَ بِضَاعَةٌ فَالْقَوْلُ قَوْلُ رَبِّ الْمَالِ'
print(sentence_test_str)
print(diacritize_string(sentence_test_str, model, sentence_encoder, max_len)[1:-1])

كَانَ مَعَهُ أَلْفٌ فَقَالَ هِيَ مُضَارَبَةٌ لِفُلَانٍ بِالنِّصْفِ وَقَدْ رَبِحَ أَلْفًا فَقَالَ فُلَانٌ هِيَ بِضَاعَةٌ فَالْقَوْلُ قَوْلُ رَبِّ الْمَالِ
كَانَ مَعَهُ أَلْفٌ فَقَالَ هِيَ مُضَارَبَةٌ لِفُلَانٍ بِالنِّصْفِ وَقَدْ رَبِحَ أَلْفًا فَقَالَ فُلَانٌ هِيَ بِضَاعَةٌ فَالْقَوْلُ قَوْلُ رَبِّ الْمَالِ
