# Create a JSON for a doc to import into Label Studio

In [50]:
import json
from typing import Any, Dict, List, Optional, Union

import cv2
import fitz  # PyMuPDF
import numpy as np
import pandas as pd
from pathlib import Path
from PIL import Image

# PDF text extraction utility functions

In [13]:
# copied from well gas project wellgas/features/extract_text.py
def extract_pdf_data_from_page(page: fitz.Page) -> dict[str, pd.DataFrame]:
    """Parse PDF page data."""
    contents = _parse_page_contents(page)
    meta = {
        "rotation_degrees": [page.rotation],
        "origin_x_pdf_coord": [page.rect[0]],
        "origin_y_pdf_coord": [page.rect[1]],
        "width_pdf_coord": [page.rect[2] - page.rect[0]],
        "height_pdf_coord": [page.rect[3] - page.rect[1]],
        "has_images": [not contents["image"].empty],
        "has_text": [not contents["pdf_text"].empty],
        "page_num": [page.number],
    }
    if not contents["image"].empty:
        img_area = (
            contents["image"]
            .eval(
                "((bottom_right_x_pdf - top_left_x_pdf)"
                " * (bottom_right_y_pdf - top_left_y_pdf))"
            )
            .sum()
        )
    else:
        img_area = 0
    total_area = meta["width_pdf_coord"][0] * meta["height_pdf_coord"][0]

    meta["image_area_frac"] = [np.float32(img_area / total_area)]
    meta_df = pd.DataFrame(meta).astype(
        {
            "rotation_degrees": np.int16,
            "origin_x_pdf_coord": np.float32,
            "origin_y_pdf_coord": np.float32,
            "width_pdf_coord": np.float32,
            "height_pdf_coord": np.float32,
            "has_images": "boolean",
            "has_text": "boolean",
            "page_num": np.int16,
            "image_area_frac": np.float32,
        }
    )
    meta = dict(page=meta_df)
    for df in contents.values():  # add ID fields
        if not df.empty:
            df["page_num"] = np.int16(page.number)
    return contents | meta


def _parse_page_contents(page: fitz.Page) -> dict[str, pd.DataFrame]:
    """Parse page contents using fitz.TextPage."""
    flags = fitz.TEXTFLAGS_DICT
    # try getting only words
    textpage = page.get_textpage(flags=flags)
    content = textpage.extractDICT()
    words = textpage.extractWORDS()
    images = []
    text = []
    for block in content["blocks"]:
        if block["type"] == 0:
            # skip over text, we'll parse it by word blocks
            continue
        elif block["type"] == 1:
            images.append(_parse_image_block(block))
        else:
            raise ValueError(f"Unknown block type: {block['type']}")
    for word_block in words:
        parsed = _parse_word_block(word_block)
        if not parsed.empty:
            text.append(parsed)
    if text:
        text = pd.concat(text, axis=0, ignore_index=True)
    else:
        text = pd.DataFrame()
    if images:
        images = pd.concat(
            (pd.DataFrame(image) for image in images), axis=0, ignore_index=True
        )
    else:
        images = pd.DataFrame()
        
    return dict(pdf_text=text, image=images)


def _parse_image_block(img_block: dict[str, Any]) -> pd.DataFrame:
    """Parse an image block from a fitz.TextPage.extractDICT() output."""
    top_left_x_pdf, top_left_y_pdf, bottom_right_x_pdf, bottom_right_y_pdf = img_block[
        "bbox"
    ]
    dpi = min(
        img_block["xres"], img_block["yres"]
    )  # should be equal; min() just in case
    out = pd.DataFrame(
        {
            "img_num": [img_block["number"]],
            "dpi": [dpi],
            "top_left_x_pdf": [top_left_x_pdf],
            "top_left_y_pdf": [top_left_y_pdf],
            "bottom_right_x_pdf": [bottom_right_x_pdf],
            "bottom_right_y_pdf": [bottom_right_y_pdf],
        }
    ).astype(
        {
            "img_num": np.int16,
            "dpi": np.int16,
            "top_left_x_pdf": np.float32,
            "top_left_y_pdf": np.float32,
            "bottom_right_x_pdf": np.float32,
            "bottom_right_y_pdf": np.float32,
        }
    )
    return out

def _parse_word_block(word_block: tuple) -> pd.DataFrame:
    """Parse a word block from a fitz.TextPage.extractWORDS() output."""
    out = {
        "top_left_x_pdf": [word_block[0]],
        "top_left_y_pdf": [word_block[1]],
        "bottom_right_x_pdf": [word_block[2]],
        "bottom_right_y_pdf": [word_block[3]],
        "text": [word_block[4]],
        "block_num": [word_block[5]],
        "line_num": [word_block[6]],
        "word_num": [word_block[7]]
    }
    out = pd.DataFrame(out).astype(
        {
            "block_num": np.int16,
            "line_num": np.int16,
            "word_num": np.int16,
            "text": "string",
            "top_left_x_pdf": np.float32,
            "top_left_y_pdf": np.float32,
            "bottom_right_x_pdf": np.float32,
            "bottom_right_y_pdf": np.float32,
        }
    )
    return out

