In [2]:
from collections import namedtuple
from enum import Enum


class FileType(Enum):
    """Enumeration for different types of file formats.

    Args:
        Enum (Enum): The base class for creating enumerations.
    """

    PDF = ("application/pdf",)
    WORD_DOCUMENT = ("application/vnd.openxmlformats-officedocument.wordprocessingml.document",)
    EXCEL_FILE = (
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "application/vnd.ms-excel",
        "text/csv",
        "application/octet-stream",
    )
    IMAGE = (
        "image/x-ms-bmp",
        "image/gif",
        "image/ief",
        "image/jpeg",
        "image/png",
        "image/svg+xml",
        "image/tiff",
        "image/vnd.microsoft.icon",
        "image/x-cmu-raster",
        "image/x-portable-anymap",
        "image/x-portable-bitmap",
        "image/x-portable-graymap",
        "image/x-portable-pixmap",
        "image/x-rgb",
        "image/x-xbitmap",
        "image/x-xpixmap",
        "image/x-xwindowdump",
    )
    ZIP = ("application/zip",)


class PredictionTypes(Enum):
    OTHERS = "Others"
    INVALID_FILE = "Invalid_File_Type"
    MULTI_TYPE = "Multiple_Documents_Detected"


FileEntry = namedtuple("FileEntry", ["filename", "file_object"])


In [3]:
import io
import logging
import math
import random
from typing import Union

import docx
import fitz
import nltk
import numpy as np
import pandas as pd
from nltk import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from paddleocr import PaddleOCR
from PIL import Image

# HACK
# os.environ["KMP_DUPLICATE_LIB_OK"] = "True"

nltk.download("punkt")
nltk.download("stopwords")
nltk.download("wordnet")

ocr = PaddleOCR(
    use_angle_cls=True,
    det_model_dir="../models/det_onnx/model.onnx",
    rec_model_dir="../models/rec_onnx/model.onnx",
    cls_model_dir="../models/cls_onnx/model.onnx",
    use_onnx=True,
    lang="en",
)


def filter_noun_phrases(noun_chunks, stopwords):
    """Filters the noun phrases by removing stopwords.

    Args:
        noun_chunks: A list of noun phrases.
        stopwords: A list of stopwords.

    Returns:
        A list of filtered noun phrases.
    """

    filtered_noun_phrases = []
    for noun_phrase in noun_chunks:
        if noun_phrase.text not in stopwords:
            filtered_noun_phrases.append(noun_phrase.text)

    return filtered_noun_phrases


def sanitize_text(text: str, lemmatize: bool = True, *args, **kwargs) -> str:
    """Removes all non-alphanumeric characters from a string.

    This function uses the `RegexpTokenizer` from the Natural Language Toolkit
    (NLTK) to split the input string into words using a regular expression
    pattern that matches only alphanumeric characters. It then joins the
    resulting list of words back into a single string separated by spaces.

    Args:
        text (str): The input string to be sanitized.

    Returns:
        str: The sanitized string, containing only alphanumeric characters
            separated by spaces.
    """

    tokenizer = RegexpTokenizer(r"\w+")
    text = text.lower()
    words = tokenizer.tokenize(text)

    stop_words = set(stopwords.words("english"))
    filtered_tokens = [token for token in words if token not in stop_words]
    if lemmatize:
        lemmatizer = WordNetLemmatizer()
        lemmatized_tokens = [lemmatizer.lemmatize(token) for token in filtered_tokens]

        result = " ".join(lemmatized_tokens)
    else:
        result = " ".join(filtered_tokens)

    return result


def sample_pages(documents: list) -> list:
    """Randomly samples a subset of pages from a list of documents.

    Args:
        documents (list): A list of document pages.

    Returns:
        list: A list containing the sampled pages.
    """

    num_documents = len(documents)

    if num_documents > 100:
        sample_size = int(math.sqrt(num_documents))
    else:
        sample_size = min(10, num_documents)

    sampled_documents = random.sample(documents, sample_size)

    return sampled_documents


