# Challenge 03: Data Modelling: From Retrieval to Upload

In this step, we will structure the data retrieved from Azure Document Intelligence (ADI) into the right format to be read by our systems in subsequent steps. 

The data will be outputted from the ADI as a JSON file, and it is our role to process and organize it. Some of the data will be structured into tables, while other data will be formatted as text. This step ensures that the extracted information is organized in a meaningful way for further analysis and usage.

As stated before, we need to make sure that our Function will know how to process:
- **Loan Forms:** Extract relevant details such as borrower information, loan amounts, and terms.
- **Loan Contract:** Identify and parse key contract elements like clauses, signatures, and dates.
- **Pay Stubs:** Retrieve data such as employee details, earnings, deductions, and net pay.

Not all customers will have provided all types of content, and during this Challenge we will be only be processing one file. We will combine in the next challenge the capabilities of a trigger, which will, at a time, also process one single document.

Due to the nature of this challenge, we will separate this challenge in the 3 different types of documents.

## house loan

In [1]:
import os
from dotenv import find_dotenv, load_dotenv
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
from datetime import datetime, timedelta
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeDocumentRequest
from azure.ai.formrecognizer import DocumentAnalysisClient as OldDocumentIntelligenceClient, AnalyzeResult as OldAnalyzeResult
import httpx
import json

In [4]:
from azure.core.exceptions import ResourceExistsError
data_folder = "data_houseloan"
container_name = "data"
connection_string = os.getenv("STORAGE_CONNECTION_STRING")

# Ensure the connection string, data folder, and container name are not None
if connection_string is None:
    raise ValueError("The connection string environment variable is not set.")
if data_folder is None:
    raise ValueError("The data folder environment variable is not set.")
if container_name is None:
    raise ValueError("The container name environment variable is not set.")

# Ensure the data folder exists
if not os.path.isdir(data_folder):
    raise FileNotFoundError(f"The specified data folder does not exist: {data_folder}")

# Create a BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Check if the container exists, and create it if it does not
container_client = blob_service_client.get_container_client(container_name)
try:
    container_client.create_container()
    print(f"Container '{container_name}' created.")
except ResourceExistsError:
    print(f"Container '{container_name}' already exists.")

# Upload files in the data folder and its subdirectories to the blob container
for root, dirs, files in os.walk(data_folder):
    for filename in files:
        file_path = os.path.join(root, filename)
        if os.path.isfile(file_path):
            # Create a blob path that maintains the directory structure
            blob_path = os.path.relpath(file_path, data_folder).replace("\\", "/")
            blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_path)
            with open(file_path, "rb") as data:
                blob_client.upload_blob(data, overwrite=True)
            print(f"Uploaded {blob_path} to blob storage.")

Container 'data' already exists.
Uploaded houseloan/houseloan.pdf to blob storage.


In [5]:
def generate_sas_url(blob_service_client, container_name, blob_name, expiry_hours=1):
    """
    Generate a SAS URL for a blob in Azure Blob Storage.

    :param blob_service_client: BlobServiceClient instance
    :param container_name: Name of the container
    :param blob_name: Name of the blob
    :param expiry_hours: Expiry time in hours for the SAS token
    :return: SAS URL for the blob
    """
    sas_token = generate_blob_sas(
        account_name=blob_service_client.account_name,
        container_name=container_name,
        blob_name=blob_name,
        account_key=blob_service_client.credential.account_key,
        permission=BlobSasPermissions(read=True),
        expiry=datetime.utcnow() + timedelta(hours=expiry_hours)
    )

    sas_url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
    return sas_url

In [6]:
def get_words(page, line):
    result = []
    for word in page.words:
        if _in_span(word, line.spans):
            result.append(word)
    return result

def _in_span(word, spans):
    for span in spans:
        if word.span.offset >= span.offset and (word.span.offset + word.span.length) <= (span.offset + span.length):
            return True
    return False

In [8]:
load_dotenv()

endpoint = os.getenv("DOC_AI_ENDPOINT")
api_key = os.getenv("DOC_AI_KEY")
   
