# Form Data-Schema


In [None]:
import os
import re
import cv2
import json
import pandas as pd
import numpy as np

from elasticsearch import Elasticsearch
from PIL import Image, ImageOps
from matplotlib import pyplot as plt
from matplotlib import patches
from matplotlib import colormaps
from pathlib import Path

In [None]:
from scripts.search import *

In [None]:
docs = pd.read_csv('./data/forms.csv.gz')
taxonomy = docs.fillna('').apply(lambda r:f"{r['type']}{r['sub']}", axis=1).to_list()

In [None]:
pages = pd.read_csv('./data/pages.csv.gz')
median = pages.median(numeric_only=True)
DW, DH, DS = np.round(median.loc[['word-width','word-height','space']], 4)
DW, DH, DS

In [None]:
BOX = ['left','top','right','bottom']

In [None]:
eclient = Elasticsearch(
    hosts=[os.environ['ELASTIC_URI']],
    basic_auth=('elastic', os.environ['ELASTIC_PASSWORD']),
    verify_certs=False
)

In [None]:
#eclient.search(index=INDEX, query={'match_all': {}})['hits']['hits']

In [None]:
def get_docs(taxonomy):
    """
    retrieve all docs of the type
    """
    query = {'bool': {'must': [{'match': {'taxonomy_id': taxonomy}}, {'match': {'block_type': 'input'}}]}}
    aggs = {'docs': {'terms': {'field': 'doc_id'}}}
    return eclient.search(index=INDEX, query=query, aggs=aggs)['aggregations']['docs']['buckets']


In [None]:
def get_docs(taxonomy):
    query = {'bool': {'must': [{'match': {'taxonomy_id': taxonomy}}, {'match': {'block_type': 'input'}}]}}
    aggs = {'docs': {'terms': {'field': 'doc_id'}}}
    return eclient.search(index=INDEX, query=query, aggs=aggs)['aggregations']['docs']['buckets']


In [None]:
tx = np.random.choice(taxonomy)
tx

In [None]:
docs = get_docs(tx)
docs

In [None]:
def find_inputs(doc, input_type=None, size=100):
    must = [{'match': {'doc_id': doc}}, {'match': {'block_type': 'input'}}]
    if input_type is not None:
        must.append({'match_phrase': {'content': {'query': input_type, 'slop': 5 }}})
    query = {'bool': {'must': must }}
    sort = [{'page_id': {'order': 'asc'}}, {'top': {'order': 'asc'}}, {'left': {'order': 'asc'}}]
    return [x['_source'] for x in eclient.search(index=INDEX, query=query, sort=sort, size=size)['hits']['hits']]


In [None]:
doc = np.random.choice([d['key'] for d in docs])
print(doc)
inputs = find_inputs(doc, size=100)
for hit in inputs:
    print(f"Page: {hit['page_id']:<3} {hit['content']}")

In [None]:
page = np.random.choice([hit['page_id'] for hit in inputs])
page

In [None]:
def get_page_content(doc, page, size=1000):
    query = {'bool': {'must': [{'match': {'doc_id': doc }}, {'match': {'page_id': page }}]}}
    sort = [{'top': {'order': 'asc'}}, {'left': {'order': 'asc'}}]
    return [x['_source'] for x in eclient.search(index=INDEX, query=query, sort=sort, size=size)['hits']['hits']]


    # doc, page = 'irs-fw3', 1
    # doc, page = 'irs-f2439', 6
    # doc, page = 'que-TP-930-V.en', 2 # nested inputs
    # doc, page = 'cnd-l600-a.fr', 1 # input filter example
    # doc, page = 'cnd-t2sch125.en', 0 # missing logo at BR corner


In [None]:
doc, page = 'que-TP-930-V.en', 2

In [None]:
def show_bbox_overlay(source, data, ax):
    image = ImageOps.grayscale(Image.open(f'data/images/{source}.png'))
    scale = min(image.size)
    image = np.array(image)
    ax.imshow(image, 'gray')
    for d in data:
        w, h = (d['right'] - d['left']) * scale, (d['bottom'] - d['top']) * scale
        x, y = d['left'] * scale, d['top'] * scale
        color = {'input':'red','word':'orange','line':'gold','block':'yellow','image':'cyan'}[d['block_type']]
        if d['block_type'] == 'block':
            ax.add_patch(patches.Rectangle((x - 5, y - 5), w + 10, h + 10,
                                           linewidth=1, edgecolor=color, facecolor='none'))
        else:
            ax.add_patch(patches.Rectangle((x, y), w, h, linewidth=1, edgecolor=color, facecolor='none'))
        

