# Document Layout Aware Processing and Retrieval Augmented Generation.

This notebook was tested on a SageMaker Studio Notebook `Data Science 3.0` kernel and  `ml.t3.xlarge` instance.

---
---

## Contents

1. [Objective](#Objective)
1. [Background](#Background-(Problem-Description-and-Approach))
1. [Document Extraction](#Document-Extraction)
1. [Document Processing](#Document-Processing)
1. [Document Chunking](#Document-Chunking)
1. [Indexing](#Indexing)
1. [RAG](#RAG)
1. [CleanUp](#CleanUp)
1. [Conclusion](#Conclusion)

---

## Objective

This example notebook guides you through the process of utilizing Amazon Textract's layout feature. This feature allows you to extract content from your document while maintaining its layout and reading format. Amazon Textract Layout feature is able to detect the following sections:
- Titles
- Headers
- Sub-headers
- Text
- Tables
- Figures
- List 
- Footers
- Page Numbers
- Key-Value pairs

Here is a snippet of Textract Layout feature on a page of Amazon Sustainability report using the Textract Console UI:
<img src="../images/amazonsus2022.jpg" width="1000"/>

The [Amazon Textract Textractor Library](https://aws-samples.github.io/amazon-textract-textractor/index.html) is a library that seamlessly works with Textract features to aid in document processing. You can start by checking out the [examples in the documentation.](https://aws-samples.github.io/amazon-textract-textractor/notebooks/layout_analysis_for_text_linearization.html)
This notebook utilizes the Textractor library to interact with Amazon Textract and interpret its response. It enriches the extracted document text with XML tags to delineate sections, facilitating layout-aware chunking and document indexing into a Vector Database (DB). This process aims to enhance Retrieval Augmented Generation (RAG) performance.

---

## Background (Problem Description and Approach)

- **Problem statement**: 
RAG serves as a technique aimed at enhancing the effectiveness of Large Language Models (LLMs) on lengthy textual content. While widely adopted, implementing RAG necessitates initial processing to extract and segment text into meaningful chunks, especially challenging for intricate assets like PDFs. Many document parsing approaches overlook layout semantics or use simplistic methods like fixed window carving, lacking awareness of document structure or elements. This can disrupt contextual continuity and diminish the performance of RAG systems. An optimal RAG input pipeline would intelligently divide PDF texts into vectorized segments aligned with layout and content semantics, preserving informational integrity for the LLM. In essence, a context-aware parsing phase is pivotal for enabling RAG techniques to realize their full potential, particularly when handling extensive or intricate documents.

- **Our approach**: 

<img src="../images/txt-layout-Page-2.jpg" width="800"/>

1. Upload multi-page document to Amazon S3.
2. Call Amazon Textract Start Document Analysis api call to extract Document Text including Layout and Tables. The response provides structured text aligned with the original document formatting and the pandas tables of each table detected in the document.
3. Enrich this extracted text further with XML tags indicating semantic sections, adding contextual metadata through the Textractor library.
4. The textrcat library extracts tables in plain text, maintaining their original layout. However, for improved processing and manipulation, it's advisable to convert them to CSV format. This method replaces the plain text tables with their CSV counterparts obtained from Textract's table feature.
5.  In this approach, the extracted text is segmented based on document title sections, the highest hierarchy level in a document. Each subsection within the title section is then chunked according to a maximum word threshold. Below outlines our approach to handling the chunking of subsection elements.:

    - **Tables:** Tables are chunked row by row until the maximum number of alphanumeric words is reached. For each table chunk, the column headers are added to the table along with the table header, typically the sentence or paragraph preceding the table in the document. This ensures that the information of the table is retained in each chunk.
    
    <img src="../images/table-chunkers.png" width="800" height=700/>
    
        To handle tables with merged cells, this solution first unmerges any merged cell ranges, then duplicates the original merged cell value into each of the corresponding individual cells after unmerging.
    
    <img src="../images/complex-tables.png" width="800" height=700/>
    
    - **List:** Chunking lists found in documents can be challenging. Naive chunking methods often split list items by sentence or newline characters. However, this approach presents issues as only the first list chunk typically contains the list title, which provides essential information about the list items. Consequently, subsequent list chunks become obsolete. In this notebook, lists are chunked based on their individual list items. Additionally, the header of the list is appended to each list chunk to ensure that the information of the list is preserved in each chunk.
    <img src="../images/list-chunker.png" width="800" height=700/>
    
    - **Section and subsection:** The structure of a document can generally be categorized into titles, sections, and paragraphs. A paragraph is typically the smallest unit of a document that conveys information independently, particularly within the context of a section or subsection header. In this method, text sections are chunked based on paragraphs, and the section header is added to each paragraph chunk (as well as tables and lists) within that section of the document.
    <img src="../images/text-chunks.png" width="800" height=700/>
    
6. Metadata is appended to each respective chunk during indexing, encompassing:
    - The entire CSV tables detected within the chunk.
    - The section header ID associated with the chunk.
    - The section title ID linked to the chunk.
    
    When retrieving a passage based on hybrid search (combining semantic and text matching), there's flexibility in the amount of content forwarded to the LLM. Some queries may necessitate additional information, allowing users to choose whether to send the corresponding chunk subsection or title section based on the specific use case.

 *Some chunk may exceed the fixed word count threshold due to preserving paragraphs and dealing with complex tables. 

**Prerequisite:**
- [Amazon Bedrock model access](https://docs.aws.amazon.com/bedrock/latest/userguide/model-access.html)

## Step 1: Setup

Install required packages

In [None]:
!pip install --force-reinstall amazon-textract-textractor==1.7.11
!pip install inflect
!pip install requests-aws4auth
!pip install opensearch-py
!pip install anthropic
!pip install openpyxl 

> ⚠️ Restart the Kernel \
> 
> Click **kernel** on the top bar and **Restart Kernel**. Continue with the cells below.

In [4]:
import os
from PIL import Image
import pandas as pd
import re
import json
import uuid
from textractor import Textractor
from textractor.visualizers.entitylist import EntityList
from textractor.data.constants import TextractFeatures
import io
import inflect
from collections import OrderedDict
import boto3
import time
import sagemaker
import openpyxl
from openpyxl.cell import Cell
from openpyxl.worksheet.cell_range import CellRange
s3=boto3.client("s3")
from botocore.config import Config
config = Config(
    read_timeout=600, 
    retries = dict(
        max_attempts = 5 
    )
)
from anthropic import Anthropic
client = Anthropic()
bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name='us-west-2',config=config)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


# Create OpenSearch Serverless Collection

In [5]:
import boto3
import time
import json
import os
vector_store_name = 'idp-workshop'
index_name = "idp-workshop-rag"
encryption_policy_name = "idp-workshop-rag"
network_policy_name = "idp-workshop-rag"
access_policy_name = 'idp-workshop-rag'
identity = boto3.client('sts').get_caller_identity()['Arn']

aoss_client = boto3.client('opensearchserverless')

security_policy = aoss_client.create_security_policy(
    name = encryption_policy_name,
    policy = json.dumps(
        {
            'Rules': [{'Resource': ['collection/' + vector_store_name],
            'ResourceType': 'collection'}],
            'AWSOwnedKey': True
        }),
    type = 'encryption'
)

network_policy = aoss_client.create_security_policy(
    name = network_policy_name,
    policy = json.dumps(
        [
            {'Rules': [{'Resource': ['collection/' + vector_store_name],
            'ResourceType': 'collection'}],
            'AllowFromPublic': True}
        ]),
    type = 'network'
)

collection = aoss_client.create_collection(name=vector_store_name,type='VECTORSEARCH')

while True:
    status = aoss_client.list_collections(collectionFilters={'name':vector_store_name})['collectionSummaries'][0]['status']
    if status in ('ACTIVE', 'FAILED'): break
    time.sleep(10)

access_policy = aoss_client.create_access_policy(
    name = access_policy_name,
    policy = json.dumps(
        [
            {
                'Rules': [
                    {
                        'Resource': ['collection/' + vector_store_name],
                        'Permission': [
                            'aoss:CreateCollectionItems',
                            'aoss:DeleteCollectionItems',
                            'aoss:UpdateCollectionItems',
                            'aoss:DescribeCollectionItems'],
                        'ResourceType': 'collection'
                    },
                    {
                        'Resource': ['index/' + vector_store_name + '/*'],
                        'Permission': [
                            'aoss:*'],
                        'ResourceType': 'index'
                    }],
                'Principal': [identity],
                'Description': 'Easy data policy'}
        ]),
    type = 'data'
)

host = collection['createCollectionDetail']['id'] + '.' + os.environ.get("AWS_DEFAULT_REGION", None) + '.aoss.amazonaws.com'

Utility function for embedding generation using Amazon Titan v2 Embedding model. 

In [6]:
def _get_emb_(passage):
    """
    This function takes a passage of text and returns the corresponding text embedding using the Amazon Titan V2 Embedding model.
    """
    response = bedrock_runtime.invoke_model(body=json.dumps({"inputText":passage,"dimensions":1024,"normalize":False}),
                                modelId="amazon.titan-embed-text-v2:0", 
                                accept="application/json", 
                                contentType="application/json")

    response_body = json.loads(response.get('body').read())
    embedding=response_body['embedding']    
    return embedding

Utility function to inference Anthropic Claude models on Bedrock.

In [7]:
def bedrock_streemer(response):
    stream = response.get('body')
    answer = ""
    i = 1
    if stream:
        for event in stream:
            chunk = event.get('chunk')
            if  chunk:
                chunk_obj = json.loads(chunk.get('bytes').decode())
                if "delta" in chunk_obj:                    
                    delta = chunk_obj['delta']
                    if "text" in delta:
                        text=delta['text'] 
                        print(text, end="")
                        answer+=str(text)       
                        i+=1
                if "amazon-bedrock-invocationMetrics" in chunk_obj:
                    input_tokens= chunk_obj['amazon-bedrock-invocationMetrics']['inputTokenCount']
                    output_tokens=chunk_obj['amazon-bedrock-invocationMetrics']['outputTokenCount']
                    print(f"\nInput Tokens: {input_tokens}\nOutput Tokens: {output_tokens}")
    return answer,input_tokens, output_tokens

def bedrock_claude_(chat_history,system_message, prompt,model_id,image_path=None):
    content=[]
    if image_path:       
        if not isinstance(image_path, list):
            image_path=[image_path]      
        for img in image_path:
            s3 = boto3.client('s3')
            match = re.match("s3://(.+?)/(.+)", img)
            image_name=os.path.basename(img)
            _,ext=os.path.splitext(image_name)
            if "jpg" in ext: ext=".jpeg"                        
            if match:
                bucket_name = match.group(1)
                key = match.group(2)    
                obj = s3.get_object(Bucket=bucket_name, Key=key)
                base_64_encoded_data = base64.b64encode(obj['Body'].read())
                base64_string = base_64_encoded_data.decode('utf-8')
            content.extend([{"type":"text","text":image_name},{
              "type": "image",
              "source": {
                "type": "base64",
                "media_type": f"image/{ext.lower().replace('.','')}",
                "data": base64_string
              }
            }])
    
    content.append({
        "type": "text",
        "text": prompt
            })
    chat_history.append({"role": "user",
            "content": content})
    prompt = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1500,
        "temperature": 0.1,
        "system":system_message,
        "messages": chat_history
    }
    answer = ""
    prompt = json.dumps(prompt)
    response = bedrock_runtime.invoke_model_with_response_stream(body=prompt, modelId=model_id, accept="application/json", contentType="application/json")
    answer,input_tokens,output_tokens=bedrock_streemer(response) 
    return answer, input_tokens, output_tokens

def _invoke_bedrock_with_retries(current_chat, chat_template, question, model_id, image_path):
    max_retries = 5
    backoff_base = 2
    max_backoff = 3  # Maximum backoff time in seconds
    retries = 0

    while True:
        try:
            response,input_tokens,output_tokens = bedrock_claude_(current_chat, chat_template, question, model_id, image_path)
            return response,input_tokens,output_tokens
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            elif e.response['Error']['Code'] == 'ModelStreamErrorException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            else:
                # Some other API error, rethrow
                raise
                

## Document Extraction
We employ the Amazon 2024 10K report as an example document. Using the textractor library, we trigger the Amazon Textract `start document analysis` API to initiate an asynchronous process for extracting document text and identifying additional elements like document layout and tables.

In [50]:
BUCKET= sagemaker.Session().default_bucket()
extractor = Textractor(region_name="us-west-2")
file="amazon-2024-10k.pdf"
doc_id= os.path.basename(file)
file_name, ext = os.path.splitext(file)

document = extractor.start_document_analysis(
    file_source=f'../samples/{file}',
    features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],
    # client_request_token=doc_id,
    save_image=False,
    s3_upload_path=f"s3://{BUCKET}",
    s3_output_path=f"s3://{BUCKET}/textract-output/{file_name}/"
)

By leveraging the Textractor linearization function, we enhance the extracted content with XML tags while concealing certain page sections such as headers, footers, and non-essential images.

We opt to tag tables, lists, title sections, and sub-sections to facilitate the efficient identification and chunking of these document elements.

These tags are use to identify the various document elements and handle them appropiately. 

In [54]:
from textractor.data.text_linearization_config import TextLinearizationConfig

config = TextLinearizationConfig(
    hide_figure_layout=False,
    title_prefix="<titles><<title>><title>",
    title_suffix="</title><</title>>",
    hide_header_layout=True,
    section_header_prefix="<headers><<header>><header>",
    section_header_suffix="</header><</header>>",
    table_prefix="<tables><table>",
    table_suffix="</table>",
    list_layout_prefix="<<list>><list>",
    list_layout_suffix="</list><</list>>",
    hide_footer_layout=True,
    hide_page_num_layout=True,
)

print(document.pages[3].get_text(config=config))



<headers><<header>><header>Competition </header><</header>>

Our businesses encompass a large variety of product types, service offerings, and delivery channels. The worldwide marketplace in which we compete is evolving rapidly and intensely competitive, and we face a broad array of competitors from many different industry sectors around the world. Our current and potential competitors include: (1) physical, e-commerce, and omnichannel retailers, publishers, vendors, distributors, manufacturers, and producers of the products we offer and sell to consumers and businesses; (2) publishers, producers, and distributors of physical, digital, and interactive media of all types and all distribution channels; (3) web search engines, comparison shopping websites, social networks, web portals, and other online and app-based means of discovering, using, or acquiring goods and services, either directly or in collaboration with other retailers; (4) companies that provide e-commerce services, inclu

## Document Processing

This code snippet comprises a Python function `split_list_items_` and a script segment that processes a document containing tables and text, converting tables into CSV format and maintaining the document structure with text and tables.

The function `split_list_items_` takes a string as input, likely representing a document with nested lists marked by specific XML tags. It parses this string, extracting items and handling nested lists appropriately. The function then returns a list containing the extracted items.

The script segment following the function processes each page of the document. It identifies tables, converts them to CSV format, and wraps them with XML tags for identification. If lists are present in the document, the script utilizes the `split_list_items_` function to handle them. The processed content is stored in dictionaries for further use.

The `layout_table_to_excel` loads a pandas dataframe in excel format to handle spanned columns/rows in complex tables. It duplicates the spanned row/columns value across corresponding spanned cells to help keep the intergrity of complex tables.

This script segment efficiently manages document content, ensuring tables are properly formatted while preserving the document's structure with text and lists. It serves to handle data extraction and processing tasks involving documents with mixed content types.

In [10]:
import numpy as np
def strip_newline(cell):
    """
    A utility function to strip newline characters from a cell.
    Parameters:
    cell (str): The cell value.
    Returns:
    str: The cell value with newline characters removed.
    """
    return str(cell).strip()

def layout_table_to_excel(document, ids,csv_seperator):    
    """
    Converts an Excel table from a document to a Pandas DataFrame, 
    handling duplicated values across merged cells.

    Args:
        document: Document containing Excel table 
        ids: ID of the Excel table in the document
        csv_seperator: Separator for CSV string conversion

    Returns: 
        Pandas DataFrame representation of the Excel table
    """
    # save the table in excel format to preserve the structure of any merged cells
    buffer = io.BytesIO()    
    document.tables[ids].to_excel(buffer)
    buffer.seek(0)
    # Load workbook, get active worksheet
    wb = openpyxl.load_workbook(buffer)
    worksheet = wb.active
    # Unmerge cells, duplicate merged values to individual cells
    all_merged_cell_ranges: list[CellRange] = list(
            worksheet.merged_cells.ranges
        )
    for merged_cell_range in all_merged_cell_ranges:
        merged_cell: Cell = merged_cell_range.start_cell
        worksheet.unmerge_cells(range_string=merged_cell_range.coord)
        for row_index, col_index in merged_cell_range.cells:
            cell: Cell = worksheet.cell(row=row_index, column=col_index)
            cell.value = merged_cell.value
    # determine table header index
    df = pd.DataFrame(worksheet.values)
    df=df.map(strip_newline)
    df0=df.to_csv(sep=csv_seperator,index=False, header=None)
    row_count=len([x for x in df0.split("\n") if x])
    if row_count>1:
        if not all(value.strip() == '' for value in df0.split("\n")[0].split(csv_seperator)): 
            row_count=1
    # attach table column names
    column_row=0 if row_count==1 else 1
    df.columns = df.iloc[column_row] 
    df = df[column_row+1:]
    return df

def split_list_items_(items):
    """
    Splits the given string into a list of items, handling nested lists.

    Parameters:
    items (str): The input string containing items and possibly nested lists.

    Returns:
    list: A list containing the items extracted from the input string.
    """
    parts = re.split("(<<list>><list>|</list><</list>>)", items)  
    output = []

    inside_list = False
    list_item = ""

    for p in parts:
        if p == "<<list>><list>":
            inside_list = True    
            list_item=p
        elif p == "</list><</list>>":
            inside_list = False
            list_item += p
            output.append(list_item)
            list_item = "" 
        elif inside_list:
            list_item += p.strip()
        else:
            output.extend(p.split('\n'))
    return output

In [11]:
import io
"""
This script processes a document containing tables and text. It converts the tables into CSV format 
and wraps them with XML tags for easy identification. The document structure with text and tables is maintained.
"""
csv_seperator="|" 
document_holder={}
table_page={}
count=0
# Whether to handle merged cells by duplicating merged value across corresponding individual cells
unmerge_span_cells=True 
# Loop through each page in the document
for ids,page in enumerate(document.pages):
    table_count=len([word for word in page.get_text(config=config).split() if "<tables><table>" in word]) # get the number of table in the extracted document page by header we set earlier
    assert table_count==len(page.tables) # check that number of tables per page is same as *tables extracted by textract TABLE feature
    content=page.get_text(config=config).split("<tables>")
    document_holder[ids]=[]    
    for idx,item in enumerate(content):
        if "<table>" in item:           
            if unmerge_span_cells:
                df=layout_table_to_excel(document, count,csv_seperator)
            else:
                df0=  document.tables[count].to_pandas(use_columns=False).to_csv(header=False, index=None,sep=csv_seperator)
                row_count=len([x for x in df0.split("\n") if x]) #Check the number of rows in the parsed table to determine how to read the table headers. if table row count is 1 then headers is obviously at 0 else headers may or may not be at 0
                #Check if the first row in the csv is empty headers
                if row_count>1:
                    if not all(value.strip() == '' for value in df0.split("\n")[0].split(csv_seperator)): 
                        row_count=1
                df=pd.read_csv(io.StringIO(df0), sep=csv_seperator, 
                               header=0 if row_count==1 else 1, keep_default_na=False) # read table with appropiate column headers
                df.rename(columns=lambda x: '' if str(x).startswith('Unnamed:') else x, inplace=True) 
            table=df.to_csv(index=None, sep=csv_seperator)

            if ids in table_page:
                table_page[ids].append(table)
            else:
                table_page[ids]=[table]
            # Extract table data and remaining content
            pattern = re.compile(r'<table>(.*?)(</table>)', re.DOTALL) 
            data=item
            table_match = re.search(pattern, data)
            table_data = table_match.group(1) if table_match else '' 
            remaining_content = data[table_match.end():] if table_match else data            
            content[idx]=f"<<table>><table>{table}</table><</table>>" ## attach xml tags to differentiate table from other text
            count+=1
            # Check for list items in remaining content
            if "<<list>>" in remaining_content:
                output=split_list_items_(remaining_content)
                output=[x.strip() for x in output if x.strip()]
                document_holder[ids].extend([content[idx]]+output)           
            else:
                document_holder[ids].extend([content[idx]]+[x.strip() for x in remaining_content.split('\n') if x.strip()]) # split other text by new line to be independent items in the python list.
        else:   
            # Check for list items and tables in remaining content
            if "<<list>>" in item and "<table>" not in item:   
                output=split_list_items_(item)
                output=[x.strip() for x in output if x.strip()]
                document_holder[ids].extend(output)
            else:
                document_holder[ids].extend([x.strip() for x in item.split("\n") if x.strip()])

Here we first flatten a nested list into a single list and then join its elements using newline characters. Subsequently, the string is split into segments based on the `<titles>` tag (split by title section hierarchy), generating a list of sub-section segments. Following this, the function `sub_header_content_splitta` is defined to process a string, splitting it by XML tags and extracting text segments, excluding segments containing specific XML tags such as `<header>`, `<list>`, or `<table>`. This function takes a string as input, applies a regular expression pattern to split it by XML tags, and iterates through the resulting segments to filter out those containing the specified XML tags. The extracted text segments are then returned as a list. 

In [12]:
# # Flatten the nested list document_holder into a single list and Join the flattened list by "\n"
flattened_list = [item for sublist in document_holder.values() for item in sublist]
result = "\n".join( flattened_list)
header_split=result.split("<titles>")

def sub_header_content_splitta(string):   
    """
    Splits the input string by XML tags and returns a list containing the segments of text,
    excluding segments containing specific XML tags such as "<header>", "<list>", or "<table>".

    Parameters:
    string (str): The input string to be processed.

    Returns:
    list: A list containing the segments of text extracted from the input string.
    """ 
    pattern = re.compile(r'<<[^>]+>>')
    segments = re.split(pattern, string)
    result = []
    for segment in segments:
        if segment.strip():
            if "<header>" not in segment and "<list>" not in segment and  "<table>" not in segment:
                segment=[x.strip() for x in segment.split('\n') if x.strip()]
                result.extend(segment)
            else:
                result.append(segment)
    return result


## Document Chunking

Example Page showing the hierarachy of elements in a document. Hierarchy flows down from **Section Title** -> **Section Header** -> **Paragraphs**, where the section title is a super set of the section header and paragraphs/tables/lists etc. 

<img src="images/layout-hierarchy.jpg" width="1000"/>

This cell iterates through the document per title section and chunks content within each sub-sections in the following manner:
- It uses number of words as chunking threshold.
- It looks for the different xml tags to identify the different document elements. These elements includes `section titles`, `section headers`, `tables`, `lists` and `paragraphs`.
    - Iterating through the various section headers within a section title identified by the **header** tags and only chunking contents within each section header. Therefore, chunks do not overflow to a different section header even if the word threshold has not been met. This helps us create a hierarchial mapping of each chunk to its parent entities (section header and section title).
    - If a table xml tag is found, it checks if there is a sentence before that table (the heueristics employed here is that the sentence before a table is usually the table header) and use it as table headers. It then splits table by rows until desired chunk is achieved and appends the corresponding section header and table column names to the table chunk.
    - If a list is found, split list by items until desired chunk is achieved. Employ same heuristics as above and append list headers to all list chunk.
    - For other text, it chunks by paragraphs and appends each section header name to the corresponding chunks.
- A dicionary containing each complete the hierarchial relationship of each chunk to its corresponding parent entities is stored as a JSON object to be used for hierarchial retrieval a.k.a **Small-to-Big**.
- The complete table and list found in each chunk is also stored for metadata purposes.

In [13]:
import re
import pandas as pd
from io import StringIO

max_words = 200
chunks = {}
table_header_dict={} 
chunk_header_mapping={}
list_header_dict={}

# iterate through each title section
for title_ids, items in enumerate(header_split):
    title_chunks = []
    current_chunk = []
    num_words = 0   
    table_header_dict[title_ids]={}
    chunk_header_mapping[title_ids]={}
    list_header_dict[title_ids]={}
    chunk_counter=0
    for item_ids,item in enumerate(items.split('<headers>')):     
        lines=sub_header_content_splitta(item)             
        SECTION_HEADER=None 
        TITLES=None
        num_words = 0  
        for ids_line,line in enumerate(lines):
            
            if line.strip():
                if "<title>" in line:   
                    TITLES=re.findall(r'<title>(.*?)</title>', line)[0].strip()
                    line=TITLES 
                    if re.sub(r'<[^>]+>', '', "".join(lines)).strip()==TITLES:
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                if "<header>" in line:   
                    SECTION_HEADER=re.findall(r'<header>(.*?)</header>', line)[0].strip()
                    line=SECTION_HEADER    
                    first_header_portion=True
                next_num_words = num_words + len(re.findall(r'\w+', line))  

                if  "<table>" not in line and "<list>" not in line:
                    if next_num_words > max_words and "".join(current_chunk).strip()!=SECTION_HEADER and current_chunk and "".join(current_chunk).strip()!=TITLES:
                
                        if SECTION_HEADER :
                            if first_header_portion:
                                first_header_portion=False                                            
                            else:
                                current_chunk.insert(0, SECTION_HEADER.strip())                       
                        
                        title_chunks.append(current_chunk)                  
                        chunk_header_mapping[title_ids][chunk_counter]=lines
               
                        current_chunk = []
                        num_words = 0 
                        chunk_counter+=1
             
                    current_chunk.append(line)    
                    num_words += len(re.findall(r'\w+', line))

                """
                Goal is to segment out table items and chunks intelligently.
                We chunk the table by rows and for each chunk of the table we append the table column headers
                and table headers if any. This way we preserve the table information across each chunks.
                This will help improve semantic search where all the chunks relating to a table would be in the 
                top k=n response giving the LLM mcomplet information on the table.
                """

                if "<table>" in line:
                    # Get table header which is usually line before table in document              
                    line_index=lines.index(line)
                    if line_index!=0 and "<table>" not in lines[line_index-1] and "<list>" not in lines[line_index-1]: #Check if table is first item on the page, then they wont be a header (header may be included it table) and also if table is the the last item in the list
                        header=lines[line_index-1].replace("<header>","").replace("</header>","")
                    else:
                        header=""                   
              
                    table = line.split("<table>")[-1].split("</table>")[0] # get table from demarcators              
                    df=pd.read_csv(io.StringIO(table), sep=csv_seperator, keep_default_na=False,header=None)
                    df.columns = df.iloc[0]
                    df = df[1:]
                    df.rename(columns=lambda x: '' if str(x).startswith('Unnamed:') else x, inplace=True)                    
                    table_chunks = []
                    curr_chunk = [df.columns.to_list()] #start current chunk with table column names    
                    words=len(re.findall(r'\w+', str(current_chunk)+" "+str(curr_chunk)))  
                    # Iterate through the rows in the table
                    for row in df.itertuples(index=False):
                        curr_chunk.append(row)         
                        words+=len(re.findall(r'\w+', str(row)))
                        if words > max_words:                        
                            if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                                table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                            else:
                                table_header_dict[title_ids][chunk_counter]=[header]+[table]                            
                            table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])) #join chunk lines together to for a csv 
                            tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]) #join chunk lines together to for a csv
                            words = len(re.findall(r'\w+', str(curr_chunk[0]))) # set word count to word length of column header names
                            if header: #If header  attach header to table                         
                                if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower(): #check if header is in the chunk and remove to avoid duplicacy of header in chunk                        
                                    current_chunk.pop(-1)
                                # Append section header to table
                                if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())                             
                                current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk]) #enrich table header with ':'
                                title_chunks.append(current_chunk)                           
                        
                            else:
                                if SECTION_HEADER:
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())                                
                                current_chunk.extend([tab_chunk])
                                title_chunks.append(current_chunk)                        
                            chunk_header_mapping[title_ids][chunk_counter]=lines
                            chunk_counter+=1
                            num_words=0
                            current_chunk=[]
                            curr_chunk = [curr_chunk[0]]
                    
                    if curr_chunk != [df.columns.to_list()] and lines.index(line) == len(lines)-1: #if table chunk still remaining and table is last item in page append as last chunk
                        table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]))
                        tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])                        
                        if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                            table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                        else:
                            table_header_dict[title_ids][chunk_counter]=[header]+[table]   
                        
                        if header: 
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                          
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk])
                            title_chunks.append(current_chunk)                   
                        else:
                            if SECTION_HEADER:
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                            
                            current_chunk.extend([tab_chunk])
                            title_chunks.append(current_chunk)             
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                        num_words=0
                        current_chunk=[]
                    elif curr_chunk != [df.columns.to_list()] and lines.index(line) != len(lines)-1: #if table is not last item in page and max word threshold is not reached, send no next loop
                        table_chunks.append("\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]]))
                        tab_chunk="\n".join([csv_seperator.join(str(x) for x in curr_chunk[0])] + [csv_seperator.join(str(x) for x in r) for r in curr_chunk[1:]])
                        
                        if [x for x in table_header_dict[title_ids] if chunk_counter == x]:
                            table_header_dict[title_ids][chunk_counter].extend([header]+[table])
                        else:
                            table_header_dict[title_ids][chunk_counter]=[header]+[table]                         
                        if header:               
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[tab_chunk])
                        else:
                            current_chunk.extend([tab_chunk])                  
                        num_words=words
                     

                """
                Goal is to segment out list items and chunk intelligently.
                We chunk each list by items in the list and 
                for each list chunk we append the list header to the chunk to preserve the information of the list across chunks.
                This would boost retrieval process where question pertaining to a list will have all list chunks within
                the topK=n responses.
                """

                if "<list>" in line:
                    # Get list header which is usually line before list in document
                    line_index=lines.index(line)
                    if line_index!=0 and "<table>" not in lines[line_index-1] and "<list>" not in lines[line_index-1]: #Check if table or list is the previous item on the page, then they wont be a header
                        header=lines[line_index-1].replace("<header>","").replace("</header>","")
                    else:
                        header=""           
                    list_pattern = re.compile(r'<list>(.*?)(?:</list>|$)', re.DOTALL)   ## Grab all list contents within the list xml tags        
                    list_match = re.search(list_pattern, line)
                    list_ = list_match.group(1)
                    list_lines=list_.split("\n")                

                    curr_chunk = []  
                    words=len(re.findall(r'\w+', str(current_chunk)))  #start word count from any existing chunk
                    # Iterate through the items in the list
                    for lyst_item in list_lines:
                        curr_chunk.append(lyst_item)         
                        words+=len(re.findall(r'\w+', lyst_item)) 
                        if words >= max_words: # 
                            if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                                list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                            else:
                                list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                            words=0     
                            list_chunk="\n".join(curr_chunk)
                            if header: # attach list header                       
                                if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk                        
                                    current_chunk.pop(-1)  
                                # Append section content header to list
                                if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())
                                    
                                current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk]) 
                                title_chunks.append(current_chunk)                          
                         
                            else:
                                if SECTION_HEADER:
                                    if first_header_portion:
                                        first_header_portion=False
                                    else:
                                        current_chunk.insert(0, SECTION_HEADER.strip())
                                    
                                current_chunk.extend([list_chunk])
                                title_chunks.append(current_chunk)                            
                            chunk_header_mapping[title_ids][chunk_counter]=lines
                            chunk_counter+=1
                            num_words=0
                            current_chunk=[]
                            curr_chunk = []
                    if curr_chunk  and lines.index(line) == len(lines)-1: #if list chunk still remaining and list is last item in page append as last chunk
                        list_chunk="\n".join(curr_chunk)
                        if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                            list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                        else:
                            list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                        if header: 
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower(): #check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1)                            
                            if SECTION_HEADER and SECTION_HEADER.lower().strip() != header.lower().strip():
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                   
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk])
                            title_chunks.append(current_chunk)                        
                        else:
                            if SECTION_HEADER:
                                if first_header_portion:
                                    first_header_portion=False
                                else:
                                    current_chunk.insert(0, SECTION_HEADER.strip())                   
                            current_chunk.extend([list_chunk])
                            title_chunks.append(current_chunk)                     
                        chunk_header_mapping[title_ids][chunk_counter]=lines
                        chunk_counter+=1
                        num_words=0
                        current_chunk=[]
                    elif curr_chunk and lines.index(line) != len(lines)-1: #if list is not last item in page and max word threshold is not reached, send to next loop          
                        list_chunk="\n".join(curr_chunk)
                        if [x for x in list_header_dict[title_ids] if chunk_counter == x]:
                            list_header_dict[title_ids][chunk_counter].extend([header]+[list_])
                        else:
                            list_header_dict[title_ids][chunk_counter]=[header]+[list_]  
                        if header:               
                            if current_chunk and current_chunk[-1].strip().lower()==header.strip().lower():#check if header is in the chunk and remove to avoid duplicacy of header in chunk
                                current_chunk.pop(-1) 
                            current_chunk.extend([header.strip()+':' if not header.strip().endswith(':') else header.strip() ]+[list_chunk])
                        else:
                            current_chunk.extend([list_chunk])                  
                        num_words=words


        if current_chunk and "".join(current_chunk).strip()!=SECTION_HEADER and "".join(current_chunk).strip()!=TITLES:
    
            if SECTION_HEADER:
                if first_header_portion:
                    first_header_portion=False
                else:
                    current_chunk.insert(0, SECTION_HEADER.strip())         
            title_chunks.append(current_chunk)
            chunk_header_mapping[title_ids][chunk_counter]=lines
            current_chunk=[]
            chunk_counter+=1
    if current_chunk:
  
        title_chunks.append(current_chunk) 
        chunk_header_mapping[title_ids][chunk_counter]=lines
    chunks[title_ids] = title_chunks

