In [None]:
# pip installs

In [None]:
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

from sklearn.model_selection import train_test_split
import statistics
from tqdm import tqdm

from zipfile import ZipFile

from tokenizers import Tokenizer, models, pre_tokenizers, trainers

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


# Data preprocessing

### here we need to load the data and extract only data with vowels punctuations

In [None]:
import requests
import os
import json
# path to the jason file for the dataset
data_json_path = 'data/books.json'

# Root directory where the downloaded files will be saved
texts_path = 'data/texts'


# Create the directory if it does not exist
if not os.path.exists(texts_path):
    os.makedirs(texts_path)


# Load the json dataset
with open(data_json_path, 'r', encoding='utf-8') as f:
    jason_data = json.load(f)

# download the files and save them in a folder

#### remove\add the comment as needed

In [None]:
# Loop through the json dataset and download the files
for entry in tqdm(jason_data):
    try:
        # Download the Nikud Meteg file
        if entry['fileName'] + '__nikud_meteg' in os.listdir(texts_path):
            continue
        nikud_meteg_url = entry['nikudMetegFileURL']
        nikud_meteg_local_path = os.path.join(texts_path, entry['fileName'] + '__nikud_meteg.zip')
        nikud_meteg_response = requests.get(nikud_meteg_url)
        with open(nikud_meteg_local_path, 'wb') as f:
            f.write(nikud_meteg_response.content)

            # Unzip the Nikud Meteg file
            with ZipFile(nikud_meteg_local_path, 'r') as zipObj:
                zipObj.extractall(os.path.join(texts_path, entry['fileName'] + '__nikud_meteg'))
    except Exception as e:
        print(f"Error reading file {entry['fileName']}: {e}")
        continue


# iterate through the texts folder and delete the zip folders
for file in tqdm(os.listdir(texts_path)):
    if file.endswith(".zip"):
        os.remove(os.path.join(texts_path, file))

            


# Author files

### Create a dictionary whose keys are authors and values are a list containing all it's files

In [None]:
# Define a method to create the author files dictionary
def create_author_files_dict(author_files):
    """
    This function creates a dictionary of author files with a list of their corresponding texts.
    """
    author_files_dict = {}
    for file in author_files:
        author_files_dict[file] = []
        for text_file_name in os.listdir(os.path.join(texts_path, file)):
            if text_file_name.endswith('.txt'):
                author_files_dict[file].append(text_file_name)
    return author_files_dict

author_files = os.listdir(texts_path)
author_files_dict = create_author_files_dict(author_files)

# Functions to clean the data

