# Setup

In [101]:
import json
import re
import time
import unicodedata
import os
import ssl
import urllib
import uuid
from pathlib import Path
from datetime import datetime
from unidecode import unidecode


import requests
from azure.ai.formrecognizer import DocumentAnalysisClient
from azure.core.credentials import AzureKeyCredential
from fuzzywuzzy import fuzz

# OCR Functions

In [4]:
class FileHandler:
    """FileHandler Class"""

    _instance = None

    def __new__(cls, *args, **kwargs):
        """This helps create a singleton class to avoid creating new instances"""
        if not cls._instance:
            cls._instance = super().__new__(cls, *args, **kwargs)
        return cls._instance


    def download_url_file(self, url: str) -> str:
        """Downloads the file from URL and saves it in local system directory.

        Args:
            url (str): URL of the invoice.

        Returns:
            Path: Downloaded file path converted to object of class WindowsPath.
        """
        file_path = None
        try:
            print(f"Downloading URL {url}")
            url_file_extension = Path(url).suffix
            _, file_path = self.get_new_file_name(url_file_extension)

            print(f"Downloading URL to file {file_path}")

            # Create a context with SSL verification disabled
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE

            # Open the URL and download the file
            req = urllib.request.Request(url)
            with urllib.request.urlopen(req, context=context) as response:
                file_data = response.read()

            # Save the file to disk
            if response.getcode() == 200:
                with open(file_path, "wb") as _f:
                    _f.write(file_data)

            print("Downloaded file")
        except Exception:
            print(f"Can not download file from {url}")
        return file_path

    def get_new_file_name(self, file_extension: str):
        """generates temporary local file name and path from given extension

        Args:
            file_extension (str): input file extension

        Returns:
            str: file name
            str: file path
        """
        file_name = None
        file_path = None
        try:
            print("Generating new file name")
            if not file_extension.startswith("."):
                file_extension = "." + file_extension
            file_name = "".join([uuid.uuid4().hex, file_extension])
            file_path = os.path.join('/temp', file_name)
            print(f"Generated new filename {file_path}")
        except Exception:
            print("Can not generate new file name")
        return file_name, file_path


file_handler = FileHandler()

