In [None]:
import os
import cv2
from pytesseract import image_to_data, Output
import matplotlib.pyplot as plt 
from more_itertools import windowed, flatten
from itertools import groupby, chain
from PIL import ImageDraw, Image, ImageFont
import numpy as np
from copy import deepcopy
import os
from base64 import b64encode, b64decode
from pathlib import Path
import json
import shapely
from shapely.geometry import box as shapely_box

In [None]:
INPUT_DATA = "../input_data/"
OUTPUT_DATA = "../out_data/"

## Creación de data items

### Levantado de la imagen

In [None]:
def load_image(img_path):
    """
    img_path : str
    Crea un data item por imagen
    """
    if img_path.endswith(".tif"):            
        imagen_cv = cv2.imread(img_path)
        data_item = {}
        data_item['file_path'] = img_path
        data_item['img_bitmap'] = imagen_cv
        data_item["image_shape"] =  {
            "image_height" : imagen_cv.shape[0], 
            "image_width" : imagen_cv.shape[1]
            }
        return data_item

In [None]:
data_item = load_image("../input_data/Ambito financiero 1987-12-04 Reacción en cadena por el caso Astiz.tif")

In [None]:
data_item.keys()

### Levantado de los json

In [None]:
class Caja(object):
    """Clase que define la estructura de datos del segmento etiquetado (bounding-box) como unidad."""
    def __init__(self, archivo, bounding_box, etiqueta, contenido):
        self.file = archivo
        self.x_1 = bounding_box['x']
        self.y_1 = bounding_box['y']
        self.x_2 = bounding_box['x'] + bounding_box['w'] 
        self.y_2 = bounding_box['y'] + bounding_box['h']
        self.content = contenido
        self.label = etiqueta
        
    def to_json(self):
        return self.__dict__
    
    def to_pd_series(self):
        return pd.Series(self.__dict__)

In [None]:
def get_segments_from_annotations(data_item):
    """
    Función que extrae cada segmento de cada artículo existente en formato json en el 
    """    
    json_path = data_item['file_path'].replace('.tif','.json')
    data_item['segments'] = []

    if json_path.endswith(".json"):
        with open(json_path, "r") as json_file:
            datos = json.load(json_file)

        print(datos['Diario'])

        try:
            caja = Caja(json_path.replace('.json', '.tif'), datos['Diario']['bounding_box'], 'Diario', datos['Diario']['texto'])
            data_item['segments'].append(caja.to_json())
            #print(f'Archivo: {caja.file} | Segmento: {segmento}  | Estado OK!')
        except:
            print(f'Archivo: {json_path} | Segmento: Diario  | Estado ERROR!')

        try:
            caja = Caja(json_path.replace('.json', '.tif'), datos['Fecha']['bounding_box'], 'Fecha', datos['Fecha']['texto'])
            data_item['segments'].append(caja.to_json())
            #print(f'Archivo: {caja.file} | Segmento: {segmento}  | Estado OK!')
        except:
            print(f'Archivo: {json_path} | Segmento: Fecha  | Estado ERROR!')

        for nota in datos['Notas']:
            for segmento in nota:
                for detalle in nota[segmento]:
                    try:
                        caja = Caja(json_path.replace('.json', '.tif'), detalle['bounding_box'], segmento, detalle['texto'])
                        data_item['segments'].append(caja.to_json())
                        #print(f'Archivo: {caja.file} | Segmento: {segmento}  | Estado OK!')
                    except:
                        print(f'Archivo: {json_path} | Segmento: {segmento}  | Estado ERROR!')
    
    return data_item

In [None]:
data_item = get_segments_from_annotations(data_item)

In [None]:
data_item.keys()

In [None]:
data_item['segments'][0].keys()

## Obtener token boxes OCR

In [None]:
tess_configs = {
    "default": "--psm 11",
    "psm3": "--psm 3",
    "psm4": "--psm 4",
    "psm5": "--psm 5",
    "psm6": "--psm 6",
    "psm12": "--psm 12",
}

TESSERACT_LANG = "spa"
TESSERACT_CONFIG = "default"

