In [2]:
import boto3
import json
import os
import sys
import re
import shutil
from collections import defaultdict

In [3]:
root_path = os.path.dirname(os.getcwd())

amazon_root_path = os.path.join(root_path,"amazonOutput")

images_root_path = os.path.join(root_path,"imagesPerPage")

In [None]:
credentials = "./credentials.json"

with open(credentials,"rb") as file:
    creds = json.loads(file.read())

<h2>Call Amazon Textract Service</h2>

In [2]:
def create_text_extract_client(aws_creds):
    """
    Used to return credentials for amazon textract client.
    Requests for Token for mfa validation
    
    """
    mfa_linked_access_id = aws_creds["access_id"]
    mfa_linked_secret_key = aws_creds["secret_key"]
    region = aws_creds["region"]
    mfa_serial = aws_creds["mfa"]
    role_arn = aws_creds["role_arn"]
    
    role_session_name = 'testRole'
    
    sts_client = boto3.client('sts',
          region_name=region,
          aws_access_key_id=mfa_linked_access_id,
          aws_secret_access_key=mfa_linked_secret_key)
    
    mfa_token = input("Enter Token: ")
    
    mfa_response = sts_client.get_session_token(
        DurationSeconds=129600, 
        SerialNumber=mfa_serial,
        TokenCode=mfa_token
    )
    
    credentials = mfa_response['Credentials']
        
    new_sts_client = boto3.client('sts',                          
                          aws_access_key_id=credentials["AccessKeyId"],
                          aws_secret_access_key=credentials["SecretAccessKey"],
                          aws_session_token=credentials["SessionToken"])
    
    # Change role
    assumed_role = new_sts_client.assume_role(
        RoleArn=role_arn,
        RoleSessionName=role_session_name)
    
    
    role_credentials = assumed_role['Credentials']
    role_credentials["region"] = creds["region"]
    
    text_client = boto3.client('textract',                          
              aws_access_key_id=role_credentials["AccessKeyId"],
              aws_secret_access_key=role_credentials["SecretAccessKey"],
              aws_session_token=role_credentials["SessionToken"],
              region_name=role_credentials["region"])
    
    return text_client

In [3]:
# Create Textract Client
client = create_text_extract_client(creds)

Enter Token: 250286


In [11]:
for folder in os.listdir(images_root_path):
    print(f"Processing Folder --> {folder}", end="\n")
    
    folder_dir = os.path.join(images_root_path,folder)
    amazon_dir = os.path.join(amazon_root_path,folder,"annotation")
    
    os.makedirs(amazon_dir,exist_ok=True)
    
    for image_file in os.listdir(folder_dir):
        image_file_dir = os.path.join(folder_dir,image_file)
        result_save_dir = os.path.join(amazon_dir,image_file.replace(".png",".json"))
        
        with open(image_file_dir,"rb") as load_file:
            doc_bytes = load_file.read()
        
        response = client.analyze_document(Document={'Bytes': doc_bytes},FeatureTypes=['FORMS','TABLES'])
        
        with open(result_save_dir,"w") as write_file:
            write_file.write(json.dumps(response))

Processing Folder --> oncoextra-tnbc-ntrk-wm-sample-report_pdf
Processing Folder --> Caris-Molecular-Intelligence_MI-Profile_Breast_NOS_WEBchanged_pdf
Processing Folder --> F1CDx Sample Report (Lung) (copy)_pdf
Processing Folder --> F1CDx Sample Report (Lung) changed_pdf
Processing Folder --> CarisReport_2023_NSCLC_KRAS_G12C_PD-L1-unlocked_pdf
Processing Folder --> Sample-NGS-Thyroid-MTC-report_changed_pdf
Processing Folder --> Tempus-Onco_Clinical-Report-Sample_pdf
Processing Folder --> Positive-Report_pdf


<h2>Extract Key-Value From Textract Response</h2>

In [9]:
def get_kv_map(response):
    # Get the text blocks
    blocks = response['Blocks']

    # get key and value maps
    key_map = {}
    value_map = {}
    block_map = {}
    for block in blocks:
        block_id = block['Id']
        block_map[block_id] = block
        if block['BlockType'] == "KEY_VALUE_SET":
            if 'KEY' in block['EntityTypes']:
                key_map[block_id] = block
            else:
                value_map[block_id] = block

    return key_map, value_map, block_map


def get_kv_relationship(key_map, value_map, block_map):
    kvs = defaultdict(list)
    for block_id, key_block in key_map.items():
        value_block = find_value_block(key_block, value_map)
        key = get_text(key_block, block_map)
        val = get_text(value_block, block_map)
        kvs[key].append(val)
    return kvs


def find_value_block(key_block, value_map):
    for relationship in key_block['Relationships']:
        if relationship['Type'] == 'VALUE':
            for value_id in relationship['Ids']:
                value_block = value_map[value_id]
    return value_block

                            