def get_image_text(image: Union[bytes, np.ndarray], request_id: str, filename: str = "file") -> dict:
    """Extract text from an image using OCR and return the resulting text as a string.

    Args:
        image (list or str): A list of image paths or a single image path as a string.
        filename (str, optional): File name of the input. Defaults to "file".


    Returns:
        str: The text extracted from the image(s) as a string.
    """

    logger = logging.getLogger(request_id)

    logger.info("Running OCR on " + filename)

    if isinstance(image, bytes):
        try:
            pil_object = Image.open(io.BytesIO(image)).convert("RGB")
        except Exception as e:
            logger.error(e)
            logger.info("Image File type not supported: %s", filename)
            pil_object = Image.new("RGB", (500, 500))

        image = np.array(pil_object)

    text = ""
    result = ocr.ocr(image, cls=True)

    for line in result[0]:
        text += line[1][0] + " "

    ocr_result = sanitize_text(text)

    ocr_data = {"text": [ocr_result], "pages": 1}
    return ocr_data


def get_pdf_text(
    iobytes: bytes, request_id: str, filename: str = "file", segregate: bool = False, text_threshold: int = 500
) -> dict:
    """Extracts the text from a PDF document. If the text is less that a
    certain threshold then run the ocr.

    Args:
        iobytes (bytes): A byte stream containing the PDF document.
        request_id (str): The ID of the request.
        filename (str, optional): _description_. Defaults to "file".


    Returns:
        dict: The extracted text data from the PDF document.
    """

    logger = logging.getLogger(request_id)

    logger.info("Running fitz extraction on " + filename)

    ocr_data = {"text": [], "pages": 0}

    with fitz.open(stream=io.BytesIO(iobytes)) as doc:
        ocr_data["pages"] = doc.page_count

        if doc.is_encrypted:
            ocr_data["text"].append("encrypted")
            return ocr_data

        if segregate:
            for i, page in enumerate(doc):
                pdf_text = page.get_text("text")
                if len(pdf_text) <= text_threshold:
                    pix = page.get_pixmap()
                    img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
                    ocr = get_image_text(
                        np.array(img),
                        filename=f"{filename}, page number [{i}]",
                        request_id=request_id,
                    )
                    pdf_text = ocr["text"][0]

                ocr_data["text"].append(sanitize_text(pdf_text))
        else:
            ocr_pages = []
            for page in doc:
                pdf_text = page.get_text("text")

                if len(pdf_text) <= text_threshold:
                    ocr_pages.append(page)
                    continue

                ocr_data["text"].append(sanitize_text(pdf_text))

            ocr_pages = sample_pages(ocr_pages)

            for i, page in enumerate(ocr_pages):
                pix = page.get_pixmap()
                img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
                ocr = get_image_text(
                    np.array(img),
                    filename=f"{filename}, page number [{i}]",
                    request_id=request_id,
                )

                ocr_data["text"].append(ocr["text"][0])

    return ocr_data


def get_excel_text(iobytes: bytes, request_id: str, filename: str = "file") -> dict:
    """After receiving the excel file, we will read it with the help of pandas
    library and concatenate the text.

    Args:
        iobytes (bytes): A byte stream containing the PDF document.
        request_id (str): The ID of the request.
        filename (str, optional): _description_. Defaults to "file".

    Returns:
        dict: The extracted text data from the Excel document.
    """

    logger = logging.getLogger(request_id)

    logger.info("Extracting Text from " + filename)

    excel_data = io.BytesIO(iobytes)
    df = pd.read_excel(excel_data)

    # Concatenate all the text in the Excel file
    ocr = " ".join(df.stack().astype(str))

    result = {"text": [sanitize_text(ocr)], "pages": 1}
    return result


def get_docx_text(iobytes: bytes, request_id: str, filename: str = "file") -> dict:
    """After receiving the DOC file as a byte stream, extract the text using
    python-docx library.

    Args:
        iobytes (bytes): A byte stream containing the DOC document.
        request_id (str): The ID of the request.
        filename (str, optional): The name of the file. Defaults to "file".

    Returns:
        dict: The extracted text data from the DOC document.
    """

    logger = logging.getLogger(request_id)

    logger.info("Extracting Text from " + filename)

    doc_stream = io.BytesIO(iobytes)
    doc = docx.Document(doc_stream)
    ocr = "\n".join([paragraph.text for paragraph in doc.paragraphs])
    result = {"text": [sanitize_text(ocr)], "pages": 1}
    return result


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


