In [1]:
# Install the required dependencies
# Additional Software Prerequisites: Tesseract, Ghostscript
!pip install numpy
!pip install pdf2image
!pip install opencv-python
!pip install pytesseract
!pip install langdetect
!pip install huggingface_hub
!pip install nltk
!pip install thefuzz



In [37]:
## BASIC SETUP ##

# Default modules
import tempfile
import glob # Pathname management
import os
import string
import re # Regex
import json
import getpass # Passkey management

# Third-party libraries
import numpy as np
import pdf2image # PDF handling
import pytesseract as tess # OCR
import cv2 # Image processing
import nltk # Natural language processing
import langdetect
import huggingface_hub # Hugging Face interface
from thefuzz import process as fuzzProcess # Fuzzy strng comparison

# Set up the directories and file structure
working_dir = os.getcwd()
input_dir = working_dir + "\\data\\" # Location of the PDF files to be scanned
int_dir = working_dir + "\\intermediate\\" # Used for storing intermediate results
output_dir = working_dir + "\\output\\" # Used for storing final output

# Make the output paths if they don't already exist
try:
    os.mkdir(int_dir)
    os.mkdir(output_dir)
except:
    None
    
# Collect all the filenames in the input directory
documents = glob.glob(input_dir + "*.pdf")
    
# Final dictionary that will hold results and be written to JSON
final_dict = dict()

In [38]:
## HUGGING FACE INITIALIZATION ##

# Get the Hugging Face API Key
try:
    key = os.environ["HUGGINGFACE_API_TOKEN"]
except KeyError:
    key = getpass.getpass("Enter Hugging Face API Key")
    
# Define the models being used for various tasks
translation_model = "facebook/mbart-large-50-many-to-many-mmt"
summarization_model = "facebook/bart-large-cnn"
#questioning_model = "deepset/roberta-base-squad2"
questioning_model = "distilbert/distilbert-base-cased-distilled-squad"
token_classification_model = "Babelscape/wikineural-multilingual-ner"
    
# Set up the Hugging Face clients
client = huggingface_hub.InferenceClient(token = key)

In [39]:
## UTILITY FUNCTIONS ##

# Converts a language code extracted from the detectlang function to the format required for the BART translation model
def BART_lang_code(lang):
    if lang == 'en':
        return 'en_XX'
    if lang == 'fr':
        return 'fr_XX'
    elif lang == 'de':
        return 'de_DE'
    elif lang == 'nl':
        return 'nl_XX'
    else:
        raise Exception(f"Unexpected language {lang} detected.")
    
# Converts a language code extracted from the detectlang function to the format required for the NLTK Punkt algorithm
def PUNKT_lang_code(lang):
    if lang == 'en':
        return 'english'
    elif lang == 'fr':
        return 'french'
    elif lang == 'de':
        return 'german'
    elif lang == 'nl':
        return 'dutch'
    else:
        raise Exception(f"Unexpected language {lang} detected.")

# Converts an image from Pillow format (output by pdf2img) to OpenCV format
def PIL_to_CV2(img):
    return cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR)

# Crops a rectangular region defined between the points (left, top) and (right, bottom) out of an OpenCV image
def crop_img(img, left, top, right, bottom):
    return img[top : bottom, left : right]

# Searches an OpenCV image, img, vertically for rectangular regions of empty space, from top to bottom, and returns the midpoint of the first rectangle encountered
# The rectangle is defined by the coordinates 
#     (top, left)     = (divOffsetY, divOffsetLeft) 
#     (bottom, right) = (divOffsetY + divHeight, width - divOffsetRight)
# divOffsetY is continually incremented and the rectangular region checked for emptiness, with the first valid match triggering a return
def vertical_sliding_split_img(img, divOffsetY = 0, divOffsetLeft = 0, divOffsetRight = 0, divHeight = 20):
    # Get the dimensions of the image to be searched
    height, width = img.shape[:2]
        
    # Slide the rectangular region down the image
    while divOffsetY + divHeight <= height:
        # Crop out the area of the original image that corresponds to the rectangle
        mask = crop_img(img, left = divOffsetLeft, top = divOffsetY, right = (width - divOffsetRight), bottom = (divOffsetY + divHeight))

        # Check if it is empty
        if cv2.countNonZero(mask) == 0:
            # Get the centre of the location of the masking rectangle and return it
            return divOffsetY + int(divHeight / 2)
        # If non-empty, slide the rectangle further down
        else:
            divOffsetY += 1

    # If we find no matches, raise an error
    if divOffsetY + divHeight == height:
        raise Exception("No valid split found")

