# Document AI OCR (async)

This notebook shows you how to do OCR on documents using the Google Cloud DocumentAI API asynchronously. For the asynchronous request the GCS URI or GCS bucket with prefix will be send for one or more documents and an operation ID is returned. The operation status will be checked until the result is available. The response is then visualized showing the preprocessed (e.g. rotated) image together with bounding boxes for block, paragraph, line and token.

## Set your Processor Variables 

In [None]:
PROJECT_ID = "YOUR_GCP_PROJECT_ID"
LOCATION = "eu"  # Format is 'us' or 'eu'
PROCESSOR_ID = "YOUR_DOCAI_PROCESSOR_ID"  # Create OCR processor in Cloud Console

# check supported file types at https://cloud.google.com/document-ai/docs/processors-list#processor_doc-ocr
SUPPORTED_FILE_TYPES = ["PDF", "TIF", "TIFF", "GIF", "JPG", "JPEG", "PNG", "BMP", "WEBP"]

# Sample invoices are stored in gs://cloud-samples-data/documentai/async_invoices/
GCS_INPUT_BUCKET = 'cloud-samples-data'
GCS_INPUT_PREFIX = 'documentai'

# The output bucket will be created if it does not exist
GCS_OUTPUT_BUCKET = PROJECT_ID + '-dai-temp'
GCS_OUTPUT_PREFIX = 'output'

TIMEOUT = 300

## Setup

In [None]:
# Install necessary Python libraries and restart your kernel after.
!pip install --quiet -r ../requirements.txt

In [None]:
from google.cloud import documentai_v1 as documentai
from google.cloud import storage

from PIL import Image, ImageDraw

from typing import List
import mimetypes
import io
import os
import re
import numpy as np
from enum import Enum
import pandas as pd

In [None]:
class FeatureType(Enum):
    PAGE = 1
    BLOCK = 2
    PARA = 3
    LINE = 4
    TOKEN = 5

In [None]:
def batch_process(gcs_output_uri: str, gcs_input_uris: List[str] = [], gcs_input_uri_prefix: str = "", skip_human_review: bool = False) -> dict:
    """Asynchronous (batch) process documents using REST API.
    
    Processes documents stored on Google Cloud Storage (GCS) either by list of GCS URIs or GCS URI Prefix and returns operation status.
    Optionally allows to skip human review if enabled for the processor.
    See details at
    https://cloud.google.com/document-ai/docs/reference/rest/v1/projects.locations.processors/batchProcess
    
    Args:
        gcs_output_uri: GCS URI to save output JSON to (e.g. 'gs://bucket/output').
        gcs_input_uris: List of GCS URIs (e.g. ['gs://bucket1/file1.jpg','gs://bucket2/file2.pdf'])
        gcs_input_uri_prefix: GCS URI Prefix (e.g. 'gs://bucket/prefix') to be checked for supported files.
        skip_human_review: Optional; Whether Human Review feature should be skipped for this request. Default to false.
        
    Returns:
        An operation reflecting the status of the batch process operation.
        See details at
        https://cloud.google.com/document-ai/docs/reference/rest/v1/projects.locations.operations#Operation
    """
     
    # Instantiate a Document AI client
    client_options = {"api_endpoint": f"{LOCATION}-documentai.googleapis.com"}
    client = documentai.DocumentProcessorServiceClient(client_options = client_options)
    
    # Instantiate a Storage Client
    storage_client = storage.Client()
    
    input_config = None
    
    if len(gcs_input_uris) > 0:
        documents = []
        for gcs_input_uri in gcs_input_uris:
            if not gcs_input_uri.startswith("gs://"):
                raise Exception(f"gcs_input_uri {gcs_input_uri} missing gs:// prefix.")
            
            mime_type = mimetypes.guess_type(gcs_input_uri)[0]
            if not mime_type:
                raise Exception(f"MIME type of gcs_input_uri {gcs_input_uri_prefix} could not be guessed from file extension.")
            
            document = {"gcs_uri": gcs_input_uri, "mime_type": mime_type}
            documents.append(document)
            
        gcs_documents = documentai.GcsDocuments(documents=documents)
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    elif gcs_input_uri_prefix:
        if not gcs_input_uri_prefix.startswith("gs://"):
            raise Exception(f"gcs_input_uri_prefix {gcs_input_uri_prefix} missing gs:// prefix.")
        gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix = gcs_input_uri_prefix)
        input_config = documentai.BatchDocumentsInputConfig(gcs_prefix = gcs_prefix)
    else:
        raise Exception("Neither gcs_input_uris nor gcs_input_uri_prefix specified.")
    
    output_bucket = storage.Bucket(client = storage_client, name = GCS_OUTPUT_BUCKET)
    if not output_bucket.exists():
        print(f"Bucket {GCS_OUTPUT_BUCKET} does not exist, creating it in location {LOCATION}.")
        output_bucket.create(location = LOCATION)
    
    # The full resource name of the processor, e.g.:
    # projects/project-id/locations/location/processor/processor-id
    name = f"projects/{PROJECT_ID}/locations/{LOCATION}/processors/{PROCESSOR_ID}"
    
    # Where to write results
    destination_uri = f"gs://{GCS_OUTPUT_BUCKET}/{GCS_OUTPUT_PREFIX}/"
    output_config = documentai.DocumentOutputConfig(
        gcs_output_config={"gcs_uri": destination_uri}
    )
    
    # Batch Process document
    batch_process_request = documentai.types.document_processor_service.BatchProcessRequest(
        name=name,
        input_documents=input_config,
        document_output_config=output_config,
    )
    
    return client.batch_process_documents(request = batch_process_request)