def _frac_normal_ascii(text: Union[str, bytes]) -> float:
    """Fraction of characters that are normal ASCII characters."""
    # normal characters, from space to tilde, plus whitespace
    # see https://www.asciitable.com/
    sum_ = 0
    if isinstance(text, bytes):
        text = text.decode("utf-8")
    for char in text:
        if (32 <= ord(char) <= 126) or char in "\t\n":
            sum_ += 1
    return sum_ / len(text)


In [16]:

def _render_page(
    pg: fitz.Page, dpi=150, clip: Optional[fitz.Rect] = None
) -> Image.Image:
    """Render a page of a PDF as a PIL.Image object.

    Args:
        pg (fitz.Page): a page of a PDF
        dpi (int, optional): image resolution in pixels per inch. Defaults to 150.
        clip (Optional[fitz.Rect], optional): Optionally render only a subset of the
            page. Defined in PDF coordinates. Defaults to None, which renders the
            full page.

    Returns:
        Image.Image: PDF page rendered as a PIL.Image object
    """
    # 300 dpi is what tesseract recommends. PaddleOCR seems to do fine with half that.
    render: fitz.Pixmap = pg.get_pixmap(dpi=dpi, clip=clip)  # type: ignore
    img = _pil_img_from_pixmap(render)
    return img


def _pil_img_from_pixmap(pix: fitz.Pixmap) -> Image.Image:
    """Convert pyMuPDF Pixmap object to PIL.Image object.

    For some reason pyMuPDF (aka fitz) lets you save images using PIL, but does not
    have any function to convert to PIL objects. Clearly they do this conversion
    internally; they should just expose it. Instead, I had to copy it out from their
    source code.

    Args:
        pix (fitz.Pixmap): a rendered Pixmap

    Returns:
        Image: a PIL.Image object
    """
    # pyMuPDF source code on GitHub is all in SWIG (some kind of C to python code
    # generator) and is unreadable to me. So you have to inspect your local .py files.
    # Adapted from the Pixmap.pil_save method in python3.9/site-packages/fitz/fitz.py
    # I just replaced instances of "self" with "pix"
    cspace = pix.colorspace
    if cspace is None:
        mode = "L"
    elif cspace.n == 1:
        mode = "L" if pix.alpha == 0 else "LA"
    elif cspace.n == 3:
        mode = "RGB" if pix.alpha == 0 else "RGBA"
    else:
        mode = "CMYK"

    img = Image.frombytes(mode, (pix.width, pix.height), pix.samples)
    return img

In [17]:
PDF_POINTS_PER_INCH = 72  # I believe this is standard for all PDFs

def pil_to_cv2(image: Image.Image) -> np.ndarray:  # noqa: C901
    """Convert a PIL Image to an OpenCV image (numpy array)."""
    # copied from https://gist.github.com/panzi/1ceac1cb30bb6b3450aa5227c02eedd3
    # This covers the common modes, is not exhaustive.
    mode = image.mode
    new_image: np.ndarray
    if mode == "1":
        new_image = np.array(image, dtype=np.uint8)
        new_image *= 255
    elif mode == "L":
        new_image = np.array(image, dtype=np.uint8)
    elif mode == "LA" or mode == "La":
        new_image = np.array(image.convert("RGBA"), dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGBA2BGRA)
    elif mode == "RGB":
        new_image = np.array(image, dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR)
    elif mode == "RGBA":
        new_image = np.array(image, dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGBA2BGRA)
    elif mode == "LAB":
        new_image = np.array(image, dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_LAB2BGR)
    elif mode == "HSV":
        new_image = np.array(image, dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_HSV2BGR)
    elif mode == "YCbCr":
        # XXX: not sure if YCbCr == YCrCb
        new_image = np.array(image, dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_YCrCb2BGR)
    elif mode == "P" or mode == "CMYK":
        new_image = np.array(image.convert("RGB"), dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGB2BGR)
    elif mode == "PA" or mode == "Pa":
        new_image = np.array(image.convert("RGBA"), dtype=np.uint8)
        new_image = cv2.cvtColor(new_image, cv2.COLOR_RGBA2BGRA)
    else:
        raise ValueError(f"unhandled image color mode: {mode}")

    return new_image


def cv2_to_pil(img: np.ndarray) -> Image.Image:
    """Create PIL Image from numpy pixel array."""
    if len(img.shape) == 2:  # single channel, AKA grayscale
        return Image.fromarray(img)
    else:  # only handle BGR for now
        return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))


def display_img_array(img: np.ndarray, figsize=(5, 5), **kwargs):
    """Plot image array for jupyter sessions."""
    plt.figure(figsize=figsize)
    if len(img.shape) == 2:  # grayscale
        return plt.imshow(img, cmap="gray", vmin=0, vmax=255, **kwargs)
    else:
        return plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), **kwargs)


def overlay_bboxes(
    img: np.ndarray, bboxes: np.ndarray, color=(255, 0, 0)
) -> np.ndarray:
    """Overlay bounding boxes of shape N x 4 (x0, y0, x1, y1) on an image."""
    img = img.copy()
    for box in np.round(bboxes, 0).astype(np.int32):  # float to int just in case:
        x0, y0, x1, y1 = box
        cv2.rectangle(img, (x0, y0), (x1, y1), color=color, thickness=1)
    return img