In [None]:
def get_token_boxes(image, tesseract_langs: str, tesseract_config: str ) -> list[dict]:
    tess_config = tess_configs.get(tesseract_config, "")
    data = image_to_data(
        image,
        lang=tesseract_langs,
        config=tess_config,
        output_type=Output.DICT,
    )

    data = zip(
        data["text"],
        data["conf"],
        data["left"],
        data["top"],
        data["width"],
        data["height"],
    )

    # box format =>  (x_left, y_top, x_right, y_bottom)
    token_boxes = map(
        lambda x: {
            "text": x[0],
            "confidence": float(x[1]) / 100,
            "top": x[3],
            "left": x[2],
            "box": (x[2], x[3], x[2] + x[4], x[3] + x[5]),
            "box_area": x[4] * x[5],
            "box_height": x[5],
            "x_position": x[2],
            "y_position": x[3],
        },
        data,
    )

    token_boxes = [token for token in token_boxes if token["text"]]

    return token_boxes


MIN_NEW_LINE_OVERLAP = 0.5

def set_line_number(token_boxes: list[dict]) -> list[dict]:
    token_boxes = sorted(
        token_boxes, key=lambda x: ((x["box"][3] + x["box"][1]) / 2)
    )

    token_box_pairs = windowed(token_boxes, 2)
    line = 1
    token_boxes[0]["n_line"] = line
    for prev_token_box, token_box in token_box_pairs:
        prev_box = prev_token_box["box"]
        box = token_box["box"]
        prev_y = (prev_box[3] + prev_box[1]) / 2
        y = (box[3] + box[1]) / 2
        diff = abs(y - prev_y)
        if (box[1] > prev_box[1]) and (box[3] < prev_box[3]):
            token_box["n_line"] = line
            continue

        height = token_box["box_height"]
        if diff >= height * MIN_NEW_LINE_OVERLAP:
            line += 1

        token_box["n_line"] = line

    return token_boxes


def get_line_groups(token_boxes: list[dict]):
    line_groups = groupby(token_boxes, key=lambda x: x["n_line"])
    line_groups = map(
        lambda x: sorted(x[1], key=lambda x: x["box"][0]), line_groups
    )

    return line_groups


def set_token_box_ids(
    token_boxes: list[dict[str]],
    image_id: str,
) -> list[dict[str]]:

    line_groups = get_line_groups(token_boxes)
    sorted_token_boxes = flatten(line_groups)
    token_boxes_ = []
    for idx, token_box in enumerate(sorted_token_boxes, start=1):
        token_id = f"{image_id}-{idx}"
        token_id = b64_encoder(token_id)
        token_box_ = {"id": token_id, **token_box, "n_token": idx}
        token_boxes_.append(token_box_)

    return token_boxes_


def b64_encoder(x: str) -> str:
    encoded = b64encode(x.encode()).decode()
    return encoded


def save_json(json_data, file_path: str):
    with open(file_path, "w") as f:
        f.write(json.dumps(json_data, indent=4, ensure_ascii=False))
        

def cv2pil(cv_image: np.ndarray) -> Image:
    image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB)
    pil_image = Image.fromarray(image)
    return pil_image


def apply_tesseract(
    data_item,
    tesseract_langs: str = "spa",
    tesseract_config: str = "default",
    output_path: str = "",
):

    data_item = deepcopy(data_item)
    image_path = data_item["file_path"]
    img = data_item["img_bitmap"]
    image = cv2pil(img)

    image_path = data_item["file_path"]
    filename = os.path.basename(image_path)
    image_id = f"{filename}"

    token_boxes = get_token_boxes(image, tesseract_langs, tesseract_config)
    token_boxes = set_line_number(token_boxes)
    token_boxes = set_token_box_ids(
        token_boxes,
        image_id,
    )

    if not token_boxes:
        logger.warning(f"WARNING no boxes for image => {image_path}")
        return

    data_item["token_boxes"] = token_boxes

    if output_path:
        path = Path(output_path)
        path.mkdir(parents=True, exist_ok=True)
        file_hash = b64_encoder(data_item["file_path"])
        save_json(token_boxes, f"{output_path}/{file_hash}.json")

    return data_item

In [None]:
data_item = apply_tesseract(data_item, output_path=OUTPUT_DATA)

In [None]:
data_item.keys()