In [108]:
class InvoiceOCR:
    """Invoice OCR Class"""

    def __init__(self):
        self.alcohol_names = "/code/src/ocr/alcohol_keywords/alcohol_keywords.json"

    def set_ocr_service(self):
        """Calls the Azure Invoice OCR service using the service key and endpoint.

        Returns:
            Object of class DocumentAnalysisClient.
        """
        result = None
        try:
            result = DocumentAnalysisClient(
                endpoint="https://konaairecognizer.cognitiveservices.azure.com/",
                credential=AzureKeyCredential("764a24f39025460087bce59a76053544"),
            )
        except Exception:
            print("Can not create OCR service")
        return result

    def set_translation_service(self):
        """Calls the Azure Translator service using the service key and endpoint.

        Returns:
            url (str), params (dict), headers (dict)
        """
        url, params, headers = None, None, None
        try:
            url = "https://api.cognitive.microsofttranslator.com/" + "/translate"
            params = {
                "api-version": "3.0",
                "to": "en",
            }
            headers = {
                "Ocp-Apim-Subscription-Key": "a8c4d4aaf84748028bdbc9fe2f62e3a7",
                "Ocp-Apim-Subscription-Region": "global",
                "Content-type": "application/json",
                "X-ClientTraceId": str(uuid.uuid4()),
            }

        except Exception:
            print("Can not create translation service")
        return url, params, headers

    def get_translated_text(self, text):
        """Translates non-english text into english.

        Args:
            str: Text to be translated.

        Returns:
            str: Translated text.
        """
        result = ""
        try:
            body = [{"text": text}]
            url, params, headers = self.set_translation_service()
            request = requests.post(str(url), params=params, headers=headers, json=body)
            response = request.json()[0]
            result = response.get("translations")[0].get("text")
        except Exception:
            print(f"Can not translate to english for {text}")
        return result

    def detect_text_script(self, text):
        """Detects the script of the text.

        Args:
            str: Text whose script needs to be detected.

        Returns:
            str: Code for the detected text script (eg: LATIN).
        """
        script = "LATIN"
        try:
            text = re.sub(
                r"[ a-z]",
                "",
                text.lower(),
            )
            if text != "":
                script = unicodedata.name(text[0]).split(" ")[0]
        except Exception:
            print(f"Can not detect text script type for {text}")
        return script

    def check_alcohol(self, text):
        """Checks whether the item is alcohol or not.

        Args:
            str: Item text.

        Returns:
            is_alcohol (bool): True if item is an alcohol.
            text_script (str): Code for the detected text script.
            text_copy (str): Translated text if item text is non-LATIN.
        """
        is_alcohol, text_script, text_copy = False, "", ""
        try:
            is_alcohol = False
            text = self.clean_text(text.lower())

            if any(keyword in text for keyword in self.alcohol_names):
                is_alcohol = True

            text_script = self.detect_text_script(text)

            text_copy = ""
            if text_script != "LATIN":
                text = self.get_translated_text(text)
                text_copy = text

            for i in self.alcohol_names:
                p_ratio = fuzz.partial_ratio(text, i)
                if p_ratio > 89:
                    is_alcohol = True
        except Exception:
            print(f"Can not validate alcohol for {text}")
        return is_alcohol, text_script, text_copy

    def fetch_amount(self, amount_data):
        """Extracts the cost of item billed.

        Args:
            text (Path): Object of class CurrencyValue.

        Returns:
            dict: A dictionary having item cost and currency symbol as keys.
        """
        amount_dict = None
        print("Extracting amount")
        try:
            if amount_data is not None:
                dict_data = amount_data.to_dict()
                if "value_type" in dict_data.keys():
                    amount_dict = dict_data["value"]
                else:
                    amount_dict = dict_data
        except Exception:
            print("Can not extract amount for the item")
        return amount_dict

    def fetch_address(self, add_data):
        """Extracts the address in a detailed format.

        Args:
            text (Path): Object of class AddressValue.

        Returns:
            dict: A dictionary having house number, street address, postal code, etc. as keys.
        """
        result = None
        try:
            print("Extracting address")
            if add_data is not None:
                result = add_data.value.to_dict()

        except Exception:
            print("Can not extract restaurant address")
        return result

    def fetch_date(self, data):
        """This function fetches extracts date

        Args:
            data (date data): Date Construct

        Returns:
            str: Date
        """
        result = None
        if data is not None:
            data = data.to_dict()
            if data.get("value_type") == "date":
                result = data.get("value").strftime("%d %B %Y")
        return result

    def clean_text(self, data):
        """This function cleans text of any pactuations

        Args:
            data (input): ideally a text but sometime get Invoice fields

        Returns:
            str: cleaned text
        """
        result = ""
        try:
            if data is not None and not isinstance(data, str):
                data = self.clean_text(data.value)
                result = re.sub(
                    "[!\"#$%&'()*+,-./:;<=>?@[\\]^_`{\\|}~\t\n\r]", " ", data
                ).strip()

        except BaseException as e:
            print("Can not clean text {text}")
            print(e)
        return result

    def extract_invoice_items(self, items):
        """This function extracts invoice line items

        Args:
            items (list): list of items

        Returns:
            list: returns a list of extracted items
        """
        print("Extracting invoice items")
        items_list = []
        try:
            if items is not None:
                alcohol_item_indicator = []
                for _id, item in enumerate(items.value):
                    item = item.value
                    items_dict = {}
                    items_dict["item_description"] = item.get("Description").value

                    item_tags, item_script = [], ""
                    if item.get("Description"):
                        (
                            alcohol_indicator,
                            item_script,
                            item_translation,
                        ) = self.check_alcohol(item.get("Description").value)

                        alcohol_item_indicator.append(alcohol_indicator)
                        if alcohol_indicator:
                            item_tags.append("alcohol")

                    items_dict["item_tags"] = item_tags
                    items_dict["script_tag"] = item_script
                    items_dict["item_description_english"] = item_translation
                    items_dict["item_quantity"] = self.clean_text(item.get("Quantity"))
                    items_dict["unit"] = self.clean_text(item.get("Unit"))
                    items_dict["unit_price"] = self.fetch_amount(item.get("UnitPrice"))
                    items_dict["product_code"] = self.clean_text(
                        item.get("ProductCode")
                    )
                    items_dict["date"] = self.fetch_date(item.get("Date"))
                    items_dict["tax"] = self.fetch_amount(item.get("Tax"))
                    items_dict["amount"] = self.fetch_amount(item.get("Amount").value)
                    items_list.append(items_dict)
        except BaseException:
            print("Can not extract invoice items")
        return items_list

    def extract_invoice_data(self, invoice_path="", output_file_path=""):
        """Extracts the data from Invoice/Receipt file using pre-built Azure Invoice OCR model.

        Args:
            invoice_path (str): Invoice/Receipt file path.

        Returns:
            dict: A dictionary with all the OCR'ed components of Invoice/Receipt as keys.
        """
        output_path = ""

        file_path = invoice_path

        with open(file_path, "rb") as _fd:
            document = _fd.read()
        document_analysis_client = self.set_ocr_service()
        poller = document_analysis_client.begin_analyze_document(
            "prebuilt-invoice", document
        )

        # wait for OCR operation to complete
        while not poller.done:
            time.sleep(2)

        invoices = poller.result()

        def fetch_relentant_data(doc):
            new_doc = {}
            for k, v in doc.items():
                # v is always dict
                # if v "value_type" is string then get the content and confidence keys and their values
                if v.get("value_type") == "string" and v.get("value") is not None:
                    # remove new line characters and punctuations
                    string_value = re.sub(
                        "[!\"#$%&'()*+-/:;<=>?@[\\]^_`{\\|}~\t\n\r]",
                        " ",
                        v.get("content"),
                    ).strip()
                    # convert non-ascii characters to ascii
                    string_value = unidecode(string_value)
                    (
                        alcohol_indicator,
                        item_script,
                        item_translation,
                    ) = self.check_alcohol(string_value)
                    new_doc[k] = {
                        "value": string_value,
                        "confidence": round(v.get("confidence")*100,2),
                    }
                    if alcohol_indicator:
                        # add another key value pair to the dict new_doc[k]
                        new_doc[k]["tags"] = ['alocohol']
                    new_doc[k]["script"] = item_script
                    new_doc[k]["english_translation"] = item_translation

                # if v "value_type" is date then get the value and confidence keys and their values
                elif v.get("value_type") == "date" and v.get("value") is not None:
                    # get date value and convert it to string
                    new_doc[k] = {
                        "value": v.get("value").strftime("%d %B %Y"),
                        "confidence": round(v.get("confidence")*100,2),
                    }
                # if v "value_type" is currency then get the value and confidence keys and their values
                elif v.get("value_type") == "currency" and v.get("value") is not None:
                    new_doc[k] = {
                        "value": v.get("value"),
                        "confidence": round(v.get("confidence")*100,2),
                    }
                # if v "value_type" is list then get value
                elif k == "Items":
                    new_doc[k] = [
                        fetch_relentant_data(item.get("value"))
                        for item in v.get("value")
                    ]
            return new_doc

        result_docs = {
            f"Document {idx}": fetch_relentant_data(doc.to_dict().get("fields"))
            for idx, doc in enumerate(invoices.documents)
        }

        # convert result_docs to json
        result_docs = json.dumps(result_docs, indent=4)

        # write result_docs to output file path
        if output_file_path:
            with open(output_file_path, "w") as f:
                f.write(result_docs)

        return output_file_path