In [14]:
# Print chunks per title section
for i, chunk in enumerate(chunks[8][:10], start=1):
    print(f'Chunk {i}:')
    for item in chunk:
        print(item)
    print('\n')

Chunk 1:
AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF CASH FLOWS (in millions)
<title>AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF CASH FLOWS (in millions) </title>:
|Year Ended December 31,|Year Ended December 31,|Year Ended December 31,
|2021|2022|2023
CASH, CASH EQUIVALENTS, AND RESTRICTED CASH, BEGINNING OF PERIOD|$ 42,377|$ 36,477|$ 54,253
OPERATING ACTIVITIES:|OPERATING ACTIVITIES:|OPERATING ACTIVITIES:|OPERATING ACTIVITIES:
Net income (loss)|33,364|(2,722)|30,425
Adjustments to reconcile net income (loss) to net cash from operating activities:|Adjustments to reconcile net income (loss) to net cash from operating activities:|Adjustments to reconcile net income (loss) to net cash from operating activities:|Adjustments to reconcile net income (loss) to net cash from operating activities:
Depreciation and amortization of property and equipment and capitalized content costs, operating lease assets, and other|34,433|41,921|48,663
Stock-based compensation|12,757|19,621|24,023
Non-operati

In [15]:
# List of title header sections names document was split into
for x in chunk_header_mapping:
    if chunk_header_mapping[x]:
        try:
            title_pattern = re.compile(r'<title>(.*?)(?:</title>|$)', re.DOTALL)       
            title_match = re.search(title_pattern, chunk_header_mapping[x][0][0])
            title_ = title_match.group(1) if title_match else ""
            print(title_, end='\n')
        except:
            continue


