In [None]:
from Levenshtein import editops
import pandas as pd
import numpy as np
from itertools import groupby
from operator import itemgetter
from collections import Counter
import unicodedata
import random
import operator

### Load token-level alignments

Load token-level alignments between OCR and their corresponding corrections by a human. The file `ocrTokens.tsv` is the output from running `extract_ocr_alignments.ipynb`.

In [None]:
ocrdf = pd.read_csv("../../../resources/ocrTokens.tsv", sep="\t", names=["ocrtext", "humtext"])
ocrdf = ocrdf.drop_duplicates(subset=['ocrtext', 'humtext'])

In [None]:
ocrdf.head()

In [None]:
ocrdf.shape

### Capture transformations

Given an OCR'd token and its correction, capture the edit operations needed to go from one to the other.

In [None]:
def find_consecutive_ranges(ltrans):
    transchunks = []
    for k, g in groupby(enumerate(ltrans), lambda ix : ix[0] - ix[1]):
        transchunks.append(list(map(itemgetter(1), g)))
    return transchunks

def capture_transformations(humtext, ocrtext, transformations):
    observed_transformations = []
    
    allreplaces = True # True if all transformations are 'replace'
    for t in transformations:
        if t[0] != 'replace':
            allreplaces = False

    # Dictionary of human2ocr transformations, and of ocr2hum transformations
    dHum2OcrTrans = dict()
    dOcr2HumTrans = dict()
    for t in transformations:
        ttype = t[0]
        tcharhum = t[1]
        tcharocr = t[2]
        if tcharhum in dHum2OcrTrans:
            dHum2OcrTrans[tcharhum].append((tcharocr, ttype))
        else:
            dHum2OcrTrans[tcharhum] = [(tcharocr, ttype)]
        if tcharocr in dOcr2HumTrans:
            dOcr2HumTrans[tcharocr].append((tcharhum, ttype))
        else:
            dOcr2HumTrans[tcharocr] = [(tcharhum, ttype)]
        
    # Only 'replace' transformations:
    if allreplaces == True:
        humrepls = [t[1] for t in transformations] # Indices replaced in humtext
        ocrrepls = [t[2] for t in transformations] # Indices replaced in ocrtext
        humranges = find_consecutive_ranges(humrepls)
        ocrranges = find_consecutive_ranges(ocrrepls)
        
        for i in range(len(humranges)):
            newhum = ""
            newocr = ""
            for j in range(len(humranges[i])):
                newhum += humtext[humranges[i][j]]
                newocr += ocrtext[ocrranges[i][j]]
            if newhum or newocr:
                observed_transformations.append((newhum, newocr))
    
    # Other transformations too:
    else:
        # If there is a 'delete', we capture it from the ocr2hum dictionary
        for ocr in dOcr2HumTrans:
            newhum = ""
            newocr = ""
            try:
                newocr = ocrtext[ocr]
                if len(dOcr2HumTrans[ocr]) > 1 and any(tr_item[1] == 'delete' for tr_item in dOcr2HumTrans[ocr]):
                    for ch_trans in dOcr2HumTrans[ocr]:
                        newhum += humtext[ch_trans[0]]
                if newhum and newocr:
                    observed_transformations.append((newhum, newocr))
            except IndexError:
                pass
            
        # If there is an 'insert', we capture it from the hum2ocr dictionary    
        for hum in dHum2OcrTrans:
            newhum = ""
            newocr = ""
            try:
                newocr = humtext[hum]
                if len(dHum2OcrTrans[hum]) > 1 and any(tr_item[1] == 'insert' for tr_item in dHum2OcrTrans[hum]):
                    for ch_trans in dHum2OcrTrans[hum]:
                        newhum += ocrtext[ch_trans[0]]
                if newhum and newocr:
                    observed_transformations.append((newhum, newocr))
            except IndexError:
                pass

    return list(set(observed_transformations))

Two main resulting structures:

* `dTransformations`: dictionary where the keys are characters that have been transformed (from human to OCR token) at least once. The value is a list of characters representing all its corresponding transformations in the OCR'd tokens:

```
'b': ['u','t','t','r','h','o','o','h','h','t','h','h','n','h','h','h','l','h','h','h''h','h','h','L','h',...]
```

* `keepTransformedTokens`: dictionary of dictionaries where the outer keys are characters that have been transformed (from human to OCR token) at least once. The inner keys are all characters into which the outer keys have been transformed, with the inner values being a list of pair tuples, in which the first element is the human correction and the second token the OCR'd version of it:

```
{'b': {
    {'u': [('Public', 'Puulic'),
      ('Erebendary', 'Ereuendary'),
      ('Abigail', 'Auigail'),
      ('Cutbush', 'Cutuush'),
      ('Glebe', 'Gleue'),
      ('Venb', 'Venu'),
      ('Gibbs', 'Giubs'),
      ('Elbra', 'Elura'),
      ('Baber', 'Bauer')],
     't': [('Labour', 'Latour'),
      ('Rugby', 'Rugty'),
      ('Cumberland', 'Cumterland'),
      ('Webb', 'Webt'),
      ('Grubb', 'Grubt'),
      ('Barbour', 'Bartour'),
      ('Robert', 'Rotert'),
      ('Liberal', 'Literal')],
     ...},
...}
```

