In [1]:
import os
from concurrent.futures import ThreadPoolExecutor

import pandas as pd
import re

from typing import Optional

import requests
from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import documentai
from google.cloud import storage

from datetime import datetime
from dotenv import load_dotenv
from google.cloud import documentai
from google.cloud.documentai_toolbox import gcs_utilities

In [2]:
cred_file = "odjfs-extraction-a3f45668cd7e.json" 

In [3]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = cred_file

In [4]:
load_dotenv()
DOC_PROCESSOR_KEY = os.getenv('DOC_PROCESSOR_KEY')
DOC_PROCESSOR_ID = os.getenv('DOC_PROCESSOR_ID')
PROJECT_ID = os.getenv('PROJECT_ID')

In [6]:
def create_gcs_bucket(bucket_name, delete=False):
    """Deletes the bucket if it exists, then creates a new bucket in Google Cloud Storage."""
    storage_client = storage.Client()

    # Check if the bucket already exists
    bucket = storage_client.lookup_bucket(bucket_name)
    if bucket and delete:
        print(f"Bucket {bucket_name} already exists. Deleting it...")
        bucket.delete(force=True)
        print(f"Bucket {bucket_name} deleted.")

        # create a new bucket
        new_bucket = storage_client.create_bucket(bucket_name)
        print(f"Bucket {new_bucket.name} created.")

    return storage_client.bucket(bucket_name)


In [7]:
bucket = create_gcs_bucket("odjfs-test")

In [8]:
import fitz
import io

def download_and_upload_pdf(bucket, blob_name, pdf_url, page_count=1):
    """
    Download a PDF from a URL and upload it to GCS.
    
    :param bucket: GCS bucket object
    :param blob_name: name of the PDF file (e.g., the PDF's ID)
    :param pdf_url: URL of the PDF file
    :param page_count: number of pages to download
    
    """
    try:
        response = requests.get(pdf_url)
        response.raise_for_status()
        pdf_content = response.content

        # retrieve the first page_count pages 
        input_pdf = fitz.open(stream=pdf_content, filetype='pdf')
        output_pdf = fitz.open()
        output_pdf.insert_pdf(input_pdf, from_page=0, to_page=page_count - 1)

        # convert to temporary "file" for upload 
        pdf_buffer = io.BytesIO()
        output_pdf.save(pdf_buffer, incremental=False)
        pdf_buffer.seek(0)

        # upload to GCS
        blob = bucket.blob(blob_name)
        blob.upload_from_file(pdf_buffer, content_type='application/pdf')
        print(f"Uploaded {blob_name} to GCS.")
    except Exception as e:
        print(f"Failed to process {pdf_url}: {e}")


In [9]:
pdf_links = pd.read_csv("test/pdf_links_test.csv", index_col=0)
pdf_links.head()