FORM 10-K 
FORM 10-K For the Fiscal Year Ended December 31, 2023 
PART II 
Item 7. Management's Discussion and Analysis of Financial Condition and Results of Operations 
Table of Contents 
INDEX TO CONSOLIDATED FINANCIAL STATEMENTS 
Report of Independent Registered Public Accounting Firm 
AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF CASH FLOWS (in millions) 
CONSOLIDATED STATEMENTS OF OPERATIONS 
AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF COMPREHENSIVE INCOME (LOSS) (in millions) 
CONSOLIDATED BALANCE SHEETS 
AMAZON.COM, INC. CONSOLIDATED STATEMENTS OF STOCKHOLDERS' EQUITY (in millions) 
AMAZON.COM, INC. NOTES TO CONSOLIDATED FINANCIAL STATEMENTS 
Note 2 - FINANCIAL INSTRUMENTS 
Report of Independent Registered Public Accounting Firm 
PART III 
PART IV 
SIGNATURES 
AMAZON.COM, INC. GLOBAL RESTRICTED STOCK UNIT AWARD AGREEMENT 
ACCEPTANCE AND ACKNOWLEDGMENT 
GLOBAL RESTRICTED STOCK UNIT AWARD AGREEMENT 
LIST OF SIGNIFICANT SUBSIDIARIES 
Consent of Independent Registered Public Accounti