# OCR Extraction

In [111]:
# import clear output
from IPython.display import clear_output

# Open folder and get all input documents
def get_input_files(input_folder):
    """This function returns all the files in the input folder

    Args:
        input_folder (str): input folder path

    Returns:
        list: returns a list of files
    """
    files = []
    try:
        for file in os.listdir(input_folder):
            if file.endswith(".pdf") or file.endswith(".jpg"):
                files.append(os.path.join(input_folder, file))
    except Exception as e:
        print("Can not get input files", e)
    return files

input_folder_path = "/Users/roopakkprajapat/Documents/dev/KonaAI_ML/code/notebooks/2-way match/data/Sample Physical Docs"
input_files = get_input_files(input_folder_path)

# print 2 files path
# print('Input file paths\n' ,input_files[:2])

ocr = InvoiceOCR()

result = {}

# Loop through all the files and extract data
for file in input_files[:]:
    # find the parent folder name
    parent_folder = os.path.basename(os.path.dirname(file))

    # get the file name without extension
    file_name = os.path.splitext(os.path.basename(file))[0]

    # create output file path with json extension
    output_file_path = os.path.join(
        os.path.dirname(file), file_name + ".json"
    )

    output_path = ""
    try: 
        # call extract_invoice_data function
        output_path = ocr.extract_invoice_data(file, output_file_path)
    except Exception as e:
        print(f"Can not extract data - {file_name}\n", e)

    # check if output file is created
    if os.path.exists(output_path):
        result[file_name] = "Extraction successful"
    else:
        result[file_name] =  "Extraction failed"

