## Setup

In [None]:
import sys
import os
from pathlib import Path
wdir = os.path.abspath('../../../../')
sys.path.append(os.path.join(wdir, 'samples/python/modules/')) # Import local modules

from IPython.display import display
import os
from dotenv import dotenv_values
import base64
import io
import json
from openai import AzureOpenAI
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from concurrent.futures import ThreadPoolExecutor
from pdf2image import convert_from_bytes

from samples.app_settings import AppSettings
from samples.utils.stopwatch import Stopwatch

from samples.models.classification import Classifications
from samples.utils.document_intelligence_result_parser import parse_document_fields
from samples.utils.custom_json_encoder import CustomJsonEncoder

In [None]:
app_settings = AppSettings(dotenv_values(Path(os.path.join(wdir, '.env'))))
scenario_path = Path(os.path.join(wdir, 'samples/python/scenarios/us_tax/'))
scenario_name = "us-tax-1040-extraction"

# Configure the default credential for accessing Azure services using Azure CLI credentials
credential = DefaultAzureCredential(
    exclude_workload_identity_credential=True,
    exclude_developer_cli_credential=True,
    exclude_environment_credential=True,
    exclude_managed_identity_credential=True,
    exclude_powershell_credential=True,
    exclude_shared_token_cache_credential=True,
    exclude_interactive_browser_credential=True
)

openai_token_provider = get_bearer_token_provider(credential, 'https://cognitiveservices.azure.com/.default')

openai_client = AzureOpenAI(
    azure_endpoint=app_settings.azure_openai_endpoint,
    azure_ad_token_provider=openai_token_provider,
    api_version=settings.azure_openai_api_version
)

document_intelligence_client = DocumentIntelligenceClient(
    endpoint=app_settings.azure_ai_services_endpoint,
    credential=credential
)

In [None]:
inputs_path = Path(os.path.join(wdir, 'samples/assets/us_tax/'))
metadata_fname = "us-tax-1040.json" # Change this to the file you want to evaluate
metadata_fpath = Path(os.path.join(inputs_path, metadata_fname))

with open(metadata_fpath, 'r') as f:
    metadata = json.load(f)

pdf_fname = metadata['fname']
pdf_fpath = Path(os.path.join(inputs_path, pdf_fname))

In [None]:
page_images = []

def encode_page(page):
    byte_io = io.BytesIO()
    page.save(byte_io, format='PNG')
    base64_data = base64.b64encode(byte_io.getvalue()).decode('utf-8')
    return {
        "type": "image_url",
        "image_url": {
            "url": f"data:image/png;base64,{base64_data}"
        }
    }

with Stopwatch() as image_stopwatch:
    with open(pdf_fpath, "rb") as f:
        document_bytes = f.read()

    pages = convert_from_bytes(document_bytes)
    
    # Process each page in parallel using multiple processes
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(encode_page, pages))
        page_images.extend(results)

## Classify

In [None]:
classifications = [
    {
        "classification": "US Tax Form 1040",
        "description": "A US individual income tax return form, used to report annual income tax return filed by citizens or residents of the United States.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule 1",
        "description": "A form used to report additional income and adjustments to income.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule 2",
        "description": "A form used to report additional taxes.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule 3",
        "description": "A form used to report nonrefundable credits.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule A",
        "description": "A form used to report itemized deductions for individual income tax returns.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule B",
        "description": "A form used to report interest and ordinary dividends.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule C",
        "description": "A form used to report income or loss from a business operated or a profession practiced as a sole proprietor.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule D",
        "description": "A form used to report capital gains and losses.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule E",
        "description": "A form used to report income or loss from rental real estate, royalties, partnerships, S corporations, estates, trusts, etc.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule EIC",
        "description": "A form used to claim the Earned Income Credit (EIC).",
    },
    {
        "classification": "US Tax Form 1040 - Schedule F",
        "description": "A form used to report profit or loss from farming.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule H",
        "description": "A form used to report household employment taxes.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule J",
        "description": "A form used to figure the tax on a qualified farmer's or fisherman's income.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule R",
        "description": "A form used to claim the Credit for the Elderly or the Disabled.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule SE",
        "description": "A form used to calculate self-employment tax.",
    },
    {
        "classification": "US Tax Form 1040 - Schedule 8812",
        "description": "A form used to claim the Child Tax Credit and Credit for Other Dependents.",
    }
]

