In [None]:
#default_exp tokenizer
%reload_ext autoreload
%autoreload 2

# Tokenizer
> A simple tokenizer for concepts using Gensim

Tokenize words using Gensim. We wanted to avoid sub-word tokenization so that we can understand how the model lumps concepts together

In [None]:
#export
from pathlib import Path
from gensim.corpora import Dictionary
from gensim.utils import simple_preprocess
from gensim.models.phrases import Phrases, Phraser
import os
import regex as re
import string
from cached_property import cached_property
import numpy as np
from typing import *
from fastcore.test import *

## Preprocessing functions

We want tokens to deal with simple concepts, so we will enforce lowercase ASCII and predominantly split on spaces.

Our tokenization will work with "lines" -- that is, a sequence of text that can contain multiple sentences, paragraphs, and newlines. For cohesiveness, we want to split these to the sentence and word level.

In [None]:
line = """
Various prior work has demonstrated 100 weaknesses in these models — even highly accurate ones — including reliance on non-salient regions 
 or on background information only. Explanation methods help identify these pitfalls by providing explanations for model predictions, enabling humans to identify the features on which a model decision is based. However, these methods provide explanations on the image level making it challenging to understand global model behavior or dataset limitations."""

We first need to check that the line contains actual content and is not a binary string acting as an identifier in most files.

In [None]:
def is_good_line(line):
    """Check if the line is valid"""
    return (len(line) > 1) and ("\x00" not in line)

In [None]:
is_good_line(line)
assert is_good_line(line)
assert not is_good_line("\x00\x0033-thegreatdivide.txt\x00")
assert not is_good_line("")

Split a text by sentence according to the following regex pattern

