# **Challenge 1: Explore Document Parsing and Chunking**

Chunking helps limit the amount of information we pass into the model. The information that we will pass through are the most relevant chunks from the overall data. There are many considerations that come into play when chunking. For example, you need to figure out the best chunk size. If the chunks are too small, you may lose important context. If the chunks are too big, it may contain unnecessary information.

## Let start the challenge

In [1]:
# Uncomment below line if running the notebook in Azure AI studio or Azure ML studio
#%pip install -r requirements.txt

import os
import re
import html
import requests
import time
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from azure.storage.blob import generate_blob_sas, BlobSasPermissions, BlobServiceClient
from dotenv import load_dotenv, find_dotenv
from pathlib import Path

In [2]:
#Configure environment variables  
load_dotenv(find_dotenv('credential.env'), override=True)

#Azure storage account credentials
connection_string = os.environ['AZURE_BLOB_STORAGE_CONNECTION_STRING']
account_name =  os.environ['AZURE_BLOB_STORAGE_ACCOUNT_NAME']
account_key =  os.environ['AZURE_BLOB_STORAGE_KEY']
container_name =  os.environ['AZURE_BLOB_CONTAINER_NAME']

#Azure Document Intelligence credentials
endpoint = os.environ['AZURE_DOCUMENT_INTELLIGENCE_ENDPOINT']
key= os.environ['AZURE_DOCUMENT_INTELLIGENCE_KEY']

In [3]:
# Declare useful method
def check_and_create_folder(folder_name):
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        print(f"The folder '{folder_name}' has been created.")
    else:
        print(f"The folder '{folder_name}' already exists.")

def print_error_message(message, prefix_message='Error: '):
    print(f"\033[1;31m{prefix_message}\033[0m{message}")

def print_warning_message(message, prefix_message='Warning: '):
    print(f"\033[1;33m{prefix_message}\033[0m{message}")
    
def print_success_message(message, prefix_message='Success: '):
    print(f"\033[1;32m{prefix_message}\033[0m{message}")

### [Step1] Upload documents to Azure stroage account and generate SAS URL

The following functions use Azure storage account SDK to upload and generate shared access signature (SAS) URL to be used as an input to Azure document intelligence. For security best practice, SAS will only valid for 1 hour.

In [4]:
def upload_pdf_to_blobs():
    # Blob connection
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    blob_container = blob_service_client.get_container_client(container_name)
    if not blob_container.exists():
        blob_container.create_container()
    
    # Upload pdf file in pdf folder to blobs
    file_names = []
    for file in Path().glob("pdf_document/*.pdf"):
        blob_container.upload_blob(file.name, file.read_bytes(), overwrite=True)
        file_names.append(file.name)
    return file_names

def get_blob_sas(account_name, account_key, container_name, blob_name):
    sas_blob = generate_blob_sas(account_name=account_name, 
                                container_name=container_name,
                                blob_name=blob_name,
                                account_key=account_key,
                                permission=BlobSasPermissions(read=True),
                                expiry=datetime.now(ZoneInfo('UTC')) + timedelta(hours=1))
    return sas_blob

def get_pdf_file_names_from_blob(container_name, connection_string):
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service_client.get_container_client(container_name)
    
    file_names = []
    blob_list = container_client.list_blobs()

    for blob in blob_list:
        if blob.name.endswith('.pdf'):
            file_names.append(blob.name)
    
    return file_names

In [None]:
pdf_names = []
url_list = []

print_warning_message("Upload documents to Azure storage account and generate SAS URL", ">>>[Step1] ")

#Upload PDF file from local folder "pdf_document" folder to Azure storage account
local_pdf_file_names = upload_pdf_to_blobs()
for pdf in local_pdf_file_names:
    print_success_message(f"{pdf} PDF files uploaded to blob storage {container_name}")

#Get a list of file name in specific blob container in Azure storage account
pdf_names = get_pdf_file_names_from_blob(container_name, connection_string)

#Generate SAS URLs of pdf files in Azure storage account.
for pdf in pdf_names:
    blob_sas = get_blob_sas(account_name, account_key, container_name, pdf)
    url = 'https://'+account_name+'.blob.core.windows.net/'+container_name+'/'+pdf+'?'+blob_sas
    url_list.append(url)

#Print URL list with SAS of each PDF document in blob storage
print_success_message("Generate SAS URLs for all files in "+container_name+" blob container")
url_list