if not endpoint or not isinstance(endpoint, str):
    raise ValueError("The DOCUMENTINTELLIGENCE_ENDPOINT environment variable is not set or is not a string.")
if not api_key or not isinstance(api_key, str):
    raise ValueError("The DOCUMENTINTELLIGENCE_API_KEY environment variable is not set or is not a string.")

def analyze_layout(sas_url):
    document_intelligence_client = OldDocumentIntelligenceClient(
        endpoint=endpoint, credential=AzureKeyCredential(api_key)
    )

    poller = document_intelligence_client.begin_analyze_document(
        "prebuilt-layout", httpx.Client().get(sas_url).read()
    )

    result: OldAnalyzeResult = poller.result()

    analysis_result = {
        "handwritten": any([style.is_handwritten for style in result.styles]) if result.styles else False,
        "pages": [],
        "tables": []
    }

    for page in result.pages:
        page_info = {
            "page_number": page.page_number,
            "width": page.width,
            "height": page.height,
            "unit": page.unit,
            "lines": [],
            "selection_marks": []
        }

        if page.lines:
            for line in page.lines:
                line_info = {
                    "text": line.content,
                    "polygon": line.polygon,
                    "words": [{"content": word.content, "confidence": word.confidence} for word in get_words(page, line)]
                }
                page_info["lines"].append(line_info)

        if page.selection_marks:
            for selection_mark in page.selection_marks:
                selection_mark_info = {
                    "state": selection_mark.state,
                    "polygon": selection_mark.polygon,
                    "confidence": selection_mark.confidence
                }
                page_info["selection_marks"].append(selection_mark_info)

        analysis_result["pages"].append(page_info)

    if result.tables:
        for table in result.tables:
            table_info = {
                "row_count": table.row_count,
                "column_count": table.column_count,
                "bounding_regions": [{"page_number": region.page_number, "polygon": region.polygon} for region in table.bounding_regions] if table.bounding_regions else [],
                "cells": [{"row_index": cell.row_index, "column_index": cell.column_index, "content": cell.content, "bounding_regions": [{"page_number": region.page_number, "polygon": region.polygon} for region in cell.bounding_regions] if cell.bounding_regions else []} for cell in table.cells]
            }
            analysis_result["tables"].append(table_info)

    return analysis_result

In [9]:
def save_analysis_results(blob_service_client, container_name, blob_name, analysis_results):
    if analysis_results is None:
        print(f"No analysis results for {blob_name}. Skipping save.")
        return

    # Define the name for the results file
    results_blob_name = f"{blob_name}_results.json"

    # Convert the analysis results to JSON
    results_json = json.dumps(analysis_results, indent=4)

    # Upload the results to the blob
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=results_blob_name)
    blob_client.upload_blob(results_json, overwrite=True)

    print(f"Saved analysis results to {results_blob_name}")

In [None]:
load_dotenv()

if __name__ == "__main__":
    # Retrieve the connection string and container name from the environment variables
    connection_string = os.getenv('STORAGE_CONNECTION_STRING')
    container_name = "data"

    # Ensure the connection string is not None
    if connection_string is None:
        raise ValueError("The connection string environment variable is not set.")

    # Create a BlobServiceClient
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    # List all blobs in the container
    blob_list = blob_service_client.get_container_client(container_name).list_blobs()

    # Iterate over each blob
    for blob in blob_list:
        blob_name = blob.name
        print(f"Processing blob: {blob_name}")

        # Ensure the file format is supported
        supported_formats = ['.pdf', '.jpeg', '.jpg', '.png', '.tiff']
        if not any(blob_name.lower().endswith(ext) for ext in supported_formats):
            print(f"Skipping unsupported file format: {blob_name}")
            continue

        # Generate the SAS URL
        sas_url = generate_sas_url(blob_service_client, container_name, blob_name)
        print(f"Generated SAS URL: {sas_url}")

        # Call the analyze_layout function with the SAS URL
        analysis_results = analyze_layout(sas_url)

        # Save the analysis results
        save_analysis_results(blob_service_client, container_name, blob_name, analysis_results)