# Function to search for the first found regex result in a block of text, extract the result, and return the result and all the text immediately after it
def rgx_and_crop(text, rgx):
    # Search the text
    search = re.search(rgx, text) 

    # If a match was found
    if search:
        # Get the index of the final character of the (first) correct match
        clipLeft = search.end(0)
         
        # Extract the match and clean-up the string
        search = search.group(0).strip().upper()

        # Clip the original text up to the final character and remove any additional trialing whitespace
        text = text[clipLeft + 1:].strip()

        # Return both the result and the clipped text
        return(search, text)
    
    # Otherwise return a blank string
    else:
        return ("", text)
        
# Simple function to clean any leading or trailing whitespace or punctuation from a string
def clean_string(s):
    return s.strip(string.whitespace + string.punctuation)

# Given a list of tokens (e.g. names of people) this function attempts to identify common errors such as duplicates and misspellings and return a cleaned list of tokens removing such errors
# Takes an iterable of strings as input
def clean_token_list(tokenList):
    # Start by cleaning the strings
    tokenList = [clean_string(t) for t in tokenList]
    
    # Eliminate duplicates by converting the list to a set (and back again to preserve ordering going forward)
    tokenList = list(set(tokenList))
    
    # Sort them by length from shortest to longest
    tokenList.sort(reverse = True, key = lambda x : len(x))
    
    # Eliminate any shortened or cropped names by removing strings that are substings of any larger strings
    # e.g. if we have "David McArthur" and "David McAr" we make the assumption that "David McAr" is meant to read "David McArthur"
    tokenList = [t for t in tokenList if all([(t not in s) for s in tokenList if s != t])]
    
    # Perform fuzzy matching to determine any close matches
    i = 0
    
    while i < len(tokenList):
        # For each word in the token list, calculate the (Levenshtein) distance between all the other words on the token list using TheFuzz
        matches = fuzzProcess.extract(tokenList[i], tokenList)
        
        # Pop the first result since this will always be an identical match
        matches.pop(0)
        
        # For the remaining possible matches, iterate through the pairs of (text, ratio)
        for matchText, matchRatio in matches:
            
            # If there are any very close matches (ratio > 95), pop the match from the tokenList and advance the iterator and loop
            if matchRatio > 95:
                tokenList.remove(matchText)
                i += 1
                continue # Break the current loop
                
        # Advance the iterator always, even if we remove no tokens
        i += 1
    
    return tokenList

In [106]:
## MAIN PROGRAM ##

# Create a temporary directory to store converted PDFs
def process_PDFs(debug = False):
    final_dict = dict()
    
    with tempfile.TemporaryDirectory() as temp_dir:
        for doc in documents:
            print(f"Extracting text from {doc}")

            # Extract just the file name for later reuse
            working_filename = os.path.basename(doc).split('.')[0]

            # Start by converting the pdf to an image and store the resulting PIL image
            img = pdf2image.convert_from_path(doc, output_folder = temp_dir)[0]

            # Convert the image from Pillow to CV2 format using our helper function
            img = PIL_to_CV2(img)

            # Get the dimensions
            height, width = img.shape[:2]

            # IMAGE PREPROCESSING #

            # Based on analysing the documents manually we can safely perform the following rough, initial crop
            img = crop_img(img, left = 275, top = 475, right = width - 100, bottom = height - 150)

            # Convert to grayscale
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

            # Invert the colours
            img = cv2.bitwise_not(img)

            # Close any small holes
            img = cv2.morphologyEx(img, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (3,3)))

            # Downsize the image, this helps the OCR perform better
            img = cv2.resize(img, (int(0.5*width), int(0.5*height))) 

            # Update the width and height
            height, width = img.shape[:2]

            # Threshold
            # img = cv2.threshold(img, 200, 255, cv2.THRESH_BINARY)[1]# + cv2.THRESH_OTSU)[1]

            if debug:
                # Write the image
                cv2.imwrite(int_dir + working_filename + '.jpg', img)

            # Split the image into upper and lower halves by finding large, black, rectangular regions and taking the centroid of the first (from top to bottom) as the dividing line  
            # See the Data Assumptions section of the readme
            divY = vertical_sliding_split_img(img, divOffsetY = 0, divOffsetLeft = 20, divOffsetRight = 55, divHeight = 20)
            upperImg = img[:divY, :]
            lowerImg = img[divY:, :]

            # Further split the lower section into a middle and lower section    
            divY = vertical_sliding_split_img(img, divOffsetY = 40, divOffsetLeft = 60, divOffsetRight = 60, divHeight = 5)
            middleImg = lowerImg[:divY, :]
            lowerImg = lowerImg[divY:, :]

            # Write if debugging
            if debug:
                cv2.imwrite(f"{int_dir}{working_filename}_upper.jpg", upperImg)
                cv2.imwrite(f"{int_dir}{working_filename}_middle.jpg", middleImg)
                cv2.imwrite(f"{int_dir}{working_filename}_lower.jpg", lowerImg)

            # Process the lower, largest section first, since this is also the easiest to extract the language of the text from with no additional image processing
            # These functions are defined in the following code blocks
            document_info, lang = process_lower(lowerImg, output_filename = working_filename, debug = debug)

            # Then process the middle annd upper sections
            document_type = process_middle(middleImg, lang, output_filename = working_filename, debug = debug)

            company_info = process_upper(upperImg, lang, output_filename = working_filename, debug = debug)

            # Merge the dictionaries
            doc_dict = company_info | document_type | document_info

            # Add the dictionary to the global document dictionary
            final_dict[working_filename] = doc_dict
            
            if debug:
                with open(f'{int_dir}{working_filename}.json', 'w', encoding = 'utf-8') as file:
                    file.write(json.dumps(final_dict, indent = 4))                
            
    return final_dict

