This notebook was tested in a `ml.t3.medium` instance and Sagemaker`Data Science 3` image Studio Notebook

<img src="images/chatbot4.png" width="800"/>

This sample notebooks implements a general chatbot.
Key functionalities include:
1. Saving of Conversation History in DynamoDB
2. Handling Document upload for various supported document format (PDF, JPG, CSV, EXCEL, PNG, TXT, JSON) by passing the document local or S3 path.
3. Implementing various prompt template store locally (can also be stored in S3)

Install required packages

In [4]:
%%sh
sudo apt-get update
sudo apt-get install tesseract-ocr-all -y

Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:4 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1271 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates/multiverse amd64 Packages [75.9 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 Packages [3544 kB]
Get:8 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [5174 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [5361 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [3233 kB]
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 Packages [1575 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-backports/universe amd64 Packages [35.2 kB]
Fetched 20.7 MB in 2

debconf: delaying package configuration, since apt-utils is not installed


Fetched 297 MB in 18s (16.8 MB/s)
Selecting previously unselected package libfribidi0:amd64.
(Reading database ... 14526 files and directories currently installed.)
Preparing to unpack .../000-libfribidi0_1.0.8-2ubuntu3.1_amd64.deb ...
Unpacking libfribidi0:amd64 (1.0.8-2ubuntu3.1) ...
Selecting previously unselected package libglib2.0-0:amd64.
Preparing to unpack .../001-libglib2.0-0_2.72.4-0ubuntu2.5_amd64.deb ...
Unpacking libglib2.0-0:amd64 (2.72.4-0ubuntu2.5) ...
Selecting previously unselected package libglib2.0-data.
Preparing to unpack .../002-libglib2.0-data_2.72.4-0ubuntu2.5_all.deb ...
Unpacking libglib2.0-data (2.72.4-0ubuntu2.5) ...
Selecting previously unselected package shared-mime-info.
Preparing to unpack .../003-shared-mime-info_2.1-2_amd64.deb ...
Unpacking shared-mime-info (2.1-2) ...
Selecting previously unselected package ucf.
Preparing to unpack .../004-ucf_3.0043_all.deb ...
Moving old data out of the way
Unpacking ucf (3.0043) ...
Selecting previously unselecte

IGNORE ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.

In [5]:
%pip install s3fs -U
%pip install pandas -U
%pip install --force-reinstall amazon-textract-textractor
%pip install python-calamine openpyxl python-calamine
%pip install pypdf2 pytesseract python-pptx python-docx pillow

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting amazon-textract-textractor
  Downloading amazon_textract_textractor-1.9.2-py3-none-any.whl.metadata (9.6 kB)
Collecting Pillow (from amazon-textract-textractor)
  Using cached pillow-11.3.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (9.0 kB)
Collecting XlsxWriter<4,>=3.0 (from amazon-textract-textractor)
  Using cached xlsxwriter-3.2.5-py3-none-any.whl.metadata (2.7 kB)
Collecting amazon-textract-caller<1,>=0.2.4 (from amazon-textract-textractor)
  Using cached amazon_textract_caller-0.2.4-py2.py3-none-any.whl.metadata (7.2 kB)
Collecting editdistance<0.9,>=0.6.2 (from amazon-textract-textractor)
  Downloading editdistance-0.8.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.9 kB)
Collecting tabulate<0.10,>=0.9 (from amazon-textract-textractor)
  Downloading tabulate-0.9.0-py3-none-any.whl.metada

**From the menu bar, go to Kernel > Restart Kernel to restart the notebook as best practice**

In [27]:
import boto3
from botocore.config import Config
import shutil
import os
import pandas as pd
import time
import json
import base64
import io
from python_calamine import CalamineWorkbook
import re
import numpy as np
import openpyxl
from openpyxl.cell import Cell
from openpyxl.worksheet.cell_range import CellRange
import uuid
from pptx import Presentation
from botocore.exceptions import ClientError
from textractor import Textractor
from textractor.visualizers.entitylist import EntityList
from textractor.data.constants import TextractFeatures
from textractor.data.text_linearization_config import TextLinearizationConfig
import pytesseract
from PIL import Image
import PyPDF2
import chardet
from datetime import datetime    
from docx import Document as DocxDocument
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
from docx.document import Document
from docx.table import _Cell, Table
from docx.text.paragraph import Paragraph
from docx.table import Table as DocxTable
import concurrent.futures
from functools import partial
import csv
from sagemaker import Session

### Configurable:
- `DYNAMODB_TABLE`: The name of the DynamoDB table used for storing chat history. Change to an empty string if you want to store chat history locally.
- `DYNAMODB_USER`: The user ID for the application.
- `BUCKET`: The name of the S3 bucket used for caching documents and extracted text.
- `CHAT_HISTORY_LENGTH`: The number of recent chat messages to load from the DynamoDB table.
- `REGION`: The AWS region to interact with AWS services .
- `LOAD_DOC_IN_ALL_CHAT_CONVO`: A boolean flag indicating whether to load documents extracted text in the chat history. If set to false only question and answer will be loaded in chat history. If set to True question and answer including all associated document extracted text will be loaded in chat history.
- `S3_DOC_CACHE_PATH`: S3 path to store attached document if from local system
- `TEXTRACT_RESULT_CACHE_PATH`: S3 path to cache extracted PDF and Images 
- `USE_TEXTRACT`: Boolean flag on whether to use amazon Textract for OCR or python libraries. If you do not have access to Amazon textract set to False. I recommend to use Amazon Textract if possible for better quality document OCR and processing.

In [48]:
session = Session()
DYNAMODB_TABLE="" # Leave Empty if not using DynamoDb for Chat history else pass name for a DynamoDB table
DYNAMODB_USER= "user id" #Change to prefered user name if using DynamoDB for chat history storage
SESSIONID=str(time.time())
REGION=os.getenv("AWS_REGION") if os.getenv("AWS_REGION") else "us-east-1"
print(f"AWS Region set to {REGION}")
chat_hist=[]
BUCKET = session.default_bucket()
print(f"Bucket Name: {BUCKET}")
S3_DOC_CACHE_PATH='chatbot_uploads'
TEXTRACT_RESULT_CACHE_PATH="textract_output"
LOAD_DOC_IN_ALL_CHAT_CONVO=True
CHAT_HISTORY_LENGTH=5
USE_TEXTRACT=False #Change to True to use Amazon Textract
LOCAL_CHAT_FILE_NAME = "chat-history.json" # Name of file to store chat history locally if not using DynamoDB


DYNAMODB  = boto3.resource('dynamodb', region_name=REGION)
dynamo=boto3.client('dynamodb', region_name=REGION)
S3=boto3.client('s3',region_name=REGION)

AWS Region set to us-west-2


#### Initialize Bedrock Runtime

In [30]:
# Create the bedrock runtime to invoke LLM
from botocore.config import Config
config = Config(
    read_timeout=600, # Read timeout parameter
    retries = dict(
        max_attempts = 10 ## Handle retries
    )
)
import boto3
bedrock_runtime = boto3.client(service_name='bedrock-runtime',region_name=REGION,config=config)

#### Create DynamoDB Table
A DynamoDB Table is created with a user ID as partition Key and Session ID as sort key. 
This enables saving multiple chat session history under the same user id.\
Provide a bucket name that would be used to cache Amazon Textract results for document OCR.

In [31]:
if DYNAMODB_TABLE:
    try:
        table = DYNAMODB.create_table(
            TableName=DYNAMODB_TABLE,
            KeySchema=[
                {
                    'AttributeName': 'UserId',  # Partition key
                    'KeyType': 'HASH'  
                },
                {
                    'AttributeName': 'SessionId',   # Sort key
                    'KeyType': 'RANGE'
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'UserId',
                    'AttributeType': 'S'   # String data type
                },
                {
                    'AttributeName': 'SessionId',
                    'AttributeType': 'S'
                },
            ],
            BillingMode='PAY_PER_REQUEST'  # On-demand billing
        )

        print("Table status:", table.table_status)

        # Wait until the table exists.
        table.meta.client.get_waiter("table_exists").wait(TableName=DYNAMODB_TABLE)
        print(table.item_count)
    except dynamo.exceptions.ResourceInUseException as e:
        print(e.response['Error']['Message'])

## Utility Functions

This function reads an Excel file from the specified S3 bucket using the provided S3 URI.
   It loads the workbook using openpyxl `table_parser_openpyxl` or python-calamine `calamaine_excel_engine` (some excel files are better read with calamine as openpyxl does not work), unmerges any merged cells, and copies their values
   to individual cells for every worksheet by calling the ` table_parser_utils` function. The worksheet data is then converted to a pandas DataFrame, and the
   `strip_newline` function is applied to each cell value to remove newline characters.
   Finally, the DataFrame is converted to a CSV string with pipe (|) as the delimiter and
   returned.

NOTE: The `calamaine_excel_engine` does not handle merged cells.
   

In [32]:
def strip_newline(cell):
    return str(cell).strip()

def table_parser_openpyxl(file):
    # Read from S3
    s3 = boto3.client('s3', region_name=REGION)
    match = re.match("s3://(.+?)/(.+)", file)
    if match:
        bucket_name = match.group(1)
        key = match.group(2)
        obj = s3.get_object(Bucket=bucket_name, Key=key)    
        # Read Excel file from S3 into a buffer
        xlsx_buffer = io.BytesIO(obj['Body'].read())
        xlsx_buffer.seek(0)    
        # Load workbook
        wb = openpyxl.load_workbook(xlsx_buffer)    
        all_sheets_string=""
        # Iterate over each sheet in the workbook
        for sheet_name in wb.sheetnames:
            # all_sheets_name.append(sheet_name)
            worksheet = wb[sheet_name]

            all_merged_cell_ranges: list[CellRange] = list(
                worksheet.merged_cells.ranges
            )
            for merged_cell_range in all_merged_cell_ranges:
                merged_cell: Cell = merged_cell_range.start_cell
                worksheet.unmerge_cells(range_string=merged_cell_range.coord)
                for row_index, col_index in merged_cell_range.cells:
                    cell: Cell = worksheet.cell(row=row_index, column=col_index)
                    cell.value = merged_cell.value        
            # Convert sheet data to a DataFrame
            df = pd.DataFrame(worksheet.values)
            df = df.map(strip_newline)
            # Convert to string and tag by sheet name
            all_sheets_string+=f'<{sheet_name}>\n{df.to_csv(sep="|", index=False, header=0)}\n</{sheet_name}>\n'
        return all_sheets_string
    else:
        raise Exception(f"{file} not formatted as an S3 path")

def calamaine_excel_engine(file):
    # # Read from S3
    s3 = boto3.client('s3',region_name=REGION)
    match = re.match("s3://(.+?)/(.+)", file)
    if match:
        bucket_name = match.group(1)
        key = match.group(2)
        obj = s3.get_object(Bucket=bucket_name, Key=key)    
        # Read Excel file from S3 into a buffer
        xlsx_buffer = io.BytesIO(obj['Body'].read())
        xlsx_buffer.seek(0)    
        all_sheets_string=""
        # Load the Excel file
        workbook = CalamineWorkbook.from_filelike(xlsx_buffer)
        # Iterate over each sheet in the workbook
        for sheet_name in workbook.sheet_names:
            # Get the sheet by name
            sheet = workbook.get_sheet_by_name(sheet_name)
            df = pd.DataFrame(sheet.to_python(skip_empty_area=False))
            df = df.map(strip_newline)
            all_sheets_string+=f'<{sheet_name}>\n{df.to_csv(sep="|", index=False, header=0)}\n</{sheet_name}>\n'
        return all_sheets_string
    else:
        raise Exception(f"{file} not formatted as an S3 path")

def table_parser_utills(file):
    try:
        response= table_parser_openpyxl(file)
        if response:
            return response
        else:
            return calamaine_excel_engine(file)        
    except Exception as e:
        try:
            return calamaine_excel_engine(file)
        except Exception as e:
            raise Exception(str(e))

1. `get_s3_keys(prefix)`: Retrieves a list of object keys from an S3 bucket that match the specified prefix. It returns an empty string if no objects are found.
2. `get_object_with_retry(bucket, key)`: Retrieves an object from an S3 bucket with retry functionality. It attempts to get the object and handles the "DecryptionFailureException" error by retrying with exponential backoff. If the maximum number of retries is exceeded, it raises an exception.

In [33]:
def get_s3_keys(prefix):
    """list all keys in an s3 path"""
    s3 = boto3.client('s3',region_name=REGION)
    keys = []
    next_token = None

    while True:
        if next_token:
            response = s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix, ContinuationToken=next_token)
        else:
            response = s3.list_objects_v2(Bucket=BUCKET, Prefix=prefix)

        if "Contents" in response:
            for obj in response['Contents']:
                key = obj['Key']
                name = key[len(prefix):]
                keys.append(name)

        if "NextContinuationToken" in response:
            next_token = response["NextContinuationToken"]
        else:
            break

    return keys

def get_object_with_retry(bucket, key):
    """Get object from s3 with error handling and retries"""
    max_retries=5
    initial_backoff=1
    retries = 0
    backoff = initial_backoff
    s3 = boto3.client('s3',region_name=REGION)

    while retries < max_retries:
        try:
            response = s3.get_object(Bucket=bucket, Key=key)
            return response
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'DecryptionFailureException':
                print(f"Decryption failed, retrying in {backoff} seconds...")
                time.sleep(backoff)
                backoff *= 2  # Exponential backoff
                retries += 1
            else:
                raise e
    # If we reach this point, it means the maximum number of retries has been exceeded
    raise Exception(f"Failed to get object {key} from bucket {bucket} after {max_retries} retries.")

Extracts text from a PDF or image file using AWS Textract or Python Lib (Pypdf2 or PyTesseract).

It checks if the extracted document content is already cached in S3 based on the file name. If found, it retrieves the cached text using the get_object_with_retry function.
If the content is not cached and the USE_TEXTRACT flag is set:

For PDF files, it uses the Textractor class from the textractor library to perform an asynchronous document analysis. It extracts text, layout, and tables from the PDF.
For other file types, it uses the Textractor class to perform a synchronous document analysis.
The extracted content is then uploaded to S3 for caching.


If the USE_TEXTRACT flag is not set:

For PDF files, it downloads the file from S3 using s3.Bucket(bucket_name).download_fileobj, reads the PDF using PyPDF2, and extracts text from each page.
For image files, it downloads the file from S3, opens it using Image.open, and uses pytesseract to extract text from the image.

In [34]:
def exract_pdf_text_aws(file):    
    file_base_name=os.path.basename(file)
    dir_name, ext = os.path.splitext(file)
    # Checking if extracted doc content is in S3
    if USE_TEXTRACT:        
        if [x for x in get_s3_keys(f"{TEXTRACT_RESULT_CACHE_PATH}/") if file_base_name in x]:    
            response = get_object_with_retry(BUCKET, f"{TEXTRACT_RESULT_CACHE_PATH}/{file_base_name}.txt")#S3.get_object(Bucket=BUCKET, Key=f"{TEXTRACT_RESULT_CACHE_PATH}/{file_base_name}.txt")
            text = response['Body'].read().decode()
            return text
        else:
            
            extractor = Textractor(region_name="us-east-1")
            # Asynchronous call, you will experience some wait time. Try caching results for better experience
            if "pdf" in ext:
                print("Asynchronous call, you may experience some wait time.")
                document = extractor.start_document_analysis(
                file_source=file,
                features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],       
                save_image=False,   
                s3_output_path=f"s3://{BUCKET}/textract_output/"
            )
            #Synchronous call
            else:
                document = extractor.analyze_document(
                file_source=file,
                features=[TextractFeatures.LAYOUT,TextractFeatures.TABLES],  
                save_image=False,
            )
            config = TextLinearizationConfig(
            hide_figure_layout=False,   
            hide_header_layout=False,    
            table_prefix="<table>",
            table_suffix="</table>",
            )
            # Upload extracted content to s3
            S3.put_object(Body=document.get_text(config=config), Bucket=BUCKET, Key=f"{TEXTRACT_RESULT_CACHE_PATH}/{file_base_name}.txt") 
            return document.get_text(config=config)
    else:
        s3=boto3.resource("s3",region_name=REGION)
        match = re.match("s3://(.+?)/(.+)", file)
        if match:
            bucket_name = match.group(1)
            key = match.group(2)    
   
        if "pdf" in ext:            
            pdf_bytes = io.BytesIO()
            
            s3.Bucket(bucket_name).download_fileobj(key, pdf_bytes)
            # Read the PDF from the BytesIO object
            pdf_bytes.seek(0)                      
            # Create a PDF reader object
            pdf_reader = PyPDF2.PdfReader(pdf_bytes)
            # Get the number of pages in the PDF
            num_pages = len(pdf_reader.pages)
            # Extract text from each page
            text = ''
            for page_num in range(num_pages):
                page = pdf_reader.pages[page_num]
                text += page.extract_text()
        else:
            img_bytes = io.BytesIO()
            s3.Bucket(bucket_name).download_fileobj(key, img_bytes)
            img_bytes.seek(0)
            image = Image.open(img_bytes)
            text = pytesseract.image_to_string(image)
        return text

In [35]:
def get_s3_obj_from_bucket_(file):
    """Retrieves an object from an S3 bucket given its S3 URI.
    Args:
       file (str): The S3 URI of the object to retrieve, in the format "s3://{bucket_name}/{key}".
   Returns:
       botocore.response.StreamingBody: The retrieved S3 object.
    """
    s3 = boto3.client('s3',region_name=REGION)
    match = re.match("s3://(.+?)/(.+)", file)
    if match:
        bucket_name = match.group(1)
        key = match.group(2)    
        obj = s3.get_object(Bucket=bucket_name, Key=key)  
    return obj

def put_obj_in_s3_bucket_(docs):
    """Uploads a file to an S3 bucket and returns the S3 URI of the uploaded object.
    Args:
       docs (str): The local file path of the file to upload to S3.
   Returns:
       str: The S3 URI of the uploaded object, in the format "s3://{bucket_name}/{file_path}".
    """
    file_name=os.path.basename(docs)
    file_path=f"{S3_DOC_CACHE_PATH}/{file_name}"
    S3.upload_file(docs, BUCKET, file_path)
    return f"s3://{BUCKET}/{file_path}"

The `process_files` function processes multiple attached files concurrently using a process pool executor. It takes a list of files, submits tasks to the executor to process each file using the `handle_doc_upload_or_s3` function, and collects the results and errors. It returns a tuple containing the processed results, errors, and a formatted result string. The function leverages concurrent processing to improve efficiency when handling a large number of files.

In [36]:
def process_files(files):
    results = []
    result_string=""
    errors = []
    future_proxy_mapping = {} 
    futures = []

    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Partial function to pass the handle_doc_upload_or_s3 function
        func = partial(handle_doc_upload_or_s3)   
        for file in files:
            future = executor.submit(func, file)
            future_proxy_mapping[future] = file
            futures.append(future)

        # Collect the results and handle exceptions
        for future in concurrent.futures.as_completed(futures):        
            file_url= future_proxy_mapping[future]
            try:
                result = future.result()
                results.append(result)
                doc_name=os.path.basename(file_url)
                
                result_string+=f"<{doc_name}>\n{result}\n</{doc_name}>\n"
            except Exception as e:
                # Get the original function arguments from the Future object
                error = {'file': file_url, 'error': str(e)}
                errors.append(error)

    return results, errors, result_string

- `extract_text_and_tables(docx_path)`: Extracts text and tables from a Word document (docx) file. It uses the python-docx library to iterate over the block-level items in the document. It identifies headings, lists, and tables based on their styles and tags them accordingly in the extracted content. It also handles nested tables by recursively parsing them.
- `extract_text_from_pptx_s3(pptx_buffer)`: Extracts text from a PowerPoint presentation (pptx) file stored in S3. It takes a BytesIO buffer containing the pptx file content and uses the python-pptx library to extract text from each slide. It returns the extracted text as a single string.
- `parse_csv_from_s3(s3_uri)`: Parses a CSV file stored in S3 using pandas. It detects the file encoding using `detect_encoding()`, sniffs the delimiter, and reads the CSV file into a pandas DataFrame. If an error occurs during parsing, it raises an InvalidContentError exception.

In [37]:
class InvalidContentError(Exception):
    pass

def detect_encoding(s3_uri):
    """detect csv encoding"""
    s3 = boto3.client('s3',region_name=REGION)
    match = re.match("s3://(.+?)/(.+)", s3_uri)
    if match:
        bucket_name = match.group(1)
        key = match.group(2) 
    response = s3.get_object(Bucket=bucket_name, Key=key)
    content = response['Body'].read()
    result = chardet.detect(content)
    return result['encoding']

def parse_csv_from_s3(s3_uri):
    """read csv files"""
    try:
        # Detect the file encoding using chardet
        encoding = detect_encoding(s3_uri)        
        # Sniff the delimiter and read the CSV file
        df = pd.read_csv(s3_uri, delimiter=None, engine='python', encoding=encoding)
        return df.to_csv(index=False, sep="|")
    except Exception as e:
        raise InvalidContentError(f"Error: {e}")
    
def iter_block_items(parent):
    if isinstance(parent, Document):
        parent_elm = parent.element.body
    elif isinstance(parent, _Cell):
        parent_elm = parent._tc
    else:
        raise ValueError("something's not right")

    for child in parent_elm.iterchildren():
        if isinstance(child, CT_P):
            yield Paragraph(child, parent)
        elif isinstance(child, CT_Tbl):
            yield DocxTable(child, parent)

def extract_text_and_tables(docx_path):
    """ Extract text from docx files"""
    document = DocxDocument(docx_path)
    content = ""
    current_section = ""
    section_type = None
    for block in iter_block_items(document):
        if isinstance(block, Paragraph):
            if block.text:
                if block.style.name == 'Heading 1':
                    # Close the current section if it exists
                    if current_section:
                        content += f"{current_section}</{section_type}>\n"
                        current_section = ""
                        section_type = None  
                    section_type ="h1"
                    content += f"<{section_type}>{block.text}</{section_type}>\n"
                elif block.style.name== 'Heading 3':
                    # Close the current section if it exists
                    if current_section:
                        content += f"{current_section}</{section_type}>\n"
                        current_section = ""
                    section_type = "h3"  
                    content += f"<{section_type}>{block.text}</{section_type}>\n"
                
                elif block.style.name == 'List Paragraph':
                    # Add to the current list section
                    if section_type != "list":
                        # Close the current section if it exists
                        if current_section:
                            content += f"{current_section}</{section_type}>\n"
                        section_type = "list"
                        current_section = "<list>"
                    current_section += f"{block.text}\n"
                elif block.style.name.startswith('toc'):
                    # Add to the current toc section
                    if section_type != "toc":
                        # Close the current section if it exists
                        if current_section:
                            content += f"{current_section}</{section_type}>\n"
                        section_type = "toc"
                        current_section = "<toc>"
                    current_section += f"{block.text}\n"
                else:
                    # Close the current section if it exists
                    if current_section:
                        content += f"{current_section}</{section_type}>\n"
                        current_section = ""
                        section_type = None
                    
                    # Append the passage text without tagging
                    content += f"{block.text}\n"
        
        elif isinstance(block, DocxTable):
            # Add the current section before the table
            if current_section:
                content += f"{current_section}</{section_type}>\n"
                current_section = ""
                section_type = None

            content += "<table>\n"
            for row in block.rows:
                row_content = []
                for cell in row.cells:
                    cell_content = []
                    for nested_block in iter_block_items(cell):
                        if isinstance(nested_block, Paragraph):
                            cell_content.append(nested_block.text)
                        elif isinstance(nested_block, DocxTable):
                            nested_table_content = parse_nested_table(nested_block)
                            cell_content.append(nested_table_content)
                    row_content.append("|".join(cell_content))
                content += "|".join(row_content) + "\n"
            content += "</table>\n"

    # Add the final section
    if current_section:
        content += f"{current_section}</{section_type}>\n"

    return content

def parse_nested_table(table):
    nested_table_content = "<table>\n"
    for row in table.rows:
        row_content = []
        for cell in row.cells:
            cell_content = []
            for nested_block in iter_block_items(cell):
                if isinstance(nested_block, Paragraph):
                    cell_content.append(nested_block.text)
                elif isinstance(nested_block, DocxTable):
                    nested_table_content += parse_nested_table(nested_block)
            row_content.append("|".join(cell_content))
        nested_table_content += "|".join(row_content) + "\n"
    nested_table_content += "</table>"
    return nested_table_content



def extract_text_from_pptx_s3(pptx_buffer):
    """ Extract Text from pptx files"""
    presentation = Presentation(pptx_buffer)    
    text_content = []
    for slide in presentation.slides:
        slide_text = []
        for shape in slide.shapes:
            if hasattr(shape, 'text'):
                slide_text.append(shape.text)
        text_content.append('\n'.join(slide_text))    
    return '\n\n'.join(text_content)

The `handle_doc_upload_or_s3 function` is a comprehensive handler for extracting content from various file formats, either from a local file or a file stored in Amazon S3. It takes a file parameter, which can be a local file path or an S3 URI, and returns the extracted content based on the file extension. This handled documents are passed as contet to the LLM.

- .pdf, .png, .jpg: It calls the exract_pdf_text_aws function to extract text from the file using AWS Textract or other libraries like PyPDF2 or pytesseract.
- .csv: It calls the parse_csv_from_s3 function to parse the CSV file using pandas, detecting the file encoding and sniffing the delimiter.
- .xlsx, .xlx: It calls the table_parser_utills function to extract content from Excel files.
- .json: It retrieves the file from S3 using the get_s3_obj_from_bucket_ function, reads the file content, and loads it as a JSON object using json.loads.
- .txt, .py: It retrieves the file from S3 using the get_s3_obj_from_bucket_ function and reads the file content as plain text.
- .docx: It retrieves the file from S3 using the get_s3_obj_from_bucket_ function, reads the file content, creates a BytesIO buffer, and passes it to the extract_text_and_tables function to extract text and tables from the Word document.
- .pptx: It retrieves the file from S3 using the get_s3_obj_from_bucket_ function, reads the file content, creates a BytesIO buffer, and passes it to the extract_text_from_pptx_s3 function to extract text from the PowerPoint presentation.
- Other file extensions: It uses the textract library to extract content from the file. It retrieves the file from S3 using the get_s3_obj_from_bucket_ function, reads the file content, creates a BytesIO buffer, and passes it to textract.process to extract the text content.

In [38]:
def handle_doc_upload_or_s3(file):
    """Handle various document format"""
    dir_name, ext = os.path.splitext(file)
    if  ext.lower() in [".pdf", ".png", ".jpg",".tif",".jpeg"]:   
        content=exract_pdf_text_aws(file)
    elif ".csv"  == ext.lower():
        content=parse_csv_from_s3(file)
    elif ext.lower() in [".xlsx", ".xls"]:
        content=table_parser_utills(file)   
    elif  ".json"==ext.lower():      
        obj=get_s3_obj_from_bucket_(file)
        content = json.loads(obj['Body'].read())  
    elif  ext.lower() in [".txt",".py",".md"]:       
        obj=get_s3_obj_from_bucket_(file)
        content = obj['Body'].read()
    elif ".docx" == ext.lower():       
        obj=get_s3_obj_from_bucket_(file)
        content = obj['Body'].read()
        docx_buffer = io.BytesIO(content)
        content = extract_text_and_tables(docx_buffer)
    elif ".pptx" == ext.lower():       
        obj=get_s3_obj_from_bucket_(file)
        content = obj['Body'].read()
        docx_buffer = io.BytesIO(content)        
        content = extract_text_from_pptx_s3(docx_buffer)
    else:
        # Condition not met, raise an exception
        raise ValueError("Input file format not supported")
    # else:            
    #     obj=get_s3_obj_from_bucket_(file)
    #     content = obj['Body'].read()
    #     doc_buffer = io.BytesIO(doc_content)
    #     content = textract.process(doc_buffer).decode()
    # Implement any other file extension logic 
    return content

Stores long-term chat history in DynamoDB or Local Disk

   1. This `put_db(messages)`function takes a dictionary of messages and stores it in a DynamoDB table. It uses the user ID and session ID as the primary key to identify the item in the table. If an item with the same user ID and session ID already exists in the table, the function retrieves the existing messages and appends the new messages to the list. Finally, it puts the updated chat item back into the DynamoDB table.
2. `save_chat_local(file_path, new_data)`: Saves new chat data to a local JSON file, appending it to the existing data. Handles conversion of Decimal objects to floats.
3. `load_chat_local(file_path)`: Loads chat history from a local JSON file stored locally, returning an empty list if the file doesn't exist.




In [39]:
def put_db(messages):
    """Store long term chat history in DynamoDB"""    
    chat_item = {
        "UserId": DYNAMODB_USER, # user id
        "SessionId": SESSIONID, # User session id
        "messages": [messages],  # 'messages' is a list of dictionaries
        "time":messages['time']
    }
    existing_item = DYNAMODB.Table(DYNAMODB_TABLE).get_item(Key={"UserId": DYNAMODB_USER, "SessionId":SESSIONID})
    if "Item" in existing_item:
        existing_messages = existing_item["Item"]["messages"]
        chat_item["messages"] = existing_messages + [messages]
    response = DYNAMODB.Table(DYNAMODB_TABLE).put_item(
        Item=chat_item
    )    
def save_chat_local(file_path, new_data):
    """Store long term chat history Local Disk"""   
    try:
        # Read the existing JSON data from the file
        with open(file_path, "r",encoding='utf-8') as file:
            existing_data = json.load(file)
        if SESSIONID not in existing_data:
            existing_data[SESSIONID]=[]
    except FileNotFoundError:
        # If the file doesn't exist, initialize an empty list
        existing_data = {SESSIONID:[]}
    # Append the new data to the existing list
    from decimal import Decimal
    data = [{k: float(v) if isinstance(v, Decimal) else v for k, v in item.items()} for item in new_data]
    existing_data[SESSIONID].extend(data)
    # Write the updated list back to the JSON file
    with open(file_path, "w") as file:
        json.dump(existing_data, file)
        
def load_chat_local(file_path):
    """Load long term chat history from Local"""   
    try:
        # Read the existing JSON data from the file
        with open(file_path, "r",encoding='utf-8') as file:
            existing_data = json.load(file)
            if SESSIONID in existing_data:
                existing_data=existing_data[SESSIONID]
            else:
                existing_data=[]
    except FileNotFoundError:
        # If the file doesn't exist, initialize an empty list
        existing_data = []
    return existing_data

Prepares chat history retrieved from either DynamoDB or Local Disk for Claude converation.

This function retrieves the chat history from DynamoDB based on the provided `chat_histories` and `cutoff` parameters. It processes the chat history and prepares it for the conversation based on the `claude3` flag and the `LOAD_DOC_IN_ALL_CHAT_CONVO` configuration.
   
If `claude3` flag, images are processed with claude3 image processor else Amazon Textract or python Libs is used. if `LOAD_DOC_IN_ALL_CHAT_CONVO` all documents in that conversation history is processed and contents are loaded as context in the chat history. The `cutoff` determines the amount of recent chat turns to load into the current conversation.

In [40]:
def get_chat_history_db(chat_histories, cutoff,claude3):
    current_chat=[]
    if DYNAMODB_TABLE:
        chat_hist=chat_histories['Item']['messages'][-cutoff:] 
    else:
        chat_hist=chat_histories[-cutoff:] 
        st.write(chat_hist)
    for d in chat_hist:
        if d['image'] and claude3 and LOAD_DOC_IN_ALL_CHAT_CONVO:
            content=[]
            for img in d['image']:
                s3 = boto3.client('s3',region_name=REGION)
                match = re.match("s3://(.+?)/(.+)", img)
                image_name=os.path.basename(img)
                _,ext=os.path.splitext(image_name)
                if "jpg" in ext: ext=".jpeg"                        
                if match:
                    bucket_name = match.group(1)
                    key = match.group(2)    
                    obj = s3.get_object(Bucket=bucket_name, Key=key)
                    base_64_encoded_data = base64.b64encode(obj['Body'].read())
                    base64_string = base_64_encoded_data.decode('utf-8')                        
                content.extend([{"type":"text","text":image_name},{
                  "type": "image",
                  "source": {
                    "type": "base64",
                    "media_type": f"image/{ext.lower().replace('.','')}",
                    "data": base64_string
                  }
                }])
            content.extend([{"type":"text","text":d['user']}])
            current_chat.append({'role': 'user', 'content': content})
        elif d['document'] and LOAD_DOC_IN_ALL_CHAT_CONVO:
            doc='Here are the documents:\n'
            for docs in d['document']:
                uploads=handle_doc_upload_or_s3(docs)
                doc_name=os.path.basename(docs)
                doc+=f"<{doc_name}>\n{uploads}\n</{doc_name}>\n"
            if not claude3 and d["image"]:
                for docs in d['image']:
                    uploads=handle_doc_upload_or_s3(docs)
                    doc_name=os.path.basename(docs)
                    doc+=f"<{doc_name}>\n{uploads}\n</{doc_name}>\n"
            current_chat.append({'role': 'user', 'content': [{"type":"text","text":doc+d['user']}]})
        else:
            current_chat.append({'role': 'user', 'content': [{"type":"text","text":d['user']}]})
        current_chat.append({'role': 'assistant', 'content': d['assistant']})  
    return current_chat, chat_hist

Processes the streamed response from the Bedrock model and extracts the generated text.

Invokes the Bedrock Claude model with the provided chat history, system message, prompt, and optional image(s).

In [41]:
def bedrock_streemer(response):
    stream = response.get('body')
    answer = ""
    i = 1
    if stream:
        for event in stream:
            chunk = event.get('chunk')
            if  chunk:
                chunk_obj = json.loads(chunk.get('bytes').decode())
                if "delta" in chunk_obj:                    
                    delta = chunk_obj['delta']
                    if "text" in delta:
                        text=delta['text'] 
                        print(text, end="")
                        answer+=str(text)       
                        i+=1
                if "amazon-bedrock-invocationMetrics" in chunk_obj:
                    input_tokens= chunk_obj['amazon-bedrock-invocationMetrics']['inputTokenCount']
                    output_tokens=chunk_obj['amazon-bedrock-invocationMetrics']['outputTokenCount']
                    print(f"\nInput Tokens: {input_tokens}\nOutput Tokens: {output_tokens}")
    return answer,input_tokens, output_tokens

def bedrock_claude_(chat_history,system_message, prompt,model_id,image_path=None):

    content=[]
    if image_path:       
        if not isinstance(image_path, list):
            image_path=[image_path]      
        for img in image_path:
            s3 = boto3.client('s3',region_name=REGION)
            match = re.match("s3://(.+?)/(.+)", img)
            image_name=os.path.basename(img)
            _,ext=os.path.splitext(image_name)
            if "jpg" in ext: ext=".jpeg"                        
            if match:
                bucket_name = match.group(1)
                key = match.group(2)    
                obj = s3.get_object(Bucket=bucket_name, Key=key)
                base_64_encoded_data = base64.b64encode(obj['Body'].read())
                base64_string = base_64_encoded_data.decode('utf-8')
            content.extend([{"type":"text","text":image_name},{
              "type": "image",
              "source": {
                "type": "base64",
                "media_type": f"image/{ext.lower().replace('.','')}",
                "data": base64_string
              }
            }])

    content.append({
        "type": "text",
        "text": prompt
            })
    chat_history.append({"role": "user",
            "content": content})
    prompt = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1500,
        "temperature": 0.5,
        "system":system_message,
        "messages": chat_history
    }
    
    prompt = json.dumps(prompt)
    response = bedrock_runtime.invoke_model_with_response_stream(body=prompt, modelId=model_id, accept="application/json", contentType="application/json")
    answer,input_tokens,output_tokens=bedrock_streemer(response) 
    return answer, input_tokens, output_tokens