print("_"*100)

# print the result
print(result)



Can not extract data (InvalidRequest) Invalid request.
Code: InvalidRequest
Message: Invalid request.
Inner error: {
    "code": "InvalidContentLength",
    "message": "The input image is too large. Refer to documentation for the maximum file size."
}
____________________________________________________________________________________________________
{'6118454316': 'Extraction successful', '6119889614': 'Extraction successful', '6118331260': 'Extraction successful', '6119626945': 'Extraction successful', '6120103875': 'Extraction successful', '6119070420': 'Extraction successful', '6120876946': 'Extraction successful', '6121338644': 'Extraction successful', '6118339664': 'Extraction failed', '6119565096': 'Extraction successful', '6120770720': 'Extraction successful', '6120271797': 'Extraction successful', '6119393216': 'Extraction successful', '6118416020': 'Extraction successful', '6120874831': 'Extraction successful', '6120752786': 'Extraction successful', '6119418937': 'Extraction 

In [112]:
# print total successful and failed extractions
print("Total successful extractions", list(result.values()).count("Extraction successful"))
print("Total failed extractions", list(result.values()).count("Extraction failed"))

Total successful extractions 23
Total failed extractions 1


In [113]:
# print file names with failed extractions
print("File names with failed extractions")
for k, v in result.items():
    if v == "Extraction failed":
        print(k)

File names with failed extractions
6118339664


# HTML Creation for JSON

In [126]:
import glob
import pandas as pd

# list all json files in the input folder
json_files = glob.glob(input_folder_path + "/*.json")

# loop through all the json files
for file in json_files:
    # read json file
    with open(file) as f:
        data = json.load(f)

    # create base tags for html file
    docs = []

    # loop through all the documents
    for doc in data.values():
        # list all keys in doc except Items
        keys = [k for k in doc.keys() if k != "Items"]

        # create an empty new doc
        new_doc = {}

        # loop through all the keys and keep only value
        for k in keys:
            new_doc[k] = doc[k].get("value")

        # add Items key to new_doc
        new_doc["Items"] = doc.get("Items")

        # create dataframe from the document by normalizing the items with keys and prefix all items keys with Items
        df = pd.json_normalize(new_doc, record_path="Items", meta=keys, record_prefix="Item_")

        # list all columns and reorder them to keep keys at the beginning
        cols = df.columns.tolist()
        cols = keys + [col for col in cols if col not in keys]

        # reorder columns
        df = df[cols]

        # convert all columns to string
        df = df.astype(str)

        # replace all nan values with empty string
        df = df.replace("nan", "")

        # convert to html
        html = df.to_html()

        # add html to docs list
        docs.append(html)

    # create an html document with file name at the beginning
    docs.insert(0, f"<h1>{os.path.basename(file)}</h1>")
    
    # create html file
    with open(file.replace(".json", ".html"), "w") as f:
        f.write("<br>".join(docs))




            