We take into consideration multiple character transformations, e.g.:
```
{'rn':
    {'m': [('Amsberg', 'Arnsberg'),
      ('Commonwealth', 'Cornmonweillh'),
      ('Harney', 'Hamey'),
      ('Palembang', 'Palernbang'),
      ('Duma', 'Durna'),
      ...},
    {'ni': [('Purnell', 'Puniell'),
      ('Whitbourne', 'Whitbounie'),
      ('Murnin', 'Muniin'),
      ('Pangborn', 'Pangboni'),
      ('Clibborn', 'Clibboni'),
      ...},
 ...}
```

In [None]:
dTransformations = dict()
keepTransformedTokens = dict()
for i, row in ocrdf.iterrows():
    ocrtext = str(row['ocrtext']).strip()
    humtext = str(row['humtext']).strip()
    edits = editops(humtext, ocrtext)
    captured_transformations = capture_transformations(humtext, ocrtext, edits)
    # we keep only pairs where we detect one type of transformation
    if len(captured_transformations) == 1:
        t = captured_transformations[0]
        if t[0] in keepTransformedTokens:
            if t[1] in keepTransformedTokens[t[0]]:
                keepTransformedTokens[t[0]][t[1]].append((humtext, ocrtext))
            else:
                keepTransformedTokens[t[0]][t[1]] = [(humtext, ocrtext)]
            dTransformations[t[0]].append(t[1])
        else:
            keepTransformedTokens[t[0]] = {t[1]: [(humtext, ocrtext)]}
            dTransformations[t[0]] = [t[1]]

### Create true variations pairs

We create true variation pairs if the transformation in them appears more than a `threshold` number of times in the aligned tokens (set to 1).

The result is `true_variations`, a dictionary in which keys are human-corrected tokens and their values are lists of their OCR'd tokens with errors:

```
{'Customline': ['Customlinc'],
 'Caspers': ['Caspcrs', 'Caspeis', 'Csspers', 'Cnspers', 'Caspera', 'Cappers', 'Gaspers'],
 'Porteous': ['Portcous', 'Portoous', "I'ortcous"],
 'Jagelman': ['Jagclman', 'Jagelmau', 'Jngelman', 'Jagelmnn'],
 ...}
```

These are all transformations that occur in the data.

In [None]:
threshold = 1

common_charpairs = dict()
for t in keepTransformedTokens:
    countTransf = keepTransformedTokens[t]
    dTransfInst = dict([(k, len(countTransf[k])) for k in countTransf if len(countTransf[k]) > threshold])
    if dTransfInst:
        common_charpairs[t] = list(dTransfInst.keys())
        
true_variations = dict()
for k in common_charpairs:
    for v in common_charpairs[k]:
        tok_tuples = keepTransformedTokens[k][v]
        for t in tok_tuples:
            if t[0] in true_variations:
                true_variations[t[0]].append(t[1])
            else:
                true_variations[t[0]] = [t[1]]

### Create false variations pairs

For each distinct human-corrected token, we artificially create as many false variation pairs as there are true variations. We create negative pairs by replacing 1-, 2-, or 3-gram characters in the human-corrected token by characters that have not been observed as a possible variation to this character in our original dataset.

In [None]:
def create_false_variations(numTransf, hum_token, dTransformations, allTransfKeys, allTransfValues, seenTuples):
    if numTransf == "random":
        random_subst_len = random.choice([1, 2, 3])
    else:
        random_subst_len = numTransfs
        
    parts = [hum_token[i : i + random_subst_len] for i in range(0, len(hum_token), random_subst_len)]
    random.shuffle(allTransfValues)
    random.shuffle(parts)
    keep_false_variation = ""
    if parts:
        for p in parts:
                for tv in allTransfValues:
                    
                    if len(tv) <= 3:
                        if not (p, tv) in seenTuples and not (tv, p) in seenTuples:
                            keep_false_variation = hum_token.replace(p, tv, 1)

                    if keep_false_variation.strip() != "":
                        break
                        
    return keep_false_variation

In [None]:
# maximum length of the transformed string, by default is set to 3
max_len_transformations = 3

transformations_keys = [transf for transf in list(dTransformations.keys()) if len(transf) <= max_len_transformations]
transformations_values = [transf for transf in list(dTransformations.values())]
transformations_values = list(set([item for sublist in transformations_values for item in sublist]))

seenTuples = set()
for k in dTransformations:
    for v in dTransformations[k]:
        seenTuples.add((k, v))
        seenTuples.add((v, k))

with open('ocr_posneg.tsv', mode='w') as fw:
    for hum_token in true_variations:
        for i in range(len(true_variations[hum_token])):
            truevar = true_variations[hum_token][i]
            falsevar = create_false_variations("random", hum_token, dTransformations, transformations_keys, transformations_values, seenTuples).strip()
            if not falsevar:
                falsevar = create_false_variations(1, hum_token, dTransformations, transformations_keys, transformations_values, seenTuples).strip()
            if truevar and falsevar:
                fw.write(hum_token.strip() + "\t" + truevar  + "\tTRUE\n")
                fw.write(hum_token.strip() + "\t" + falsevar + "\tFALSE\n")