In [None]:
import config
import os
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest, ContentFormat, AnalyzeResult
from azure.storage.blob import BlobServiceClient
from config import AZURE_STORAGE_CONNECTION_STRING, AZURE_DOC_INTEL_ENDPOINT, AZURE_DOC_INTEL_KEY
import base64
from datetime import datetime
from config import AZURE_OPENAI_KEY, AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_DEPLOYMENT
from openai import AzureOpenAI
import json

prebuilt_layout_model = "prebuilt-layout"

# Constants
AZURE_OPENAI_TEMP = 0
AZURE_OPENAI_MAX_TOKENS = 2500
RESULTS_DIR = "results"

# Initialize Azure OpenAI client
client = AzureOpenAI(azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key=AZURE_OPENAI_KEY, api_version="2024-08-01-preview")


In [None]:
def analyze_document(local_file_path, prebuilt_model="prebuilt-layout"):
    document_intelligence_client = DocumentIntelligenceClient(endpoint=AZURE_DOC_INTEL_ENDPOINT, credential=AzureKeyCredential(AZURE_DOC_INTEL_KEY))
    try:
        with open(local_file_path, 'rb') as f:
            poller = document_intelligence_client.begin_analyze_document(
                prebuilt_model, analyze_request=f, content_type="application/octet-stream"
            )
            result = poller.result()
    except Exception as e:
        return f"Failed to analyze document: {e}"

    markdown_lines = []
    if result.styles is None:
        # Handle the None case, e.g., log an error or return an empty list
        print("Result.styles is None")
        #return markdown_lines
    else:
        for idx, style in enumerate(result.styles):
            print("Result.styles is not None")
            #markdown_lines.append(
            #    f"Document contains {'handwritten' if style.is_handwritten else 'no handwritten'} content"
            #)

    for page in result.pages:
        for line_idx, line in enumerate(page.lines):
            markdown_lines.append(
                f"...Line # {line_idx}: '{line.content}'"
            )
        #if page.selection_marks is not None:
        #    for selection_mark in page.selection_marks:
        #        markdown_lines.append(
        #            f"...Selection mark is '{selection_mark.state}' and has a confidence of {selection_mark.confidence}"
        #        )
    if result.tables is not None:
        for table_idx, table in enumerate(result.tables):
            markdown_lines.append(
                f"Table # {table_idx} has {table.row_count} rows and {table.column_count} columns"
            )

            for cell in table.cells:
                markdown_lines.append(
                    f"...Cell[{cell.row_index}][{cell.column_index}] has content '{cell.content}'"
                )

    markdown_lines.append("----------------------------------------")
    return "\n".join(markdown_lines)
    #return result.content

In [None]:
# Get the current working directory
current_dir = os.getcwd()

# Define the data folder path
data_folder = os.path.join(current_dir, 'data', 'new_data')

# List all files in the data folder
files = os.listdir(data_folder)

# Filter out image files (assuming jpg and png formats)
image_files = [f for f in files if f.endswith(('.jpg', '.png'))]

# Output the array of image files
print(f"Image files: {image_files}")

In [None]:
check_results = []

for image_file in image_files:
    check_result = analyze_document(os.path.join(data_folder, image_file), prebuilt_layout_model)
    check_results.append({
        "image_url": os.path.join(data_folder, image_file),
        "result": check_result
    })

    # save the result for each check to a JSON file in the results directory (create if it doesn't exist). Save as a subdirectory 'ocr_results'
    results_dir = os.path.join(current_dir, RESULTS_DIR, "ocr_results")
    os.makedirs(results_dir, exist_ok=True)
    result_file_path = os.path.join(results_dir, f"{os.path.splitext(image_file)[0]}_ocr_result.json")
    with open(result_file_path, 'w') as result_file:
        result_file.write(check_result)  # Write the JSON result to the file
    
    print(f"Results for {image_file}:\n{check_result}\n")

# Output the array of check results
print(check_results)

In [None]:
from logprobs_handler_custom import LogprobsHandler

# Initialize the LogprobsHandler
logprobs_handler = LogprobsHandler()

def encode_image(image_path):
    """Encode image to base64."""
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')
    
