In [None]:
!pip install botok
!pip install spacy

##### config.py

In [None]:

import json
from pathlib import Path
from typing import Dict, Optional


def _mkdir(path):
    if path.is_dir():
        return path
    path.mkdir(exist_ok=True, parents=True)
    return path


BASE_PATH = _mkdir(Path.home() / ".mt_files")
BO_FILES_PATH = _mkdir(BASE_PATH / "tibetan_files")
EN_FILES_PATH = _mkdir(BASE_PATH / "english_files")

"""Path to the folder where the tokenized files(both english and tibetan files) will be stored"""
TOKENIZED_FILES_PATH = _mkdir(BASE_PATH / "tokenized_files")


CHECKPOINT_FILE = BASE_PATH / "checkpoint.json"


def load_checkpoint():
    """Load the last checkpoint or create the file if it doesn't exist."""
    if not CHECKPOINT_FILE.exists():
        CHECKPOINT_FILE.touch()  # Create the file if it doesn't exist
        return []

    with CHECKPOINT_FILE.open("r") as file:
        try:
            return json.load(file)
        except json.JSONDecodeError:
            return {}


def save_checkpoint(id_, stage: str, version: str = None):
    """
    Save a checkpoint for a specific ID and stage.

    :param id_: The ID to save the checkpoint for.
    :param stage: The stage (e.g., 'Tokenization', 'Alignment') of the process.
    """
    checkpoints = load_checkpoint()
    if id_ not in checkpoints:
        checkpoints[id_] = {
            "Tokenization": False,
            "Alignment": False,
            "re_alignment_versions": [],
        }

    """Save the checkpoint for the ID and stage."""
    if stage == "re_alignment":
        checkpoints[id_]["re_alignment_versions"].append(version)
    else:
        checkpoints[id_][stage] = True

    with CHECKPOINT_FILE.open("w") as file:
        json.dump(checkpoints, file, indent=4)


def is_id_already_aligned(id_: str, id_checkpoints: Dict):
    if id_ in id_checkpoints and id_checkpoints[id_]["Alignment"]:
        return True
    return False


def is_id_already_tokenized(id_: str, id_checkpoints: Dict):
    if id_ in id_checkpoints and id_checkpoints[id_]["Tokenization"]:
        return True
    return False


def is_id_already_realigned(id_: str, version: Optional[str], id_checkpoints: Dict):
    if (
        id_ in id_checkpoints
        and version in id_checkpoints[id_]["re_alignment_versions"]
    ):
        return True
    return False




##### utility.py

In [None]:
import time
from functools import wraps


def execution_time(custom_name=None):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            name = custom_name if custom_name else func.__name__
            print(f"Total time taken for {name}: {end_time - start_time} seconds.")
            return result

        return wrapper

    return decorator

##### tokenizers.py

In [None]:
import re
from typing import List

import botok
from spacy.lang.en import English

bo_word_tokenizer = None
en_nlp = English()
en_nlp.add_pipe("sentencizer")
en_nlp.max_length = 5000000

# Types
SENT_PER_LINE_STR = str  # sentence per line string
IS_AFFIX_PART = bool


def get_bo_word_tokenizer():
    global bo_word_tokenizer
    if bo_word_tokenizer is None:
        bo_word_tokenizer = botok.WordTokenizer()
    return bo_word_tokenizer


def join_sentences(sentences):
    """Join sentences into a text with one sentence per line."""
    return "\n".join(sentences)


def en_preprocess(text: str) -> str:
    re_sub = [(r"\r\n", " "), (r"\n", " "), (r"\s{2,}", " "), (r"\t", " ")]
    for pattern, repl in re_sub:
        text = re.sub(pattern, repl, text)
    return text


def en_sent_tokenizer(text: SENT_PER_LINE_STR) -> SENT_PER_LINE_STR:
    """Tokenize a text into sentences."""
    print("[INFO] Tokenizing English text...")
    text = en_preprocess(text)
    doc = en_nlp(text)
    sentences = [sent.text for sent in doc.sents]
    return join_sentences(sentences)


def en_word_tokenizer(text: str) -> List[str]:
    """Tokenize a text into words."""
    doc = en_nlp(text)
    words = [token.text for token in doc]
    return words


def bo_preprocess(text: str) -> str:
    text = text.replace("\r", "").replace("\n", "")
    return text


