In [14]:
import re
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [15]:
def read_cleaned_train(file_path='./data/cleaned_train.txt'):
    """Read text from cleaned_train.txt file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        print(f"Successfully read {len(text)} characters from {file_path}")
        return text
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error reading file: {e}")
        return None

# Read the file
text = read_cleaned_train()
if text:
    print(f"First 500 characters:\n{text[:500]}")
text = text[:180920]

Successfully read 18092077 characters from ./data/cleaned_train.txt
First 500 characters:
وَلَوْ جَمَعَ ثُمَّ عَلِمَ تَرْكَ رُكْنٍ مِنْ الْأُولَى بَطَلَتَا وَيُعِيدُهُمَا جَامِعًا ، أَوْ مِنْ الثَّانِيَةِ ، فَإِنْ لَمْ يَطُلْ تَدَارَكَ ، وَإِلَّا فَبَاطِلَةٌ وَلَا جَمَعَ ، وَلَوْ جَهِلَ أَعَادَهُمَا لِوَقْتَيْهِمَا
قَالَ أَبُو زَيْدٍ أَهْلُ تِهَامَةَ يُؤَنِّثُونَ الْعَضُدَ وَبَنُو تَمِيمٍ يُذَكِّرُونَ ، وَالْجَمْعُ أَعْضُدٌ وَأَعْضَادٌ مِثْلُ أَفْلُسٍ وَأَقْفَالٍ
بِمَنْزِلَةِ أَهْلِ الذِّمَّةِ إذَا دَخَلُوا قَرْيَةً مِنْ قُرَى أَهْلِ الْحَرْبِ ثُمَّ ظَفِرَ الْمُسْلِمُونَ بِهَا فَهُمْ


## Diacritic and Letter Mapper

In [16]:
class DiacriticMapper:
    def __init__(self):
        self.unicode_diacrits = self._build_diacritics()
        self.unicode_letters = self._build_letters()
        
        self.diacrits_to_index = {char: idx for idx, char in enumerate(self.unicode_diacrits)}
        self.index_to_diacrits = {idx: char for idx, char in enumerate(self.unicode_diacrits)}
        
        self.letters_to_index = {char: idx for idx, char in enumerate(self.unicode_letters)}
        self.index_to_letter = {idx: char for idx, char in enumerate(self.unicode_letters)}
    
    def _build_diacritics(self):
        diacritics = [chr(code) for code in range(0x064B, 0x0652 + 1)]
        # Add Shadda combinations (Shadda first)
        diacritics.extend([
            chr(0x0651) + chr(0x064E),  # Shadda + Fatha
            chr(0x0651) + chr(0x064F),  # Shadda + Damma
            chr(0x0651) + chr(0x0650),  # Shadda + Kasra
            chr(0x0651) + chr(0x064B),  # Shadda + Tanween Fath
            chr(0x0651) + chr(0x064C),  # Shadda + Tanween Damm
            chr(0x0651) + chr(0x064D),  # Shadda + Tanween Kasr
        ])
        # Add reverse order combinations (Vowel first)
        diacritics.extend([
            chr(0x064E) + chr(0x0651),  # Fatha + Shadda
            chr(0x064F) + chr(0x0651),  # Damma + Shadda
            chr(0x0650) + chr(0x0651),  # Kasra + Shadda
            chr(0x064B) + chr(0x0651),  # Tanween Fath + Shadda
            chr(0x064C) + chr(0x0651),  # Tanween Damm + Shadda
            chr(0x064D) + chr(0x0651),  # Tanween Kasr + Shadda
        ])
        diacritics.append("")  # No diacritic
        return diacritics
    
    def _build_letters(self):
        return [chr(code) for code in range(0x0620, 0x064A + 1)]
    
    @property
    def vocab_size(self):
        return len(self.unicode_letters)
    
    @property
    def num_classes(self):
        return len(self.unicode_diacrits)

mapper = DiacriticMapper()
print(f"Number of diacritics: {mapper.num_classes}")
print(f"Number of letters: {mapper.vocab_size}")

Number of diacritics: 21
Number of letters: 43


## Text Processor

In [17]:
class TextProcessor:
    def __init__(self, mapper):
        self.mapper = mapper
        self.pattern = r'[\u0620-\u064A][\u064B-\u0652]*'
    
    def extract_features(self, text):
        matches = re.findall(self.pattern, text)
        inputs = []
        targets = []
        
        for match in matches:
            inputs.append(self.mapper.letters_to_index[match[0]])
            
            if len(match) == 1:
                targets.append(self.mapper.diacrits_to_index[''])
            elif len(match) == 2:
                targets.append(self.mapper.diacrits_to_index[match[1]])
            elif len(match) == 3:
                targets.append(self.mapper.diacrits_to_index[match[1] + match[2]])
        
        return inputs, targets
    
    def remove_diacritics(self, text):
        return re.sub(r'[\u064B-\u0652]', "", text)
    
    def apply_diacritics(self, text, predictions):
        pattern = r'[\u0620-\u064A]'
        matches = list(re.finditer(pattern, text))
        result = ""
        last_pos = 0
        
        for i, match in enumerate(matches):
            result += text[last_pos:match.start()]
            result += match.group()
            
            pred_idx = predictions[i].item() if torch.is_tensor(predictions[i]) else int(predictions[i])
            result += self.mapper.index_to_diacrits[pred_idx]
            
            last_pos = match.end()
        
        result += text[last_pos:]
        return result

processor = TextProcessor(mapper)
input_data, target_data = processor.extract_features(text)
print(f"Input length: {len(input_data)}, Target length: {len(target_data)}")

Input length: 83584, Target length: 83584


## Neural Network Model

In [18]:
class SingleMaskModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, mask_classes):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, mask_classes)
    
    def forward(self, x):
        x = self.embedding(x)
        outputs, _ = self.rnn(x)
        logits = self.fc(outputs)
        return logits

class DiacritizationModel:
    def __init__(self, mapper, embedding_dim=64, hidden_dim=64):
        self.mapper = mapper
        self.model = SingleMaskModel(
            vocab_size=mapper.vocab_size,
            embedding_dim=embedding_dim,
            hidden_dim=hidden_dim,
            mask_classes=mapper.num_classes
        )
    
    def predict(self, inputs):
        self.model.eval()
        with torch.no_grad():
            logits = self.model(inputs)
            predictions = logits.argmax(dim=-1)
            return predictions.numpy().reshape(-1,)

diac_model = DiacritizationModel(mapper)

## Model Trainer

In [19]:
class ModelTrainer:
    def __init__(self, model, learning_rate=0.001):
        self.model = model
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    def train_step(self, input_tensor, target_tensor):
        self.model.train()
        self.optimizer.zero_grad()
        
        logits = self.model(input_tensor)
        logits_reshaped = logits.view(-1, logits.size(-1))
        target_reshaped = target_tensor.view(-1)
        
        loss = self.criterion(logits_reshaped, target_reshaped)
        loss.backward()
        self.optimizer.step()
        
        return loss.item()
    
    def train_epoch(self, input_data, target_data, batch_size=512, seq_length=128):
        """Train for one epoch with batching"""
        total_loss = 0
        num_batches = 0
        
        # Create batches
        for i in range(0, len(input_data) - seq_length, seq_length):
            batch_inputs = []
            batch_targets = []
            
            for j in range(0, batch_size):
                start_idx = i + j * seq_length
                if start_idx + seq_length >= len(input_data):
                    break
                batch_inputs.append(input_data[start_idx:start_idx + seq_length])
                batch_targets.append(target_data[start_idx:start_idx + seq_length])
            
            if not batch_inputs:
                break
                
            input_tensor = torch.tensor(batch_inputs)
            target_tensor = torch.tensor(batch_targets)
            
            loss = self.train_step(input_tensor, target_tensor)
            total_loss += loss
            num_batches += 1
            
            if num_batches % 100 == 0:
                print(f"  Batch {num_batches}, Loss: {loss:.4f}")
        
        return total_loss / num_batches if num_batches > 0 else 0

trainer = ModelTrainer(diac_model.model)

print(f"Training samples: {len(input_data)}")

# Train for multiple epochs
num_epochs = 2
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")
    avg_loss = trainer.train_epoch(input_data, target_data)
    print(f"Average Loss: {avg_loss:.4f}")

Training samples: 83584

Epoch 1/2
  Batch 100, Loss: 1.2515
  Batch 100, Loss: 1.2515
  Batch 200, Loss: 1.0325
  Batch 200, Loss: 1.0325
  Batch 300, Loss: 0.8728
  Batch 300, Loss: 0.8728
  Batch 400, Loss: 0.7465
  Batch 400, Loss: 0.7465
  Batch 500, Loss: 0.6022
  Batch 500, Loss: 0.6022
  Batch 600, Loss: 0.3495
  Batch 600, Loss: 0.3495
Average Loss: 0.8962

Epoch 2/2
Average Loss: 0.8962

Epoch 2/2
  Batch 100, Loss: 0.6112
  Batch 100, Loss: 0.6112
  Batch 200, Loss: 0.5314
  Batch 200, Loss: 0.5314
  Batch 300, Loss: 0.4507
  Batch 300, Loss: 0.4507
  Batch 400, Loss: 0.3517
  Batch 400, Loss: 0.3517
  Batch 500, Loss: 0.2168
  Batch 500, Loss: 0.2168
  Batch 600, Loss: 0.0824
  Batch 600, Loss: 0.0824
Average Loss: 0.4043
Average Loss: 0.4043


## Evaluator

In [20]:
class Evaluator:
    @staticmethod
    def calculate_accuracy(target, predictions):
        correct = sum(1 for t, p in zip(target, predictions) if t == p)
        total = len(target)
        acc = (correct / total) * 100 if total > 0 else 0
        
        print(f"Total diacritics: {total}")
        print(f"Correct predictions: {correct}")
        print(f"Incorrect predictions: {total - correct}")
        print(f"Diacritic Accuracy: {acc:.2f}%")
        
        return acc

evaluator = Evaluator()

## Prediction and Evaluation

In [21]:
train_input_tensor = torch.tensor([input_data])
predictions = diac_model.predict(train_input_tensor)
print(f"Predictions length: {len(predictions)}")

nonDictText = processor.remove_diacritics(text)
final_text = processor.apply_diacritics(nonDictText, predictions)

print("\nOriginal text:")
print(text[:500])
print("\nPredicted text:")
print(final_text[:500])

accuracy = evaluator.calculate_accuracy(target_data, predictions)
print(f"\nTraining Accuracy: {accuracy:.2f}%")

Predictions length: 83584

Original text:
وَلَوْ جَمَعَ ثُمَّ عَلِمَ تَرْكَ رُكْنٍ مِنْ الْأُولَى بَطَلَتَا وَيُعِيدُهُمَا جَامِعًا ، أَوْ مِنْ الثَّانِيَةِ ، فَإِنْ لَمْ يَطُلْ تَدَارَكَ ، وَإِلَّا فَبَاطِلَةٌ وَلَا جَمَعَ ، وَلَوْ جَهِلَ أَعَادَهُمَا لِوَقْتَيْهِمَا
قَالَ أَبُو زَيْدٍ أَهْلُ تِهَامَةَ يُؤَنِّثُونَ الْعَضُدَ وَبَنُو تَمِيمٍ يُذَكِّرُونَ ، وَالْجَمْعُ أَعْضُدٌ وَأَعْضَادٌ مِثْلُ أَفْلُسٍ وَأَقْفَالٍ
بِمَنْزِلَةِ أَهْلِ الذِّمَّةِ إذَا دَخَلُوا قَرْيَةً مِنْ قُرَى أَهْلِ الْحَرْبِ ثُمَّ ظَفِرَ الْمُسْلِمُونَ بِهَا فَهُمْ

Predicted text:
وَلَوْ جَمَعْ ثُمَّ عُلُمْ تُرَكٍ رَكْنّ مِنْ الْأَوْلَى بَطَلَتَا وَيَعِيَدُهُمَا جَامَعًا ، أَو مِنْ الثَّانِيَةِ ، فَإِنْ لَمْ يُطُلُ تُدَارَكَ ، وْإلَا فَبَاطَلَةٍ وَلَا جَمْعَ ، وَلَوْ جَهُلَ أَعَادُهُمَا لَوَقَتَيْهِمَا
قِالَ أَبُو زَيْدُ أَهْلِ تْهَامَةَ يُؤَنّثُونَ الَعَضْدُ وَبَنُوَ تَمِيمِ يذَكَرُونٌ ، وَالْجَمْعِ أَعْضَدُ وَأَعْضَادَ مِثُلُ أَفْلَسَ وَأَقَفَالَ
بَمْنْزِلَةُ أَهْلَ الذَّمَةِ إذَا دَخَلَوَا قُرِّيَةُ مِنْ قَرَى

In [22]:
# Read validation data
def read_cleaned_val(file_path='./data/cleaned_val.txt'):
    """Read text from cleaned_val.txt file."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            val_text = f.read()
        print(f"Successfully read {len(val_text)} characters from {file_path}")
        return val_text
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error reading file: {e}")
        return None