def get_text(result, blocks_map,key=True):
    text = ''
    if 'Relationships' in result:
        for relationship in result['Relationships']:
            if relationship['Type'] == 'CHILD':
                for child_id in relationship['Ids']:
                    word = blocks_map[child_id]
                    if word['BlockType'] == 'WORD':
                        text += word['Text'] + " "
                    if word['BlockType'] == 'SELECTION_ELEMENT':
                        if word['SelectionStatus'] == 'SELECTED':
                            text += 'X '

    return text

<h2>Extract Table From Textract Response</h2>

In [10]:
def get_rows_columns_map(table_result, blocks_map):
    rows = {}
    scores = []
    for relationship in table_result['Relationships']:
        if relationship['Type'] == 'CHILD':
            for child_id in relationship['Ids']:
                cell = blocks_map[child_id]
                if cell['BlockType'] == 'CELL':
                    row_index = cell['RowIndex']
                    col_index = cell['ColumnIndex']
                    if row_index not in rows:
                        # create new row
                        rows[row_index] = {}
                    
                    # get confidence score
                    scores.append(str(cell['Confidence']))
                        
                    # get the text value
                    rows[row_index][col_index] = get_text(cell, blocks_map)
    return rows, scores


def get_text(result, blocks_map):
    text = ''
    if 'Relationships' in result:
        for relationship in result['Relationships']:
            if relationship['Type'] == 'CHILD':
                for child_id in relationship['Ids']:
                    word = blocks_map[child_id]
                    if word['BlockType'] == 'WORD':
                            text += word['Text'] + ' '
                    if word['BlockType'] == 'SELECTION_ELEMENT':
                        if word['SelectionStatus'] =='SELECTED':
                            text +=  'X '
    return text

def generate_table_csv(table_result, blocks_map, table_index):
    rows, scores = get_rows_columns_map(table_result, blocks_map)
    
    csv = ''
    for row_index, cols in rows.items():
        for col_index, text in cols.items():
            col_indices = len(cols.items())
            csv += '{}'.format(text) + "<sep>"
        csv += '\n'

    csv += '\n\n'
    return csv

def get_table_csv_results(response):
    
    blocks=response['Blocks']
    
    blocks_map = {}
    table_blocks = []
    for block in blocks:
        blocks_map[block['Id']] = block
        if block['BlockType'] == "TABLE":
            table_blocks.append(block)

    if len(table_blocks) <= 0:
        return "<b> NO Table FOUND </b>"

    csv = ''
    for index, table in enumerate(table_blocks):
        csv += generate_table_csv(table, blocks_map, index +1)
        csv += '\n---End--OF--Table\n'

    return csv

In [11]:
def generate_json_from_csv(csv, file):
    if "<b> NO Table FOUND </b>" in csv:
        return {"Comment": "No Table FOUND"}
    
    tables = csv.split("\n\n\n\n---End--OF--Table\n")
    
    out = {}
    
    for table in tables:
        table_lines = table.strip().split("\n")
        table_lines = [line.split("<sep>") for line in table_lines]
        table_lines = [[ele for ele in line if ele.strip() != ""] for line in table_lines]
        
        # ignore empty tables
        if not table_lines:
            continue
        
        headers = table_lines[0]
        header_count = len(headers)
        
        for row in table_lines[1:]:
            
            if len(row) >= header_count:
                for header, value in zip(headers, row):
                    if header.strip() not in out:
                        out[header.strip()] = []
                    out[header.strip()].append(value.strip())
    
    return out

In [12]:
def combine_table_kvs(table_json,kvs):
    """
    Used to Combine Key-Value and Table Information
    Into a single json per image file.
    """
    
    for key,value in kvs.items():
        stripped_key = key.strip()
        stripped_value = [i.strip() for i in value]

        if stripped_key in table_json.keys():
            table_json[stripped_key].extend(stripped_value)

        else:
            table_json[stripped_key] = []
            table_json[stripped_key].extend(stripped_value)
    
    return table_json

In [13]:
for folder in os.listdir(amazon_root_path):
    if folder != ".DS_Store":
        annotation_dir = os.path.join(amazon_root_path,folder,"annotation")
        results_dir = os.path.join(amazon_root_path,folder,"results")
        
        os.makedirs(results_dir,exist_ok=True)
        
        for jsonFile in os.listdir(annotation_dir):
            payload_dir = os.path.join(annotation_dir,jsonFile)
            table_save_dir = os.path.join(results_dir,jsonFile.split(".json")[0] + "_table_result.csv")
            json_table_save_dir = os.path.join(results_dir,jsonFile.split(".json")[0] + "_json_result.json")

            with open(payload_dir,"r") as load_file:
                json_payload = json.loads(load_file.read())

            csv = get_table_csv_results(json_payload)
            table_json = generate_json_from_csv(csv,jsonFile)

            key_map, value_map, block_map = get_kv_map(json_payload)
            kvs = get_kv_relationship(key_map, value_map, block_map)

            master = combine_table_kvs(table_json,kvs)
                  
            with open(table_save_dir,"w") as save_file:
                save_file.write(csv)

            with open(json_table_save_dir, "w") as json_save_file:
                json_save_file.write(json.dumps(master))