In [1]:
import os
from dotenv import load_dotenv
import boto3
import json
import dateparser
from llmsherpa.readers import LayoutPDFReader
from functools import lru_cache
# from concurrent.futures import ThreadPoolExecutor

from Templates.ibis_aws_summary_template import IBIS_SUMMARY_TEMPLATE

load_dotenv()

llmsherpa_api_url = "http://localhost:5501/api/parseDocument?renderFormat=all"

In [2]:
AWS_REGION_NAME = 'us-west-2'
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')

# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime.html
bedrock = boto3.client(
    service_name='bedrock-runtime',
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=AWS_REGION_NAME
)

In [3]:
# trouble-shooting: use a different client with service_name 'bedrock', not 'bedrock-runtime'
# https://docs.aws.amazon.com/bedrock/latest/APIReference/welcome.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock.html

# client = boto3.client(
#     service_name='bedrock',
#     aws_access_key_id=aws_access_key_id,
#     aws_secret_access_key=aws_secret_access_key,
#     region_name=AWS_REGION_NAME
# )

# summ = client.list_foundation_models()['modelSummaries']
# [model for model in summ if 'Sonnet' in model['modelName']]

In [4]:
# [m for m in dir(bedrock) if not m.startswith('_')]
# help(bedrock.converse)

In [5]:
@lru_cache(maxsize=1000)
def get_raw_pdf_part(filename: str) -> dict:
    content = open(filename, 'rb').read()
    return {
        "document": {
            "format": "pdf",
            "name": 'document',
            "source": {
                "bytes": content
            }
        }
    }


@lru_cache(maxsize=1000)
def get_text_pdf_part(filename: str) -> dict:
    """This ways works too but the results are worse and slow parsing."""
    reader = LayoutPDFReader(llmsherpa_api_url)
    doc = reader.read_pdf(filename)
    text = doc.to_text()

    return {
        "document": {
            "format": "txt",
            "name": 'document',
            "source": {
                "bytes": text
            }
        }
    }


def response_to_template(template: dict, filename: str, from_text: bool) -> dict:

    prompt = """
    You are an expert in extracting market and financial data from documents.
    Extract essential data from text in the enclosed document.

    Return the result in JSON format. Do not use non-JSON tags such as <property> or <UNKNOWN>.
    Use only simple keys with units, such as "historical_revenue_growth_percentage" or "establishments_count" or "revenue_dollars".
    """

    initial_message = {
        "role": "user",
        "content": [
            {
                "text": prompt,
            },
        ],
    }

    if not from_text:
        initial_message['content'].append(get_raw_pdf_part(filename))
    else:
        # initial_message['content'].append(get_text_pdf_part(filename))
        text = get_text_pdf_part(filename)['document']['source']['bytes']
        initial_message['content'].append({"text": text})

    tool_list = [{
        "toolSpec": template
    }]
    response = bedrock.converse(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        # modelId="meta.llama3-1-405b-instruct-v1:0",
        messages=[initial_message],
        inferenceConfig={
            "temperature": 0
        },
        toolConfig={
            "tools": tool_list,
            "toolChoice": {
                "tool": {
                    "name": "summarize_document"
                }
            }
        }
    )
    core_response = response['output']['message']['content'][0]['toolUse']['input']
    if 'properties' in core_response:
        core_response: dict = core_response['properties']
    for k, v in core_response.items():
        if isinstance(v, str) and v[0] in '{[' and v[-1] in ']}':
            try:
                core_response[k] = json.loads(v)
            except Exception:
                pass

    return core_response

In [6]:
def ibis_industry_summary(filename: str, from_text: bool) -> dict:
    """Populate the separate templates and merge the result."""
    total = {}

    def response_to_part(ist: dict) -> dict:
        return response_to_template(ist, filename, from_text)

    # threads seem to be in conflict with the quota
    # with ThreadPoolExecutor(max_workers=2) as executor:
    #     results = list(executor.map(response_to_part, IBIS_SUMMARY_TEMPLATE))

    results = list(map(response_to_part, IBIS_SUMMARY_TEMPLATE))

    total = {}
    for result in results:
        total.update(result)

    return total

In [7]:
def parse_save_file(filename: str, from_text: bool):
    """Extract info and save it."""
    ibis_summary = ibis_industry_summary(filename, from_text)
    last_updated = dateparser.parse(ibis_summary['last_updated']).isoformat()
    last_updated = last_updated.replace(' ', 'T') + '+00:00'

    out = {
        'source': 'IBIS',
        'type': 'Industry research',
        'subtype': 'Industry at a Glance',

        'industry_name': ibis_summary['industry_name'],
        'last_updated': last_updated,
        'industry_summary': ibis_summary,
    }

    out_filename = filename.replace('Source', 'Summary')
    if from_text:
        out_filename = out_filename.replace('.pdf', '.txt.json')
    else:
        out_filename = out_filename.replace('.pdf', '.json')

    json.dump(out, open(out_filename, 'w'), indent=2)

In [8]:
filenames = [
    'IndustrySource/IBIS/Furniture Stores in the US.pdf',
    'IndustrySource/IBIS/Household Furniture Manufacturing in the US.pdf',
    "IndustrySource/IBIS/IT Consulting in the US.pdf",
]
for filename in filenames[:1]:
    parse_save_file(filename, from_text=False)