Unnamed: 0_level_0,program_name,pdf,Address,City,Zip
program_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2190020840,"""BECOMING ME"" SOCIAL AND EMOTIONAL LEARNING EN...",https://childcaresearch.ohio.gov//pdf/00219002...,2627 PARK AVE,CINCINNATI,45206
2190020063,1ST CHOICE CHILD CARE,https://childcaresearch.ohio.gov//pdf/00219002...,4303 CLEVELAND AVE,COLUMBUS,43224
300864,1ST FRIENDS LEARNING ACADEMY,https://childcaresearch.ohio.gov//pdf/00000030...,1930 PEARL RD,BRUNSWICK,44212
2190019999,3MB AFTERSCHOOL,https://childcaresearch.ohio.gov//pdf/00219001...,18316 ST. CLAIR AVENUE,CLEVELAND,44110
201048,3T LEARNING ACADEMY LLC 11,https://childcaresearch.ohio.gov//pdf/00000020...,7523 READING ROAD,CINCINNATI,45237


In [9]:
len(pd.read_csv("data/pdf_links_final.csv",index_col=0)['pdf'])

3746

In [11]:
# from main import upload_files_to_gcs
# upload_files_to_gcs(
#     bucket, 
#     pd.read_csv("data/pdf_links_final.csv", index_col=0)['pdf'].iloc[0:50],
#     batch_size=10,
#     num_workers=5,
#     num_pages=2
# )

Uploaded input/107193.pdf to GCS.
Uploaded input/2180016960.pdf to GCS.
Uploaded input/2190020297.pdf to GCS.
Uploaded input/200644.pdf to GCS.
Uploaded input/2190020063.pdf to GCS.
Uploaded input/207607.pdf to GCS.
Uploaded input/2220026701.pdf to GCS.
Uploaded input/2180017710.pdf to GCS.
Uploaded input/300864.pdf to GCS.
Uploaded input/107064.pdf to GCS.
Uploaded input/300001.pdf to GCS.
Uploaded input/400468.pdf to GCS.
Uploaded input/2200022316.pdf to GCS.
Uploaded input/2210025267.pdf to GCS.
Uploaded input/401180.pdf to GCS.
Uploaded input/2190019999.pdf to GCS.
Uploaded input/306543.pdf to GCS.
Uploaded input/2210024741.pdf to GCS.
Uploaded input/2180018327.pdf to GCS.
Uploaded input/2220026936.pdf to GCS.
Uploaded input/2190019520.pdf to GCS.
Uploaded input/201048.pdf to GCS.
Uploaded input/400610.pdf to GCS.
Uploaded input/307578.pdf to GCS.
Uploaded input/104554.pdf to GCS.
Uploaded input/2210025027.pdf to GCS.
Uploaded input/207853.pdf to GCS.
Uploaded input/2220025784.pdf 

In [12]:
test_link = pdf_links.iloc[0]['pdf']
test_link

'https://childcaresearch.ohio.gov//pdf/002190020840_2023-10-25_ANNUAL.pdf'

In [19]:
# download_and_upload_pdf(bucket, "input/002190020840.pdf", test_link)

Uploaded input/002190020840.pdf to GCS.


In [90]:
def extract_from_pdfs(
        gcs_bucket_name: str,
        gcs_prefix: str,
        batch_size: int = 50,
        location='us',
        field_mask: Optional[str] = None,
        timeout: int = 400,
        verbose=True
):

    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)
    
    # Cloud Storage URI for the Output Directory
    gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri="gs://odjfs-test/output/", field_mask=field_mask
    )

    # Where to write results
    output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)

    batches = gcs_utilities.create_batches(
        gcs_bucket_name=gcs_bucket_name, 
        gcs_prefix=gcs_prefix, 
        batch_size=batch_size
    )

    proc_name = client.processor_path(PROJECT_ID, location="us", processor=DOC_PROCESSOR_ID)

    if verbose:
        print(f"{len(batches)} batch(es) created.")
    
    processed_docs = [] 
    
    for i, batch in enumerate(batches):
        # print(f"{len(batch.gcs_documents.documents)} files in batch.")
        # print(batch.gcs_documents.documents)

        request = documentai.BatchProcessRequest(
            name=proc_name, 
            input_documents=batch,
            document_output_config=output_config
        )
        
        processed_docs.append(process_batch(client, request, timeout, verbose)) 
        
    return pd.concat(processed_docs)

In [94]:
def process_batch(client: documentai.DocumentProcessorServiceClient, request, timeout, verbose=False):
    
    operation = client.batch_process_documents(request)
    
    # wait until the batch process is complete
    try:
        if verbose:
            print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result(timeout=timeout)
    # Catch exception when operation doesn't finish before timeout
    except (RetryError, InternalServerError) as e:
        print(e.message)
            
    # NOTE: Can also use callbacks for asynchronous processing
    #
    # def my_callback(future):
    #   result = future.result()
    #
    # operation.add_done_callback(my_callback)

    # After the operation is complete,
    # get output document information from operation metadata
    metadata = documentai.BatchProcessMetadata(operation.metadata)

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

    storage_client = storage.Client()
    
    documents = []

    if verbose:
        print("Output files:")
    # One process per Input Document
    for process in list(metadata.individual_process_statuses):
        # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
        # The Cloud Storage API requires the bucket name and URI prefix separately
        matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
        if not matches:
            print(
                "Could not parse output GCS destination:",
                process.output_gcs_destination,
            )
            continue

        output_bucket, output_prefix = matches.groups()

        # Get List of Document Objects from the Output Bucket
        output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)

        # Document AI may output multiple JSON files per source file
        current_doc_fields = {}
        doc_ID = None
        for blob in output_blobs:
            # Document AI should only output JSON files to GCS
            if blob.content_type != "application/json":
                print(
                    f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
                )
                continue
                
            if doc_ID is None:
                doc_ID = blob.name.split("/")[-1].split(".")[0].split("-")[0]

            # Download JSON File as bytes object and convert to Document Object
            if verbose:
                print(f"Fetching {blob.name}")
            document = documentai.Document.from_json(
                blob.download_as_bytes(), ignore_unknown_fields=True
            )
            
            fields = get_fields(document)
            current_doc_fields.update(fields)
            
        documents.append(pd.DataFrame(current_doc_fields, index=[doc_ID]))

    return pd.concat(documents)

In [97]:
processed = extract_from_pdfs(
        gcs_bucket_name="odjfs-test",
        gcs_prefix="test/quarantine",
        batch_size=1,
        field_mask="entities"
)
processed

1 batch(es) created.
Waiting for operation projects/406890213804/locations/us/operations/7742598543856842495 to complete...
Output files:
Fetching output/7742598543856842495/0/200979-0.json


Unnamed: 0,Begin-Time,End-Time
200979,9:00 AM,11:30 AM


In [93]:
processed

Unnamed: 0,End-Time,Moderate-Risk,Program-Name,Rules-Verified-Non-Compliance,Serious-Risk,Full-Time-Infant,Older-Toddler-Full-Time,Older-Toddler-Part-Time,Older-Toddler-Total,Part-Time-Infant,...,Inspection-Notice,Fire-Inspection-Approval-Date,Inspection-Scope,Maximum-Under-2,Inspection-Date,Food-Service-Risk-Level,Reviewer,Begin-Time,Rules-Verified,Building-Approval-Date
100361-0,1:58 PM,0.0,"A 2 Z PRESCHOOL, INC.",2.0,0.0,11.0,10.0,0.0,10.0,0.0,...,Unannounced,03/19/2024,Full,26,12/10/2024,Level II,JENNIFER COPE,10:20 AM,58.0,
104554-0,6:14 PM,0.0,A PLACE TO LEARN AND GROW\nINCORPORATED,7.0,0.0,9.0,14.0,0.0,14.0,0.0,...,Unannounced,12/29/2022,Full,40,10/11/2023,Level III,MICHELE FAKAN,11:35 AM,58.0,08/15/2001
107064-0,3:00 PM,0.0,A PLACE FOR EVERYONE LLC,12.0,0.0,5.0,3.0,0.0,3.0,0.0,...,Unannounced,12/21/2023,Full,½,12/10/2024,Exempt,SARAH HEIL-HINTON,11:10 AM,58.0,10/02/2009
107193-0,3:45 PM,1.0,A BEAUTIFUL CHILD'S PRESCHOOL &\nDAYCARE,5.0,0.0,1.0,0.0,0.0,0.0,0.0,...,Unannounced,11/23/2016,Full,0,12/05/2024,Level III,BETH RAGLE,9:45 AM,58.0,10/05/2015
200644-0,11:45 AM,0.0,A GREAT START PRESCHOOL INC,0.0,0.0,0.0,0.0,30.0,30.0,0.0,...,Unannounced,08/28/2024,Full,½,11/21/2024,Exempt,BRENDA MEYER,10:00 AM,58.0,07/15/2024
200979-0,11:30 AM,,,,,,,,,,...,,,,,,,,9:00 AM,,


In [46]:
def norm_text(text: str): 
    """ Removes spaces and newline characters. """ 
    return (
        text
        .strip()
        .replace("\n", " ")
        .lower()
    )

In [57]:
processed[1]

IndexError: list index out of range

In [51]:
document = processed[0]

In [77]:
def get_fields(document):
    fields = {}
    
    for ent in document.entities:

        # process the table (listed as grouped fields/entities 
        if ent.properties:
            fields.update({prop.type_: prop.mention_text for prop in ent.properties}) 
        else:
            fields[ent.type_] = ent.mention_text
            
    return fields

# get_fields(processed)

In [44]:
document.entities[1].properties

[]

In [42]:
document.entities[0].properties

[text_anchor {
  text_segments {
    start_index: 1442
    end_index: 1444
  }
}
type_: "Full-Time-Infant"
mention_text: "11"
confidence: 1
page_anchor {
  page_refs {
    bounding_poly {
      normalized_vertices {
        x: 0.452218443
        y: 0.738901079
      }
      normalized_vertices {
        x: 0.473265082
        y: 0.738901079
      }
      normalized_vertices {
        x: 0.473265082
        y: 0.747252762
      }
      normalized_vertices {
        x: 0.452218443
        y: 0.747252762
      }
    }
  }
}
id: "1"
normalized_value {
  text: "11"
}
, text_anchor {
  text_segments {
    start_index: 1519
    end_index: 1521
  }
}
type_: "Older-Toddler-Full-Time"
mention_text: "10"
confidence: 1
page_anchor {
  page_refs {
    bounding_poly {
      normalized_vertices {
        x: 0.452218443
        y: 0.792967
      }
      normalized_vertices {
        x: 0.473833889
        y: 0.792967
      }
      normalized_vertices {
        x: 0.473833889
        y: 0.800879121
  

In [39]:
pd.DataFrame({ent.type_: ent.mention_text for ent in document.entities if ent.mention_text})

ValueError: If using all scalar values, you must pass an index

In [37]:
document.entities[1].type_

'Food-Service-Risk-Level'

In [35]:
document.entities[1].mention_text

'Level II'

In [193]:
document.pages[0].tables[0].header_rows[0].cells[0].layout.text_anchor

text_segments {
  start_index: 1349
  end_index: 1359
}

In [204]:
print(len(document.text))

9561


In [260]:
def table_to_dataframe(document: documentai.Document, page_index: int, table_index: int) -> pd.DataFrame:
    """
    Convert the specified table (page_index, table_index) of a Document
    into a pandas DataFrame. Attempts to use the first header row as columns.
    """
    # Safety checks
    if page_index >= len(document.pages):
        raise IndexError(f"Page index {page_index} is out of range.")
    page = document.pages[page_index]

    if table_index >= len(page.tables):
        raise IndexError(f"Table index {table_index} is out of range for page {page_index}.")
    table = page.tables[table_index]

    # Helper to extract text from a cell using text anchors
    def get_cell_text(cell: documentai.Document.Page.Table.TableCell) -> str:
        segments = cell.layout.text_anchor.text_segments
        # If multiple segments, concatenate them
        text_parts = []
        print(segments[0])
        for seg in segments:
            print(document.text[seg.start_index:seg.end_index].strip())
            print()
            text_parts.append(document.text[seg.start_index:seg.end_index])
        return "".join(text_parts).strip()

    # Extract column headers from the first header row (if it exists)
    if table.header_rows:
        header_row = table.header_rows[0]
        columns = [get_cell_text(cell) for cell in header_row.cells]
        print(columns)
    else:
        # Fallback: If no header rows, create generic column names 
        # based on the first body row (if available)
        if not table.body_rows or not table.body_rows[0].cells:
            # Empty table or no cells at all
            return pd.DataFrame()
        num_cols = len(table.body_rows[0].cells)
        columns = [f"col_{i+1}" for i in range(num_cols)]

    print("process rows")

    # Extract body rows
    rows_data = []
    for row in table.body_rows:
        row_values = [get_cell_text(cell) for cell in row.cells]
        rows_data.append(row_values)

    # Build the DataFrame
    df = pd.DataFrame(rows_data, columns=columns)
    return df


def extract_first_4_pages_tables(document: documentai.Document) -> list[pd.DataFrame]:
    """
    For each page in the first 4 pages of a Document, extract every table
    as a pandas DataFrame. Return a list of all DataFrames found.
    """
    dataframes = []
    # Only go up to 4 or the total number of pages, whichever is smaller
    max_pages = min(1, len(document.pages))

    for page_idx in range(max_pages):
        page = document.pages[page_idx]
        print(f"Processing Page {page_idx + 1}/{len(document.pages)}: found {len(page.tables)} table(s).")
        
        for table_idx, _ in enumerate(page.tables):
            df = table_to_dataframe(document, page_idx, 1)
            dataframes.append(df)
            break

    return dataframes

stop = 3
tables = extract_first_4_pages_tables(document)
tables[0]

Processing Page 1/5: found 5 table(s).
start_index: 1035
end_index: 1218

Inspection Type
Inspection Information
Inspection Scope
Full
Inspection Notice
Unannounced
Annual
Begin Time 2:40 PM
End Time 5:00 PM
Inspection Date
10/25/2023
Reviewer:
SULYN ROMER

Summary of Findings

['Inspection Type\nInspection Information\nInspection Scope\nFull\nInspection Notice\nUnannounced\nAnnual\nBegin Time 2:40 PM\nEnd Time 5:00 PM\nInspection Date\n10/25/2023\nReviewer:\nSULYN ROMER\nSummary of Findings']
process rows


Unnamed: 0,Inspection Type\nInspection Information\nInspection Scope\nFull\nInspection Notice\nUnannounced\nAnnual\nBegin Time 2:40 PM\nEnd Time 5:00 PM\nInspection Date\n10/25/2023\nReviewer:\nSULYN ROMER\nSummary of Findings


Processing Page 1/5: found 5 table(s).
Processing Page 2/5: found 1 table(s).
Processing Page 3/5: found 0 table(s).
Processing Page 4/5: found 0 table(s).


In [226]:
tables[2]

Unnamed: 0,No. Rules Verified\nNo. Rules with Non-compliances No. Serious Risk\n11\n0\nNo. Moderate Risk\nNo. Low Risk\n58\n1\n11


In [263]:
pd.read_csv("data/pdf_links_final.csv")['pdf']

0       https://childcaresearch.ohio.gov//pdf/00219002...
1       https://childcaresearch.ohio.gov//pdf/00219002...
2       https://childcaresearch.ohio.gov//pdf/00000030...
3       https://childcaresearch.ohio.gov//pdf/00219001...
4       https://childcaresearch.ohio.gov//pdf/00000020...
                              ...                        
3741    https://childcaresearch.ohio.gov//pdf/00000050...
3742    https://childcaresearch.ohio.gov//pdf/00000020...
3743    https://childcaresearch.ohio.gov//pdf/00000020...
3744    https://childcaresearch.ohio.gov//pdf/00000020...
3745    https://childcaresearch.ohio.gov//pdf/00000020...
Name: pdf, Length: 3746, dtype: object