Upload section contents (title and headers for each chunk) to s3

In [16]:
with open (f"{doc_id}.json", "w") as f:
    json.dump(chunk_header_mapping,f)
s3.upload_file(f"{doc_id}.json", BUCKET, f"{doc_id}.json")

## Indexing


Here's a sample script for indexing document chunks into an [Amazon OpenSearch Serverless](https://aws.amazon.com/blogs/big-data/introducing-the-vector-engine-for-amazon-opensearch-serverless-now-in-preview/).

This code block establishes an index within an Amazon OpenSearch Service (Provisioned Capacity) and proceeds to index the document chunks. The index mapping incorporates metadata fields such as: 
- document name, 
- complete chunk tables, 
- header section IDs, 
- complete chunk list,
- and title section IDs. 

However you can decide what metadata is useful to keep as part of the index.

The section ID's are used to map the corresponding passages (chunks) to their higher hierarchy content including the complete section headers the chunk is a part of and the complete section titles the chunks is a part of. This is a an advanced retrieval technique called **Small-to-Big** where a child chunk (passages) are used to retrieve parent chunks (section headers or titles) in situation where more context is needed and flow of information is to be preserved. These parent chunks can be stored in the same opensearch domain or a different storage system. This implementation uses Amazon S3 for storing the parent chunks.