In [None]:
classification_model_map = {
    "US Tax Form 1040": "prebuilt-tax.us.1040",
    "US Tax Form 1040 - Schedule 1": "prebuilt-tax.us.1040Schedule1",
    "US Tax Form 1040 - Schedule 2": "prebuilt-tax.us.1040Schedule2",
    "US Tax Form 1040 - Schedule 3": "prebuilt-tax.us.1040Schedule3",
    "US Tax Form 1040 - Schedule A": "prebuilt-tax.us.1040ScheduleA",
    "US Tax Form 1040 - Schedule B": "prebuilt-tax.us.1040ScheduleB",
    "US Tax Form 1040 - Schedule C": "prebuilt-tax.us.1040ScheduleC",
    "US Tax Form 1040 - Schedule D": "prebuilt-tax.us.1040ScheduleD",
    "US Tax Form 1040 - Schedule E": "prebuilt-tax.us.1040ScheduleE",
    "US Tax Form 1040 - Schedule EIC": "prebuilt-tax.us.1040ScheduleEIC",
    "US Tax Form 1040 - Schedule F": "prebuilt-tax.us.1040ScheduleF",
    "US Tax Form 1040 - Schedule H": "prebuilt-tax.us.1040ScheduleH",
    "US Tax Form 1040 - Schedule J": "prebuilt-tax.us.1040ScheduleJ",
    "US Tax Form 1040 - Schedule R": "prebuilt-tax.us.1040ScheduleR",
    "US Tax Form 1040 - Schedule SE": "prebuilt-tax.us.1040ScheduleSE",
    "US Tax Form 1040 - Schedule 8812": "prebuilt-tax.us.1040Schedule8812"
}

In [None]:
classify_system_prompt = f"""You are an AI assistant that helps detect the boundaries of sub-section or sub-documents using the provided classifications.

## Classifications

{json.dumps(classifications)}
"""

In [None]:
classify_user_content = []

In [None]:
classify_user_prompt = f"""Classify documents that are a US Tax Form in the provided page images.
- A single classification may span multiple page images.
- A single page image may contain multiple classifications.
- If a page image does not contain a classification, ignore it."""

classify_user_content.append({
    "type": "text",
    "text": classify_user_prompt
})

# for each of the page images, add a text content block with the page index, and then another block with the image
for i, page_image in enumerate(page_images):
    classify_user_content.append({
        "type": "text",
        "text": f"Page {i + 1}:"
    })
    classify_user_content.append(page_image)

In [None]:
with Stopwatch() as classify_stopwatch:
    classify_completion = openai_client.beta.chat.completions.parse(
        model=app_settings.azure_openai_chat_deployment,
        messages=[
            {
                "role": "system",
                "content": classify_system_prompt,
            },
            {
                "role": "user",
                "content": classify_user_content
            }
        ],
        response_format=Classifications,
        max_tokens=4096,
        temperature=0.1,
        top_p=0.1,
        logprobs=True # Enabled to determine the confidence of the response.
    )

In [None]:
document_classifications = classify_completion.choices[0].message.parsed

In [None]:
# DEBUG: Display the document classifications
print(document_classifications.model_dump_json(indent=2))

In [None]:
# For each document classification, group the page images together
classified_images = []
classified_images_pil = []

for classification in document_classifications.classifications:
    image_range_start = classification.image_range_start
    image_range_end = classification.image_range_end

    classified_images.append(page_images[image_range_start - 1:image_range_end])
    classified_images_pil.append(pages[image_range_start - 1:image_range_end])

## Extract

In [None]:
for i, images in enumerate(classified_images):
    classification = document_classifications.classifications[i]
    
    display(f"Classification {i} - {classification.classification}")
    
    images_pil = classified_images_pil[i]
    
    pdf_bytes = io.BytesIO()
    images_pil[0].save(pdf_bytes, format='PDF', resolution=100.0, save_all=True, append_images=images_pil[1:])
    pdf_bytes.seek(0)
    
    # DEBUG: Save the PDF to a temporary file
    pdf_fpath = Path(os.path.join(scenario_path, f"{pdf_fname}_{classification.classification}.pdf"))
    with open(pdf_fpath, "wb") as f:
        f.write(pdf_bytes.read())
    pdf_bytes.seek(0)
    
    # Use the classification of the document to determine the Azure AI Document Intelligence model to use
    model_id = classification_model_map[classification.classification]
        
    with Stopwatch() as di_stopwatch:
        poller = document_intelligence_client.begin_analyze_document(
            model_id=model_id,
            body=pdf_bytes,
            content_type="application/pdf"
        )
        
        result: AnalyzeResult = poller.result()

        doc_result = result.documents[0].fields
            
    # DEBUG: Display the document intelligence content
    doc_result_json = json.dumps(parse_document_fields(doc_result), indent=2)
    print(doc_result_json)
    
    # DEBUG: Save both the original doc_result and the parsed doc_result to a file
    di_result_fpath = Path(os.path.join(scenario_path, f"{pdf_fname}_{classification.classification}_original.json"))
    with open(di_result_fpath, "w") as f:
        json.dump(doc_result, f, indent=2, cls=CustomJsonEncoder)
        
    di_result_parsed_fpath = Path(os.path.join(scenario_path, f"{pdf_fname}_{classification.classification}_parsed.json"))
    with open(di_result_parsed_fpath, "w") as f:
        json.dump(parse_document_fields(doc_result), f, indent=2)