# DocAI Document Processing Pipeline

* Author: docai-incubator@google.com

## Disclaimer

This tool is not supported by the Google engineering team or product team. It is provided and supported on a best-effort basis by the **DocAI COS Team**. No guarantees of performance are implied.

## Objective

This tool helps to:
- Automate Document AI processing based on the number of pages in PDF documents. This includes both online and batch processing.
- Implement a queue for concurrent batch processing using a first-in, first-out (FIFO) approach.
- Monitor batches for their completion and trigger new batch processing as needed.
- Handle quota hit issues in online processing by automatically switching to batch processing for affected documents.
- Manage failed documents separately, ensuring they are processed successfully or flagged for manual review.

## Prerequisites
* GCP Project ID with billing setup
* DocumentAI Processor ID
* Cloud Storage(GCS)
* Cloud Firestore
* Cloud Functions
* Cloud Pub/Sub
* Service Account with following permissions:
    * Cloud Datastore User
    * Cloud Run Invoker
    * Document AI API User
    * Pub/Sub Editor
    * Storage Object User


## Overview

* This tool provides a DocAI processing pipeline designed for both synchronous and asynchronous (batch) processing of documents. 
* The pipeline begins with a setup.sh script that initializes necessary GCP resources, including Firestore indexes and a Pub/Sub topic. 
* Documents are loaded into a queue_collection via an HTTP trigger. Documents with 15 or fewer pages are processed synchronously. Larger documents, or those exceeding quota limitations for synchronous processing, are marked for batch processing. 
* A batch process is triggered manually or automatically upon completion of an asynchronous task by publishing a message to the document-processing-trigger topic.
* The process_batch cloud function then processes documents in batches (FIFO), updating document statuses within the queue. 
* Both sync and batch processes handle failures by copying failed documents to a separate folder for retry, and both update the queue_collection to reflect processing status. 
* This system combines flexibility for smaller documents with efficient batch processing for larger workloads, ensuring robust and scalable document processing.


### Flowchart

<img src="images/docai-processing-pipeline-flowchart.png" alt="docai-processing-pipeline-flowchart.png" width="50%">

### Step by Step Procedure

### 1. Install dependencies

In [None]:
!pip install json requests

### 2. Create required variables

##### Configuration Variables for Document AI Processing Pipeline

* <span style="color: green">PROJECT_ID: </span>
       The Google Cloud Project ID where resources will be deployed.
* <span style="color: green">SERVICE_ACCOUNT: </span>
       The service account email with necessary IAM permissions. Must have permissions for Cloud Functions, Document AI, GCS, and Pub/Sub.
* <span style="color: green">FIRESTORE_COLLECTION: </span>
       Name of the Firestore collection used for queue management. This collection will store document processing status and metadata.
* <span style="color: green">PUBSUB_TOPIC_PROCESS: </span>
       Name of the Pub/Sub topic for triggering batch processing. Will be created if it doesn't exist.
* <span style="color: green">REGION: </span>
       The GCP region where Cloud Functions will be deployed. Must be a valid GCP region with Cloud Functions support.
* <span style="color: green">PROCESSOR_LOCATION: </span>
       Location of the Document AI processor. Valid values are "us" or "eu".
* <span style="color: green">PROCESSOR_ID: </span>
       The ID of your Document AI processor. Can be found in the Document AI console.
* <span style="color: green">MAX_CONCURRENT_BATCHES: </span>
       Maximum number of batch processes that can run simultaneously. Controls pipeline throughput and resource usage.
* <span style="color: green">INPUT_MIME_TYPE: </span>
       MIME type of the input documents. Must match the document type supported by your Document AI processor.
* <span style="color: green">GCS_OUTPUT_BUCKET: </span>
       Name of the GCS bucket for processed outputs. Do not include "gs://" prefix.
* <span style="color: green">GCS_OUTPUT_PREFIX: </span>
       Folder path prefix within the output bucket. Do not include bucket name or "gs://" prefix.
* <span style="color: green">GCS_FAILED_FILES_BUCKET: </span>
       Name of the GCS bucket for failed documents. Do not include "gs://" prefix.
* <span style="color: green">GCS_FAILED_FILES_PREFIX: </span>
       Folder path prefix for failed files. Do not include bucket name or "gs://" prefix.

### Note:
* All bucket names should be globally unique
* Service account must have appropriate IAM roles assigned
* All GCS paths should end with a forward slash (/)
* Review and update all values before deployment

