In [None]:
#Full Tokenizer By Isaac Angulo Gomez

In [6]:
pip install regex

Collecting regex
  Using cached regex-2025.10.23-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
Using cached regex-2025.10.23-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (803 kB)
Installing collected packages: regex
Successfully installed regex-2025.10.23
Note: you may need to restart the kernel to use updated packages.


In [7]:
!pip install transformers

Collecting transformers
  Using cached transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
Collecting filelock (from transformers)
  Using cached filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Using cached huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting numpy>=1.17 (from transformers)
  Using cached numpy-2.3.4-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (62 kB)
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers)
  Using cached tokenizers-0.22.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.8 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Using cached safetensors-0.6.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting fsspec>=2023.5.0 (from huggingface-hub<1.0,>=0.34.0->transformers)
  Using cached fsspec-2025.10.0-py3-none-any.whl.metadata (10 kB)
Collecting hf-xet<2.0.0,>=1.1.3 (from huggingface-hub

In [9]:
#Used to detect whitespace, or accents
import unicodedata

#Used to support Unicode property classes
import regex as re

#Huggingface transformers BertTokenizer
from transformers import BertTokenizer

#For Dictionary
import collections

In [10]:
#downloads vocabulary
tok = BertTokenizer.from_pretrained("bert-base-uncased")

#vocabulary size
print(tok.vocab_size) 

30522


In [11]:
NEVER_SPLIT = {"[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"}

UNK = "[UNK]"
CLS = "[CLS]"
SEP = "[SEP]"
PAD = "[PAD]"
MASK = "[MASK]"

#regex pattern for unicode punctuation
_PUNC_RE = re.compile(r"([\p{P}])")

#Cleans the text

#determines if character is a whitespace
def _is_whitespace(ch):
    return ch in (" ", "\t", "\n", "\r") or unicodedata.category(ch) == "Zs"

#strips accents and returns string without accents
def _strip_accents(text):
    text = unicodedata.normalize("NFD", text)
    return "".join(ch for ch in text if unicodedata.category(ch) != "Mn")

#checks to see if character is a control or whitespace character
def _is_control(ch):
    cat = unicodedata.category(ch)
    return (cat.startswith("C") and ch not in ("\t", "\n", "\r"))

#rebuilds the text sting by removing control characters, null characters, or whitespaces, then returns it.
def _clean_text(text):
    out = []
    for ch in text:
        if ch == "\u0000" or _is_control(ch):
            continue
        out.append(" " if _is_whitespace(ch) else ch)
    return "".join(out)


In [12]:
#returns an ordered vocab dictionary
def load_vocab(vocab_file):
    vocab = collections.OrderedDict()
    with open(vocab_file, "r", encoding="utf-8") as f:
        for i, token in enumerate(f):
            token = token.rstrip("\n")
            vocab[token] = i
    return vocab

In [13]:
#Tokenizes usisng wodpiece tokenizer vocabulary
class WordpieceTokenizer:

    def __init__(self, vocab, unk_token = UNK, max_input_chars_per_word=100):
        self.vocab = vocab
        self.unk_token = unk_token
        self.max_input_chars_per_word = max_input_chars_per_word

    def tokenize(self,token):
        
        if len(token) > self.max_input_chars_per_word:
            return [self.unk_token]

        sub_tokens = []
        start = 0
        while start < len(token):
            end = len(token)
            cur_substring = None

            while start < end:
                substring = token[start:end]

                if start > 0:
                    substring = "##" + substring

                if substring in self.vocab:
                    cur_substring = substring
                    break

                end -= 1

            if cur_substring is None:
                return [self.unk_token]

            sub_tokens.append(cur_substring)

            start = end
            
        return sub_tokens
            

In [14]:
class FullTokenizer:

    def __init__(self, vocab_file, do_lower_case = True, never_split = None):
        self.vocab = load_vocab(vocab_file)
        self.inv_vocab = {v:k for k, v in self.vocab.items()}
        self.do_lower_case = do_lower_case
        self.never_split = set(NEVER_SPLIT if never_split is None else never_split)
        self.wordpiece_tokenizer = WordpieceTokenizer(self.vocab, unk_token= UNK)
        

    def tokenize(self, text):
        if not text:
            return []

        text = _clean_text(text)

        #make lower case and remove accents
        if self.do_lower_case:
            text = text.lower()
            text = _strip_accents(text)

        #split on whitespace and punctuation, keeping punctuation as token
        tokens = []
        for tok in text.strip().split():
            if tok in self.never_split:
                tokens.append(tok)
                continue
            parts = [p for p in _PUNC_RE.split(tok) if p and not p.isspace()]
            tokens.extend(parts)

        #wordpiece token list
        wp_tokens = []
        for t in tokens:
            if t in self.never_split:
                wp_tokens.append(t)
            else:
                wp_tokens.extend(self.wordpiece_tokenizer.tokenize(t))

        return wp_tokens

    def convert_tokens_to_ids(self, tokens):
        unk_id = self.vocab.get(UNK)

        return [self.vocab.get(t, unk_id) for t in tokens]

    def convert_ids_to_tokens(self, ids):
        return [self.inv_vocab[i] for i in ids]
    

In [15]:
def build_inputs_from_tokens(tokens_a, tokens_b = None, max_len = 512, pad_to_max = True, pad_token = PAD):
    tokens = [CLS] + tokens_a + [SEP]

    token_type_ids = [0] * len(tokens)

    if tokens_b:
        tokens += tokens_b + [SEP]
        token_type_ids += [1] * (len(tokens_b) + 1)

    if len(tokens) > max_len:
        tokens = tokens[:max_len]
        token_type_ids = token_type_ids[:max_len]

    attention_mask = [1] * len(tokens)

    if pad_to_max and len(tokens) < max_len:
        pad_len = max_len - len(tokens)
        tokens += [pad_token] * pad_len
        token_type_ids += [0] * pad_len
        attention_mask += [0] * pad_len

    return tokens, token_type_ids, attention_mask

def build_inputs_from_texts(tokenizer, text_a, text_b = None, max_len = 512):
    ta = tokenizer.tokenize(text_a)
    tb = tokenizer.tokenize(text_b) if text_b is not None else None
    tokens, token_type_ids, attention_mask = build_inputs_from_tokens(ta, tb, max_len=max_len)
    input_ids = tokenizer.convert_tokens_to_ids(tokens)

    return dict(
        input_ids = input_ids,
        token_type_ids = token_type_ids,
        attention_mask = attention_mask,
        tokens = tokens
    )

In [16]:
#Testing for Toekenization using fake vocab

fake_vocab = collections.OrderedDict({
    PAD:0, UNK:1, CLS:2, SEP:3, MASK:4,
    "un":5, "##believable":6, "!":7,
    "cats":8, "are":9, "##n":10, "'":11, "t":12,
    "dogs":13, ".":14
})

# Write fake vocab to temporary file
with open("fake_vocab.txt", "w", encoding="utf-8") as f:
    for tok in fake_vocab.keys():
        f.write(tok + "\n")

#Create teokenizer and test with String
tok = FullTokenizer("fake_vocab.txt", do_lower_case=True)
print(tok.tokenize("Unbelievable! Cats aren't Dogs."))
print(build_inputs_from_texts(tok, "Unbelievable! Cats aren't Dogs.", max_len=20))

['un', '##believable', '!', 'cats', 'are', '##n', "'", 't', 'dogs', '.']
{'input_ids': [2, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 3, 0, 0, 0, 0, 0, 0, 0, 0], 'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0], 'tokens': ['[CLS]', 'un', '##believable', '!', 'cats', 'are', '##n', "'", 't', 'dogs', '.', '[SEP]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]', '[PAD]']}


In [17]:
# Full Pre-Training Builder By Isaac Angulo Gomez
!pip install torch

Collecting torch
  Using cached torch-2.9.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting sympy>=1.13.3 (from torch)
  Using cached sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
Collecting networkx>=2.5.1 (from torch)
  Using cached networkx-3.5-py3-none-any.whl.metadata (6.3 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.8.93 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.8.93-py3-none-manylinux2010_x86_64.manylinux_2_12_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-runtime-cu12==12.8.90 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cuda-cupti-cu12==12.8.90 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.8.90-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.7 kB)
Collecting nvidia-cudnn-cu12==9.10.2.21 (from torch)
  Using cached nvidia_cudnn_cu12-9.10.2.21-py3-none-manylinux_2_27_x86_64.whl.metadata (1.8 kB)
Collecting nvidia

In [18]:
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import random
import numpy as np
import torch

In [19]:
@dataclass
class Instance:
    input_ids: List[int]
    token_type_ids: List[int]
    attention_mask: List[int]
    mlm_labels: List[int]
    nsp_lavel: int

In [20]:
#Shortern sequence by removing tokens from a or b until it fits the max length
def _truncate_seq_pair(tokens_a: List[int], tokens_b: List[int], max_len: int) -> Tuple[List[int], List[int]]:
    while len(token_a) + len(token_b) > max_len:
        if len(token_a) > len(token_b):
            if random.random() < 0.5:
                tokens_a.pop(0)
            else:
                token_a.pop()
        else:
            if random.random() < 0.5:
                token_b.pop(0)
            else:
                token_b.pop()
    return tokens_a, tokens_b

In [21]:
#Masks 15% of tokens
def _create_masked_lm(tokens, mask_token_in, pad_token_id, special_token_ids, vocab_size, mask_prob=.015):
    #get positions of tokesn that are not special tokens or pad tokens
    cand_positions = [i for i, t in enumerate(tokens) 
                      if t not in special_tokens_ids and t != pad_token_id]
    
    num_to_mask = max(1, int(round(len(cand_positions) * mask_prob))) if cand_positions else 0
    
    masked_positions = set(random.sample(cand_positions, num_to_mask)) if num_to_mask else set()

    mlm_labels = [-100] * len(tokens)
    new_tokens = list(tokens)

    for i in range(len(tokens)):
        for i in masked_positions:
            mlm_labels[i] = tokens[i]
            r = random.random()
            if r < 0.8:
                new_toknes[i] = mask_token_id
            elif r < 0.9:
                #Replace with random token
                for _ in range(10):
                    cand = random.randint(0, vocab_size - 1)
                    if cand not in special_token_ids and cand != pad_token_id:
                        new_tokens[i] = cand
                        break
                else:
                    new_tokens[i] = tokens[i]
            else:
                #Keep original
                new_tokens[i] = tokens[i]
        return new_tokens, mlm_labels

In [22]:
#Adds [CLS] and [SEP] and creates token_type_ids
def _pack(tokens_a, tokens_b, special_ids):
    cls_id, sep_id = special_ids["cls"], special["sep"]
    input_ids = [cls_id] + tokens_a + [sep_id] + tokens_b + [sep_id]
    token_type_ids = [0] * (len(tokens_a) + 2) + [1] * (len(tokens_b) + 1)
    return input_ids, token_type_ids

In [23]:
#Builds IsNext pair from document
def _make_isnext_pair(doc_sents, max_seq_len_no_special, shor_seq_prob):
    
    #Split artificially if len = 1
    if len(doc_sents) == 1:
        s = doc_sents[0]
        mid = max(1, len(s)//2)
        
        return s[:mid], s[mid:], 1

    #randomly split within document
    start = random.randrange(0,len(doc_sents)-1)
    token_a = list(doc_sents[start])
    i = start + 1
    while i < len(doc_sents)-1 and len(tones_a) < max_seq_len_no_special// 2 and random.random() < 0.5:
        tokens_a += doc_sents[i]
        i += 1

    tokens_b = list(doc-sents[i])
    i += i
    while i < len(doc_sents) and (len(tokens_a) + len(tokens_b)) < max_seq_len_no_special and random.random() < 0.7:
        tokens_b += doc_sents[i]
        i += 1

    if random.randm() < short_seq_prob:
        tarket = random.randint(2, max(2, max_seq_len_no_special//2))
        tokesn_a = tokens_a[:target//2]
        tokens_b = tokens_b[:target - len(tokens_a)]

    return tokens_a, tokens_b, 1

In [24]:
#Builds pair of sentences from two different documents so they dont go together
def _make_notnext_pair(all_docs, cur_doc_idx, max_seq_len_no_special, short_seq_prob):
    doc_a = all_docs[cur_doc_idx]
    start = random.randrange(0, len(doc_a))
    tokens_a = list(doc_a[start])
    i = start + 1

    while i < len(doc_a) and len(toknes) < max_seq_len_no_special//2 and random.random() < 0.5:
        tokens_a += doc_a[i]
        i += 1

    #pick pair from another document
    other_idx = cur_doc_idx

    if len(all_docs) > 1:
        while other_idx == cur_doc_idx: 
            other_idx = random.randrange(0, len(all_docs))

    doc_b = all_docs[other_idx]

    start_b = random.randrange(0, len(doc_b))
    tokens_b = list(doc_b[start_b])

    j = start_b + 1

    while j < len(doc_b) and (len(tokens_a) + len(token_b)) < max_seq_len_no_special and random.random() < 0.7:
        tokens_b += doc_b[j]
        j+= 1

    if random.random() < short_seq_prob:
        target = random.randint(2, max(2, max_seq_len_no_sepcial//2))
        token_a = tokens_a[:target//2]
        target_b = tokens_b[:target - len(tokens_a)]

    return tokens_a, tokens_b, 0

In [25]:
#Builds MLM + NSP istances for all documents
def build_pretraining_instance(tokenized_documents, special_ids, vocab_size, max_seq_len=128, short_seq_prob = 0.1, nsp_prob=0.5, mask_prob=0.15, seed=42):
    rng_state = random.getstate()
    random.seed(seed)
    instances = []
    max_seq_len_no_special = max_seq_len - 3
    special_token_ids = {special_ids["cls"], special_ids["sep"], special_ids["pad"], special_ids["mask"]}

    for d_idx, doc in enumerate(tokenized_documents):
        if not doc:
            continue

        for _ in range(max(1, len(doc))):
            is_next = random.random() < nsp_prob

            if(is_next):
                a, b, nsp = _make_isnext_pair(doc, max_seq_len_no_special, short_seq_prob)
            else:
                a, b, nsp = _make_notnext_pair(tokenized_documents, d_idx, max_seq_len_no_special, short_seq_prob)


            a, b = _truncate_seq_pair(a,b , max_seq_len_no_special)
            input_ids, token_type_ids = _pack(a,b, special_ids)
            attention_mas = [1] * len(input_ids)

            masked_ids, mlm_labels = _create_masked_lm(input_ids, masked_token_ids = special["mask"], pad_token_id = special_ids["pad"], specia_token_ids = special_token_ids, vocab_size = vocab_size, mask_prob = mask_prob)

            inst = Instantiate(masked_ids, token_type_ids, attention_mask, mlm_labels, nsp)
            instances.append(inst)

        random.setstate(rng_state)
        return instances
            

In [27]:
#wrapper for pretaining instances
class BertPretrainDatase(torch.utils.data.Dataset):
    def __init__(self, instances,pad_token_id,max_seq_len):
        self.instances = instances
        self.pad = pad_toke_id
        self.max_len = max_seq_len

    def __len__(self):
        return len(self.instances)

    def __getitem__(self, idex):
        inst = self.instance[idx]
        return {
            "inputs_ids": torch.tensor(inst.input_ids, dtype = torch.long),
            "token_type_ids": torch.tensor(inst.token_type_ids, dtype = torch.long),
            "attention_mask": torch.tensor(inst.attention_mask, dtype=torch.long),
            "mlm_labels": torch.tensor(inst.mlm_labels, dtype=torch.long),
            "nsp_label": torch.tensor(inst.nsp_label, dtype=torch.long),
        }

In [33]:
#Pad batch of variable length examples into uniform tensors
def bert_collate_fn(batch, pad_token_id, max_seq_len):
    bsz = len(batch)
    out = {}
    keys = ["input_ids", "token_type_ids", "attention_mask", "mlm_labels"]

    for k in keys:
        if k == "mlm_labels":
            paded = torch.full((bsz, max_seq_len), -100, dtype=torch.long)
        elif k == "input_ids":
            padded = torch.full((bsz, max_seq_len), pad_token_id, dtype = torch.long)
        else: 
            padded = torch.zeros((bsz, max_seq_len), dtype=torch.long)

        for i, items in enumerate(batch):
            x = item[k]
            L = min(len(x), max_seq_len)
            padded[i, :L] = x[:L]

        out[k] = padded

    out["nsp_label"] = torch.stach([item["nsp_label"] for item in batch]).view(-1)
    out["attention_mask"] = (out["input_ids"] != pad_token_id).long()
            