# Adobe Extract API parser

In [1]:
# !pip install pdfservices-sdk

In [1]:
from adobe.pdfservices.operation.auth.credentials import Credentials
from adobe.pdfservices.operation.execution_context import ExecutionContext
from adobe.pdfservices.operation.io.file_ref import FileRef
from adobe.pdfservices.operation.pdfops.extract_pdf_operation import ExtractPDFOperation
from adobe.pdfservices.operation.pdfops.options.extractpdf.extract_pdf_options import (
    ExtractPDFOptions,
)
from adobe.pdfservices.operation.pdfops.options.extractpdf.table_structure_type import (
    TableStructureType,
)
from adobe.pdfservices.operation.pdfops.options.extractpdf.extract_element_type import (
    ExtractElementType,
)
from adobe.pdfservices.operation.exception.exceptions import ServiceApiException, ServiceUsageException

import os.path
import zipfile
import json

import dotenv

from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from enum import StrEnum
from dataclasses import dataclass
import weakref
import re
from pathlib import Path

In [2]:
# Get environment variables
dotenv.load_dotenv()

client_id = os.getenv("ADOBE_CLIENT_ID")
client_secret = os.getenv("ADOBE_CLIENT_SECRET")

# Some constants
# Input PDF file
input_pdf = "UK_23.pdf"

# Input PDF directory
input_pdf_dir = "/Users/dvdblk/Downloads/pdf_files_complete/"
input_pdf_path = os.path.join(input_pdf_dir, input_pdf)
# Output directory
out_dir = "../../data/interim/000-adobe-extract/"
# Output filename will be the same as input but extension will be .zip

In [3]:
@dataclass
class AdobeExtractedPDF:
    json_data: Dict[str, Any]
    file_path: str
    csv_tables: Dict[str, List[str]]

def get_extracted_pdf(input_file_path: str) -> AdobeExtractedPDF:
    input_basename = Path(input_pdf).stem
    # Check if .zip file exists
    zip_file = os.path.join(out_dir, f"{input_basename}.zip")

    if os.path.isfile(zip_file):
        print(f"File '{zip_file}' already exists, skipping download.")

        # Open and read zip file
        archive = zipfile.ZipFile(zip_file, "r")
        json_data = archive.open("structuredData.json").read()
        json_data = json.loads(json_data)

        # Extract csv tables
        csv_tables = {}
        table_files = [file_name for file_name in archive.namelist() if file_name.startswith("tables/")]
        for table_name in table_files:
            csv_tables[table_name] = archive.open(table_name).read().decode("utf-8-sig").rstrip("\n")

        return AdobeExtractedPDF(json_data, zip_file, csv_tables)

    # Initial setup, create credentials instance.
    credentials = (
        Credentials.service_principal_credentials_builder()
        .with_client_id(client_id)
        .with_client_secret(client_secret)
        .build()
    )

    # Create an ExecutionContext using credentials and create a new operation instance.
    execution_context = ExecutionContext.create(credentials)
    extract_pdf_operation = ExtractPDFOperation.create_new()

    # Set operation input from a source file.
    source = FileRef.create_from_local_file(input_file_path)
    extract_pdf_operation.set_input(source)

    # Build ExtractPDF options and set them into the operation
    extract_pdf_options: ExtractPDFOptions = (
        ExtractPDFOptions.builder().with_elements_to_extract([ExtractElementType.TEXT, ExtractElementType.TABLES]).with_table_structure_format(TableStructureType.CSV).build()
    )
    extract_pdf_operation.set_options(extract_pdf_options)

    # Execute the operation.
    result: FileRef = extract_pdf_operation.execute(execution_context)

    # Save the result to the specified location.
    result.save_as(zip_file)

    # Extract json file and save to out_dir
    archive = zipfile.ZipFile(zip_file, "r")
    json_data = archive.open("structuredData.json").read()
    json_data = json.loads(json_data)

    # Extract csv tables
    csv_tables = {}
    table_files = [file_name for file_name in archive.namelist() if file_name.startswith('tables/')]
    for table_name in table_files:
        csv_tables[table_name] = archive.open(table_name).read().decode("utf-8")

    return AdobeExtractedPDF(json_data, zip_file, csv_tables)