[2024/02/05 14:41:42] ppocr DEBUG: Namespace(help='==SUPPRESS==', use_gpu=False, use_xpu=False, use_npu=False, ir_optim=True, use_tensorrt=False, min_subgraph_size=15, precision='fp32', gpu_mem=500, gpu_id=0, image_dir=None, page_num=0, det_algorithm='DB', det_model_dir='../models/det_onnx/model.onnx', det_limit_side_len=960, det_limit_type='max', det_box_type='quad', det_db_thresh=0.3, det_db_box_thresh=0.6, det_db_unclip_ratio=1.5, max_batch_size=10, use_dilation=False, det_db_score_mode='fast', det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_sast_score_thresh=0.5, det_sast_nms_thresh=0.2, det_pse_thresh=0, det_pse_box_thresh=0.85, det_pse_min_area=16, det_pse_scale=1, scales=[8, 16, 32], alpha=1.0, beta=1.0, fourier_degree=5, rec_algorithm='SVTR_LCNet', rec_model_dir='../models/rec_onnx/model.onnx', rec_image_inverse=True, rec_image_shape='3, 48, 320', rec_batch_num=6, max_text_length=25, rec_char_dict_path='/Users/nanakdeepsingh/Projects/Code/JIRA

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/nanakdeepsingh/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/nanakdeepsingh/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/nanakdeepsingh/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [4]:
import hashlib
import hmac
import json


def calculate_sha256(data: bytes) -> str:
    """Calculate the SHA256 hash value of the input data.

    Args:
        data (bytes): The data to be hashed.

    Returns:
        str: The hexadecimal representation of the SHA256 hash value.
    """
    sha256_hash = hashlib.sha256()
    sha256_hash.update(data)
    sha256_hex = sha256_hash.hexdigest()

    return sha256_hex


def calculate_hmac(secret_key: bytes, response: dict) -> str:
    """
    Calculates the HMAC (hash-based message authentication code) of the provided
    file bytes using the provided secret key and SHA-256 hash algorithm.

    Args:
        secret_key (bytes): The secret key to use for generating the HMAC.
        file_bytes (bytes): The bytes of the file to generate the HMAC for.

    Returns:
        str: The hexadecimal representation of the computed HMAC.
    """

    request_body_json = json.dumps(response).encode("utf-8")

    hash_algorithm = hashlib.sha256
    computed_hmac = hmac.new(secret_key, request_body_json, hash_algorithm).hexdigest()

    return computed_hmac


In [5]:
import csv
import io
import logging
import zipfile

import backoff
import magic
import numpy as np
import pandas as pd
import requests

# from utils.constants import FileEntry, FileType, PredictionTypes
# from utils.encryption import calculate_sha256
# from utils.ocr import get_docx_text, get_excel_text, get_image_text, get_pdf_text


class FileProcessor:
    def __init__(self, documents: list, request_id: str, segregate: bool = False):
        self.documents = documents
        self.request_id = request_id
        self.segregate = segregate
        self.request_info = {}
        self.extracted_documents = []

    def process_files(self):
        """Process files and does the ocr in the zip archive.

        Returns:
            str: Returns ocr data in csv format.
        """

        logger = logging.getLogger(self.request_id)

        csv_data = []

        for document in self.documents:
            filename = document.filename
            file = document.file_object
            mime_type = magic.from_buffer(file, mime=True)
            logger.info("Processing %s", filename)

            for key in self.file_type_mapping:
                if mime_type in key:
                    process_func = self.file_type_mapping[key]
                    result = process_func(self, contents=file, filename=filename)
                    csv_data.extend(result)
                    break
            else:
                logger.info(
                    "The file type is not supported by the platform, please check the file %s",
                    mime_type,
                )

                contents_hash = calculate_sha256(file)
                csv_data.extend([[contents_hash, filename, " ", 0]])
                self.request_info.update(
                    {contents_hash: {"filename": filename, "pages": 0, "remarks": PredictionTypes.INVALID_FILE.value}}
                )

        headers = ["parent_hash", "filename", "text", "page_number"]
        result_csv = self._create_csv_string(headers, csv_data)

        return result_csv

    def _do_post_processing(self, ocr_data: dict, contents: bytes, filename: str) -> list:
        """Performs post-processing on OCR data and returns a list of results.

        Args:
            ocr_data (dict): A dictionary containing OCR data, including the extracted text and number of pages.
            contents (bytes): The contents of the file.
            filename (str): The name of the file.

        Returns:
            list: A list of post-processed results. Each result is represented as a sublist with the following elements:
                - contents_hash: The SHA256 hash of the file contents.
                - filename: The name of the file.
                - ocr: The post-processed OCR text.
                - page_index: The index of the page in the OCR data.
        """
        file_result = []
        ocr_result, number_of_pages = (
            ocr_data["text"],
            ocr_data["pages"],
        )

        contents_hash = calculate_sha256(contents)

        self.request_info.update({contents_hash: {"filename": filename, "pages": number_of_pages}})
        for i, ocr in enumerate(ocr_result):
            if not (ocr and len(ocr)):
                ocr = " "
            file_result.append([contents_hash, filename, ocr, i])

        extracted_document = FileEntry(filename, contents)
        self.extracted_documents.append(extracted_document)
        return file_result

    def _process_zip(self, contents: bytes, filename: str) -> list:
        """Process a zip file and extract information from its contents.

        Args:
            contents (bytes): The binary contents of the zip file.
            filename (str): The name of the zip file.

        Returns:
            list: A list of extracted information from the zip file.
        """
        logger = logging.getLogger(self.request_id)
        zip_result = []
        logger.info("Opening zip file")
        zip_file = zipfile.ZipFile(io.BytesIO(contents))
        file_list = zip_file.infolist()

        file_name_list = [file.filename for file in file_list]

        logger.info("Files in zip %s", str(file_name_list))
        for file_info in file_list:
            if file_info.is_dir() or file_info.filename.startswith("__MACOSX"):
                continue

            filename = file_info.filename
            logger.info("Processing %s", filename)

            with zip_file.open(filename) as file:
                individual_file = file.read()
                mime_type = magic.from_buffer(individual_file, mime=True)

                for key in self.file_type_mapping:
                    if mime_type in key:
                        process_func = self.file_type_mapping[key]
                        result = process_func(self, contents=individual_file, filename=filename)
                        zip_result.extend(result)
                        break
                else:
                    logger.info(
                        "The file type is not supported by the platform, please check the file %s",
                        mime_type,
                    )

                    contents_hash = calculate_sha256(individual_file)
                    zip_result.extend([[contents_hash, filename, " ", 0]])
                    self.request_info.update(
                        {
                            contents_hash: {
                                "filename": filename,
                                "pages": 0,
                                "remarks": PredictionTypes.INVALID_FILE.value,
                            }
                        }
                    )

        return zip_result

    def _process_pdf(self, contents: bytes, filename: str) -> list:
        """Process a PDF file and extract text from its contents using OCR.

        Args:
            contents (bytes): The binary contents of the PDF file.
            filename (str): The name of the PDF file.

        Returns:
            list: A list of extracted information from the PDF file.
        """
        pdf_data = get_pdf_text(
            iobytes=contents, request_id=self.request_id, filename=filename, segregate=self.segregate
        )
        pdf_result = self._do_post_processing(ocr_data=pdf_data, contents=contents, filename=filename)
        return pdf_result

    def _process_image(self, contents: bytes, filename: str) -> list:
        """Process a image file and extract text from its contents using OCR.

        Args:
            contents (bytes): The binary contents of the PDF file.
            filename (str): The name of the PDF file.

        Returns:
            list: A list of extracted information from the image file.
        """
        image_data = get_image_text(image=contents, request_id=self.request_id, filename=filename)
        image_result = self._do_post_processing(ocr_data=image_data, contents=contents, filename=filename)
        return image_result

    def _process_excel(self, contents: bytes, filename: str) -> list:
        """Process a excel file and extract text from it.

        Args:
            contents (bytes): The binary contents of the PDF file.
            filename (str): The name of the PDF file.

        Returns:
            list: A list of extracted information from the excel file.
        """
        excel_data = get_excel_text(iobytes=contents, request_id=self.request_id, filename=filename)
        excel_result = self._do_post_processing(ocr_data=excel_data, contents=contents, filename=filename)
        return excel_result

    def _process_word_doc(self, contents: bytes, filename: str) -> list:
        """Process a docx file and extract text from it.

        Args:
            contents (bytes): The binary contents of the PDF file.
            filename (str): The name of the PDF file.

        Returns:
            list: A list of extracted information from the PDF file.
        """
        docx_data = get_docx_text(iobytes=contents, request_id=self.request_id, filename=filename)
        docx_result = self._do_post_processing(ocr_data=docx_data, contents=contents, filename=filename)
        return docx_result

    def _create_csv_string(self, headers: list, csv_data: list) -> str:
        """Create a CSV string from a list of data.

        Args:
            data (list): A list of rows, where each row is a list of values.

        Returns:
            str: The CSV string representation of the data.
        """

        csv_string = io.StringIO()
        csv_writer = csv.writer(csv_string, quoting=csv.QUOTE_ALL)
        csv_writer.writerow(headers)

        for row in csv_data:
            csv_writer.writerow(row)

        csv_string.seek(0)
        csv_formatted_data = csv_string.getvalue()
        csv_string.close()
        return csv_formatted_data

    file_type_mapping = {
        FileType.PDF.value: _process_pdf,
        FileType.EXCEL_FILE.value: _process_excel,
        FileType.WORD_DOCUMENT.value: _process_word_doc,
        FileType.IMAGE.value: _process_image,
        FileType.ZIP.value: _process_zip,
    }



In [10]:
import io

filepath = "../samples/test_doc.docx"

# Open the file in binary mode and read its content
with open(filepath, "rb") as f:
    file_content = f.read()
print(type(file_content),file_content)
# Create a BytesIO object using the read content
myBytesIO = io.BytesIO(file_content)

# Seek to the beginning of the BytesIO object


<class 'bytes'> b'PK\x03\x04\x14\x00\x06\x00\x08\x00\x00\x00!\x00\xdf\xa4\xd2lZ\x01\x00\x00 \x05\x00\x00\x13\x00\x08\x02[Content_Types].xml \xa2\x04\x02(\xa0\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0

In [9]:

files=[myBytesIO]
documents=[]
for file in files:
        contents = file.read()
        print(contents)
        document = FileEntry("test_doc.docx", contents)
        print(type(contents))
        documents.append(document)
print(documents)

b'PK\x03\x04\x14\x00\x06\x00\x08\x00\x00\x00!\x00\xdf\xa4\xd2lZ\x01\x00\x00 \x05\x00\x00\x13\x00\x08\x02[Content_Types].xml \xa2\x04\x02(\xa0\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0

In [None]:
obj=FileProcessor(documents,0) 

In [None]:
obj.process_files()

'"parent_hash","filename","text","page_number"\r\n"d8d54a582cc62cc4c5cdb999b943dcfeb2984087262d7cb483163f74cde610d5","test_doc.docx","mask personally identificable information pii including person name text john doe currently life 1234 elm street springfield anywhere 12345 reached johndoe email com phone number 555 123 4567 social security number 123 45 6789 bank account number 9876543210 springfield bank john attended springfield university earned bachelor degree computer science work acme corp employee id 123456 john medical record number mrn 001234 history asthma high blood pressure primary care physician dr jane smith practice springfield medical center recent blood test result show cholesterol level 200 mg dl blood glucose level 90 mg dl","0"\r\n'

In [None]:
files=[myBytesIO]
documents=[]
for file in files:
        document = FileEntry(file["filename"], file["content"])
        documents.append(document)
print(documents)