def bo_sent_tokenizer(text: str) -> SENT_PER_LINE_STR:
    """Tokenize a text into sentences."""
    print("[INFO] Tokenizing Tibetan text...")

    def get_token_text(token):
        if hasattr(token, "text_cleaned") and token.text_cleaned:
            return token.text_cleaned
        else:
            return token.text

    # fmt: off
    opening_puncts = ['༁', '༂', '༃', '༄', '༅', '༆', '༇', '༈', '༉', '༊', '༑', '༒', '༺', '༼', '༿', '࿐', '࿑', '࿓', '࿔', '࿙']  # noqa: E501
    closing_puncts = ['།', '༎', '༏', '༐', '༔', '༴', '༻', '༽', '༾', '࿚']  # noqa: E501
    skip_chunk_types = [botok.vars.CharMarkers.CJK.name, botok.vars.CharMarkers.LATIN.name]
    # fmt: on

    # Regex to improve the chunking of shunits, this will be replaced by a better sentence segmentation in botok
    r_replace = [
        (r"༼༼[༠-༩]+[བན]༽", r""),  # delete source image numbers `ས་༼༤བ༽མེད་བ`
        (
            r"([^ང])་([༔།])",
            r"\1\2",
        ),  # delete spurious spaces added by botok in the cleantext values
        (
            r"([།གཤ]{1,2})\s+(།{1,2})",
            r"\1\2 ",
        ),  # Samdong Rinpoche style double shad. This needs to be applied on inference input
        # (r"", r""),
    ]

    text = bo_preprocess(text)
    sents_words = []
    tokenizer = get_bo_word_tokenizer()
    tokens = tokenizer.tokenize(text, split_affixes=False)
    for token in tokens:
        if token.chunk_type in skip_chunk_types:
            continue
        token_text = get_token_text(token)
        if any(punct in token_text for punct in opening_puncts):
            sents_words.append(token_text.strip())
        elif any(punct in token_text for punct in closing_puncts):
            sents_words.append(token_text.strip())
            sents_words.append("\n")
        else:
            sents_words.append(token_text)

    sents_text = "".join(sents_words)

    for fr, to in r_replace:
        sents_text = re.sub(fr, to, sents_text)

    return sents_text


def remove_emojis(text):
    emojis_to_remove = ["1️⃣", "2️⃣", "3️⃣"]
    for emoji in emojis_to_remove:
        text = text.replace(emoji, "")
    return text


def sent_tokenize(text, lang) -> SENT_PER_LINE_STR:
    """Tokenize a text into sentences."""
    text = remove_emojis(text)

    if lang == "en":
        return en_sent_tokenizer(text)
    elif lang == "bo":
        return bo_sent_tokenizer(text)
    else:
        raise NotImplementedError

##### download.py

In [None]:
import subprocess
from pathlib import Path
from typing import Optional


ORG = "MonlamAI"


def clone_github_repo(
    repository,
    destination_folder: Path,
    organization: str = ORG,
):
    try:
        if not destination_folder.exists():
            _mkdir(destination_folder)
            repo_url = f"git@github.com:{organization}/{repository}.git"
            # Make a new folder in destination_folder and clone the repo there
            command = [
                "git",
                "clone",
                "--no-checkout",
                repo_url,
                str(destination_folder),
            ]
            subprocess.run(command, check=True)
    except subprocess.CalledProcessError as e:
        raise Exception(f"Failed to clone repository {repo_url}: {e}")
    except Exception as e:
        raise Exception(f"An error occurred while cloning repository {repo_url}: {e}")


def find_first_txt_file(folder_path: Path) -> Optional[Path]:
    folder = Path(folder_path)
    for file in folder.rglob("*.txt"):
        if file.is_file():
            return file
    raise FileNotFoundError(f"No .txt file found in folder {folder_path}")

##### pipeline.py

In [None]:
import argparse
import logging
import multiprocessing
from pathlib import Path
from typing import Optional

from tqdm import tqdm



log_fn = "errors.log"
error_id_log_fn = "error_ids.log"


logging.basicConfig(
    filename=str(log_fn),
    level=logging.ERROR,
    format="%(asctime)s - %(levelname)s - %(message)s",
)


def log_error_with_id(id_: str):
    """Log error message with ID to a separate file."""
    with open(error_id_log_fn, "a") as log_file:
        log_file.write(f"{id_}\n")