In [4]:
extracted_pdf = get_extracted_pdf(input_file_path=input_pdf_path)

result = extracted_pdf.json_data

File '../../data/interim/000-adobe-extract/UK_23.zip' already exists, skipping download.


In [5]:
print(extracted_pdf.csv_tables)




In [6]:
# Print path, page, text and table file paths for each element in the result document
for element in result["elements"]:
    print("Path: " + element["Path"])
    if page := element.get("Page"):
        print("Page: " + str(element["Page"]))
    if text := element.get("Text"):
        print("Text: " + element["Text"])
    if file_paths := element.get("filePaths"):
        print("File paths: " + str(file_paths))
    print()

Path: //Document/Figure

Path: //Document/P
Text: The Futures Toolkit 

Path: //Document/Figure[2]

Path: //Document/P[2]
Text: Tools for Futures Thinking and Foresight Across UK Government 

Path: //Document/P[3]
Text: Edition 1.0 

Path: //Document/P[4]
Text: November 2017 

Path: //Document/Figure[3]

Path: //Document/H1
Page: 1
Text: Contents 

Path: //Document/TOC/TOCI/Reference/Lbl
Page: 1
Text: 1. 

Path: //Document/TOC/TOCI/Reference/LBody
Page: 1
Text: Introduction .............................................................................................................................. 1 

Path: //Document/TOC/TOCI[2]/Reference/Lbl
Page: 1
Text: 2. 

Path: //Document/TOC/TOCI[2]/Reference/LBody
Page: 1
Text: Introduction to futures thinking.............................................................................................. 2 

Path: //Document/TOC/TOCI[3]/Reference/Lbl
Page: 1
Text: 3. 

Path: //Document/TOC/TOCI[3]/Reference/LBody
Page: 1
Text: Futures process d

In [11]:
# data structures for the entire PDF that include structural metadata along with text

@dataclass
class InterimElement:
    """Represent an element in the document that is about to be processed"""

    raw: Dict[str, Any]
    """JSON object from the Adobe Extract API"""
    is_aside: bool
    """Whether the element is an aside (elements that are next to a table or a figure)"""
    path: str
    """The path of the element in the document without the //Document prefix and other useless parts"""

    @property
    def text(self) -> Optional[str]:
        """The text of the element"""
        # Preprocess
        if result := self.raw.get("Text"):
            # Remove 3x or more consecutive underscores or dots
            result = re.sub(r"(\_|\.){3,}", "", result)
            # Remove trailing whitespace
            result = result.rstrip()
            # Check if is numeric or is digit and return None if so
            if result.isnumeric() or result.isdigit():
                return None
            # Remove empty paragraphs
            if result == "" or result == "\n":
                return None

            return result
        else:
            return None

    @property
    def page(self) -> Optional[int]:
        """The page of the element"""
        return self.raw.get("Page")

    @property
    def full_path(self) -> Optional[str]:
        """The full path of the element in the document as provided by Adobe Extract API"""
        return self.raw.get("Path")

class TextOrigin(StrEnum):
    """Enumeration to describe the origin of a text"""

    TOC = "TOC"
    """Table of contents"""
    PARAGRAPH = "PARAGRAPH"
    LIST = "LIST"
    TABLE = "TABLE"
    FIGURE = "FIGURE"

class Paragraph:
    """Describe a paragraph of text"""

    def __init__(self, text: str, origin: TextOrigin, aside: bool = False) -> None:
        """
        Args:
            text (str): The text of the paragraph
            origin (TextOrigin): The origin of the text in this paragraph
            aside (bool, optional): Whether the paragraph is an aside (elements that are next to a table or a figure).
                                     Defaults to `False`.
        """
        self.text = text
        self.origin = origin
        self.aside = aside

    def __repr__(self) -> str:
        return f"<Paragraph ({self.origin}) text={self.text}>"