Invokes the Bedrock Claude model with retries and exponential backoff in case of throttling errors.


In [42]:
def _invoke_bedrock_with_retries(current_chat, chat_template, question, model_id, image_path):
    max_retries = 5
    backoff_base = 2
    max_backoff = 3  # Maximum backoff time in seconds
    retries = 0

    while True:
        try:
            response,input_tokens,output_tokens= bedrock_claude_(current_chat, chat_template, question, model_id, image_path)
            return response,input_tokens,output_tokens
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            elif e.response['Error']['Code'] == 'ModelStreamErrorException':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            elif e.response['Error']['Code'] == 'EventStreamError':
                if retries < max_retries:
                    # Throttling, exponential backoff
                    sleep_time = min(max_backoff, backoff_base ** retries + random.uniform(0, 1))
                    time.sleep(sleep_time)
                    retries += 1
                else:
                    raise e
            else:
                # Some other API error, rethrow
                raise
                

#### Chat Function

Conducts a conversation with the Bedrock Claude model based on the user's question and optional uploaded documents.

   This function takes a user's question and a list of document paths (optional) as input. It retrieves the past chat
   history from DynamoDB (if configured) or uses local storage. It prepares the chat template based on whether
   documents are provided or not. If documents are provided, it handles the document uploads and extracts the text
   content. If the Claude3 model is used, it handles images separately. The function then invokes the Bedrock Claude
   model with retries and exponential backoff in case of throttling errors. The conversation history is stored in
   DynamoDB (if configured) or local disk for future reference.