In [107]:
# Takes an image of the lowest section of the document (the main body of text) and processes it using OCR and Gen AI
# Returns a dictionary containing the names of people mentioned in the document, their roles (if any), and a short summary of the document
# Also returns the detected language of the document
def process_lower(img, output_filename = "", debug = False):
    document_info = dict()
    
    # Process the text using Tesseract    
    text = tess.image_to_string(img, lang = 'fra+nld+deu+eng', config = r'--psm 3')

    # Detect the most probabalistic language
    lang = langdetect.detect(text)    
    
    # Write the raw OCR text if debugging
    if debug:
        # Write the raw OCR'd text to file for debugging
        with open(f'{int_dir}{output_filename}_lower_{lang}.txt', 'w', encoding = 'utf-8') as file:
            file.write(text)
        
    # Due to token limits with the translation we need to process the translation in smaller chunks
    # Sentence-by-sentence makes the most sense, and we can do this using nltk
    sentences = nltk.sent_tokenize(text, language = PUNKT_lang_code(lang))
    
    # Translate the text to English sentence-by-sentence and rejoin them as a string
    text = " ".join([client.translation(s, model = translation_model, src_lang = BART_lang_code(lang), tgt_lang = 'en_XX').translation_text for s in sentences])
    
    # Write the translated text if debugging
    if debug:
        with open(f'{int_dir}{output_filename}_lower_en.txt', 'w', encoding = 'utf-8') as file:
            file.write(text)
    
    # Determine the people mentioned in the text using token classification
    classified_tokens = client.token_classification(text, model = token_classification_model)
    
    # Separate out the tokens corresponding to people's names
    people_tokens = [c['word'] for c in classified_tokens if c['entity_group'] == 'PER']
    
    # Clean the list
    people_tokens = clean_token_list(people_tokens)
    
    # Define an empty dictionary
    # We store people in the dictionary with their names as the key, and their roles as the value
    people_dict = dict()
    
    # For each person we want to determine if they have a particular role using the question-asking model
    # We do this in the context of the body of the text
    for person in people_tokens:
        Q_ppl = f'What role does {person} have?'
        Q_ppl_ans = client.question_answering(question = Q_ppl, model = questioning_model, context = text, doc_stride = 350)
        
        # If the answer is high-confidence, record it
        if Q_ppl_ans.score >= 0.9:
            people_dict[person.title()] = Q_ppl_ans.answer.title()
        # Otherwise list unknonw
        else:
            people_dict[person.title()] = 'Unknown'
            
    # Store the dictionary
    document_info['people'] = people_dict
    
    # Create the short summary using the summarization model
    text_summary = client.summarization(text, model = summarization_model).summary_text #, parameters = {"max_length" : 250, "min_length" : 30, "do_sample" : False}).summary_text
    document_info['summary'] = text_summary    
    
    return (document_info, lang)

