In [1]:
%%bash
pip install --upgrade pip
pip install smart_open minecart
pip install textract-trp



In [2]:
import time 
import re
import os
import trp
import boto3
import minecart
import json
import logging 

import numpy as np
import pandas as pd

from smart_open import open
from sagemaker.session import Session

In [3]:
# initiate s3 bucket and corresponding data folder
bucket = "ran-s3-systemic-risk"
data_folder ="Input/X-17A-5-Subsets/"

# script to perform OCR (using Textract) for X-17A-5 subsets
out_folder = 'Output/X-17A-5-BS/'

# Amazon Textract client and Sagemaker session
textract = boto3.client('textract')
s3 = boto3.client('s3')
session = Session()

# AWS Asynchronous Textract Script (requesting Job)
**Content modified from Amazon AWS Textract repository (refer to [URL](https://github.com/aws-samples/amazon-textract-code-samples/blob/master/python/12-pdf-text.py) below)** 

In [4]:
def startJob(s3BucketName:str, objectName:str) -> str:
    """
    Starts a Textract job on AWS server 
    """
    response = None
    client = boto3.client('textract')
    
    # issue response to AWS to start Textract job for table analysis 
    response = client.start_document_analysis(
        DocumentLocation={
            'S3Object': {
                'Bucket': s3BucketName,
                'Name': objectName
            }
        },
        FeatureTypes=['FORMS', 'TABLES']    # selecting forms and tables from the OCR
    )
    
    # return response job ID for service
    return response["JobId"]

In [5]:
def isJobComplete(jobId:str) -> str:
    """
    Tracks the completion status of the Textract job when qued
    """
    time.sleep(1)
    client = boto3.client('textract')
    response = client.get_document_analysis(JobId=jobId)
    status = response["JobStatus"]
    print("Job status: {}".format(status))
    
    # check current status of AWS job (ask server every 5 seconds for data)
    while(status == "IN_PROGRESS"):
        time.sleep(5)                   # lag before reporting status
        response = client.get_document_analysis(JobId=jobId)
        status = response["JobStatus"]
        print("Job status: {}".format(status))
    
    return status

In [6]:
def getJobResults(jobId:str) -> list:
    """
    Returns the contents of the Textract job, after completion status met
    """
    pages = []          # initialize list object to track pages

    client = boto3.client('textract')
    response = client.get_document_analysis(JobId=jobId)
    
    pages.append(response)
    print("Resultset page recieved: {}".format(len(pages)))
    
    nextToken = None
    if('NextToken' in response):
        nextToken = response['NextToken']
    
    # iterate through the pages and append to response figure
    while(nextToken):
        response = client.get_document_analysis(JobId=jobId, NextToken=nextToken)
        pages.append(response)
        print("Resultset page recieved: {}".format(len(pages)))
        nextToken = None
        if('NextToken' in response):
            nextToken = response['NextToken']

    return pages

In [7]:
def runJob(bucket:str, key:str):
    """
    Function designed to call an AWS Textract 
    """
    # S3 storage for files on AWS site   
    jobId = startJob(bucket, key)   # intialize Textract job 
    print("Started job with id: {}".format(jobId))

    # if job is complete from AWS return response object 
    if(isJobComplete(jobId)):
        response = getJobResults(jobId)
        
    return response

# AWS Extraction Scripts (Key-Value Pairs)
**The content was modified from AWS to extract key-value pairs in form documents from Block objects that are stored in a map. (refer to [URL](https://docs.aws.amazon.com/textract/latest/dg/examples-extract-kvp.html))**

In [8]:
def get_kv_relationship(key_map, value_map, block_map):
    """
    Retreaving the Key-Value relationship from FORM 
    """
    key_value_map = {}
    
    # unpack the key_map to retrieve the block id and key names
    for block_id, key_block in key_map.items():
        # retrieve value block given job id
        value_block = find_value_block(key_block, value_map)

        # get text value from key and value blocks
        key = get_text(key_block, block_map)
        val = get_text(value_block, block_map)
        
        # map the key and value pairs
        key_value_map[key] = val
        
    return key_value_map

In [9]:
def find_value_block(key_block, value_map):
    """
    Retrieving value block from AWS textract job 
    """
    for relationship in key_block['Relationships']:
        if relationship['Type'] == 'VALUE':
            for value_id in relationship['Ids']:
                value_block = value_map[value_id]
    return value_block

In [10]:
def get_text(result, blocks_map):
    """
    Retrieving text values from given work
    """
    text = ''
    if 'Relationships' in result:
        for relationship in result['Relationships']:
            if relationship['Type'] == 'CHILD':
                for child_id in relationship['Ids']:
                    word = blocks_map[child_id]
                    
                    # if word then we append to space
                    if word['BlockType'] == 'WORD':
                        text += word['Text'] + ' '
                    if word['BlockType'] == 'SELECTION_ELEMENT':
                        if word['SelectionStatus'] == 'SELECTED':
                            text += 'X '    
    return text

# OCR Wrapper Functions
**The scripts perform an OCR job from AWS Textract, and returning well formated data**

In [11]:
def trp2df(table:trp.Table) -> pd.DataFrame:
    """
    Function designed to convert a trp table into a dataframe
    :param table: a trp table object parsed from a pdf  
    :return: a DataFrame object housing a textracted trp table
    
    Complexity -> O(n^2) approx.
    """
    N = len(table.rows)               # number of rows in table
    M = len(table.rows[0].cells)      # number of columns in table
    arr = [0]*N
    
    # iterate through each row within the provided table
    for row in np.arange(N):
        
        # strip the text from the cell references to construct (N X M) matrix
        arr[row] = [table.rows[row].cells[col].text.strip() for col in np.arange(M)]
        
    return pd.DataFrame(arr)

In [12]:
def readTable(response:list) -> pd.DataFrame:
    """
    Function to transform AWS Textract object to a dataframe, by searching for tables
    :param response: AWS Textract response object
    """
    # in the event multiple tables detected on one page (concat them)
    catDF = []
    
    # format the Textract response type 
    doc = trp.Document(response)
    
    # iterate through document pages
    for page in doc.pages:
        
        # itterate through page tables
        for table in page.tables: 
            
            # convert trp-table into dataframe object
            df = trp2df(table)
            
            # remove columns that are completly empty
            empty_cols = [col for col in df.columns if (df[col] == '').all()]
            df = df.drop(empty_cols, axis=1)
  
            # number of columns in dataframe
            n = df.columns.size
            
            # reset the column names (avoid the column names)
            df.columns = np.arange(n)
            
            ##############################################################
            #                           NOTES
            #          a good dataframe should have 2-3 columns
            #      anything more or less is a reading error we ignore
            ##############################################################
            
            # if the dataframe has more than 3 columns then we most likley have an issue in parsing
            if n > 3:
                return None
            
            elif n > 1:
                
                ##############################
                # Balance Sheet Assummptions
                ##############################
                
                lineIndex = df.columns[0]

                # check for the word "cash" in a string at the begining, ignoring case sensitivity (asset check)
                assetCheck = df[lineIndex].str.contains('^Cash', regex=True, flags=re.IGNORECASE)

                # check for the word "Liabilities" in a string at the end, ignoring case sensitivity (liability check)
                debtCheck1 = df[lineIndex].str.contains('Liabilities$|^Liabilities', regex=True, flags=re.IGNORECASE)
                debtCheck2 = df[lineIndex].str.contains('Liability$|^Liability', regex=True, flags=re.IGNORECASE)
                
                # check for the presence of $ sign, we assume the balance sheet items should have presence of $ sign
                # used to handle table of contents error caused from prior reads
                dollarCheck = df[df.columns[1]].str.contains('\$[^\]]+', regex=True, flags=re.IGNORECASE)
                
                ##############################
                # Balance Sheet Determination
                ##############################
                
                # check if the key words have been found 
                check1 = df[assetCheck | debtCheck1 | debtCheck2].empty      # check for terms, and $ presence
                check2 = df[dollarCheck == True].empty                       # check for presence of '$' sign  
                check3 = df[debtCheck1 == True].empty                        # debt check for Liabilities
                check4 = df[debtCheck2 == True].empty                        # debt check for Liability 
                
                # if either asset term or liability term is found, with a $ sign we append the dataframe
                if not check1 and not check2:
                    catDF.append(df)      # we append since sometimes asset and liablility tables are seperated 

                    if not check3 or not check4:
                        # if liability table was found on the first iteration we simply concat data frames and return 
                        return pd.concat(catDF)
        

In [13]:
def readForm(response:list) -> dict:
    """
    Function to transform AWS Textract object to a dictionary, by searching for key value pairs
    :param response: AWS Textract response object
    """
    # format the Textract response type 
    doc = trp.Document(response)
    
    # initializing 
    key_map = {}
    value_map = {}
    block_map = {}

    # iterate through document pages
    for page in doc.pages:

        # itterate through page tables
        for block in page.blocks: 

            # store the block id in map to retrive information later
            block_id = block['Id']
            block_map[block_id] = block

            # if Key-value set has been seen 
            if block['BlockType'] == "KEY_VALUE_SET":

                # if KEY is labeled as entity type then we found Key, else we found VALUE
                if 'KEY' in block['EntityTypes']:
                    key_map[block_id] = block
                else:
                    value_map[block_id] = block

    return get_kv_relationship(key_map, value_map, block_map)

## Extract Balance Sheet information

In [14]:
def textractParse(path:str, index:int, csvDirectory:np.ndarray, bucket:str = "ran-s3-systemic-risk", 
                  out_folder:str = 'Output/X-17A-5-BS/') -> dict:
    """
    Function runs a Textract job and saves Balance Sheet information to .csv file in s3 folder 
    """
    errors = ''
    
    # track the presence of json file storing information on forms
    if os.path.exists('X17A5-forms.json'):
        with open('X17A5-forms.json', 'r') as f: forms = json.loads(f.read())
    else:
        forms = {}
    
    # baseFile name to name export .csv file e.g. 782124-2002.csv
    baseFile = '-'.join(path.split('/')[-1].split('-')[:2])
    fileName = baseFile + '.csv'
    print('\nPerforming OCR for {}'.format(baseFile))

    # if file is not found in directory we continue the iteration process
    if (out_folder + fileName not in csvDirectory) or (baseFile not in forms):

        # temporary data frame object for balance sheet information
        res = runJob("ran-s3-systemic-risk", path)
        
        # if Textract job did not fail we continue extraction
        if res[0]['JobStatus'] != 'FAILED':
            # retrieve structured data pulls from OCR
            forms[baseFile] = readForm(res)
            tempDF = readTable(res)
            
            print(tempDF)
            
            # checks for type of return, if none then we log an error
            if type(tempDF) == pd.DataFrame:
                
                # writing data frame to .csv file
                tempDF.to_csv(fileName, index=False)

                # save contents to AWS S3 bucket
                with open(fileName, 'rb') as data:
                    s3.put_object(Bucket=bucket, Key=out_folder + fileName, Body=data)

                # remove local file after it has been created
                os.remove(fileName)
                
                print('-----------------------------------------------------')
                print('Saved {} file to s3 bucket'.format(baseFile + '.csv'))
            else:
                errors = 'No Balance Sheet found, or parsing error'
        else:
            errors = 'Could not parse, JOB FAILEDs'
    else:
        print('{} has been downloaded'.format(fileName))
        
    # storing key-value line items
    with open('X17A5-forms.json', 'w') as f: json.dump(forms, f)
        
    return errors

In [18]:
# csv Directory to store balance sheet information 
csvs = np.array(session.list_s3_files(bucket, out_folder))

# discover all of the pdfs that you want to parse
paths = np.array(session.list_s3_files(bucket, data_folder))[1:]

errorDict = {}

# iterate through X-17A-5 subsets stored in s3 
for i, key in enumerate(['Input/X-17A-5-Subsets/91154-2017-subset.pdf']):     
    val = textractParse(key, i, csvs)
    
    if val != '':
        errorDict[key] = val


Performing OCR for 91154-2017
Started job with id: 9fc2eda3a9203ba54b0915e753d97d1998a37d306c3815d63ef33dba84f4e23b
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: IN_PROGRESS
Job status: SUCCEEDED
Resultset page recieved: 1
Resultset page recieved: 2
Resultset page recieved: 3
Resultset page recieved: 4
Resultset page recieved: 5
Resultset page recieved: 6
Resultset page recieved: 7
Resultset page recieved: 8
                                                    0          1
0                                              Assets           
1                           Cash and cash equivalents      $ 951
2   Cash segregated under federal and other regula...      4,435
3   Securities borrowed or purchased under agreeme...    160,143
4   Trading account assets ($29,

In [16]:
# storing unique list of asset items and liability line items
with open('textractErrors.json', 'w') as f: json.dump(errorDict, f)