In [None]:
data = get_page_content(doc, page, size=10000)
fig, ax = plt.subplots(figsize=(11, 11))
show_bbox_overlay(f'{doc}-{page}', data, ax)
plt.axis('off')
plt.show()

In [None]:
inputs = pd.read_csv(f'./data/inputs/{doc}-{page}.csv.gz')
data = get_page_content(doc, page, size=10000)
data = pd.DataFrame.from_dict(data)
data = data.loc[data['block_type']!='input']
image = ImageOps.grayscale(Image.open(f'data/images/{doc}-{page}.png'))
scale = min(image.size)
image = np.array(image)

fig, ax = plt.subplots(figsize=(11, 11))
ax.imshow(image, 'gray')
for d in data.to_dict('records'):
    w, h = (d['right'] - d['left']) * scale, (d['bottom'] - d['top']) * scale
    x, y = d['left'] * scale, d['top'] * scale
    color = {'input':'red','word':'darkorange','line':'gold','block':'yellow','image':'cyan'}[d['block_type']]
    if d['block_type'] == 'word':
        ax.add_patch(patches.Rectangle((x, y), w, h, linewidth=0, edgecolor='none', facecolor='gold'))        
for d in inputs.to_dict('records'):
    w, h = (d['right'] - d['left']) * scale, (d['bottom'] - d['top']) * scale
    x, y = d['left'] * scale, d['top'] * scale
    ax.add_patch(patches.Rectangle((x + 3, y + 3), w - 6, h - 6,
                                   linewidth=1, edgecolor='crimson', facecolor='none'))        
plt.axis('off')
plt.show()

In [None]:
    print(len(inputs))
    inputs = inputs.loc[inputs['field_type_string']!='Button']
    hidden = inputs.loc[(inputs['field_type_string']=='Text')&(inputs['right'] - inputs['left'] < DH)]
    inputs = inputs.loc[~inputs.index.isin(hidden.index)]
    #if len(inputs) == 0:
    #    return
    print(len(inputs))
    # get content from page (already indexed)
    #data = get_page_content(doc['file'], inputs.iloc[0]['page'], size=10000)
    #if len(data) == 0:
    #    return
    data = pd.DataFrame.from_dict(data)
    print(len(data))
    data = data.loc[data['block_type']!='input']
    print(len(data))
    
    #columns = ['content','block_type','left','top','right','bottom']
    data = pd.DataFrame.from_dict(data)[['content','block_type'] + BOX]

    # build low-res word-presence map to detect overlap easily
    M = np.round(data[data['block_type']=='word'].loc[:,BOX] * 100).astype(int)
    #if len(M) == 0:
    #    return
    W, H = M[['right','bottom']].max().astype(int)
    matrix = np.zeros((H, W))
    for d in M.to_dict('records'):
        matrix[int(d['top']) + 1:int(d['bottom']), int(d['left']) + 1:int(d['right'])] = 1
    nested = []
    test = np.round(inputs.loc[:,BOX] * 100).astype(int)
    for i in inputs.index: # if input space is already occupied -- it's nested
        l, t, r, b = test.loc[i,:].values
        if np.any(matrix[int(t) + 1:int(b), int(l) + 1:int(r)]):
            nested.append(i)
    # filter-out nested
    inputs = inputs.loc[~inputs.index.isin(nested)]
    print(len(inputs))
    

In [None]:
fig, ax = plt.subplots(figsize=(11, 11))
ax.imshow(image, 'gray')
for d in data.to_dict('records'):
    w, h = (d['right'] - d['left']) * scale, (d['bottom'] - d['top']) * scale
    x, y = d['left'] * scale, d['top'] * scale
    color = {'input':'red','word':'darkorange','line':'gold','block':'yellow','image':'cyan'}[d['block_type']]
    if d['block_type'] == 'word':
        ax.add_patch(patches.Rectangle((x, y), w, h, linewidth=0, edgecolor='none', facecolor='orange'))        
for d in inputs.to_dict('records'):
    w, h = (d['right'] - d['left']) * scale, (d['bottom'] - d['top']) * scale
    x, y = d['left'] * scale, d['top'] * scale
    ax.add_patch(patches.Rectangle((x + 3, y + 3), w - 6, h - 6,
                                   linewidth=1, edgecolor='crimson', facecolor='none'))        