In [108]:
# This middle segment of the image is a single line of text corresponding to the Purpose of the Act
def process_middle(img, lang = 'fr', output_filename = "", debug = False):   
    # We should only be dealing with a single line of text so we use Tesseract with the --psm 11 option to maximize extraction of raw text
    purposeText = clean_string(tess.image_to_string(img, lang = 'fra+nld+deu+eng', config = r'--psm 11'))
    
    # Write the text if debugging
    if debug:
        with open(f'{int_dir}{output_filename}_middle_{lang}.txt', 'w', encoding = 'utf-8') as file:
            file.write(purposeText)
    
    # We want to slice the middle text using the position of any found colons
    # We include a few similar characters to match in the regex which a colon may be incorrectly detected as
    rgx = r'(?s)(:|‘|.|\'|`)(.*){2,}'
    purposeText = clean_string(re.search(rgx, purposeText).group(0))
    
    # Translate the purpose and store it
    purposeText = client.translation(purposeText, model = translation_model, src_lang = BART_lang_code(lang), tgt_lang = 'en_XX').translation_text
    
    # Write the translated text if debugging
    if debug:
        with open(f'{int_dir}{output_filename}_middle_en.txt', 'w', encoding = 'utf-8') as file:
            file.write(purposeText)
    
    return {'purpose' : purposeText.title()}

In [109]:
# The upper segment of the image consists of a table-like structure with headers on the left and information on the right
# This information is assumed to always be of the same structure, vertically descent as follow
# - Company identifier
# - Company name
# - Company type
# - Comapny address
# The division between the headers and the information is determined by locating all the scolons in the image and taking the median of the position
# We take the median because the address section can be inconsistent with its location of any colons

def process_upper(img, lang = 'fr', output_filename = "", debug = False):     
    company_info = dict()
    
    # We perform an initial OCR using pytesseract to detect any colons in particular
    # Dilate the image first slightly since these colons may be particularly small and easy-to-miss with OCR detection
    colon_img = cv2.dilate(img, kernel = np.ones((2,2)))
    
    # Perform the OCR
    ocr_tree = tess.image_to_data(colon_img, lang = 'fra+nld+deu+eng', output_type = tess.Output.DATAFRAME, config = r'--psm 3')
    
    # Detect the locations of the colons and take the median value (see above for reasoning)
    # Apply a slight pixel offset so the position is actually to the right and clears the colons
    semi_loc = int(ocr_tree[ocr_tree['text'].str.contains(r':', na = False)]['left'].median()) + 5

    # Slice and preserve everything right of the colons 
    height, width = img.shape[:2]
    img = crop_img(img, left = semi_loc, top = 0, right = width, bottom = height)
    
    # Write the image if debugging
    if debug:
        cv2.imwrite(int_dir + output_filename + '_upper.jpg', img)
    
    # Perform a secondary OCR on the cropped image
    # Note the use of --psm 11 to extract as much raw text as possible
    text = tess.image_to_string(img, lang = 'fra+nld+deu+eng', config = r'--psm 11') 

    # Attempt to extract the company identifier using regex to pick out strings of consecutive digits with spaces or hyphens between them
    identifier_rgx = r"([0-9])([0-9|-| ]){1,}"
    identifier, text = rgx_and_crop(text, identifier_rgx)
    
    # Remove all whitespaces and dashes
    identifier = identifier.replace(" ", "")
    identifier = identifier.replace("-", "")
    
    # Clean the ends
    company_info["identifier"] = clean_string(identifier)

    # Use a similar technique for the company name by detecting the next textual string up to a newline character
    text_rgx = r'([0-9|A-Z|a-z|À-ÿ])(.*){2,}'
    name, text = rgx_and_crop(text, text_rgx)
    
    # Clean the ends
    company_info["name"] = clean_string(name).title()

    # Repeat this techniue for the company type
    company_type, text = rgx_and_crop(text, text_rgx)
    
    # Clean the string
    company_type = clean_string(company_type)
    
    # For this data we also want to translate it
    company_type = client.translation(company_type, model = translation_model, src_lang = BART_lang_code(lang), tgt_lang = 'en_XX').translation_text
    company_info["company type"]  = company_type.title()

    # Repeat the process again for the address
    address, text = rgx_and_crop(text, text_rgx)
    
    # Clean the ends and fix the formatting
    company_info["address"] = clean_string(address).title()
    
    return company_info

In [None]:
json_data = process_PDFs(debug = False)

# Write to final output JSON
with open(f'{output_dir}output.json', 'w', encoding = 'utf-8') as file:
    file.write(json.dumps(json_data, indent = 4))
    print(f"{output_dir}output.json written")
    
print("Complete")
        

Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000001.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000002.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000003.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000004.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000005.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000006.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000007.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000008.pdf
Extracting text from g:\Sync\Work\Maths\Jupyter_Notebooks\Creditsafe Interview Task\data\24000009.pdf
