# Document classifier

> Based on the following documentaion 
> https://learn.microsoft.com/en-us/python/api/overview/azure/ai-documentintelligence-readme?view=azure-python
> https://github.com/szetinglau/CustomClassifier
> https://techcommunity.microsoft.com/blog/azure-ai-services-blog/building-a-document-intelligence-custom-classification-model-with-the-python-sdk/4104233

## Setup
1. Update the inputs as needed
1. Create a new Resource Group (tried in US Gov Virginia and US East)
1. In the resource group
    1. Create a new Multi-Service AI Account
    1. A new Storage Account (HOT LRS No Purge Protection) - For demo only
1. Assign the following roles
    * Cognitive Services Data Contributor - User
    * Cognitive Services User roles - User
    * Blob Data Owner for both the - User and the Document intellegence
    * Owner on the storage account - User
1. Login to azure using `az login`
1. Prepare the local documents training data as specified in [this link](https://techcommunity.microsoft.com/blog/azure-ai-services-blog/building-a-document-intelligence-custom-classification-model-with-the-python-sdk/4104233)

> For Government use `az cloud set --name AzureUSGovernment`. To return to MAC `az cloud set --name AzureCloud` 
 

In [None]:
# Get classifier ID RUN ONCE ONLY 

# this will create a new classifier in the service save it once you create it

import uuid
CLASSIFIER_ID = str(uuid.uuid4())
BASE_CLASSIFIER_ID = None
print(f"CLASSIFIER_ID: {CLASSIFIER_ID}")

# set inputs

In [61]:
from azure.identity import DefaultAzureCredential, AzureAuthorityHosts, InteractiveBrowserCredential
from azure.core.credentials import AzureKeyCredential
container_name = "trainingdata"
storage_account_name = ""
doc_intel_service_name = ""
local_directory = "" # relative directory with no ending slash
CLASSIFIER_DESCRIPTION = ""
TESTING_DOCUMENTS=  ""

doc_intel_key = None # STATIC -- Do not change
authority = AzureAuthorityHosts.AZURE_PUBLIC_CLOUD # STATIC -- Do not change
storage_postfix = "core.windows.net" # STATIC -- Do not change
doc_intel_service_postfix = "cognitiveservices.azure.com" # STATIC -- Do not change
API_TYPE = "documentClassifiers" # STATIC -- Do not change
API_VERSION = "2024-11-30" # STATIC -- Do not change
#uncomment for Gov Cloud
#doc intellegence key only needed for gov cloud
#doc_intel_key = "" 

#storage_postfix = "core.usgovcloudapi.net" # STATIC
#authority = AzureAuthorityHosts.AZURE_GOVERNMENT # STATIC
#doc_intel_service_postfix = "cognitiveservices.azure.us" # STATIC
#doc_intel_cred = AzureKeyCredential(doc_intel_key) # STATIC -- Do not change
#end uncomment for gov cloud

In [None]:
# Import Required Libraries
import logging, json, os, time
from requests import post, get
from datetime import datetime, timezone, timedelta
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, ContainerSasPermissions, generate_container_sas
from azure.ai.documentintelligence import DocumentIntelligenceClient, DocumentIntelligenceAdministrationClient
from azure.ai.documentintelligence.models import (
                AzureBlobFileListContentSource,
                ClassifierDocumentTypeDetails,
                BuildDocumentClassifierRequest,
            )

from azure.ai.documentintelligence.models import AnalyzeResult
from azure.core.exceptions import HttpResponseError


doc_intel_endpoint = f"https://{doc_intel_service_name}.{doc_intel_service_postfix}/"
print(f"doc_intel_endpoint: {doc_intel_endpoint}")

    
# Create the credential object
# Requires the Cognitive Services Data Reader/Contributor and Cognitive Services User roles
credential = DefaultAzureCredential(authority=authority)
if (doc_intel_key is None):
    doc_intel_cred = credential

# Output the current user's login name
#interactive_credential = DefaultAzureCredential(authority=authority)
user_info = credential.get_token("https://management.azure.com/.default")
print(f"Logged in as: {user_info.token}")

# Analyze Files

This will take a while for a lot of documents

In [None]:
def analyze_layout():
# [START analyze_layout]
    document_intelligence_client = DocumentIntelligenceClient(
        endpoint=doc_intel_endpoint, credential=doc_intel_cred, audience=f"https://{doc_intel_service_postfix}"
    )
    # Create arrays to store the incompatible files
    incompatible_files = []
    # Iterate through files in the local directory and analyze each document
    for root, dirs, files in os.walk(local_directory):
        for dir in dirs:
            dir_path = os.path.join(root, dir)
            for file in os.listdir(dir_path):
                document_file_path = os.path.join(dir_path, file)
                if not file.endswith((".json", ".jsonl")):
                    print(f"Analyzing document in {document_file_path}")
                    ocr_json_file_path = document_file_path + ".ocr.json"
                    try:
                        with open(document_file_path, "rb") as f:
                            # Use begin_analyze_document to start the analysis process, and use a callback in order to recieve the raw response
                            poller = document_intelligence_client.begin_analyze_document(
                                "prebuilt-layout", body=f, content_type="application/octet-stream", cls=lambda raw_response, _, headers: create_ocr_json(ocr_json_file_path, raw_response)
                            )
                    except HttpResponseError as error:
                        print(f"Analysis of {file} failed: {error.error}\n\nSkipping to next file...")
                        incompatible_files.append(document_file_path)
                        break 
                    result = poller.result()
# [END analyze_layout]

    # Print the list of incompatible files
    if len(incompatible_files) > 0:
        print("\nThe following files were skipped as they are corrupted or the format is unsupported:")
        for file in incompatible_files:
            print(f"\t{file}")
        print("Please visit the following link for more information on supported file types and sizes. \nhttps://learn.microsoft.com/en-us/azure/ai-services/document-intelligence/concept-custom-classifier?view=doc-intel-4.0.0#input-requirements")
    
    print("Batch layout analysis completed!")

def create_ocr_json(ocr_json_file_path, raw_response):
# [START create_ocr_json]
    with open(ocr_json_file_path, "w", encoding="utf-8") as f:
        f.write(raw_response.http_response.body().decode("utf-8"))
        print(f"\tOutput saved to {ocr_json_file_path}")
# [END create_ocr_json]

print("Batch layout analysis started...")
analyze_layout()
print("Batch layout analysis completed!")

# Create the Container if it doesn't exist

In [None]:


print("Starting to create container")
# Define the connection string and container name for Azure Blob Storage



blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.{storage_postfix}", credential=credential)



# Create the container if it doesn't exist
container_client = blob_service_client.get_container_client(container_name)
if not container_client.exists():
    container_client.create_container()
    print(f"Created container {container_name}")

print("Done Creating container")

# Upload Data to Storage Account
Python cell to upload the data into a storage account.

In [None]:
print("Start Upload Blob")

def upload_documents():
# [START upload_documents]
    # Create arrays to store the incompatible files
    incompatible_files = []

    # List all files in the local directory
    for root, dirs, files in os.walk(local_directory):
        for dir in dirs:
            jsonl_data = []
            dir_path = os.path.join(root, dir)
            for file in os.listdir(dir_path):
                local_file_path = os.path.join(dir_path, file)
                ocr_json_file_path = local_file_path + ".ocr.json"
                if ( file.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".heif", ".pdf", ".docx", ".xlsx", ".pptx")) 
                    and os.path.isfile(ocr_json_file_path)):
                        upload_file_to_blob(local_file_path, jsonl_data)
                        upload_file_to_blob(ocr_json_file_path)
                elif not file.endswith((".ocr.json", ".jsonl")):
                    incompatible_files.append(local_file_path)

            # Write the .jsonl file as long as there are at least 5 training files per document type
            if len(jsonl_data) >= 5:
                jsonl_file_path = os.path.join(local_directory, f"{dir}.jsonl")
                print(f"Getting {jsonl_file_path}")
                with open(jsonl_file_path, "w") as f:
                    for item in jsonl_data:
                        f.write(json.dumps(item) + "\n")
                
                upload_file_to_blob(jsonl_file_path)

    # Print the list of incompatible files
    if len(incompatible_files) > 0:
        print("\nThe following files are not of a supported file type, missing a corresponding .ocr.json file, or both:")
        for local_file_path in incompatible_files:
            print(f"\t{local_file_path}")
        print("Please ensure you run analyze_layout.py to create .ocr.json files before uploading documents. \nVisit the following link for more information on supported file types and sizes. \nhttps://learn.microsoft.com/en-us/azure/ai-services/document-intelligence/concept-custom-classifier?view=doc-intel-4.0.0#input-requirements")
    
    print("Batch upload completed!")