In [2]:
# TO-DO: Change the variables values before run the next cells
PROJECT_ID = "your-project-id"
SERVICE_ACCOUNT = "your-service-account"
FIRESTORE_COLLECTION = "queue_collection"
PUBSUB_TOPIC_PROCESS = "document-processing-trigger"
REGION = "us-central1"
PROCESSOR_LOCATION = "us"
PROCESSOR_ID = "your-processor-id"
MAX_CONCURRENT_BATCHES = 5
INPUT_MIME_TYPE = "application/pdf"
GCS_OUTPUT_BUCKET = "output-bucket-name"
GCS_OUTPUT_PREFIX = "some/folder/"
GCS_FAILED_FILES_BUCKET = "bucket-name"
GCS_FAILED_FILES_PREFIX = "some/folder/failed/"

### 3. Create indexes for Firestore

In [None]:
!gcloud firestore indexes composite create --file "src/firestore.indexes.json"

### 4. Create Pub/Sub topics

In [None]:
!gcloud pubsub topics create {PUBSUB_TOPIC_PROCESS} --project={PROJECT_ID}

### 5. Deploy Cloud Functions

### Deploy load_queue cloud function

In [None]:
!gcloud functions deploy load_queue \
    --gen2 \
    --runtime python311 \
    --trigger-http \
    --allow-unauthenticated \
    --region {REGION} \
    --source load_queue_cf \
    --set-env-vars PROJECT_ID={PROJECT_ID},PROCESSOR_LOCATION={PROCESSOR_LOCATION},PROCESSOR_ID={PROCESSOR_ID},FIRESTORE_COLLECTION={FIRESTORE_COLLECTION},PUBSUB_TOPIC_PROCESS={PUBSUB_TOPIC_PROCESS},INPUT_MIME_TYPE={INPUT_MIME_TYPE},GCS_OUTPUT_BUCKET={GCS_OUTPUT_BUCKET},GCS_OUTPUT_PREFIX={GCS_OUTPUT_PREFIX},GCS_FAILED_FILES_BUCKET={GCS_FAILED_FILES_BUCKET},GCS_FAILED_FILES_PREFIX={GCS_FAILED_FILES_PREFIX} \
    --service-account {SERVICE_ACCOUNT} \
    --run-service-account {SERVICE_ACCOUNT} \
    --entry-point src/load_queue \
    --memory 512MB

### Deploy process_batch cloud function

In [None]:
!gcloud functions deploy process_batch \
    --gen2 \
    --runtime python311 \
    --trigger-topic {PUBSUB_TOPIC_PROCESS} \
    --entry-point process_batch_documents \
    --set-env-vars MAX_CONCURRENT_BATCHES={MAX_CONCURRENT_BATCHES},PROJECT_ID={PROJECT_ID},PROCESSOR_LOCATION={PROCESSOR_LOCATION},PROCESSOR_ID={PROCESSOR_ID},FIRESTORE_COLLECTION={FIRESTORE_COLLECTION},PUBSUB_TOPIC_PROCESS={PUBSUB_TOPIC_PROCESS},INPUT_MIME_TYPE={INPUT_MIME_TYPE},GCS_OUTPUT_BUCKET={GCS_OUTPUT_BUCKET},GCS_OUTPUT_PREFIX={GCS_OUTPUT_PREFIX},GCS_FAILED_FILES_BUCKET={GCS_FAILED_FILES_BUCKET},GCS_FAILED_FILES_PREFIX={GCS_FAILED_FILES_PREFIX} \
    --region {REGION} \
    --source src/process_batch_cf \
    --service-account {SERVICE_ACCOUNT} \
    --run-service-account {SERVICE_ACCOUNT} \
    --memory 512MB

### 6. Trigger Document processing 

In [None]:
## NOTE: Currently code considers that batch is triggered for 1 file at a time for each file in the provided file/folder paths

# TO-DO: Update the "file_paths" in the below payload to inlcude the files/folders to be processed. It is a list of GCS files/folders to process. Starts with gs://

# First get the auth token
auth_token = !gcloud auth print-identity-token
token = auth_token[0]  # Get the actual token string

# Construct the URL
url = f"https://{REGION}-{PROJECT_ID}.cloudfunctions.net/load_queue_test"

# Define the payload
payload = {
    "file_paths": [
        "gs://example-test-bucket/folder/subfolder/",
        "gs://example-bucket/folder/test-file.pdf",
    ]
}

In [None]:
# Using Python's requests library is more reliable than curl in Jupyter

import requests
import json

headers = {"Authorization": f"bearer {token}", "Content-Type": "application/json"}

response = requests.post(url, headers=headers, json=payload, timeout=70)

print(f"Status Code: {response.status_code}")
print("Response:")
print(
    json.dumps(response.json(), indent=2)
    if response.status_code == 200
    else response.text
)

### 7. Output

The processed document JSONs will be stored in <span style="color: green">GCS_OUTPUT_PREFIX</span> inside <span style="color: green">GCS_OUTPUT_BUCKET</span> bucket