In [1]:
!pip install -q google-cloud-documentai
!pip install -q PyMuPDF
!pip install -q google-cloud-storage
!pip show -q google-cloud-documentai
!pip install -q --upgrade google-cloud-documentai
!pip install -q opencv-python
!pip install -q pdf2image
!pip install aspose-words -q

In [26]:
import os
import csv
from typing import Optional
from google.api_core.client_options import ClientOptions
from google.cloud import documentai
import aspose.words as aw

In [27]:
def data_extraction(document):
    extracted_data = {"last_name": [], "first_name": [], "rank": [], "regiment": [], "page": []}

    # Iterate through each line of the document
    for line in document.text.split("\n"):
        
        entities_from_line = {"last_name": None, "first_name": None, "rank": None, "regiment": None, "page": None}

        # Iterate through each entity in the document
        for entity in document.entities:
            if entity.type_ in entities_from_line:
                # Check if the entity is in the current line
                if entity.mention_text in line:
                    entities_from_line[entity.type_] = entity.mention_text

        # Append entities from the line to extracted_data
        for entity_type, entity_value in entities_from_line.items():
            extracted_data[entity_type].append(entity_value)

    # Find the length of the longest list among all entities
    max_length = max(len(value_list) for value_list in extracted_data.values())

    # Pad all lists to have the same length
    for key, value_list in extracted_data.items():
        while len(value_list) < max_length:
            value_list.append(None)
    
    # Check for null values in the "rank" column and other columns
    for i in range(max_length):
        if extracted_data["last_name"][i] is not None and extracted_data["first_name"][i] is not None and extracted_data["rank"][i] is None:
            extracted_data["rank"][i] = "No Rank"

    # Remove all null values from all columns
    for key, value_list in extracted_data.items():
        extracted_data[key] = [value for value in value_list if value is not None]    

    # Check if any page numbers are extracted
    if extracted_data["page"]:
        page_number = extracted_data["page"][0]
    else:
        page_number = "Page Number Not Detected"
    
    extracted_data["page"] = [page_number] * len(extracted_data["last_name"])

    return extracted_data

In [28]:
def save_to_csv(extracted_data, output_file):
    entities = list(extracted_data.keys())
    max_length = max(len(extracted_data[entity]) for entity in entities)

    # Pad shorter lists with None values to match the length of the longest list
    for entity in entities:
        extracted_data[entity] += [None] * (max_length - len(extracted_data[entity]))

    # Filter out rows with null values
    non_null_indices = [i for i in range(max_length) if all(extracted_data[key][i] is not None for key in extracted_data)]
    extracted_data = {key: [extracted_data[key][i] for i in non_null_indices] for key in extracted_data}

    with open(output_file, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(entities)
        for i in range(len(non_null_indices)):
            row_data = [extracted_data[entity][i] for entity in entities]
            writer.writerow(row_data)

In [38]:
def determine_process(project_id, location, processor_id, path, field_mask=None, processor_version_id=None):
    if os.path.isfile(path):
        # If the path is a file, process the single file directly
        file_name, extension = os.path.splitext(os.path.basename(path))
        mime_type = determine_mime_type(extension)
        process_document_sample(project_id, location, processor_id, path, mime_type, field_mask, processor_version_id)
    elif os.path.isdir(path):
        # If the path is a directory, determine MIME types for all files in the directory and process them
        mime_types = directory_mime_types(path)
        for file_name, mime_type in mime_types.items():
            file_path = os.path.join(path, file_name)
            process_document_sample(project_id, location, processor_id, file_path, mime_type, field_mask, processor_version_id)
    else:
        print(f"Invalid path: '{path}'")

In [39]:
def directory_mime_types(path):
    mime_types = {}
    for filename in os.listdir(path):
        file_path = os.path.join(path, filename)
        if os.path.isfile(file_path):
            file_name, extension = os.path.splitext(filename)
            mime_type = determine_mime_type(extension)
            mime_types[filename] = mime_type
    return mime_types

In [40]:
def determine_mime_type(file_extension):
    # Determine MIME type based on file extension
    mime_types_mapping = {
        ".pdf": "application/pdf",
        ".gif": "image/gif",
        ".tiff": "image/tiff",
        ".tif": "image/tiff",
        ".jpg": "image/jpeg",
        ".jpeg": "image/jpeg",
        ".png": "image/png",
        ".bmp": "image/bmp",
        ".webp": "image/webp"
    }
    return mime_types_mapping.get(file_extension.lower(), "Unknown")

In [45]:
def process_document_sample(
    project_id: str,
    location: str,
    processor_id: str,
    file_path: str,
    mime_type: str,
    field_mask: Optional[str] = None,
    processor_version_id: Optional[str] = None,
) -> None:
    # Initialize Document AI client
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")
    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    if processor_version_id:
        name = client.processor_version_path(project_id, location, processor_id, processor_version_id)
    else:
        name = client.processor_path(project_id, location, processor_id)
    
    all_extracted_data = {"last_name": [], "first_name": [], "rank": [], "regiment": [], "page": []}
    
    if mime_type == "application/pdf":
        doc = aw.Document(file_path)
        for page in range(0, doc.page_count):
            page_file_path = f"page_{page}.pdf"
            extractedPage = doc.extract_pages(page, 1)
            extractedPage.save(page_file_path)
        
            # Process each page image using Document AI OCR
            with open(page_file_path, "rb") as image:
                image_content = image.read()

            raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type)

            process_options = documentai.ProcessOptions()

            request = documentai.ProcessRequest(
                name=name,
                raw_document=raw_document,
                field_mask=field_mask,
                process_options=process_options,
            )

            result = client.process_document(request=request)
            document = result.document

            # Convert the OCR result to JSON for the current page
            extracted_data = data_extraction(document)

            for key, value_list in extracted_data.items():
                all_extracted_data[key].extend(value_list)

            os.remove(page_file_path)
    
    else:
        # Process each page image using Document AI OCR
            with open(file_path, "rb") as image:
                image_content = image.read()

            raw_document = documentai.RawDocument(content=image_content, mime_type=mime_type)

            process_options = documentai.ProcessOptions()

            request = documentai.ProcessRequest(
                name=name,
                raw_document=raw_document,
                field_mask=field_mask,
                process_options=process_options,
            )

            result = client.process_document(request=request)
            document = result.document

            # Convert the OCR result to JSON for the current page
            extracted_data = data_extraction(document)

            for key, value_list in extracted_data.items():
                all_extracted_data[key].extend(value_list)
    
    # Save all extracted data to a single CSV file
    output_file = os.path.splitext(os.path.basename(file_path))[0] + "_extracted_data.csv"
    save_to_csv(all_extracted_data, output_file)
    print(f"All extracted data saved to {output_file}")

In [46]:
determine_process(
    project_id="322996871040",
    location="us",
    processor_id="eb7f025d9a48d15",
    path="test",  
    #processor_version_id="69f0b6febc1d5673"
)

All extracted data saved to test_extracted_data.csv
All extracted data saved to 00000024_extracted_data.csv