# [END upload_documents]
    
def upload_file_to_blob(local_file_path, jsonl_data=None):
# [START upload_file_to_blob]
    blob_name = os.path.relpath(local_file_path, local_directory).replace("\\", "/")
    if jsonl_data is not None:
        jsonl_data.append({"file": f"{blob_name}"})
    blob_client = container_client.get_blob_client(blob_name)
    # Upload the file to Azure Blob Storage
    with open(local_file_path, "rb") as data:
        blob_client.upload_blob(data, overwrite=True)
    print(f"Uploaded {local_file_path} to {blob_name} in container {container_name}")
# [END upload_file_to_blob]

upload_documents()
print("End Upload Blob")

# Build Classifier

Build the document classifier

In [None]:
def build_classifier():
# [START build_classifier]
    base_classifier_id = BASE_CLASSIFIER_ID
    classifier_description = CLASSIFIER_DESCRIPTION
    document_model_admin_client = create_clients()
    container_sas_url = create_container_sas_url(container_client)

    poller = document_model_admin_client.begin_build_classifier(
        BuildDocumentClassifierRequest(
            classifier_id=CLASSIFIER_ID,
            base_classifier_id=base_classifier_id,
            description=classifier_description,
            doc_types= get_doctypes(container_client, container_sas_url),
        )
    )
    result = poller.result()
    print_classifier_results(result)