def pdf_coords_to_pixel_coords(coords: np.ndarray, dpi: int) -> np.ndarray:
    """Convert PDF coordinates to pixel coordinates."""
    # For arbitrary PDFs you would need to subtract the origin in PDF coordinates,
    # but since you create these PDFs, you know the origin is (0, 0).
    out = coords * dpi / PDF_POINTS_PER_INCH
    return out

# Read in a doc and create a JSON

In [98]:
# UPDATE THIS
pdf_filename = "wisconsin_electric.pdf"

In [99]:
src_path = Path(pdf_filename)
assert src_path.exists()

In [100]:
# from file
doc = fitz.Document(str(src_path))
doc.is_pdf

True

In [101]:
# from bytes
_bytes = src_path.read_bytes()
from io import BytesIO
doc = fitz.open(stream=BytesIO(_bytes), filetype="pdf")
doc.is_pdf

True

### Extract Text Bboxes

In [102]:
pg = doc[0]
extracted = extract_pdf_data_from_page(pg)
extracted.keys()

dict_keys(['pdf_text', 'image', 'page'])

In [103]:
txt = extracted['pdf_text']
img_info = extracted['image']
pg_meta = extracted['page']
txt.shape, img_info.shape, pg_meta.shape

((106, 9), (0, 0), (1, 9))

In [104]:
txt

Unnamed: 0,top_left_x_pdf,top_left_y_pdf,bottom_right_x_pdf,bottom_right_y_pdf,text,block_num,line_num,word_num,page_num
0,504.791168,83.474442,541.465210,98.621445,Exhibit,0,0,0,0
1,544.523193,83.474442,565.929260,98.621445,21.1,0,0,1,0
2,184.882263,127.074478,248.440277,142.221481,WISCONSIN,1,0,0,0
3,251.498276,127.074478,306.498260,142.221481,ELECTRIC,1,0,1,0
4,309.556274,127.074478,351.114288,142.221481,POWER,1,0,2,0
...,...,...,...,...,...,...,...,...,...
101,167.466446,432.646484,212.476440,446.386475,subsidiary,10,2,5,0
102,215.256439,432.646484,223.596436,446.386475,of,10,2,6,0
103,226.376434,432.646484,249.706451,446.386475,WEC,10,2,7,0
104,252.486450,432.646484,284.166443,446.386475,Energy,10,2,8,0


In [105]:
full_pg_img = _render_page(pg)

In [106]:
image_filename = "wisconsin_electric.jpg"

In [107]:
full_pg_img.save(image_filename)

## Define page variables and JSON dict

In [35]:
original_width = pil_to_cv2(full_pg_img).shape[1]
original_height = pil_to_cv2(full_pg_img).shape[0]

In [37]:
x_norm = 100/pg_meta.width_pdf_coord.iloc[0]
y_norm = 100/pg_meta.height_pdf_coord.iloc[0]
x_norm, y_norm

(0.16798942273629794, 0.11878039560889525)

In [120]:
annotation_json = {
    "data": {
        "ocr": "gs://labeled-ex21-filings/wisconsin_electric.jpg"  # how do we get the image name?
    },
    "annotations": [],
    "predictions": [{"model_version": "v1.0", "result": []}],
}

## Create a bounding box result entry for each word

In [121]:
def get_bbox_dicts(bbox: pd.Series, ind) -> List[Dict]:
    x = bbox["top_left_x_pdf"] * x_norm
    y = bbox["top_left_y_pdf"] * y_norm
    width = (bbox["bottom_right_x_pdf"] - bbox["top_left_x_pdf"]) * x_norm
    height = (bbox["bottom_right_y_pdf"] - bbox["top_left_y_pdf"]) * y_norm
    word = bbox["text"]
    bbox_id = f"bbox_{ind}"
    box_dict = {
        "original_width": original_width,
        "original_height": original_height,
        "image_rotation": 0,
        "value": {
            "x": x,
            "y": y,
            "width": width,
            "height": height,
            "rotation": 0
        },
        "id": bbox_id,
        "from_name": "bbox",
        "to_name": "image",
        "type": "rectangle",
        "origin": "manual"
    }
    word_dict = {
        "original_width": original_width,
        "original_height": original_height,
        "image_rotation": 0,
        "value": {
            "x": x,
            "y": y,
            "width": width,
            "height": height,
            "rotation": 0,
            "text": [word]
        },
        "id": bbox_id,
        "from_name": "transcription",
        "to_name": "image",
        "type": "textarea",
        "origin": "manual"
    }
    return [box_dict, word_dict]

In [122]:
result = []
# change to using an apply?
for i, row in txt.iterrows():
    result += get_bbox_dicts(row, i)

In [123]:
annotation_json["predictions"][0]["result"] = result

In [124]:
json_filename = "wisconsin_electric_full.json"

In [125]:
with open(json_filename, 'w') as fp:
    json.dump(annotation_json, fp)