In [52]:
%load_ext autoreload
%autoreload 2

import json
import re
from typing import List, Tuple, Set
from collections import defaultdict

from extract.document import TextBlock, Page, Document
from extract.extract import DocumentTextExtractor

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
with open("temp-adobe.json", "r") as f:
    data = json.load(f)


In [3]:
[el["Path"] for el in data['elements']]

['//Document/Figure',
 '//Document/P',
 '//Document/Figure[2]',
 '//Document/Aside/P',
 '//Document/Aside/P[2]',
 '//Document/Aside/P[3]',
 '//Document/Aside/P[4]',
 '//Document/Aside/P[5]',
 '//Document/P[2]',
 '//Document/P[3]',
 '//Document/P[4]',
 '//Document/P[5]',
 '//Document/P[6]',
 '//Document/P[7]',
 '//Document/Aside[2]/P',
 '//Document/Aside[2]/P[2]',
 '//Document/Aside[2]/P[3]',
 '//Document/Aside[2]/P[4]',
 '//Document/Figure[3]',
 '//Document/Aside[3]/P',
 '//Document/Aside[3]/P[2]',
 '//Document/H1',
 '//Document/P[8]',
 '//Document/P[9]',
 '//Document/P[10]',
 '//Document/P[11]',
 '//Document/P[12]',
 '//Document/P[13]',
 '//Document/P[14]',
 '//Document/Figure[4]',
 '//Document/P[15]',
 '//Document/H1[2]',
 '//Document/P[16]',
 '//Document/P[17]',
 '//Document/H1[3]',
 '//Document/P[18]',
 '//Document/P[19]',
 '//Document/P[20]',
 '//Document/P[21]',
 '//Document/P[22]',
 '//Document/P[23]',
 '//Document/P[24]',
 '//Document/H1[4]',
 '//Document/Figure[5]',
 '//Docume

In [158]:
class AdobeAPIExtractor(DocumentTextExtractor):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._elements_exclude = [
            "Aside",
            "Figure",
            "Footnote",
            "Reference",
            "TOC",
            "Watermark",
            "Table",
        ]
        # Maximum clockwise or anti-clockwise rotation a text element can have, otherwise it's excluded from the parsing results.
        self._max_rotation_degrees = 20

    @staticmethod
    def _flatten_data(data: dict) -> dict:
        """Flatten out 'Kids' elements which refer to PDF structure."""
        new_data = {k:v for k,v in data.items() if k != "elements"}
        new_data["elements"] = []
        
        for el in data["elements"]:
            if "Kids" in el:
                # We take all the properties of the parent and pass them 
                # to the each kid, but the kid can overwrite any properties
                # passed to it by the parent (e.g. bounding boxes).
                # This enables propagating page numbers, language prediction
                # and other properties to the kids.
                parent = {k:v for k,v in el.items() if k != "Kids"}
                for kid in el["Kids"]:
                    new_kid = parent.copy()
                    new_kid.update(kid)
                    new_data["elements"].append(kid)
            else:
                new_data["elements"].append(el)
        
        return new_data
    
    @staticmethod
    def _get_lines(char_bounds) -> List[Tuple[float, float]]:
        """Get and merge lines.

        Args:
            char_bounds (_type_): _description_

        Returns:
            _type_: _description_
        """
        
        # Get lines as ymin and ymax coordinates of each character bounds
        lines = [list(x) for x in set([(i[1], i[3]) for i in char_bounds])]
        lines.sort(key=lambda interval: interval[0])
        
        # Merge overlapping lines
        merged = [lines[0]]
        for current in lines:
            previous = merged[-1]
            if current[0] <= previous[1]:
                previous[1] = max(previous[1], current[1])
            else:
                merged.append(current)
        
        return merged
    
    @staticmethod
    def _get_line_number_of_char_bound(char_bound, lines):
        in_line_bool_array = [char_bound[1] >= line[0] and char_bound[3] <= line[1] for line in lines]
        line_number_list = [idx for idx, val in enumerate(in_line_bool_array) if val]
        
        if len(line_number_list) != 1:
            raise Exception
        
        return line_number_list[0]

    
    def _element_to_text_block(self, el: dict, block_id: str) -> TextBlock:
        char_bounds = el['CharBounds']
        merged_lines = self._get_lines(char_bounds)
        chars_in_lines_idxs = [self._get_line_number_of_char_bound(char_bound, merged_lines) for char_bound in char_bounds]
        line_change_idxs = [0] + [i for i in range(1,len(chars_in_lines_idxs)) if chars_in_lines_idxs[i]!=chars_in_lines_idxs[i-1]] + [len(el['Text'])]
        text_by_line = [el['Text'][line_change_idxs[idx]: line_change_idxs[idx+1]].strip() for idx in range(len(line_change_idxs)-1)]
        
        return TextBlock(
            text=text_by_line,
            text_block_id=block_id,
            coords=self._convert_coordinate_axis(el['Bounds'], el['Page']),
            type=self._structure_path(el["Path"], remove_numbers=True)[-1],
            path=self._structure_path(el["Path"], remove_numbers=False)
        )
    
    def _convert_coordinate_axis(self, coords: List[float], page_number: int) -> List[float]:
        """Convert coordinates so that the origin is at top left, rather than bottom left output by Adobe.

        Args:
            data: JSON data output by Adobe API.
            coords: list of coordinates output by Adobe: [x0, y0, x1, y1] with origin at bottom left.
            page_number: number of page output by Adobe. Indexed at 0.
        """
        page_height = self._current_data['pages'][page_number]['height']
        
        # To reverse the coordinate system we subtract y0 and y1 from the page height and swap
        # them.
        return [coords[0], page_height-coords[3], coords[2], page_height-coords[1]]
    
    @staticmethod
    def _structure_path(path: str, remove_numbers: bool = True) -> List[str]:
        """
        Convert a PDF path into a list. 
        E.g. '//Document/Aside[3]/P[2]' becomes['Document', 'Aside', 'P'].
        """
        
        path_split = path[2:].split("/")
        
        if not remove_numbers:
            return path_split
        else:
            return [re.sub(r"\[\d+\]", "", i) for i in path_split]
    
    @staticmethod    
    def _index_of(val, in_list):
        try:
            return in_list.index(val)
        except ValueError:
            return None

    def _convert_data(self, data: dict, filename: str) -> Document:
        page_id = 0
        block_counter = 1
        text_blocks_by_page = defaultdict(list)
        self._current_data = self._flatten_data(data)

        for el in self._current_data['elements']:
            # Ignore rotated text elements
            element_rotation = el.get("Rotation", 0)
            if self._max_rotation_degrees < element_rotation < 360-self._max_rotation_degrees:
                continue 
            
            # Ignore superscript
            if el.get("attributes", {}).get("TextPosition") == "Sup":
                continue
                
            # TODO: handle subscript
            
            if el["Page"] != page_id:
                page_id += 1
                block_counter = 1
            
            if not any([e in self._structure_path(el["Path"]) for e in self._elements_exclude]):
                block_id = f"p{page_id}_b{block_counter}"
                
                # Ignore blocks without any text which haven't already been excluded by type
                if "Text" in el:
                    text_blocks_by_page[page_id].append(
                        self._element_to_text_block(el, block_id)
                    )

                block_counter += 1
        
        pages = []    
        
        for page_id, page_text_blocks in text_blocks_by_page.items():
            pages.append(
                Page(
                    text_blocks=page_text_blocks,
                    page_id=page_id,
                    dimensions=(data['pages'][page_id]['width'], data['pages'][page_id]['height']),
                )
            )    
            
        document = Document(
            pages=pages,
            filename=filename,
        )
                                    
        return document

extractor = AdobeAPIExtractor()

doc = extractor._convert_data(data, filename="test")