In [None]:
# Read a txt file from the author files dictionary
def read_txt_file(file_path):
    """
    This function reads a txt file and returns the text as a string.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()
    return text

def remove_nikud(string):
    """Removes the nikud from the given string."""
    nikud = re.compile(r'[\u05B0-\u05C2]') # Nikud unicode range (https://en.wikipedia.org/wiki/Unicode_and_HTML_for_the_Hebrew_alphabet)
    # negate the nikud regex
    return nikud.sub("", string)

def get_nikud(word):
    """Returns the nikud from the given word."""
    nikud = re.compile(r'[\u05B0-\u05C2]') # Nikud unicode range (https://en.wikipedia.org/wiki/Unicode_and_HTML_for_the_Hebrew_alphabet)
    # check if each element in the array is nikud
    current_nikud = ''
    nikud_arr = []
    for i in range(len(word)):
        if i == 0:
            continue
        if nikud.match(word[i]):
            current_nikud += word[i]
        else:
            nikud_arr.append(current_nikud)
            current_nikud = ''
    nikud_arr.append(current_nikud)
    return nikud_arr

def add_nikud(word, nikud):
    """Adds the nikud to the given word."""
    new_word = ''
    for i in range(len(word)):
        new_word += word[i] + nikud[i]
    return new_word

# create DataSet class

In [None]:
# Define the dataset class, get iten will return the text and the nikud_meteg
class NikudMetegDataset(Dataset):
    def __init__(self, files_list):
        self.files_list = files_list

    def __len__(self):
        return len(self.files_list)

    def __getitem__(self, idx):
        # Read the text and the nikud_meteg files
        with open(self.files_list[idx][0], 'r', encoding='utf-8') as f:
            text = f.read()
        with open(self.files_list[idx][1], 'r', encoding='utf-8') as f:
            nikud_meteg = f.read()
        return text, nikud_meteg



# split the data into train validation and test
train_files_list, test_files_list = train_test_split(files_tuple_list, test_size=0.1, random_state=42)
train_files_list, val_files_list = train_test_split(train_files_list, test_size=0.2, random_state=42)

# Create a dataset object
train_dataset = NikudMetegDataset(train_files_list)
val_dataset = NikudMetegDataset(val_files_list)
test_dataset = NikudMetegDataset(test_files_list)

print(f"Number of train files: {len(train_dataset)}")
print(f"Number of validation files: {len(val_dataset)}")
print(f"Number of test files: {len(test_dataset)}")


# performe EDAs on the data

In [None]:
def perform_eda(dataset, title):
    # Get the text and nikud_meteg data from the dataset
    texts = []
    nikud_metegs = []
    for i in tqdm(range(len(dataset))):
        text, nikud_meteg = dataset[i]
        texts.append(text)
        nikud_metegs.append(nikud_meteg)

    # Calculate the length of the texts and nikud_metegs
    text_lengths = [len(text) for text in texts]
    nikud_meteg_lengths = [len(nikud_meteg) for nikud_meteg in nikud_metegs]

    # Calculate the mean and standard deviation of the text and nikud_meteg lengths
    text_mean_length = sum(text_lengths) / len(text_lengths)
    text_std_length = statistics.stdev(text_lengths)
    nikud_meteg_mean_length = sum(nikud_meteg_lengths) / len(nikud_meteg_lengths)
    nikud_meteg_std_length = statistics.stdev(nikud_meteg_lengths)

    # Print the results
    print(f"Number of samples in the dataset: {len(dataset)}")
    print(f"Mean text length: {text_mean_length:.2f} (std: {text_std_length:.2f})")
    print(f"Mean nikud_meteg length: {nikud_meteg_mean_length:.2f} (std: {nikud_meteg_std_length:.2f})")

    # Create histograms of the text and nikud_meteg lengths
    plt.hist(text_lengths, bins=50)
    plt.title(f"{title} - Text Lengths")
    plt.xlabel("Length")
    plt.ylabel("Frequency")
    plt.show()

    plt.hist(nikud_meteg_lengths, bins=50)
    plt.title(f"{title} - Nikud Meteg Lengths")
    plt.xlabel("Length")
    plt.ylabel("Frequency")
    plt.show()

    # Create a boxplot of the text and nikud_meteg lengths
    plt.boxplot(text_lengths)
    plt.title(f"{title} - Text Lengths")
    plt.ylabel("Length")
    plt.show()

    plt.boxplot(nikud_meteg_lengths)
    plt.title(f"{title} - Nikud Meteg Lengths")
    plt.ylabel("Length")
    plt.show()


# Perform EDA on the train, validation and test datasets
print("Train Dataset EDA:")
perform_eda(train_dataset, "Train Dataset")
print("Validation Dataset EDA:")
perform_eda(val_dataset, "Validation Dataset")
print("Test Dataset EDA:")
perform_eda(test_dataset, "Test Dataset")


# dataloader and tokenizer

In [None]:
# Define dataloader parameters
batch_size = 1

# Create the dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# test the dataloader
for text, nikud_meteg in train_dataloader:
    # print as list of chars
    print((text[0]))
    print((nikud_meteg[0]))
    break


# Define the two models (one with look-ahead, one without)

### train the two models

### Evaluate the models

In [None]:
class PunctuationPredictionModelWithLookahead(nn.Module):
    def __init__(self, num_words, num_punctuations, embedding_dim=256, num_heads=8, num_layers=1):
        super().__init__()
        self.embedding = nn.Embedding(num_words, embedding_dim)
        self.transformer = nn.Transformer(embedding_dim, num_heads, num_layers)
        self.fc = nn.Linear(embedding_dim, num_punctuations)

    def forward(self, x):
        x = self.embedding(x)
        x = self.transformer(x)
        x = self.fc(x)
        return x


class PunctuationPredictionModelWithoutLookahead(nn.Module):
    def __init__(self, num_words, num_punctuations, embedding_dim=256, num_heads=8, num_layers=1):
        super().__init__()
        self.embedding = nn.Embedding(num_words, embedding_dim)
        self.transformer = nn.Transformer(embedding_dim, num_heads, num_layers)
        self.fc = nn.Linear(embedding_dim, num_punctuations)

    def forward(self, x):
        x = self.embedding(x)
        mask = self._generate_future_mask(x.size(0)).to(x.device)
        x = self.transformer(x, src_mask=mask)
        x = self.fc(x)
        return x

    def _generate_future_mask(self, size):
        mask = (torch.triu(torch.ones(size, size)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask


# Define the dual model class, it will be composed of two models.
#### whenever there is a disagreement between the two models, the model will add nikud using the lookahead model

## Evaluation of the dual model