In [43]:
from typing import List
def conversation_bedroc_chat_(question, model_id,upload_doc: List[str]):
    """
    Function takes a user query and a document path (from S3 or Local)
    passing a document path is optional
    """    
    num_retries=0
    local_chat_file_name = f"chat-history.json"
    if not isinstance(upload_doc, list):
        raise TypeError("documents must be in a list format")
        
    # Check if Claude3 model is used and handle images with the CLAUDE3 Model
    claude3=False
    if "sonnet" in model_id or "haiku" in model_id:
        claude3=True
    current_chat=[]
   
    # Retrieve past chat history from Dynamodb
    if DYNAMODB_TABLE:
        chat_histories = DYNAMODB.Table(DYNAMODB_TABLE).get_item(Key={"UserId": DYNAMODB_USER, "SessionId":SESSIONID})
        if "Item" in chat_histories:            
            current_chat,chat_hist=get_chat_history_db(chat_histories, CHAT_HISTORY_LENGTH,claude3)
        else:
            chat_hist=[]
    # Retrieve from local
    else:
        chat_histories=load_chat_local(local_chat_file_name)
        if chat_histories:
            current_chat,chat_hist=get_chat_history_db(chat_histories, CHAT_HISTORY_LENGTH,claude3)
    ## prompt template for when a user uploads a doc
    doc_path=[]
    image_path=[]
    full_doc_path=[]
    doc=""
    if upload_doc:  
        doc='I have provided documents and/or images.\n'
        for ids,docs in enumerate(upload_doc):
            _,extensions=os.path.splitext(docs)
            if not docs.startswith("s3://"):
                docs=put_obj_in_s3_bucket_(docs)
            full_doc_path.append(docs)
            if extensions in [".jpg",".jpeg",".png",".gif",".webp"] and claude3:       
                image_path.append(docs)                
                continue
                
        new_upload_doc = [item for item in full_doc_path if item not in image_path]
        results, errors, result_string=process_files(new_upload_doc)    
        if errors:
            print(errors)
        doc+= result_string
        with open("prompt/doc_chat.txt","r",encoding='utf-8') as f:
            chat_template=f.read()       
    else:        
        # Chat template for open ended query
        with open("prompt/chat.txt","r",encoding='utf-8') as f:
            chat_template=f.read()    
    response,input_tokens,output_tokens=_invoke_bedrock_with_retries(current_chat, chat_template, doc+question, model_id, image_path)
    chat_history={"user":question,
    "assistant":response,
    "image":image_path,
    "document":new_upload_doc if upload_doc else [],
    "modelID":model_id,
    "time":str(time.time()),
    "input_token":round(input_tokens) ,
    "output_token":round(output_tokens)}         
                 
    #store convsation memory in DynamoDB table
    if DYNAMODB_TABLE:
        put_db(chat_history)
    # use local disk for storage
    else:        
        save_chat_local(local_chat_file_name,[chat_history])
    return response,doc