plt.axis('off')
plt.show()

Observation: query works but needs correction (may miss some words, especially if the label sits on the right side) -- best strategy depends on the input-type.

### Algorithmic approach
This based on retrieving the text from the areas which "look like" they might be the labels.

In [None]:
def build_map(data, value, scale=100):
    D = data.copy()
    D.loc[:,['left','top','right','bottom']] = (D[['left','top','right','bottom']] * scale).astype(int)
    #D.loc[:,['left','top']] = (D[['left','top']] * scale).astype(int)
    #D.loc[:,['right','bottom']] = np.round(D[['right','bottom']] * scale).astype(int)

    w, h = D[['left','top']].min().astype(int)
    W, H = D[['right','bottom']].max().astype(int)
    matrix = np.zeros((H + h, W + w))
    for d in D.to_dict('records'):
        matrix[int(d['top']):int(d['bottom']) + 1, int(d['left']):int(d['right']) + 1] = value[d['block_type']]
    return matrix[h:,w:], (h, w)
        

In [None]:
todo = pd.read_csv('data/todo.csv.gz')
d = np.random.choice(todo[(todo['missing'] > 0)&(todo['missing'] < 1)].to_dict('records'))
doc, page = d['file'], int(d['page'])
print(d)
doc, page = 'que-TP-930-V.en', 2

In [None]:
data = get_page_content(doc, page)
columns = ['content','block_type','left','top','right','bottom']
data = pd.DataFrame.from_dict([x['_source'] for x in data]).loc[:,columns]

value = {'block':0.33, 'word':0.66, 'input':0.99}
matrix, (h, v) = build_map(data, value, scale=100)

fig, ax = plt.subplots(1, 2, figsize=(10,10))
show_bbox_overlay(doc, page, data, ax[0])
ax[0].set_title('Originall HD image')
ax[0].set_axis_off()
cmap = colormaps['Oranges']
for k,v in value.items(): ax[1].scatter([-10], [10], color=cmap(v), marker='s', s=30, label=k)
ax[1].imshow(matrix, 'Oranges')
ax[1].set_title('Matrix: low-res. map')
ax[1].legend(bbox_to_anchor=(1.3, 1), frameon=False)
plt.show()

In [None]:
    def get_sequence(words, h, w):
        # locate the word which satisfies condition
        start = words.loc[(words['top'] <= h)&(words['bottom'] >= h)&(words['left'] <= w)&(words['right'] >= w)]
        if len(start) == 0:
            return
        # make sure words are in the proper order
        words = words.sort_values(['top','left'])
        # get line and in-line position
        i, d = start.index[0], start.to_dict('records')[0]
        line = words[words['top'] == d['top']].index
        # follow the side-wise along the line while linked (one space apart)
        left, head = start['left'], []
        for x in range(line.get_loc(i) - 1, -1, -1):
            if words.loc[line[x], 'right'] > left:
                break
            left = words.loc[line[x], 'left']
            tail.append(words.loc[line[x], 'content'])
        right, tail = start['right'], []
        for x in range(line.get_loc(i) + 1, len(line)):
            if words.loc[line[x], 'left'] > right:
                break
            right = words.loc[line[x], 'right']
            tail.append(words.loc[line[x], 'content'])
        # connected sequence
        return ' '.join(head + [d['content']] + tail)
    

In [None]:
def check_inputs(data):
    inputs = data.loc[data['block_type']=='input']
    words = data.loc[data['block_type']=='word'].sort_values(['top','left'])
    if len(inputs) == 0:
        return
    inputs['type'] = inputs['content'].apply(lambda x:x.split(' NAME: ')[0])
    inputs['name'] = None
    inputs['label'] = None
    for i in inputs.index:
        d = inputs.loc[i].to_dict()
        # first check if the label is provided
        name, label = d['content'].split(' LABEL: ')
        _,name = name.split(' NAME: ')
        inputs.loc[i,'name'] = name
        if label != '' and label != 'nan':
            inputs.loc[i,'label'] = label
            continue
        #test = re.sub('([A-Z][a-z]?)', r' \1', re.sub('(\d+)', r' \1', name.replace('_', ' '))).strip()
        #if len(test.split()) > 1:
        #    inputs.loc[i,'label'] = test.split('_')[0]
        #    continue
        #label = search_nearby_content(words, d)
    return inputs


In [None]:
res = check_inputs(data)
res