### [Step2] Parsing PDF document using Azure Document Intelligence. 

There are many prebuit model which we can use to parse different format of document. We will explore **prebuilt-read** model in this challenge. It supports various file types including PDF, Microsoft office (DOCX, PPTX, XLSX), HTML and images (JPG, PNG, BMP, TIFF, HEIF)

In [None]:
model_id = 'prebuilt-read'
api_version = '2023-10-31-preview'

# Set the local folder name for document intelligence output
folder_name = "document_intelligence_output"

# Check if the folder exists
check_and_create_folder("document_intelligence_output")

print_warning_message("Parsing PDF document using Azure Document Intelligence using " + model_id + " model. Please wait...", ">>>[Step2] ")

# for index, url in enumerate(url_list):
for index, (name, url) in enumerate(zip(pdf_names, url_list)):
    print(f"Processing document: {name}")
    payload = {
        "urlSource": url
    }
    headers = {
        'Ocp-Apim-Subscription-Key': key
    }

    #Send a request to document intelligence endpoint with API keys and version
    response = requests.post(url=f"{endpoint}documentintelligence/documentModels/{model_id}:analyze?api-version={api_version}", headers=headers, json=payload)
    
    if not response.ok:
        response.raise_for_status()
    
    time.sleep(5)

    #Add delay when processing each file to allow time for SDK finish the previous document processing.
    for sleep_time in [20,40,60,120,240,960]:
    # while (True):
        response_2 = requests.get(response.headers['Operation-Location'], headers=headers)
        rst = response_2.json()

        if rst['status'] == 'succeeded':
            Path(f"document_intelligence_output/{index}.json").write_text(json.dumps(rst))
            print_success_message("", "Success!")
            break
        else:
            time.sleep(sleep_time)
    else:
        print_error_message("Failed time out")

#### [Check Point] Take a moment to view the output of document parsing under folder "document_intelligence_output"

### [Step3] Break down JSON output to mulitple 1,000 characters document chunks

There are multiple chunking techniques we can use. In this challenge, we will explore chunking by length of 1,000 characters with 10 percents overlaping. You may need to explore the right chunking techniques that work best with your data. Other chunking techniques you may consider such as 
- Chunking by length
- Chunking by page
- Chunking by semantic
- Chunking by sentences
- Chunking recursively


In [7]:
MAX_SECTION_LENGTH = 1000
SENTENCE_SEARCH_LIMIT = 100
SECTION_OVERLAP = 100

#Convert table to HTML format
def table_to_html(table):
    table_html = "<table>"
    rows = [sorted([cell for cell in table.cells if cell.row_index == i], key=lambda cell: cell.column_index) for i in range(table.row_count)]
    for row_cells in rows:
        table_html += "<tr>"
        for cell in row_cells:
            tag = "th" if (cell.kind == "columnHeader" or cell.kind == "rowHeader") else "td"
            cell_spans = ""
            if cell.column_span > 1: cell_spans += f" colSpan={cell.column_span}"
            if cell.row_span > 1: cell_spans += f" rowSpan={cell.row_span}"
            table_html += f"<{tag}{cell_spans}>{html.escape(cell['content'])}</{tag}>"
        table_html +="</tr>"
    table_html += "</table>"
    return table_html

def get_document_text(form_recognizer_results):
        offset = 0
        page_map = []
        for page_num, page in enumerate(form_recognizer_results['pages']):
            try:
                tables_on_page = [table for table in form_recognizer_results['tables'] if table.bounding_regions[0].page_number == page_num + 1]    
            except:
                tables_on_page = []

            # mark all positions of the table spans in the page
            page_offset = page['spans'][0]['offset']
            page_length = page['spans'][0]['length']
            table_chars = [-1]*page_length
            for table_id, table in enumerate(tables_on_page):
                for span in table['spans']:
                    # replace all table spans with "table_id" in table_chars array
                    for i in range(span['length']):
                        idx = span['offset'] - page_offset + i
                        if idx >=0 and idx < page_length:
                            table_chars[idx] = table_id

            # build page text by replacing charcters in table spans with table html
            page_text = ""
            added_tables = set()
            for idx, table_id in enumerate(table_chars):
                if table_id == -1:
                    page_text += form_recognizer_results['content'][page_offset + idx]
                elif not table_id in added_tables:
                    page_text += table_to_html(tables_on_page[table_id])
                    added_tables.add(table_id)

            page_text += " "
            page_map.append((page_num, offset, page_text))
            offset += len(page_text)

        return page_map