def get_file_content_by_lines(file_path):
    """
    Reads a file and returns its content split into lines.

    :param file_path: Path to the file to be read.
    :return: List of lines in the file.
    """
    file_path = Path(file_path)
    if file_path.exists() and file_path.is_file():
        with file_path.open("r") as file:
            return [line.strip() for line in file if line.strip()]
    else:
        raise FileNotFoundError(f"No file found at {file_path}")


def pipeline(
    file_path: Path, re_align: bool = False, alignment_version: Optional[str] = "v1"
):
    """
    file_path: a file containing ids of the repositories to be aligned
                ,ids should be separated by new lines
    re_align: if True, realign the ids with the specific version
    alignment_version: version you want to name for realign

    """

    ids = get_file_content_by_lines(file_path)

    """load progress"""
    id_checkpoints = load_checkpoint()
    files_tobe_aligned = []

    for id_ in tqdm(ids, desc="Processing IDs"):
        try:
            bo_id, en_id = f"BO{id_}", f"EN{id_}"

            """if id is already realigned with the specific version, skip it"""
            if re_align and is_id_already_realigned(
                id_, alignment_version, id_checkpoints
            ):
                continue

            """if id is already tokenized and aligned, skip it"""
            if not re_align and is_id_already_aligned(id_, id_checkpoints):
                continue

            """if id is not tokenized, tokenize it"""
            if not is_id_already_tokenized(id_, id_checkpoints):
                bo_file_path = BO_FILES_PATH / bo_id
                en_file_path = EN_FILES_PATH / en_id

                clone_github_repo(repository=bo_id, destination_folder=bo_file_path)
                clone_github_repo(repository=en_id, destination_folder=en_file_path)

                bo_file = find_first_txt_file(bo_file_path)
                en_file = find_first_txt_file(en_file_path)
                if bo_file and en_file:
                    tokenized_bo_file_path, tokenized_en_file_path = tokenize_files(
                        id_, bo_file, en_file
                    )
                    """save the id to checkpoint file for tokenization"""
                    save_checkpoint(id_, "Tokenization")

            tokenized_bo_file_path = TOKENIZED_FILES_PATH / f"tokenized_{bo_id}.txt"
            tokenized_en_file_path = TOKENIZED_FILES_PATH / f"tokenized_{en_id}.txt"

            if not re_align:
                alignment_version = None
            files_tobe_aligned.append(
                (
                    id_,
                    tokenized_bo_file_path,
                    tokenized_en_file_path,
                    alignment_version,
                )
            )

        except Exception as e:
            logging.error(f"{id_}: {e}")
            log_error_with_id(id_)
            continue
    num_processes = 10
    try:
        with multiprocessing.Pool(num_processes) as pool:
            pool.starmap(send_aligner_api_request, files_tobe_aligned)
    except Exception as e:
        logging.error(f"Alignment Failed {id_}: {e}")
        log_error_with_id(id_)


def tokenize_files(id_: str, bo_file: Path, en_file: Path):
    bo_id, en_id = f"BO{id_}", f"EN{id_}"
    """Tokenize the files"""
    tokenized_bo_text = sent_tokenize(bo_file.read_text(), lang="bo")
    tokenized_en_text = sent_tokenize(en_file.read_text(), lang="en")

    """Write both tokenized texts to files in TOKENIZED_FILES_PATH"""
    tokenized_bo_file_path = TOKENIZED_FILES_PATH / f"tokenized_{bo_id}.txt"
    tokenized_en_file_path = TOKENIZED_FILES_PATH / f"tokenized_{en_id}.txt"

    tokenized_bo_file_path.write_text(tokenized_bo_text)
    tokenized_en_file_path.write_text(tokenized_en_text)

    return tokenized_bo_file_path, tokenized_en_file_path


@execution_time(custom_name="sending api request")
def send_aligner_api_request(
    id_: str,
    tokenized_bo_file_path: Path,
    tokenized_en_file_path: Path,
    alignment_version: Optional[str] = None,
):
    
    print(f"Sending request to aligner for {id_}")

    response = send_api_request_to_aligner(
        id_, tokenized_bo_file_path, tokenized_en_file_path, alignment_version
    )
    if isinstance(response, dict) and "error" in response:
        raise Exception(response["error"])

    print(f"Alignment successful for {id_}")
    """save the id to checkpoint file"""
    save_checkpoint(id_, "Alignment")

    if alignment_version:
        """save the id to checkpoint file for re-alignment"""
        save_checkpoint(id_, "re_alignment", alignment_version)


    