The complete chunk table and list are also additional information indexed with the chunks to provide flexibility in context retrieval to provide more information in addition to the retieved passages that may contain part of this document elements.

We utilize an embedding model to generate embeddings and subsequently index them. The provided options in this implementation uses `Amazon titan Embedding`. The ouptut vector size for the titan embedding used for this implementation is 1024, however, Amazon titan v2 supports 256 and 512 output vector sizes which helps in reducing the size of you opensearch index with a minimal reduction in accuracy.

**Note:** Certain chunks may exceed the threshold set for chunking in the previous cells due to the way tables are chunked by row and section paragraph sizes. This might result in a token limit exceed error for certain embedding models.

Using **host** variable in **domain_endpoint** will ensure it takes your built OpenSearch Service domain/Serverless endpoint id. If not following the steps, please update the variable with yours domain/endpoint id.

In [None]:
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from requests_aws4auth import AWS4Auth

"""
This script demonstrates indexing documents into an Amazon OpenSearch Serverless domain using AWS Identity and Access Management (IAM) for authentication.
"""
service = 'aoss'
# Using host will use your OpenSearch Service domain/Serverless endpoint id
domain_endpoint = host 

credentials = boto3.Session().get_credentials()
awsauth =  AWSV4SignerAuth(credentials, "us-west-2", service)
os_ = OpenSearch(
    hosts = [{'host': domain_endpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    timeout=120,        
    # http_compress = True, # enables gzip compression for request bodies
    connection_class = RequestsHttpConnection
)

# Sample Opensearch domain index mapping
mapping = {
  'settings': {
    'index': {  
      'knn': True,
      "knn.algo_param.ef_search": 100,            
    }
      },

      'mappings': {  
        'properties': {
          'embedding': {
            'type': 'knn_vector', 
            'dimension':1024, #change as per sequence length of Embedding Model
            "method": {
              "name": "hnsw",       
              "space_type": "cosinesimil",
              "engine": "nmslib",
              "parameters": {
                 "ef_construction": 256,
                 "m":  48
               }
            }
          },

          'passage': {
            'type': 'text'
          },

          'doc_id': {
            'type': 'keyword'
          },
        
          'table': {
            'type': 'text'
          },
                
          'list': {
            'type': 'text'
          },
          'section_header_ids': {
            'type': 'text'
          },
          'section_title_ids': {
            'type': 'text'
          },

        }
      }
    }

domain_index =f"test-index" #domain index name

if not os_.indices.exists(index=domain_index):        
    os_.indices.create(index=domain_index, body=mapping)
    # Verify that the index has been created
    if os_.indices.exists(index=domain_index):
        print(f"Index {domain_index} created successfully.")
    else:
        print(f"Failed to create index '{domain_index}'.")
else:
    print(f'{domain_index} Index already exists!')

i = 1
SAGEMAKER=boto3.client('sagemaker-runtime')
for ids, chunkks in chunks.items(): # Iterate through the page title chunks 
    index_adjuster=len(chunk_header_mapping[ids])%len(chunkks)
    for chunk_ids,chunk in enumerate(chunkks): # iterating through section header chunks   
        chunk_ids+=index_adjuster
        passage_chunk="\n".join(chunk).replace("<title>","").replace("</title>","")
        if passage_chunk.strip():
            embedding=_get_emb_(passage_chunk)       
            table=[]
            if ids in table_header_dict:
                if [x for x in table_header_dict[ids] if x ==chunk_ids]:                
                    table="\n".join(table_header_dict[ids][chunk_ids])
            lists=[]
            if ids in list_header_dict:
                if [x for x in list_header_dict[ids] if x ==chunk_ids]:                
                    lists="\n".join(list_header_dict[ids][chunk_ids])
            documentt = { 
                'doc_id':doc_id, #doc name   
                'passage': passage_chunk,
                'embedding': embedding,
                'table':table,
                "list":lists,    
                "section_header_ids":chunk_ids, #Store id of the header section
                "section_title_ids":ids #Store id of the title section
            }

            try:
                response = os_.index(index=domain_index, body=documentt)
                i += 1
                # Check the response to see if the indexing was successful
                if response["result"] == "created":
                    print(f"Document indexed successfully with ID: {response['_id']}")
                else:
                    print("Failed to index document.")
            except RequestError as e:
                logging.error(f"Error indexing document to index '{domain_index}': {e}")
        else:
            continue        

# RAG

Custom approach to combine lexical and keyword search from OpenSearch Serverless as this is not natively intergrated in the service

In [18]:
import numpy as np

def normalize_scores_(scores,normalizer):
    """
    Normalize scores using L2/min-max normalization.
    :param scores: The list of scores to normalize.
    :param mormalizer: normalizing tekniq
    :return: The normalized scores.
    """
    if "minmax" in normalizer:
        scores = np.array(scores)
        return (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
    elif "l2" in normalizer:
        scores = np.array(scores)
        return scores / np.linalg.norm(scores)
    else:
        raise "enter either minmax or l2 as normalizer"
        
def interpolate_scores(lexical_score, semantic_score, alpha=0.5):
    """
    Interpolate lexical and semantic scores using a weighted sum.
    :param lexical_score: The normalized score from the lexical search.
    :param semantic_score: The normalized score from the semantic search.
    :param alpha: The interpolation weight (default: 0.5).
    :return: The interpolated score.
    """
    return alpha * lexical_score + (1 - alpha) * semantic_score

def reciprocal_rank_fusion(lexical_results, semantic_results, k=60):
    """
    Combine lexical and semantic search results using Reciprocal Rank Fusion (RRF).
    :param lexical_results: The results from the lexical search.
    :param semantic_results: The results from the semantic search.
    :param k: The parameter for RRF (default: 60).
    :return: The combined search results.
    """
    combined_results = {}

    for hit in lexical_results['hits']['hits']:
        doc_id = hit['_id']
        if doc_id not in combined_results:
            combined_results[doc_id] = {'_id': doc_id, '_source': hit['_source'], '_score': 0}
        combined_results[doc_id]['_score'] += 1 / (k + hit['_score'])

    for hit in semantic_results['hits']['hits']:
        doc_id = hit['_id']
        if doc_id not in combined_results:
            combined_results[doc_id] = {'_id': doc_id, '_source': hit['_source'], '_score': 0}
        combined_results[doc_id]['_score'] += 1 / (k + hit['_score'])

    combined_results = list(combined_results.values())
    combined_results = sorted(combined_results, key=lambda x: x['_score'], reverse=True)

    return {'hits': {'hits': combined_results}}

def hybrid_search(top_K_results,lexical_results, semantic_results, interpolation_weight=0.5, normalizer="minmax",use_rrf=False, rrf_k=60):
    """
    Perform hybrid search by combining lexical and semantic search results.
    :param lexical_results: The results from the lexical search.
    :param semantic_results: The results from the semantic search.
    :param interpolation_weight: The interpolation weight for score interpolation.
    :param normalizer: The normalization function (default: minmax normalization).
    :return: The combined search results.
    """
    
    if use_rrf:
        return reciprocal_rank_fusion(lexical_results, semantic_results, k=rrf_k)
    
    combined_results = []

    # Normalize the scores from lexical and semantic searches
    lexical_scores = [hit['_score'] for hit in lexical_results['hits']['hits']]
    semantic_scores = [hit['_score'] for hit in semantic_results['hits']['hits']]
    normalized_lexical_scores = normalize_scores_(lexical_scores,normalizer)
    normalized_semantic_scores = normalize_scores_(semantic_scores,normalizer)

    # Combine the results based on document IDs
    lexical_docs = {hit['_id']: (hit, score) for hit, score in zip(lexical_results['hits']['hits'], normalized_lexical_scores)}
    semantic_docs = {hit['_id']: (hit, score) for hit, score in zip(semantic_results['hits']['hits'], normalized_semantic_scores)}

    for doc_id in set(lexical_docs.keys()) | set(semantic_docs.keys()):
        lexical_hit, lexical_score = lexical_docs.get(doc_id, (None, 0))
        semantic_hit, semantic_score = semantic_docs.get(doc_id, (None, 0))

        if lexical_hit and semantic_hit:
            # Interpolate scores if both lexical and semantic results are available
            interpolated_score = interpolate_scores(lexical_score, semantic_score, interpolation_weight)      
            combined_hit = {
                '_id': doc_id,
                '_source': {**lexical_hit['_source']},
                '_score': interpolated_score,     
            }
        elif lexical_hit:
            # Use lexical hit if only lexical result is available
            combined_hit = {
                '_id': doc_id,
                '_source': lexical_hit['_source'],
                '_score': lexical_score
            }
        else:
            # Use semantic hit if only semantic result is available
            combined_hit = {
                '_id': doc_id,
                '_source': semantic_hit['_source'],
                '_score': semantic_score
            }
        combined_results.append(combined_hit)
    # Sort the combined results by the blended score
    combined_results = sorted(combined_results, key=lambda hit: hit['_score'], reverse=True)
    return {'hits': {'hits': combined_results[:top_K_results]}}

#### HYBRID SEARCH AND FILTER

In [75]:
from opensearchpy import Transport
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, "us-west-2", service, session_token=credentials.token)
transport = Transport(
   hosts = [{'host': domain_endpoint, 'port': 443}],
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    timeout=120,        
    # http_compress = True, # enables gzip compression for request bodies
    connection_class = RequestsHttpConnection
)
question="Amazon net sales by geographical location and products in 2023?"
embedding=_get_emb_(question)
# Top K returned results
top_K_results=10
# Define the search query

search_requests = [
    ({}, {"query": {"match": {"passage": question}}, "size": top_K_results, "_source": {"exclude": ["embedding"]}}),
    ({}, {"query": {"knn": {"embedding": {"vector": embedding, "k": 3}}}, "size": top_K_results, "_source": {"exclude": ["embedding"]}})
]

# Convert the search requests to NDJSON format
data = ""
for metadata, request in search_requests:
    data += f"{json.dumps(metadata)}\n{json.dumps(request)}\n"
response = transport.perform_request("GET", f"/{domain_index}/_msearch", body=data)
# Separate the results    
lexical_search_results = response['responses'][0]
semantic_search_results = response['responses'][1]
# Use the custom hybrid search function
hybrid_results = hybrid_search(top_K_results,lexical_search_results, semantic_search_results, 
                               interpolation_weight=0.5, normalizer="minmax", use_rrf=False, rrf_k=100)

# Implement a combination technique or just pass one of either lexical or semantic search back
response= hybrid_results
response['hits']['hits']

[{'_id': '1%3A0%3AQDNbHJABQBU2LhBAgZnN',
  '_source': {'section_header_ids': 27,
   'section_title_ids': 4,
   'passage': 'Net Sales\nNet sales include product and service sales. Product sales represent revenue from the sale of products and related shipping fees and digital media content where we record revenue gross. Service sales primarily represent third-party seller fees, which includes commissions and any related fulfillment and shipping fees, AWS sales, advertising services, Amazon Prime membership fees, and certain digital media content subscriptions. Net sales information is as follows (in millions):\n|Year Ended December 31,|Year Ended December 31,\nNorth America|13 %|12%\nInternational|4|11\nAWS|29|13\nConsolidated|13|12\nNet Sales Mix:|Net Sales Mix:|Net Sales Mix:\nNorth America|61 %|61 %\nInternational|23|23\nAWS|16|16\nConsolidated|100 %|100 %\nSales increased 12% in 2023, compared to the prior year. Changes in foreign exchange rates reduced net sales by $71 million in 20

The capability to select hierarchical information of interest from the retrieved response is based on the corresponding matched passage chunk. This allows for flexibility in determining the amount of information provided to the LLM, accommodating queries that benefit from access to full section information.

This approach entails storing chunk hierarchical sections and titles in an S3 bucket. If additional information beyond the passage chunk in the OpenSearch index is necessary, it retrieves the corresponding sections based on the indexed matching IDs associated with the chunk.

In [76]:
def read_file_from_s3(bucket_name, key, section_content, title_id=None,section_id=None):
    """
    Read a file from an S3 bucket and extract specific sections based on given parameters.
    Parameters:
        bucket_name (str): The name of the S3 bucket.
        key (str): The key (path) of the file in the S3 bucket.
        section_content (str): Specifies the type of section content to extract.
            Possible values: "section_header" or "section_title".
        title_id (str or int, optional): The ID of the title. Required if `section_content` is "section_header".
        section_id (str or int, optional): The ID of the section. Required if `section_content` is "section_header".

    Returns:
        str or None: The extracted section content as a string. Returns None if there's an error.
    """
    try:   
        response = s3.get_object(Bucket=bucket_name, Key=key)        
        file_content = response['Body'].read().decode('utf-8')
        file_content=json.loads(file_content)
        if section_content=="section_header":
            passage=file_content[str(title_id)][str(section_id)]
        elif section_content=="section_title":     
            # Join each sublist into a string
            strings = ["\n".join(sublist) for sublist in list(file_content[str(title_id)].values())]
            # Convert to a set to remove duplicates
            passage = OrderedDict.fromkeys(strings)
        return "\n".join(passage)
    except Exception as e:
        print(f"Error reading {key} from S3 bucket {bucket_name}:", e)
        return None

class InvalidContentError(Exception):
    pass

# Extract relevant information from the search response
def content_extraction_os_(response:str, table:bool, lyst:bool,section_content:str):
    """
    Extracts content from the OpenSearch response based on specified parameters.

    Parameters:
    response (dict): The response from OpenSearch containing search results.
    table (bool): A boolean indicating whether to include table content.
    lyst (bool): A boolean indicating whether to include list content.
    section_content (str): The type of content to extract. Allowed values are 'passage', 'section_header', or 'section_title'.

    Returns:
    tuple: A tuple containing concatenated passages and tables.
    """
    allowed_values = {"passage", "section_header", "section_title"}  # Define allowed values
    if section_content not in allowed_values:
        raise InvalidContentError(f"Invalid content type '{section_content}'. Allowed values are {', '.join(allowed_values)}.")
    
    res=response['hits']['hits']
    score = [str(x['_score']) for x in res]  #retrieval score    
    # title_names = [x['_source']['title_headers'] for x in res] #doc page number of chunks
    doc_name = [x['_source']['doc_id'] for x in res] # doc names
    header_ids = [x['_source']['section_header_ids'] for x in res] # section header id
    title_ids=[x['_source']['section_title_ids'] for x in res] # section title id
    tables=""
    lists=""
    
    if section_content=="passage":
        passage = [x['_source']["passage"] for x in res] #retrieved passages, here you can choose to retrieve the  complete section header or title instead of the chunk passage
        tables=[x['_source']['table'] for x in res] # tables in the corresponding chunk
        lists=[x['_source']['list'] for x in res]
    else:
        passage=[]
        for x in range(len(title_ids)):
            passage.append(read_file_from_s3(BUCKET, f"{doc_name[x]}.json",section_content,title_ids[x],header_ids[x]))
        passage=set(passage)      
    p = inflect.engine()
    ## Concatenate passages and tables to use in prompt template 
    passages=""
    tab=""
    lst=""
    for  ids,text in enumerate(passage):
        passages+=f"<{p.ordinal(ids+1)}_passage>\n{text}\n</{p.ordinal(ids+1)}_passage>\n"
    if table and tables:
        for  ids,text in enumerate(tables):            
            tab+=f"<{p.ordinal(ids+1)}_passage_table>\n{text}\n</{p.ordinal(ids+1)}_passage_table>\n"  #Table can be coupled with passage chunks to provide more information.
    if lyst and lists:
        for  ids,text in enumerate(lists):            
            lst+=f"<{p.ordinal(ids+1)}_passage_lists>\n{text}\n</{p.ordinal(ids+1)}_passage_lists>\n"  
    return passages, tab, lst,passage,tables,lists

In [77]:
passages,tab,lyst,passage,tables,lists=content_extraction_os_(response, False,False, "passage")
print(passages)

<1st_passage>
Net Sales
Net sales include product and service sales. Product sales represent revenue from the sale of products and related shipping fees and digital media content where we record revenue gross. Service sales primarily represent third-party seller fees, which includes commissions and any related fulfillment and shipping fees, AWS sales, advertising services, Amazon Prime membership fees, and certain digital media content subscriptions. Net sales information is as follows (in millions):
|Year Ended December 31,|Year Ended December 31,
North America|13 %|12%
International|4|11
AWS|29|13
Consolidated|13|12
Net Sales Mix:|Net Sales Mix:|Net Sales Mix:
North America|61 %|61 %
International|23|23
AWS|16|16
Consolidated|100 %|100 %
Sales increased 12% in 2023, compared to the prior year. Changes in foreign exchange rates reduced net sales by $71 million in 2023. For a discussion of the effect of foreign exchange rates on sales growth, see "Effect of Foreign Exchange Rates" belo

## Bedrock Anthropic LLM Inference

Using the a prompt template with placeholders for the retrieved passages as `passages` under **document** tags and any retrieved standalone tables and list found within each retrieved passages as `tab` under **additional_information** tags.\
Change the `csv_seperator` variable name to what was used during chunking. default is "|" pipe character.\
Anthropic Claude models (Claude 3 and 2) is used to generate a response to the user question.

In [None]:
csv_seperator="|"
prompt_template=f"""You are a helpful, obedient and truthful financial assistance.

<document>
{passages}
</document>          

<additional_information>
{tab}
</additional_information>

<instructions>
When providing your response based on the document:
1. Understand the question to know what is being asked of you.
2. Review the entire document provided and check if it contains relevant information to answer the question. Only pay attention to passages with relevant information.
3. Any tables provided within the document or additional information are delimited by {csv_seperator} character.
4. If the document is sufficient to answer the question, provide a comprehensive answer ENTIRELY based on the document provided. DO NOT make up answers not present in the document.
5. If the answer is not available in the document, say so.
</instructions>

Question: {question}
if able to answer:
    Include in your response before your answer:    
    <source>document or additional info tag(s) containing the relevant info</source>"""


The groundtruth can be found on page 69 of the amazon 2024 10K document

In [85]:
model_id="anthropic.claude-3-haiku-20240307-v1:0" #"anthropic.claude-3-sonnet-20240229-v1:0""anthropic.claude-v2","anthropic.claude-3-haiku-20240307-v1:0"
model_response,input_tokens, output_tokens=_invoke_bedrock_with_retries([], "", prompt_template,model_id , [])

<source>3rd_passage, 5th_passage, 7th_passage</source>

Based on the information provided in the document, here are the key details on Amazon's net sales by geographical location and products in 2023:

Net sales by geographical location in 2023:
- United States: $395,637 million
- Germany: $37,588 million
- United Kingdom: $33,591 million 
- Japan: $26,002 million
- Rest of world: $81,967 million

Net sales by product and service categories in 2023:
- Online stores: $231,872 million
- Third-party seller services: $140,053 million
- AWS: $90,757 million
- Advertising services: $46,906 million
- Subscription services: $40,209 million
- Physical stores: $20,030 million
- Other: $4,958 million
Input Tokens: 3076
Output Tokens: 213


# Conclusion

This notebook showcases the extraction of content from a document while maintaining its layout structure. Additionally, we processed and chunked the document, ensuring the integrity of the information was preserved. Furthermore, we indexed these chunks and associated hierarchical metadata information, offering flexibility in information retrieval. 

Finally, we conducted a RAG query and generated contextual answers.

# Delete Resources

In [None]:
import botocore

def deleteDomain(client, domainName):
    """Deletes an OpenSearch Service domain. Deleting a domain can take several minutes."""
    try:
        response = client.delete_collection(
            id=domainName
        )
        print('Sending domain deletion request...')
        print(response)

    except botocore.exceptions.ClientError as error:
        if error.response['Error']['Code'] == 'ResourceNotFoundException':
            print('Domain not found. Please check the domain name.')
        else:
            raise error
            
domain_id=aoss.batch_get_collection(
        names=['idp-workshop-aoss'])['collectionDetails'][0]['id']
deleteDomain(aoss, domain_id)