In [None]:
#export
spattern = re.compile(r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s")

def line2sentences(line):
    """Convert a line into sentences, """
    line = line.replace('\n', ' ').strip().lower()
    return spattern.split(line)

In [None]:
sentences = line2sentences(line); sentences

['various prior work has demonstrated 100 weaknesses in these models — even highly accurate ones — including reliance on non-salient regions   or on background information only.',
 'explanation methods help identify these pitfalls by providing explanations for model predictions, enabling humans to identify the features on which a model decision is based.',
 'however, these methods provide explanations on the image level making it challenging to understand global model behavior or dataset limitations.']

Once we have a sentence, we want to strip all punctuation and unicode

In [None]:
#export
def strip_punc_unicode(line):
    """Strip all punctuation and unicode from the line"""
    line = line.translate(str.maketrans('', '', string.punctuation))
    line = ''.join([c for c in line if c.isascii()])
    return line

In [None]:
proc_sentences = [strip_punc_unicode(s) for s in sentences]; proc_sentences

['various prior work has demonstrated 100 weaknesses in these models  even highly accurate ones  including reliance on nonsalient regions   or on background information only',
 'explanation methods help identify these pitfalls by providing explanations for model predictions enabling humans to identify the features on which a model decision is based',
 'however these methods provide explanations on the image level making it challenging to understand global model behavior or dataset limitations']

And remove all instances where there are multiple spaces

In [None]:
#export
space_pat = re.compile("\s+")

def remove_multiple_spaces(sentence):
    return space_pat.sub(" ", sentence)

In [None]:
proc_sentences = [remove_multiple_spaces(s) for s in proc_sentences]; proc_sentences

['various prior work has demonstrated 100 weaknesses in these models even highly accurate ones including reliance on nonsalient regions or on background information only',
 'explanation methods help identify these pitfalls by providing explanations for model predictions enabling humans to identify the features on which a model decision is based',
 'however these methods provide explanations on the image level making it challenging to understand global model behavior or dataset limitations']

Before we have our tokens, we will define the concept of 'number' as any ASCII token that contains a digit

In [None]:
#export
def isnum(token):
    return any(t.isdigit() for t in token)

Compiling all these steps into a single function

In [None]:
#export
def process_line(line):
    """Compose all transformations to process a line into tokens as desired"""
    sents = line2sentences(line)
    out = []
    for s in sents:
        x = strip_punc_unicode(s)
        x = remove_multiple_spaces(x)
        xs = x.split()
        xs = [x_ if not isnum(x_) else "<NUM>" for x_ in xs]
        out.append(xs)

    return out

In [None]:
tokens = process_line(line); print(tokens[0])

['various', 'prior', 'work', 'has', 'demonstrated', '<NUM>', 'weaknesses', 'in', 'these', 'models', 'even', 'highly', 'accurate', 'ones', 'including', 'reliance', 'on', 'nonsalient', 'regions', 'or', 'on', 'background', 'information', 'only']


In [None]:
def process_tok(x, num_tok="xxNUMxx", stop_tok="xxSTOPxx", stopwords=[]):
    """Process a token by replacing numbers and stop tokens with the desired special tokens"""
    if isnum(x):
        return num_tok
    elif x in stopwords:
        return stop_tok
    return x.strip()

In [None]:
test_eq(process_tok(" "), "")
test_eq(process_tok("abc88"), "xxNUMxx")
test_eq(process_tok("993"), "xxNUMxx")
test_eq(process_tok("the", stopwords=["the", "a", "but"]), "xxSTOPxx")
test_eq(process_tok("   lotsofspace "), "lotsofspace")

In [None]:
[process_tok(t, stopwords=["the", "in", "on", "or", "has"]) for t in tokens[0]]

['various',
 'prior',
 'work',
 'xxSTOPxx',
 'demonstrated',
 '<NUM>',
 'weaknesses',
 'xxSTOPxx',
 'these',
 'models',
 'even',
 'highly',
 'accurate',
 'ones',
 'including',
 'reliance',
 'xxSTOPxx',
 'nonsalient',
 'regions',
 'xxSTOPxx',
 'xxSTOPxx',
 'background',
 'information',
 'only']

And now we can convert an entire file to tokens (naively loading everything into memory)

In [None]:
#export
def file2tokens(fname):
    """Convert a file of text into tokenized sentences"""
    with open(fname, 'r', encoding='utf8') as fp:
        chunk = fp.readlines()
        tokenized = []
        for line in chunk:
            if is_good_line(line):
                tokenized += process_line(line)
        return tokenized


# The Tokenizer
> Collecting all the helper functions underneath a single class

In [None]:
#hide
#export
PATCH_DICT = {
    "<UNK>": 0,
    "<NUM>": 1,
}

In [None]:
#export
class GensimTokenizer:
    def __init__(self, dictionary, phraser=None, patch_dict=PATCH_DICT):
        """Wrap a Gensim Dictionary, phrase detector, and special tokens for creating tokenization from OWT
        
        Args:
            dictionary: The gensim dictionary mapping vocabulary to IDs and back
            phraser: If provided, use gensim's phrase detector to lump common concepts together
            patch_dict: Patch the dictionary with special tokens
        """
        self.dictionary = dictionary
        self.phraser = Phrases([[]]) if phraser is None else phraser
        self.patch_dict = patch_dict

    @classmethod
    def from_file(cls, dict_fname, phraser_fname=None):
        """Load tokenizer information from a dictionary file (generated by gensim dictionary.save) and a phraser file."""
        d = Dictionary.load(str(dict_fname))
        if phraser_fname is not None:
            p = Phraser.load(phraser_fname)
        else:
            print("No phraser specified. Proceeding without phrases")
            p = Phraser(Phrases([[]]))
            
        return cls(d, p)

    def add_document_from_fname(self, fname):
        """For training, add the contents of a text file to the dictionary"""
        print(f"Adding {fname}")
        tokens = self.phraser[file2tokens(fname)]
        self.dictionary.add_documents(tokens)

    def add_to_phraser_from_fname(self, fname):
        """Detect common phrases from fname for bigramming purposes"""
        print(f"Adding {fname} to phraser")
        tokens = file2tokens(fname)
        self.phraser.add_vocab(tokens)

    def get_dictionary(self):
        return self.dictionary

    def token2id(self, word):
        """Convert a token into an id, converting to UNK ID as necessary"""
        d = self.dictionary
        return d.token2id.get(word, d.token2id["<UNK>"])

    def tokens2ids(self, tokens):
        """Convert a list of tokens into ids, converting to UNK as necessary"""
        return [self.token2id(tok) for tok in tokens]

    def tokenize(self, s:str):
        """Convert a sentence into its tokens"""
        return self.phraser[process_line(s)[0]]

    def tokenize_batch(self, lines:List[str]):
        """Convert a batch of lines into their tokens"""
        return self.phraser[[process_line(line)[0] for line in lines]]

    def encode(self, s):
        """Encode a single sentence into IDs"""
        sent_tokens = self.tokenize(s)
        return self.tokens2ids(sent_tokens)

    def decode(self, ids):
        """Alias for `ids2tokens`"""
        return self.ids2tokens(ids)

    def id2token(self, id):
        """Convert an id to a token"""
        d = self.dictionary
        if id == -1: return "<STOPWRD>" # Account for post processing
        return d[id] # Add error handling if bad id

    def ids2tokens(self, ids):
        """Convert iterable of ids to tokens"""
        return [self.id2token(id) for id in ids]

    def set_outdir(self, outdir):
        """Useful when training in parallel. If set, will save contents to outdir"""
        self.outdir = Path(outdir)

    def patch(self, vocab_size, new_vocab, no_below=15, no_above=0.8):
        """Patch the tokenizer with a manually specified list of tokens, after training"""
        
        print("Patching with special tokens...")
        self.dictionary.patch_with_special_tokens(self.patch_dict)
        print("Filtering vocabulary...")
        self.dictionary.filter_extremes(no_below=no_below, no_above=no_above, keep_n=vocab_size)

        print(f"Adding {len(new_vocab)} new words to dictionary...")
        new_vocab = self.tokenize_batch(new_vocab)
        self.dictionary.add_documents(new_vocab)
        print(f"Done patching. New vocab size = {self.n_vocab()}")
        return new_vocab

    def save(self, outfile):
        self.dictionary.save(outfile)

    def n_vocab(self):
        return len(self.vocab)
    
    @cached_property
    def vocab(self):
        return self.dictionary.keys()

    def __len__(self):
        return self.n_vocab()

    def encode_sentences_from_fname(self, fname):
        """Tokenize all the sentences from a text file"""
        outlist = []
        ind_offsets = []
        new_start = 0

        with open(fname, 'r') as fp:
            for line in fp.readlines():
                if is_good_line(line):
                    sents = self.phraser[process_line(line)]
                    for sent in sents:
                        ids = self.tokens2ids(sent)
                        outlist += ids
                        new_start = new_start + len(ids)
                        ind_offsets.append(new_start)

        return np.asarray(outlist, dtype=np.int32), np.asarray(ind_offsets, dtype=np.uint64)

    def encode_and_save_for_mp(self, fname):
        """Save sentences from fname. Needed because a local function can't be used with the MP module"""
        if self.outdir is None: raise ValueError("Please `set_outdir` first")

        fname = Path(fname)

        idarr_outfile = self.outdir / (fname.stem + '.npy')
        ind_offsets_outfile = self.outdir / (fname.stem + '_offsets.npy')
        idarr, ind_offsets = self.encode_sentences_from_fname(fname)
        np.save(idarr_outfile, idarr)
        np.save(ind_offsets_outfile, ind_offsets)

The `GensimTokenizer` is a simple wrapper around gensim's `Dictionary` and `Phraser` classes that aligns them with our simple tokenization rules. Assuming you have a saved Gensim tokenization, you can use the model as follows:

In [None]:
vocab = "../data/tokenizer/gensim1_patched.dict"
tok = GensimTokenizer.from_file(vocab)
tokens = ["apple", "pie", "is", "delicious"]
ids = tok.tokens2ids(tokens); ids

No phraser specified. Proceeding without phrases


[2563, 17862, 17, 8073]

In [None]:
tok.ids2tokens(ids)

['apple', 'pie', 'is', 'delicious']

# Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted 01_Tokenizer.ipynb.
Converted index.ipynb.