def extract_key_fields_with_gpt(image_path: str, doc_intel_result: str):
    """Extract key marked up and filled out fields and checkboxes from an image of a check."""
    base64_image = encode_image(image_path)
    
    # First prompt to extract key fields
    response1 = client.chat.completions.create(
        model=AZURE_OPENAI_DEPLOYMENT,
        response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": "You are an assistant responsible for extracting key fields from the image of a check, with the assistance of an OCR tool. Return the output in the specified JSON format."},
            {"role": "user", "content": [
                {"type": "text", "text": 
                 f"""Extract the following fields from the image of the check: \
                 - CheckDate: Date the check was written. Date should be returned in the format MM/DD/YYYY. \
                 - Payee: Name of the person or entity the check is made out to \
                 - Amount: Amount check was paid out for, return as a 2-decimal number (verify using both numberAmount and wordAmount) \
                 - MICR (4 digits): Check number, usually at the bottom of the check \
                 
                 In order to extract the fields reliably, follow the following steps: \
                 1. Understand the OCR output and extract the key fields from the OCR markdown output. \
                 2. Verify the extracted OCR fields against the image to ensure accuracy. \
                    Note: Sometimes the image contains additional detail alongside the check, so please ensure you only look at the check. \
                 3. Fix any inconsistencies in the field values, and double check with the OCR output. \
                 4. Return the extracted fields in the specified JSON format. \
                 
                 OCR Markdown Output: \
                 {doc_intel_result} \
                                  
                 Output JSON Schema: \
                 file_name: Name of the file \
                 fields: array of objects containing the following fields: \
                    - field_name: Name of the field \
                    - field_value: Value of the field \
                
                 Return result below: \
                 -------------------------------------------"""},
                {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
            ]}
        ],
        temperature=AZURE_OPENAI_TEMP,
        max_tokens=AZURE_OPENAI_MAX_TOKENS,
        logprobs=True
    )
    
    gpt_extracted_fields = response1.choices[0].message.content
    
    # Extract the log probabilities from the response
    response_logprobs = response1.choices[0].logprobs.content if hasattr(response1.choices[0], 'logprobs') else []

    # Format the logprobs
    logprobs_formatted = logprobs_handler.format_logprobs(response_logprobs)
    #print(logprobs_formatted)

    paired_probs = logprobs_handler.calculate_words_probas(logprobs_formatted)
    gpt_extracted_fields = logprobs_handler.calculate_confidence_scores(paired_probs)
    
    return gpt_extracted_fields

In [None]:
# Iterate through the check results and run the results through the extract_key_fields_with_gpt function
for check in check_results:
    image_path = check["image_url"]
    doc_intel_result = check["result"]
    gpt_result = extract_key_fields_with_gpt(image_path, doc_intel_result)
    print(f"gpt_result for {image_path}: {gpt_result}")
    # add filename to the JSON result
    gpt_result_dict = gpt_result
    gpt_result_dict["file_name"] = os.path.basename(image_path)
    gpt_result = json.dumps(gpt_result_dict)
    
    # Save the result for each check to a JSON file in the results directory (create if it doesn't exist). Save as a subdirectory 'gpt_results'
    results_dir = os.path.join(current_dir, RESULTS_DIR, 'gpt_results')
    os.makedirs(results_dir, exist_ok=True)
    result_file_path = os.path.join(results_dir, f"{os.path.splitext(os.path.basename(image_path))[0]}_gpt_result.json")
    
    with open(result_file_path, 'w') as result_file:
        result_file.write(gpt_result)  # Write the JSON result to the file
    
    print(f"GPT result for {image_path}:\n{gpt_result}\n")

In [None]:
import pandas as pd
import os
import json

# Define the results directory
results_dir = os.path.join(current_dir, RESULTS_DIR)

# Initialize an empty list to store the data
data = []

# Function to extract data from JSON files
def extract_data_from_json(file_path, result_type):
    with open(file_path, 'r') as file:
        result = json.load(file)
        fields = {field["field_name"]: field["field_value"] for field in result["fields"]}
        confidences = {f"{field['field_name']}_confidence": field.get("field_confidence", None) for field in result["fields"]}
        fields.update(confidences)
        fields["file_name"] = result["file_name"]
        fields["result_type"] = result_type
        return fields

# Iterate through the GPT results
gpt_results_dir = os.path.join(results_dir, "gpt_results")
for json_file in os.listdir(gpt_results_dir):
    if json_file.endswith(".json"):
        file_path = os.path.join(gpt_results_dir, json_file)
        data.append(extract_data_from_json(file_path, "gpt_result"))

# Create a DataFrame from the data
df = pd.DataFrame(data)

# Define the desired columns
columns = ["file_name", "CheckDate", "CheckDate_confidence", "Payee", "Payee_confidence", "Amount", "Amount_confidence", "MICR", "MICR_confidence", "result_type"]

# Reorder the DataFrame columns
df = df[columns]

# Save the DataFrame to a CSV file
csv_file_path = os.path.join(results_dir, "results_summary.csv")
df.to_csv(csv_file_path, index=False)

print(f"CSV file generated at: {csv_file_path}")