START ANALYSIS FROM JSON

In [10]:
import os
import json
import pandas as pd
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv
import re
# Load environment variables from .env file
load_dotenv()

def read_json_files_from_blob(folder_path):
    # Retrieve the connection string from the environment variables
    connection_string = os.getenv('STORAGE_CONNECTION_STRING')

    # Ensure the connection string is not None
    if connection_string is None:
        raise ValueError("The connection string environment variable is not set.")

    # Create a BlobServiceClient
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)

    # Get the container client
    container_client = blob_service_client.get_container_client("data")

    # List all blobs in the specified folder
    blob_list = container_client.list_blobs(name_starts_with=folder_path)

    # Filter out JSON files and read their contents
    for blob in blob_list:
        if blob.name.endswith('.json'):
            blob_client = container_client.get_blob_client(blob.name)
            blob_data = blob_client.download_blob().readall()
            data = json.loads(blob_data)
            return data 

In [12]:
houseloan = read_json_files_from_blob("houseloan") ## RETIRAR PARA ELES PERCEBEREM OQ TAO A FAZE

In [24]:
houseloan.keys

<function dict.keys>

In [42]:
def clean_json_data(json_data):
    # Extract relevant text content from the JSON
    content = []

    # Extract text from paragraphs
    paragraphs = json_data.get("paragraphs", [])
    for paragraph in paragraphs:
        content.append(paragraph.get("content", "").strip())

    # Extract text from pages and lines
    pages = json_data.get("pages", [])
    for page in pages:
        for line in page.get("lines", []):
            content.append(line.get("content", "").strip())

    # Join all text content into a single string with spaces between components
    plain_text_content = " ".join(content)
    print(plain_text_content)
   
    return plain_text_content

In [None]:
houseloan["analyzeResult"]["paragraphs"]

In [43]:
houseloan_unstructured = clean_json_data(houseloan["analyzeResult"])
houseloan_unstructured

Contoso Bank - House Loan Terms and Conditions 1. Introduction These terms and conditions govern the house loans provided by Contoso Bank (referred to as "the Bank") to customers (referred to as "Borrower"). By applying for and accepting a house loan, the Borrower agrees to the terms and conditions outlined herein. 2. Loan Amount and Purpose · The loan is granted exclusively for the purpose of purchasing a residential property, refinancing an existing mortgage, or for approved home improvement projects. . The maximum loan amount will be determined by the Bank based on the Borrower's financial profile, creditworthiness, and property value. 3. Interest Rates · Fixed Rate: The interest rate remains constant throughout the loan term. · Variable Rate: The interest rate may fluctuate based on market conditions and will be tied to a publicly available index. Changes in the interest rate will affect the Borrower's monthly payments. · Interest rates are disclosed at the time of loan approval an

'Contoso Bank - House Loan Terms and Conditions 1. Introduction These terms and conditions govern the house loans provided by Contoso Bank (referred to as "the Bank") to customers (referred to as "Borrower"). By applying for and accepting a house loan, the Borrower agrees to the terms and conditions outlined herein. 2. Loan Amount and Purpose · The loan is granted exclusively for the purpose of purchasing a residential property, refinancing an existing mortgage, or for approved home improvement projects. . The maximum loan amount will be determined by the Bank based on the Borrower\'s financial profile, creditworthiness, and property value. 3. Interest Rates · Fixed Rate: The interest rate remains constant throughout the loan term. · Variable Rate: The interest rate may fluctuate based on market conditions and will be tied to a publicly available index. Changes in the interest rate will affect the Borrower\'s monthly payments. · Interest rates are disclosed at the time of loan approval

In [60]:
from pydantic import BaseModel
from openai import AzureOpenAI

client = AzureOpenAI(
  azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"),
  api_key=os.getenv("AZURE_OPENAI_KEY"),
  api_version= "2024-08-01-preview"
)