val_text = read_cleaned_val()
if val_text:
    print(f"First 500 characters of validation:\n{val_text[:500]}")
    
    # Extract features from validation data
    val_input_data, val_target_data = processor.extract_features(val_text)
    print(f"\nValidation - Input length: {len(val_input_data)}, Target length: {len(val_target_data)}")
    
    # Predict on validation data
    val_input_tensor = torch.tensor([val_input_data])
    val_predictions = diac_model.predict(val_input_tensor)
    print(f"Validation predictions length: {len(val_predictions)}")
    
    # Apply diacritics to validation text
    val_nonDictText = processor.remove_diacritics(val_text)
    val_final_text = processor.apply_diacritics(val_nonDictText, val_predictions)
    
    print("\n" + "="*50)
    print("VALIDATION RESULTS")
    print("="*50)
    print("\nOriginal validation text:")
    print(val_text[:500])
    print("\nPredicted validation text:")
    print(val_final_text[:500])
    
    print("\n" + "="*50)
    val_accuracy = evaluator.calculate_accuracy(val_target_data, val_predictions)
    print(f"\nValidation Accuracy: {val_accuracy:.2f}%")
    print("="*50)

Successfully read 882191 characters from ./data/cleaned_val.txt
First 500 characters of validation:
الشَّهَادَةِ ظَاهِرَةً ، وَبِحَقٍّ بَيِّنٍ تَضْعُفُ التُّهْمَةُ ، وَهُوَ الْفَرْقُ بَيْنَهُ وَبَيْنَ الشَّهَادَةِ ، وَعَنْ أَصْبَغَ الْجَوَازُ فِي الْوَلَدِ وَالزَّوْجَةِ وَالْأَخِ وَالْمُكَاتَبِ وَالْمُدَبَّرِ وَالْمِدْيَانِ إنْ كَانَ مِنْ أَهْلِ الْقِيَامِ بِالْحَقِّ ، وَصَحَّ الْحُكْمُ ، وَقَدْ يَحْكُمُ لِلْخَلِيفَةِ ، وَهُوَ فَوْقَهُ ، وَتُهْمَتُهُ أَقْوَى ، وَلَا يَنْبَغِي لَهُ الْقَضَاءُ بَيْنَ أَحَدٍ مِنْ عَشِيرَتِهِ وَخَصْمِهِ ، وَإِنْ رَضِيَ الْخَصْمُ بِخِلَافِ رَجُلَيْنِ رَضِيَا بِحُكْ

Validation - Input length: 407434, Target length: 407434
Validation predictions length: 407434
Validation predictions length: 407434

VALIDATION RESULTS

Original validation text:
الشَّهَادَةِ ظَاهِرَةً ، وَبِحَقٍّ بَيِّنٍ تَضْعُفُ التُّهْمَةُ ، وَهُوَ الْفَرْقُ بَيْنَهُ وَبَيْنَ الشَّهَادَةِ ، وَعَنْ أَصْبَغَ الْجَوَازُ فِي الْوَلَدِ وَالزَّوْجَةِ وَالْأَخِ وَالْمُكَاتَبِ وَالْمُدَبَّرِ وَالْمِ