In [None]:
def get_page_bounds(page, feature):
    # [START vision_document_text_tutorial_detect_bounds]
    """Returns document bounds given the OCR output page."""

    bounds = []

    # Collect specified feature bounds by enumerating all document features
    if (feature == FeatureType.BLOCK):
        for block in page.blocks:
            if not block.layout.bounding_poly.vertices:
                block.layout.bounding_poly.vertices = []
                for normalized_vertice in block.layout.bounding_poly.normalized_vertices:
                    block.layout.bounding_poly.vertices.append(documentai.Vertex(x=int(normalized_vertice.x * page.image.width),y=int(normalized_vertice.y * page.image.height)))
            bounds.append(block.layout.bounding_poly)
    if (feature == FeatureType.PARA):
        for paragraph in page.paragraphs:
            if not paragraph.layout.bounding_poly.vertices:
                paragraph.layout.bounding_poly.vertices = []
                for normalized_vertice in paragraph.layout.bounding_poly.normalized_vertices:
                    paragraph.layout.bounding_poly.vertices.append(documentai.Vertex(x=int(normalized_vertice.x * page.image.width),y=int(normalized_vertice.y * page.image.height)))
            bounds.append(paragraph.layout.bounding_poly)
    if (feature == FeatureType.LINE):        
        for line in page.lines:
            if not line.layout.bounding_poly.vertices:
                line.layout.bounding_poly.vertices = []
                for normalized_vertice in line.layout.bounding_poly.normalized_vertices:
                    line.layout.bounding_poly.vertices.append(documentai.Vertex(x=int(normalized_vertice.x * page.image.width),y=int(normalized_vertice.y * page.image.height)))
            bounds.append(line.layout.bounding_poly)
    if (feature == FeatureType.TOKEN):        
        for token in page.tokens:
            if not token.layout.bounding_poly.vertices:
                token.layout.bounding_poly.vertices = []
                for normalized_vertice in token.layout.bounding_poly.normalized_vertices:
                    token.layout.bounding_poly.vertices.append(documentai.Vertex(x=int(normalized_vertice.x * page.image.width),y=int(normalized_vertice.y * page.image.height)))
            bounds.append(token.layout.bounding_poly)


    # The list `bounds` contains the coordinates of the bounding boxes.
    # [END vision_document_text_tutorial_detect_bounds]
    return bounds

In [None]:
def draw_boxes(image, bounds, color, width):
    """Draw a border around the image using the hints in the vector list."""
    draw = ImageDraw.Draw(image)

    for bound in bounds:
        points = (
            (bound.vertices[0].x, bound.vertices[0].y),
            (bound.vertices[1].x, bound.vertices[1].y),
            (bound.vertices[2].x, bound.vertices[2].y),
            (bound.vertices[3].x, bound.vertices[3].y),
            (bound.vertices[0].x, bound.vertices[0].y)
        )
        draw.line(points,fill=color,width=width,joint='curve')
    return image

In [None]:
def render_doc_text(page):  
    image = Image.open(io.BytesIO(page.image.content))
    
    # this will draw the bounding boxes for block, paragraph, line and token
    bounds = get_page_bounds(page, FeatureType.BLOCK)
    draw_boxes(image, bounds, color='blue', width=8)
    bounds = get_page_bounds(page, FeatureType.PARA)
    draw_boxes(image, bounds, color='red',width=6)
    bounds = get_page_bounds(page, FeatureType.LINE)
    draw_boxes(image, bounds, color='yellow',width=4)
    bounds = get_page_bounds(page, FeatureType.TOKEN)
    draw_boxes(image, bounds, color='green',width=2)
        
    image.show()
    
    # uncomment if you want to save the image with bounding boxes locally
    #image.save(document.name)