class Section:
    """Describe a section of the document. Separated and detected by Adobe Extract API by headers (H1, H2, ..., Hn)."""

    def __init__(self, title: Optional[str] = None, pages: Optional[Set[int]] = None, section_type: Optional[str] = None, paragraphs: Optional[List[str]] = None, subsections: Optional[List["Section"]] = None, parent: Optional["Section"] = None) -> None:
        self.title = title
        self.pages = pages if pages else set()
        self.section_type = section_type
        self.paragraphs = paragraphs if paragraphs else []
        self.parent: Optional[weakref.ReferenceType[Section]] = weakref.ref(parent) if parent else None
        self.subsections: List[Section] = subsections if subsections else []

    def __repr__(self) -> str:
        return f"<Section ({self.section_type}) title={self.title}>"

class Document(Section):
    """A tree of Sections"""

    def __init__(self, title: str | None = None, pages: Set[int] = set(), paragraphs: List[str] = [], subsections: List[Section] = [], parent: Section | None = None) -> None:
        super().__init__(title, pages, "document", paragraphs, subsections, parent)


def adobe_extracted_pdf_to_document(extracted_pdf: AdobeExtractedPDF) -> Document:
    # header_to_section_type = {
    #     "H1": "section",
    #     "H2": "subsection",
    #     "H3": "subsubsection",
    # }
    document = Document()

    curr_section = document
    section_to_insert_header = {}
    elements_iterator = iter(extracted_pdf.json_data["elements"])
    elements_stack: List[Tuple[dict, str]] = []

    def get_next_elem() -> InterimElement:
        """Uses an iterator (and a stack) to get the next element from the structured_json"""
        if elements_stack:
            return elements_stack.pop()
        else:
            element = next(elements_iterator)
            path = element.get("Path")
            if path is None:
                raise ValueError("Path is None")

            # Verify that xpath starts with //Document
            if not path.startswith("//Document"):
                raise ValueError(f"Path does not start with //Document: {path}")

            # Remove //Document
            path = path.replace("//Document", "", 1)

            # Check if it is an Aside element
            is_aside = False
            new_path, n_occurences = re.subn(r"^\/Aside(\[\d+\])?", "", path)
            if n_occurences == 1:
                # remove Aside prefix
                path = new_path
                is_aside = True

            # Remove /Reference occurences
            new_path, n_occurences = re.subn(r"\/Reference(\[\d+\])?(\/Sub(\[\d+\])?)?", "", path)
            if n_occurences > 0:
                path = new_path

            # Remove /ParagraphSpan occurences
            new_path, n_occurences = re.subn(r"\/ParagraphSpan(\[\d+\])?", "", path)
            if n_occurences > 0:
                path = new_path

            # Remove /StyleSpan occurences
            new_path, n_occurences = re.subn(r"\/StyleSpan(\[\d+\])?", "", path)
            if n_occurences > 0:
                path = new_path

        return InterimElement(element, is_aside, path)

    def add_paragraph(paragraph: Paragraph, pages: Optional[Union[Set[int], int]]):
        if paragraph.text is not None:
            curr_section.paragraphs.append(paragraph)
            if pages:
                if isinstance(pages, int):
                    curr_section.pages.add(pages)
                elif isinstance(pages, set):
                    curr_section.pages.update(pages)

    try:
        while True:
            element = get_next_elem()

            if re.match(r".*\/(ExtraCharSpan|DirectEntrySpan)(\[\d+\])?$", element.path):
                # Skip extra characters
                continue
            elif re.match(r"^\/Title$", element.path):
                # Title
                document.title = element.text
            elif re.match(r"^(\/P(\[\d+\])?)?\/Figure(\[\d+\])?$", element.path):
                # Figure
                add_paragraph(Paragraph(element.text, TextOrigin.FIGURE), pages=element.page)
            elif re.match(r"^\/P(\[\d+\])?(\/(Sub|ParagraphSpan)(\[\d+\])?)?$", element.path):
                # Paragraph
                add_paragraph(Paragraph(element.text, TextOrigin.PARAGRAPH), pages=element.page)
            elif re.match(r"^\/Table(\[\d+\])?.*$", element.path):
                # Table
                if csv_paths := element.raw.get("filePaths"):
                    # Table with data
                    # load the csvs and add them as paragraphs
                    for csv_path in csv_paths:
                        csv_text = extracted_pdf.csv_tables[csv_path]
                        add_paragraph(Paragraph(csv_text, TextOrigin.TABLE), pages=element.page)

            elif match := re.match(r"^\/(H\d)(\[\d+\])?(\/Sub(\[\d+\])?)?$", element.path):
                # Header
                title = element.text
                section_type = match.group(1)

                # Verify that this header is not the same as the previous one
                # Check if page difference is less than 2
                # Check if title is the same
                if curr_section.title == title and element.page - max(curr_section.pages) < 2:
                    # Skip this header
                    continue

                # Verify next element is a paragraph and not a followup header
                while True:
                    next_element = get_next_elem()
                    reinsert_back = True
                    if next_match := re.match(r"^\/(H\d)(\[\d+\])?(\/Sub(\[\d+\])?)?$", next_element.path):
                        if next_match.group(1) == section_type:
                            next_title = next_element.text or ""
                            if not title.endswith(" ") and not next_title.startswith(" "):
                                title += " "
                            title += next_title
                            reinsert_back = False

                    if reinsert_back:
                        # Next element is not a header, push it to the stack
                        elements_stack.append(next_element)
                        break

                # Create new section
                new_section = Section(
                    title=title,
                    section_type=section_type,
                )

                # Check if new section is a subsection
                if curr_section.section_type == "document":
                    # New section is a top level (H1) section
                    # Add it to the document
                    curr_section.subsections.append(new_section)
                    # Set parent of new section to document
                    new_section.parent = curr_section
                    # Every new H1 has to be inserted into document subsections
                    section_to_insert_header[section_type] = curr_section
                else:
                    if section_type < curr_section.section_type:
                        # H2 -> H1 or H3 -> H2 or H3 -> H1
                        if parent_section := section_to_insert_header.get(section_type):
                            parent_section.subsections.append(new_section)
                            new_section.parent = parent_section
                        else:
                            # TODO: find closest parent of curr_section with section_type < new_section.section_type
                            raise ValueError("Could not find parent section")

                    elif section_type > curr_section.section_type:
                        # H1 -> H2 or H2 -> H3 or H1 -> H3
                        # curr_section is the parent
                        curr_section.subsections.append(new_section)
                        new_section.parent = curr_section
                        section_to_insert_header[section_type] = curr_section
                    else:
                        # H1 = H1 or H2 = H2 or H3 = H3
                        # curr_section.parent is the parent as they are on the same level
                        curr_section.parent.subsections.append(new_section)
                        new_section.parent = curr_section.parent

                curr_section = new_section
            elif match := re.match(r"^\/L(\[\d+\])?\/LI(\[\d+\])?\/(Lbl|LBody).*$", element.path):
                # List
                list_item_type = match.group(3)
                if list_item_type == "Lbl":
                    # Check if we can join it with a LBody element that should follow this one
                    next_element = get_next_elem()
                    if next_element.path == element.path.replace("Lbl", "LBody"):
                        # Join the two elements
                        joined_text = element.text or ""
                        if next_element.text:
                            if not next_element.text.startswith(" "):
                                joined_text += " "
                            joined_text += next_element.text
                        add_paragraph(Paragraph(joined_text, TextOrigin.LIST), pages=set([element.page, next_element.page]))
                    else:
                        # Push next element to stack
                        elements_stack.append(next_element)
                        # Add current element as a paragraph
                        add_paragraph(Paragraph(element.text, TextOrigin.LIST), pages=element.page)
                elif list_item_type == "LBody":
                    add_paragraph(Paragraph(element.text, TextOrigin.LIST), pages=element.page)
            elif match := re.match(r"^\/TOC(\[\d+\])?\/TOCI(\[\d+\])?\/(Span|Lbl|LBody)(\[\d+\])?$", element.path):
                # Table of contents
                toc_elem_type = match.group(3)
                if toc_elem_type == "Span":
                    if text := element.text:
                        # Check if the next element is another span that should be joined
                        next_element = get_next_elem()
                        if next_element.path == element.path + "[2]":
                            # Join the two elements
                            joined_text = text
                            if next_element.text:
                                if not next_element.text.startswith(" "):
                                    joined_text += " "
                                joined_text += next_element.text
                            add_paragraph(Paragraph(joined_text, TextOrigin.TOC), pages=set([element.page, next_element.page]))
                        else:
                            # Push next element to stack
                            elements_stack.append(next_element)
                            # Add current element as a paragraph
                            add_paragraph(Paragraph(text, TextOrigin.TOC), pages=element.page)
                elif toc_elem_type == "Lbl":
                    # Check if we can join it with a LBody element that should follow this one
                    next_element = get_next_elem()
                    if next_element.path == element.path.replace("Lbl", "LBody"):
                        # Join the two elements
                        joined_text = element.text or ""
                        if next_element.text:
                            if not next_element.text.startswith(" "):
                                joined_text += " "
                            joined_text += next_element.text
                        add_paragraph(Paragraph(joined_text, TextOrigin.TOC), pages=set([element.page, next_element.page]))
                    else:
                        # Push next element to stack
                        elements_stack.append(next_element)
                        # Add current element as a paragraph
                        add_paragraph(Paragraph(element.text, TextOrigin.TOC), pages=element.page)

            elif re.match(r"^\/Footnote(\[\d+\])?$", element.path):
                # Footnote
                # Ignore for now
                continue
            else:
                add_paragraph(Paragraph(element.text, TextOrigin.PARAGRAPH), pages=element.page)
                print(f"Unknown element path: {element.path} in {extracted_pdf.file_path}")
                #raise ValueError(f"Unknown element path: {element.path}")
    except StopIteration:
        pass

    return document