#### Query the the chat bot with your questions.
Also takes a document path(s) stored in s3 or local. Once a documents path is passed, a different prompt template is triggered.

In [45]:
question="""what is the popular food in east asia?"""
model_id="anthropic.claude-3-sonnet-20240229-v1:0"
docu=[]  # pass a list of document names (strings) in local storage or S3 else leave as an empty list
res,d=conversation_bedroc_chat_(question, model_id,docu)

Some of the most popular foods in East Asia include:

### Rice
- Rice is a staple food in many East Asian cuisines, including Chinese, Japanese, Korean, and others. It is often served with other dishes.

### Noodles
- Noodle dishes like ramen, udon, soba (Japan), jajangmyeon, naengmyeon (Korea), and various noodle soups and stir-fries (China) are very popular.

### Dumplings
- Dumplings like jiaozi (China), gyoza (Japan), mandu (Korea) are common and come in many varieties - steamed, fried, or in soups.

### Sushi
- Sushi is an internationally renowned Japanese dish made with vinegared rice and various toppings like raw fish.

### Stir-Fries
- Stir-fried dishes with meat and vegetables like kung pao chicken, beef and broccoli, and chop suey are staples of Chinese cuisine.

### Curries
- Curries like Japanese curry rice, Korean curry rice, and Chinese curry dishes are widely eaten.

### Kimchi
- Kimchi, the famous fermented vegetable dish, is a national staple in Korean cuisine.

These 