In [None]:
%pip install azure-ai-formrecognizer --quiet

from pathlib import Path
from datetime import datetime
import re

from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


In [None]:
DOC_INTEL_ENDPOINT = "https://<your-docint-name>.cognitiveservices.azure.com/"
DOC_INTEL_KEY = "<your-key>"

client = DocumentAnalysisClient(
    endpoint=DOC_INTEL_ENDPOINT,
    credential=AzureKeyCredential(DOC_INTEL_KEY)
)

INPUT_FOLDER = Path("/lakehouse/default/Files/incoming_documents")

In [None]:
# helper doc classifier

def classify_document(file_path):

    with open(file_path, "rb") as f:
        poller = client.begin_analyze_document(
            model_id="prebuilt-read",
            document=f
        )
    result = poller.result()

    text = ""
    for page in result.pages:
        for line in page.lines:
            text += line.content + " "

    text_lower = text.lower()

    if "form 1040" in text_lower:
        return "1040", "prebuilt-tax.us.1040"

    if "1099" in text_lower:
        return "1099", "prebuilt-tax.us.1099"

    if "driver license" in text_lower or "driver's license" in text_lower:
        return "license", "prebuilt-idDocument"

    return "unknown", "prebuilt-read"


In [None]:
# extraction loop

records = []

for file in INPUT_FOLDER.iterdir():

    if file.suffix.lower() not in [".png", ".jpg", ".jpeg", ".pdf"]:
        continue

    document_id = file.stem

    print(f"Processing: {file.name}")

    
    doc_type, model_id = classify_document(file)

    print(f"Detected type: {doc_type} -> {model_id}")

    
    with open(file, "rb") as f:
        poller = client.begin_analyze_document(
            model_id=model_id,
            document=f
        )
    result = poller.result()

    
    for doc in result.documents:

        for field_name, field in doc.fields.items():

            value = None
            if field.value is not None:
                value = str(field.value)
            else:
                value = field.content

            records.append({
                "document_id": document_id,
                "document_type": doc_type,
                "field_name": field_name,
                "field_value": value,
                "field_type": str(field.value_type),
                "confidence_score": float(field.confidence),
                "extracted_at": datetime.utcnow().isoformat()
            })


In [None]:
df = spark.createDataFrame(records)
df.show(truncate=False)

In [None]:
df.write.mode("overwrite").format("delta").saveAsTable("document_fields")

In [None]:
# log table
docs = df.select("document_id", "document_type").distinct()
docs.write.mode("overwrite").format("delta").saveAsTable("document_registry")