In [12]:
document = adobe_extracted_pdf_to_document(extracted_pdf)

In [13]:
# Sanity check
# Count number of sections per section type
counts = {}
def count_sections(section: Section):
    if section.section_type not in counts:
        counts[section.section_type] = 0
    counts[section.section_type] += 1
    for subsection in section.subsections:
        count_sections(subsection)
count_sections(document)
# Remove document count (always '1')
del counts["document"]

# Count number of sections in the JSON
json_section_counts = {}
for element in result["elements"]:
    if path := element.get("Path"):
        if match := re.match(r"^\/\/Document\/(H\d)(\[\d+\])?$", path):
            section_type = match.group(1)
            if section_type not in json_section_counts:
                json_section_counts[section_type] = 0
            json_section_counts[section_type] += 1

# Should be roughly the same
# (some headers might get merged into one header so the counts are not exactly the same)
# but this always holds: for each section: counts <= json counts
print(counts, json_section_counts)

{'H1': 21, 'H2': 98, 'H3': 47, 'H4': 18} {'H1': 21, 'H2': 107, 'H3': 56, 'H4': 18}


In [14]:
document.subsections

[<Section (H1) title=Contents>,
 <Section (H1) title=1. Introduction>,
 <Section (H1) title=2. Introduction to futures thinking>,
 <Section (H1) title=3. Futures process design>,
 <Section (H1) title=4. Using the toolkit>,
 <Section (H1) title=5. Pathways designed to meet specific business needs>,
 <Section (H1) title=6. Tools for gathering intelligence about the future>,
 <Section (H1) title=7. Tools for exploring the dynamics of change>,
 <Section (H1) title=8. Tools for describing what the future might be like>,
 <Section (H1) title=9. Tools for developing and testing policy and strategy>,
 <Section (H1) title=10 minutes>,
 <Section (H1) title=Annex 1: Sample futures and foresight material>,
 <Section (H1) title=Two horizon scanning formats>,
 <Section (H1) title=An extract from an issues paper>,
 <Section (H1) title=A sample scenario (one from a group of four)>,
 <Section (H1) title=Annex 2: Glossary of futures and foresight terms>,
 <Section (H1) title=Annex 3: Frequently Asked Qu

In [15]:
# Create a simple linear text representation of the document
def document_to_text(document: Document) -> str:
    text = ""
    for paragraph in document.paragraphs:
        text += paragraph.text + "\n"
    for subsection in document.subsections:
        text += subsection.title + "\n"
        text += document_to_text(subsection)
    return text

document_text = document_to_text(document)

In [16]:
# write document_text to processed dir
output_basename = Path(input_pdf).stem + ".txt"
output_dir = "../../data/processed/adobe-extract/"

# create dir if not exists
os.makedirs(output_dir, exist_ok=True)

with open(os.path.join(output_dir, output_basename), "w") as f:
    f.write(document_text)

print(f"Processed file: {input_pdf}")

Processed file: UK_23.pdf


In [26]:
document.subsections[4].subsections[0].paragraphs

[<Paragraph (PARAGRAPH) text=The Toolkit contains 12 tools, organized into four categories and described in detail in the relevant chapter.>,
 <Paragraph (PARAGRAPH) text=There are four tools for gathering intelligence about the future ➲ Chapter 6>,
 <Paragraph (LIST) text=• Horizon Scanning>,
 <Paragraph (LIST) text=• 7 Questions>,
 <Paragraph (LIST) text=• The Issues Paper>,
 <Paragraph (LIST) text=• Delphi>,
 <Paragraph (TABLE) text=There are two tools for exploring the dynamics of change  • Driver Mapping   • Axes of Uncertainty    ,➲  Chapter 7  >,
 <Paragraph (PARAGRAPH) text=There are three tools for describing what the future might be like>,
 <Paragraph (LIST) text=• Scenarios>,
 <Paragraph (LIST) text=• Visioning>,
 <Paragraph (LIST) text=• SWOT Analysis>,
 <Paragraph (PARAGRAPH) text=There are three tools for developing and testing policy and strategy>,
 <Paragraph (LIST) text=• Policy Stress-testing>,
 <Paragraph (LIST) text=• Backcasting>,
 <Paragraph (LIST) text=• Roadmapp

In [18]:
input_pdfs = [f"UK_{i:02}.pdf" for i in range(26, 45)]


for input_pdf in input_pdfs:
    try:
        # Get AdobeExtractedPDF
        extracted_pdf = get_extracted_pdf(input_file_path=os.path.join(input_pdf_dir, input_pdf))
        # Convert to document
        document = adobe_extracted_pdf_to_document(extracted_pdf)
        # Convert to text
        document_text = document_to_text(document)

        # write document_text to processed dir
        output_basename = Path(input_pdf).stem + ".txt"
        output_dir = "../../data/processed/adobe-extract/"

        # create dir if not exists
        os.makedirs(output_dir, exist_ok=True)

        with open(os.path.join(output_dir, output_basename), "w") as f:
            f.write(document_text)

        print(f"Processed file: {input_pdf}")
    except ServiceUsageException as e:
        print("Service usage error: " + input_pdf)
        print(e)
        continue
    except ServiceApiException as e:
        print("Bad input file: " + input_pdf)
        print(e)
        continue
    except Exception as e:
        print("Failed to process file: " + input_pdf)
        raise e

KeyboardInterrupt: 