class Houseloan(BaseModel):
    introduction: str
    loan_amount_and_purpose: str
    interest_rates: str
    loan_tenure: str
    late_payments: str

completion = client.beta.chat.completions.parse(
    model="gpt-4o", # replace with the model deployment name of your gpt-4o 2024-08-06 deployment
    messages=[
        {"role": "system", "content": "Extract the information about this textual document with sections and content."},
        {"role": "user", "content": houseloan_unstructured},
    ],
    response_format=Houseloan,
)

finaljsonstr = completion.model_dump_json(indent=2)

In [66]:
finaljsonstr


'{\n  "id": "chatcmpl-Ae0F60HU9CbGUoLt1frJgwSy4XDym",\n  "choices": [\n    {\n      "finish_reason": "stop",\n      "index": 0,\n      "logprobs": null,\n      "message": {\n        "content": "{\\"introduction\\":\\"These terms and conditions govern the house loans provided by Contoso Bank to customers. By applying for and accepting a house loan, the Borrower agrees to the terms and conditions outlined herein.\\",\\"loan_amount_and_purpose\\":\\"The loan is granted exclusively for purchasing a residential property, refinancing an existing mortgage, or for approved home improvement projects. The maximum loan amount is determined by the Bank based on the Borrower\'s financial profile, creditworthiness, and property value.\\",\\"interest_rates\\":\\"There are two types: Fixed Rate, which remains constant throughout the loan term; and Variable Rate, which may fluctuate based on market conditions and is tied to a publicly available index. Interest rates are disclosed at the time of loan ap

In [70]:
result = json.loads(json.loads(finaljsonstr)['choices'][0]['message']['content'])

In [71]:
result

{'introduction': 'These terms and conditions govern the house loans provided by Contoso Bank to customers. By applying for and accepting a house loan, the Borrower agrees to the terms and conditions outlined herein.',
 'loan_amount_and_purpose': "The loan is granted exclusively for purchasing a residential property, refinancing an existing mortgage, or for approved home improvement projects. The maximum loan amount is determined by the Bank based on the Borrower's financial profile, creditworthiness, and property value.",
 'interest_rates': 'There are two types: Fixed Rate, which remains constant throughout the loan term; and Variable Rate, which may fluctuate based on market conditions and is tied to a publicly available index. Interest rates are disclosed at the time of loan approval and may change based on prevailing conditions until the loan agreement is signed.',
 'loan_tenure': "Loan terms range from 5 to 30 years, depending on the loan product and Borrower's preference. Early re

### Code

In [73]:
from azure.cosmos import CosmosClient, exceptions, PartitionKey
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Cosmos DB connection details from environment variables
endpoint = os.getenv("COSMOS_ENDPOINT")
key = os.getenv("COSMOS_KEY")

def upload_text_to_cosmos_db(text_content, container_name):
    # Check if the text is empty
    if not text_content:
        print("The text content is empty. No data to upload.")
        return
    
    # Initialize the Cosmos client
    client = CosmosClient(endpoint, key)
    
    try:
        # Create or get the database
        database = client.create_database_if_not_exists(id="ContosoDB")
        
        # Create or get the container
        container = database.create_container_if_not_exists(
            id=container_name,
            partition_key=PartitionKey(path=f"/id"),
            offer_throughput=400
        )
    except exceptions.CosmosHttpResponseError as e:
        print(f"An error occurred while creating the database or container: {e.message}")
        return
    
    # Create a document with the text content and partition key
    document = {
        'id': "houseloan_1",  # Generate a unique ID for the document
        'content': text_content,  # Store the plain text as 'content'
    }
    
    # Upload the document to the container
    try:
        container.create_item(body=document)
        print(f"Text content uploaded successfully with ID '{document['id']}' in Cosmos DB.")
    except exceptions.CosmosHttpResponseError as e:
        print(f"An error occurred while uploading the document: {e.message}")

### Upload houseloan

In [74]:
upload_text_to_cosmos_db(result, "houseloan")

Text content uploaded successfully with ID 'houseloan_1' in Cosmos DB.