# [END build_classifier]

def create_clients():
# [START create_clients]
    endpoint = doc_intel_endpoint
    document_model_admin_client = DocumentIntelligenceAdministrationClient(endpoint=endpoint, 
                                                                           credential=doc_intel_cred, 
                                                                           audience=f"https://{doc_intel_service_postfix}")
    return document_model_admin_client
# [END create_clients]


def create_container_sas_url(container_client):
# [START create_container_sas_url]
    # Define the SAS token permissions
    sas_permissions=ContainerSasPermissions(read=True, list=True)

    # Define the expiry time and start time for the SAS token
    start_time = datetime.now(timezone.utc) - timedelta(minutes=1)
    expiry_time = datetime.now(timezone.utc) + timedelta(minutes=15)

    # Generate the container SAS token
    # container_sas_token = generate_container_sas(
    #    container_client.account_name,
    #    container_client.container_name,
    #    account_key=container_client.credential.account_key,
    #    permission=sas_permissions,
    #    expiry=expiry_time,
    #    start=start_time,
    #)
    # Create the container sas URL by appending the token to the container url
    #container_sas_url = f"{container_client.url}?{container_sas_token}"
    container_sas_url = f"{container_client.url}"

    return container_sas_url
# [END create_container_sas_url]

def get_doctypes(container_client, container_sas_url):
# [START get_doctypes]
    doc_types = {}
    doc_types_list = []

    blob_list = container_client.walk_blobs()
    for blob in blob_list:
        if blob.name.endswith(".jsonl"):
            doc_type = os.path.splitext(blob.name)[0]
            doc_types_list.append(doc_type)

    for doc_type in doc_types_list:
        doc_types[doc_type] = ClassifierDocumentTypeDetails(
            azure_blob_file_list_source=AzureBlobFileListContentSource(
                container_url=container_sas_url, 
                file_list=f"{doc_type}.jsonl"
            )
        )
    return doc_types
# [END get_doctypes]

def print_classifier_results(result):
# [START print_classifier_results]
    print(f"Classifier ID: {result.classifier_id}")
    print(f"API version used to build the classifier model: {result.api_version}")
    print(f"Classifier description: {result.description}")
    print(f"Document classes used for training the model:")
    for doc_type in result.doc_types.items():
        print(f"Document type: {doc_type}")
        \
# [START print_classifier_results]

build_classifier()

# Classify a local document

In [None]:
def classify_document(classifier_id, doc_path):
    # [START classify_document]

    endpoint = doc_intel_endpoint
    classifier_id = classifier_id

    document_intelligence_client = DocumentIntelligenceClient(endpoint=endpoint, 
                                                              credential=doc_intel_cred, 
                                                              audience=f"https://{doc_intel_service_postfix}")
    with open(doc_path, "rb") as f:
        poller = document_intelligence_client.begin_classify_document(
            classifier_id, body=f, content_type="application/pdf"
        )
    result: AnalyzeResult = poller.result()

    print("----Classified documents----")
    if result.documents:
        for doc in result.documents:
            if doc.bounding_regions:
                print(
                    f"Found document of type '{doc.doc_type or 'N/A'}' with a confidence of {doc.confidence} contained on "
                    f"the following pages: {[region.page_number for region in doc.bounding_regions]}"
                )
    # [END classify_document]

try:

    for document in os.listdir(TESTING_DOCUMENTS):
        doc_path = os.path.join(TESTING_DOCUMENTS, document)
        print(f"Classifying document {document}...")
        request = classify_document(CLASSIFIER_ID, doc_path)

        
except HttpResponseError as error:
    # Examples of how to check an HttpResponseError
    # Check by error code:
    if error.error is not None:
        if error.error.code == "InvalidImage":
            print(f"Received an invalid image error: {error.error}")
        if error.error.code == "InvalidRequest":
            print(f"Received an invalid request error: {error.error}")
        # Raise the error again after printing it
        raise
    # If the inner error is None and then it is possible to check the message to get more information:
    if "Invalid request".casefold() in error.message.casefold():
        print(f"Uh-oh! Seems there was an invalid request: {error}")
    # Raise the error again
    raise