#Break down text according to defined length with overlapping in each chunk
def split_text(page_map):
    SENTENCE_ENDINGS = [".", "!", "?"]
    WORDS_BREAKS = [",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", "\n"]
     

    def find_page(offset):
        l = len(page_map)
        for i in range(l - 1):
            if offset >= page_map[i][1] and offset < page_map[i + 1][1]:
                return i
        return l - 1

    all_text = "".join(p[2] for p in page_map)
    length = len(all_text)
    start = 0
    end = length
    while start + SECTION_OVERLAP < length:
        last_word = -1
        end = start + MAX_SECTION_LENGTH

        if end > length:
            end = length
        else:
            # Try to find the end of the sentence
            while end < length and (end - start - MAX_SECTION_LENGTH) < SENTENCE_SEARCH_LIMIT and all_text[end] not in SENTENCE_ENDINGS:
                if all_text[end] in WORDS_BREAKS:
                    last_word = end
                end += 1
            if end < length and all_text[end] not in SENTENCE_ENDINGS and last_word > 0:
                end = last_word # Fall back to at least keeping a whole word
        if end < length:
            end += 1

        # Try to find the start of the sentence or at least a whole word boundary
        last_word = -1
        while start > 0 and start > end - MAX_SECTION_LENGTH - 2 * SENTENCE_SEARCH_LIMIT and all_text[start] not in SENTENCE_ENDINGS:
            if all_text[start] in WORDS_BREAKS:
                last_word = start
            start -= 1
        if all_text[start] not in SENTENCE_ENDINGS and last_word > 0:
            start = last_word
        if start > 0:
            start += 1

        section_text = all_text[start:end]
        yield (section_text, find_page(start))

        last_table_start = section_text.rfind("<table")
        if (last_table_start > 2 * SENTENCE_SEARCH_LIMIT and last_table_start > section_text.rfind("</table")):
            # If the section ends with an unclosed table, we need to start the next section with the table.
            # If table starts inside SENTENCE_SEARCH_LIMIT, we ignore it, as that will cause an infinite loop for tables longer than MAX_SECTION_LENGTH
            # If last table starts inside SECTION_OVERLAP, keep overlapping
            
            start = min(end - SECTION_OVERLAP, start + last_table_start)
        else:
            start = end - SECTION_OVERLAP
        
    if start + SECTION_OVERLAP < end:
        yield (all_text[start:end], find_page(start))

#Extend the PDF file name with page number
def blob_name_from_file_page(filename, page = 0):
    if os.path.splitext(filename)[1].lower() == ".pdf":
        return os.path.splitext(os.path.basename(filename))[0] + f"-{page}" + ".pdf"
    else:
        return os.path.basename(filename)

In [None]:
print_warning_message("Break down JSON output to mulitple "+str(MAX_SECTION_LENGTH)+" characters document chunks", ">>>[Step3] ")

# Check if the folder exists
check_and_create_folder("chunked_document")

for index, file_name in enumerate(pdf_names):
    try:
        with open(f'document_intelligence_output/{str(index)}.json') as JSON:
            raw_json = json.load(JSON)
    except:
        continue
    
    page_map = get_document_text(raw_json['analyzeResult'])
    
    #Format the JSON chunked to match with Azure AI search index profile
    for i, (section, pagenum) in enumerate(split_text(page_map)):
        try:
            doc = {
                "id": re.sub("[^0-9a-zA-Z_-]","_",f"{file_name}-{i}"),
                "content": section,   
                #"state":  file_name.split('/')[1] if file_name.split('/')[0] == 'States' else '', 
                "category": 'Not Available',
                "sourcepage": blob_name_from_file_page(file_name, pagenum),
                "sourcefile": file_name
            }
            with open(f'chunked_document/{doc["id"]}.json', 'w') as file:
                json.dump(doc, file)
            print_success_message("Chunked document #" + str(i) + " is created in 'chunk_document' folder")
        except Exception as error:
            print_error_message(error, f"Error on '{file_name}' | chunk#{i}: ")

#### [Check Point] Review chunked document JSON output under "chucked_document" folder