In [106]:
import tensorflow as tf
print (tf.__version__)

In [181]:
import collections
import itertools
import os
import re

from functools import partial
from typing import List, Tuple, Union, NewType, OrderedDict, Counter, Iterator

import matplotlib.pyplot as plt
import nltk
import numpy as np
import sklearn
import sklearn.model_selection as model_selection  # TODO: remove and use predefined splits
import tqdm

from tensorflow import keras

import ocr_input

nltk.download('stopwords')

%matplotlib inline
plt.rcParams["figure.figsize"] = (12, 9)


In [258]:
InputText = NewType('InputText', Union[str, List[str]])
Label = NewType('Label', int)
DocumentRecord = NewType('DocumentRecord', Tuple[InputText, Label])
Dataset = NewType('Dataset', List[DocumentRecord])

Token = NewType('Token', str)
Vocabulary = NewType('Vocabulary', OrderedDict[Token, int])

class_names = {'invoice': 0, 'form': 1, 'email': 2, 'handwritten': 3, 'advertisement': 4}
NUM_CLASSES = len(class_names)

STOP_WORD_S = set(nltk.corpus.stopwords.words('english'))
random_seed = 42

# Load the dataset

In [109]:
def get_dataset() -> Dataset:
    dataset_path = r".\dataset\\ocr"

    all_files = os.listdir(dataset_path)
    doc_ocr_d = {file: content for file, content in tqdm.tqdm(zip(map(lambda f: os.path.splitext(f)[0], all_files), 
                                                                  map(ocr_input.parse_xml, map(lambda p: os.path.join(dataset_path, p), all_files))),
                                                                  total=len(all_files))}

    with open(r".\dataset\label.txt", "r") as fp:
        label_d = {file: int(label) for file, label in map(lambda line: line.split(','), fp.readlines())}

    return [(doc_ocr_d[file], label) for file, label in label_d.items()]

# TODO: remove and use predefined splits
def split_dataset(dataset: Dataset, test_size: float = 0.2, random_seed: int = random_seed) -> Tuple[List[InputText], List[InputText], List[Label], List[Label]]:
    return model_selection.train_test_split(*zip(*dataset), test_size=test_size, random_state=random_seed)
    

dataset = get_dataset()
x_train, x_test, y_train, y_test = split_dataset(dataset)

In [110]:
print(len(dataset))
print(len(x_train), len(x_test))


# Study the vocabulary

In [230]:
# Naive vocabulary counting: splitting on space character

# Conventions:
# index 0 is reserved for unknown tokens that will be mapped to `__UNK__`.
# other special token come just after (eg. `__NUM__` for numbers).
# other classic token are inserted in order for reverse dictionnary purpose.
__UNK__ = '__UNK__'

# always put __UNK__ first when redefining special char.
DEFAULT_SPECIALS = [__UNK__]

def unknown_wrapped(f):
    def wrapped(text, vocabulary=None):
        gen = f(text)

        if vocabulary is None:
            yield from gen
        else:
            for token in gen:
                if token not in vocabulary:
                    yield __UNK__
                else:
                    yield token
    
    return wrapped


@unknown_wrapped
def basic_tokenizer(text: str) -> Iterator[Token]:
    yield from text.split(" ")


def no_preprocess(text: str) -> str:
    return text


def compute_vocabulary(input_text: List[InputText],
                       max_size=1000,
                       tokenize_f=basic_tokenizer,
                       specials=DEFAULT_SPECIALS,
                       preprocess_f=no_preprocess) -> Tuple[Vocabulary, Counter[Token]]:
    word_occurences_d = collections.Counter(i for i in itertools.chain(*map(tokenize_f,        # Split into token
                                                                            map(preprocess_f,  # Preprocess text before tokenization
                                                                                x_train)))
                                            if i)                                              # Filter out empty strings


    # compute number of missing special tokens in the word occurences
    no_missing_special = sum(1 for sp in specials if not sp in word_occurences_d)
    vocabulary = collections.OrderedDict([(word, i) for i, (word, _) in enumerate(word_occurences_d.most_common(max_size - no_missing_special), no_missing_special)])

    i = 1
    for sp in reversed(specials):
        if sp not in vocabulary:
            vocabulary[sp] = no_missing_special - i
            vocabulary.move_to_end(sp, last=False)
            i += 1

    word_tokenizer_f = partial(tokenize_f, vocabulary=vocabulary)

    print(f"With a vocabulary of size {max_size}, you cover {sum(word_occurences_d[t] for t in vocabulary) / sum(i for i in word_occurences_d.values()) * 100:0.2f}%")

    return vocabulary, word_tokenizer_f, word_occurences_d

In [229]:
def plot_vocabulary(vocabulary, n=1000):
    plt.plot(list(range(n)), [i for _, i in vocabulary.most_common(n)])

    plt.title(f"Évolution du nombre d'occurrence des {n} tokens les plus fréquents")
    plt.show()

def plot_accumulated_vocabulary(vocabulary, n=1000):
    total_tokens = sum(vocabulary.values()) / 100
    plt.plot(list(range(n)), list(itertools.accumulate(i / total_tokens for _, i in vocabulary.most_common(n))))

    plt.title(f"Évolution du nombre d'occurrences cumulé des {n} tokens les plus fréquents rapporté au nombre total de tokens")
    plt.show()