In [None]:
# transforming the image should not be necessary and requires opencv which has a lot of dependencies

def transform_image(page):
    # only install depedencies when necessary
    !sudo apt -qq install -y python3-opencv
    !pip install --quiet opencv-python 

    import cv2

    img_stream = io.BytesIO(page.image.content)
    img = cv2.imdecode(np.frombuffer(img_stream.read(), np.uint8), 1)

    matrix = None    
    data = page.transforms[0].data
    rows = page.transforms[0].rows
    cols = page.transforms[0].cols
    
    if page.transforms[0].type_ == 0:
        matrix = np.reshape(np.frombuffer(data, np.uint8), (rows, cols))
    elif page.transforms[0].type_ == 1:
        matrix = np.reshape(np.frombuffer(data, np.int8), (rows, cols))
    elif page.transforms[0].type_ == 2:
        matrix = np.reshape(np.frombuffer(data, np.uint16), (rows, cols))
    elif page.transforms[0].type_ == 3:
        matrix = np.reshape(np.frombuffer(data, np.int16), (rows, cols))
    elif page.transforms[0].type_ == 4:
        matrix = np.reshape(np.frombuffer(data, np.int32), (rows, cols))
    elif page.transforms[0].type_ == 5:
        matrix = np.reshape(np.frombuffer(data, np.float32), (rows, cols))
    elif page.transforms[0].type_ == 6:
        matrix = np.reshape(np.frombuffer(data, np.float64), (rows, cols))
    elif page.transforms[0].type_ == 7:
        matrix = np.reshape(np.frombuffer(data, np.float16), (rows, cols))
    
    # TODO: check rows and cols and implement warpPerspective for 3x3, throw error if not 2x3 or 3x3
    
    # the scale factor is required as the transformed image will be larger than the source image
    scale_factor = 3
    
    if rows == 2 and cols == 3:
        transformed_img = cv2.warpAffine(img, matrix, (page.image.width * scale_factor, page.image.height * scale_factor))
    elif rows == 3 and cols == 3:
        transformed_img = cv2.warpPerspective(img, matrix, (page.image.width * scale_factor, page.image.height * scale_factor))
        
    # trim image and remove black border
    gray = cv2.cvtColor(transformed_img,cv2.COLOR_BGR2GRAY)
    _,thresh = cv2.threshold(gray,1,255,cv2.THRESH_BINARY)
    contours,hierarchy = cv2.findContours(thresh,cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)
    cnt = contours[0]
    x,y,w,h = cv2.boundingRect(cnt)
    transformed_img = transformed_img[y:y+h,x:x+w]
    
    content = cv2.imencode('.jpg', transformed_img)[1].tobytes()
    height, width, _ = transformed_img.shape
    page.image = documentai.Document.Page.Image(content=content, mime_type=page.image.mime_type, width=width, height=height)
    return page

### Process documents asynchronously

In [None]:
def batch_process_gcs_samples():
    gcs_input_uri_prefix = f"gs://{GCS_INPUT_BUCKET}/{GCS_INPUT_PREFIX}"
    gcs_output_uri = f"gs://{GCS_OUTPUT_BUCKET}/{GCS_OUTPUT_PREFIX}"
    print(f"Processing all documents at {gcs_input_uri_prefix}")
    operation = batch_process(gcs_input_uri_prefix = gcs_input_uri_prefix, gcs_output_uri = gcs_output_uri)
    
    operation_id = operation._operation.name.split('/')[-1]
    
    print(f"Operation ID: {operation_id}")
    
    # Wait for the operation to finish
    operation.result(timeout=TIMEOUT)
            
    # Instantiate a Storage Client
    storage_client = storage.Client()

    bucket = storage.Bucket(client = storage_client, name = GCS_OUTPUT_BUCKET)
    blob_list = list(bucket.list_blobs(prefix=f"{GCS_OUTPUT_PREFIX}/{operation_id}"))

    for i, blob in enumerate(blob_list):
        # If JSON file, download the contents of this blob as a bytes object.
        if ".json" in blob.name:
            blob_as_bytes = blob.download_as_string()

            document = documentai.types.Document.from_json(blob_as_bytes)

            # For a full list of Document object attributes, please reference this page:
            # https://cloud.google.com/document-ai/docs/reference/rpc/google.cloud.documentai.v1#document
            
            for page in document.pages:
                # TODO: remove once b/196544985 is fixed
                if page.transforms:
                    page = transform_image(page)

                print(f"Rendering file {blob.name} - Page {page.page_number}/{len(document.pages)}")
                render_doc_text(page=page)
       
            #remove blob, uncomment if you want to inspect the output json
            print(f"Deleting object {blob.name}")
            blob.delete()

In [None]:
batch_process_gcs_samples()