In [231]:
vocabulary, _, word_count = compute_vocabulary(x_train, max_size=10**4)
plot_vocabulary(word_count, n=10000)
plot_accumulated_vocabulary(word_count, n=10000)
print(len(word_count))

print(list(vocabulary.items())[101:300])

## We must reduce vocabulary size

In [232]:
def regex_preprocess(text: str) -> str:
    text = re.sub(r"([.?,!:])", r" \1 ", text.lower())  # Add an extra space before punctuation (usefull in english)
    return re.sub(r"[^ a-zA-Z0-9.?,!:$£€@#/\-\+\\\*]", " ", text)  # Remove any non basic character


@unknown_wrapped
def stop_word_tokenizer(text: str) -> Iterator[Token]:
    for word in text.split(" "):
        if not word in STOP_WORD_S:
            yield word

ALPHA = {i for i in "azertyuiopqsdfghjklmwxcvbnAZERTYUIOPQSDFGHJKLMWXCVBN"}
DIGIT = {i for i in "1234567890"}
PUNCT = {i for i in r".?,!:$£€@#/-+\*"}


__NUM__ = "__NUM__"
__PUN__ = "__PUN__"
__MIX__ = "__MIX__"
MY_SPECIALS = [__UNK__, __NUM__, __PUN__, __MIX__]


@unknown_wrapped
def special_tokenizer(text: str) -> Iterator[Token]:
    for word in text.split(" "):
        if not word in STOP_WORD_S:
            if all(c in ALPHA for c in word):
                yield word
            elif all(c in DIGIT for c in word):
                yield __NUM__
            elif all(c in PUNCT for c in word):
                yield __PUN__
            else:
                yield __MIX__


vocabulary, _, word_count = compute_vocabulary(x_train, max_size=10**2, tokenize_f=special_tokenizer, preprocess_f=regex_preprocess, specials=MY_SPECIALS)
plot_vocabulary(word_count, n=10000)
plot_accumulated_vocabulary(word_count, n=10000)
print(len(word_count))

# Basic Model: Bag of Words


In [238]:
VOCABULARY_SIZE = 10**3

vocabulary, tokenizer_f, _ = compute_vocabulary(x_train, max_size=VOCABULARY_SIZE, tokenize_f=special_tokenizer, preprocess_f=regex_preprocess, specials=MY_SPECIALS)

In [239]:
print(vocabulary)
vectorizer = sklearn.feature_extraction.text.CountVectorizer(vocabulary=vocabulary, tokenizer=tokenizer_f, preprocessor=regex_preprocess, analyzer="word")

print(vectorizer.get_feature_names())
print(regex_preprocess(x_train[0]))
print(vectorizer.fit_transform(x_train[:1]))

In [255]:
model = keras.models.Sequential([
    keras.layers.Dense(input_dim=VOCABULARY_SIZE, units=128, activation="relu"),
    keras.layers.Dense(units=1024, activation="tanh"),
    keras.layers.Dense(units=1024, activation="tanh"),
    keras.layers.Dense(units=NUM_CLASSES, activation="softmax"),
])

optimizer = keras.optimizers.Adam(learning_rate=0.001)

model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["sparse_categorical_accuracy"])

model.summary()

In [256]:
hist = model.fit(vectorizer.fit_transform(x_train[:-100]), y_train[:-100], epochs=30, batch_size=64, validation_data=(vectorizer.fit_transform(x_train[-100:]), y_train[-100:]), verbose=2)

In [257]:
model.evaluate(vectorizer.fit_transform(x_test), y_test, verbose=2)

y_pred = model.predict_classes(vectorizer.fit_transform(x_test))

print(class_names.keys())
print(sklearn.metrics.confusion_matrix(y_test, y_pred))

# A bit more complex: Recurrent Neural Networks and Long-Short Term Memory

In [269]:
VOCABULARY_SIZE = 10**3
EMBEDDING_SIZE = 64
MAX_SEQ_LEN = 10**3

# vocabulary, tokenizer_f, _ = compute_vocabulary(x_train, max_size=VOCABULARY_SIZE, tokenize_f=special_tokenizer, preprocess_f=regex_preprocess, specials=MY_SPECIALS)
vectorizer = lambda text, vocabulary=vocabulary: map(lambda token, vocabulary=vocabulary: vocabulary[token], special_tokenizer(text, vocabulary))
print(*vectorizer(x_test[0]), sep="\n")

In [261]:
model = keras.models.Sequential([
    keras.layers.Embedding(VOCABULARY_SIZE, EMBEDDING_SIZE, input_length=MAX_SEQ_LEN),
    keras.layers.Bidirectional(keras.layers.LSTM(128)),
    keras.layers.Dense(units=NUM_CLASSES, activation="softmax")
])

optimizer = keras.optimizers.Adam(learning_rate=0.001)

model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["sparse_categorical_accuracy"])

model.summary()

In [262]:
hist = model.fit(vectorizer.fit_transform(x_train[:-100]), y_train[:-100], epochs=30, batch_size=64, validation_data=(vectorizer.fit_transform(x_train[-100:]), y_train[-100:]), verbose=2)