In [1]:
import spacy
from spacy.matcher import Matcher
import csv
import utils

class ner_model:
    """
    This class is used to load and configure a custom Named Entity Recognition (NER) model
    using the spaCy library. It sets up the NER pipeline with pre-trained models and custom
    entity patterns for improved accuracy in entity extraction.
    """
    
    def __init__(self):
        
        # Load pre-trained and custom NER models for extraction
        self._nlp_es = spacy.load("es_core_news_lg")
        self._nlp_custom = spacy.load("ner_model/model/model-best")

        # Create a matcher to improve accuracy 
        self._matcher = Matcher(self._nlp_es.vocab)

        # Load list of names
        names = []
        last_names = []

        with open('ner_model/data/processed/first_names_processed.csv', 'r', newline="") as file:
            reader = csv.reader(file, delimiter=",")
            for row in reader:
                names.append(row[0])

        with open('ner_model/data/processed/last_names_processed.csv', 'r', newline='') as file:
            reader = csv.reader(file, delimiter=",")
            for row in reader:
                last_names.append(row[0])

        # Add names and last names to entity ruler
        ruler = self._nlp_custom.add_pipe("entity_ruler", after="ner", config={"overwrite_ents":True})

        # Create patterns
        patterns = []
        match_patterns = []
        for name in names:
            patterns.append({
                "label":"FIRST_NAME",
                "pattern": [{"LOWER":name.lower()}]
            })
            match_patterns.append([{"LOWER":name.lower()}])
    
        # Add patterns to matcher
        self._matcher.add("FIRST_NAME", match_patterns)

        match_patterns = []
        for name in last_names:
            patterns.append({
                "label":"LAST_NAME",
                "pattern": [{"LOWER":name.lower()}]
            })
            match_patterns.append([{"LOWER":name.lower()}])

        # Add patterns to pipeline and matcher
        self._matcher.add("LAST_NAME", match_patterns)
        ruler.add_patterns(patterns)

    def extract_names(self, text_blocks, original_text, token_map):
        ## TODO: Need to find a way to be a bit more flexible with the choice

        results = []

        for block in text_blocks:
            # Extract names
            print("---------- New Block ----------")
            doc = self._nlp_es(block)
            for ent in doc.ents:
                if ent.label_ == "PER":
                    print("## Name found by default model: ", ent, ent.label_)
            
                    # Get bounding boxes for the entire entity and append to database register
                    bounds = utils.get_bounding_box(ent.text, original_text, token_map)

                    # Split entity to clasiffy in first name and last name
                    tokens = ent.text.split(" ")

                    # Gather first names and last names
                    first_names = []
                    last_names = []

                    # Identify names in the entity
                    for token in tokens:
                        name = self._nlp_custom(utils.remove_accent(token))
                        matches = self._matcher(name)
                        for name_ent in name.ents:
                            if len(matches) > 0 and name_ent.label_ == self._nlp_es.vocab[matches[0][0]].text:
                                # Add to corresponding name list
                                if name_ent.label_ == "FIRST_NAME":
                                    first_names.append(name_ent.text)
                                elif name_ent.label_ == "LAST_NAME":
                                    last_names.append(name_ent.text)
                        
                                # Print recognition data
                                print("     -- MATCHES MATCHER. Custom model results: ", 
                                    name_ent, 
                                    name_ent.label_, 
                                    "/ Matcher results: ", 
                                    name[matches[0][1]:matches[0][2]],
                                    self._nlp_es.vocab[matches[0][0]].text)
                        
                            elif len(matches) == 0:
                                continue

                            else:
                                print("     -- DISCREPANCY WITH MATCHER. Custom model results: ", 
                                    name_ent, name_ent.label_, 
                                    name[matches[0][1]:matches[0][2]], 
                                    self._nlp_es.vocab[matches[0][0]].text)
            
                    # Process names separated by a space
                    for i in range(len(tokens)):
                        if i+1 < len(tokens):
                            combined = self._nlp_custom(tokens[i]+tokens[i+1])
                            match = self._matcher(combined)
                            if match and combined.ents[0].label_ == self._nlp_es.vocab[match[0][0]].text:
                                # Add to corresponding name list
                                if combined.ents[0].label_ == "FIRST_NAME":
                                    first_names.append(combined.ents[0].text)
                                elif combined.ents[0].label_ == "LAST_NAME":
                                    last_names.append(combined.ents[0].text)
            
                    # Add first and last names to name list
                    # If no first names are found, set to None
                    if first_names:
                        print("#### Joining first names...")
                        first_names = " ".join(first_names)
                        print("     -- First Names: ", first_names)
                    else:
                        print("#### No names found. Setting to none: ", first_names)
                        first_names = None

                    # If no last names are found, set to None
                    if last_names:
                        print("#### Joining last names...")
                        last_names = " ".join(last_names)
                        print("     -- Last Names: ", last_names)
                    else:
                        print("#### No names found. Setting to none: ", last_names)
                        last_names = None
                    
                    # Append entity to results
                    results.append({
                        "first_name": first_names,
                        "last_name": last_names,
                        "bounding_box": bounds
                    })

        # Return the extracted names and their bounding boxes
        return results
                    

In [2]:
from google.cloud import documentai_v1 as documentai
from google.oauth2 import service_account
from database import Database
import re
import utils

db = Database()

# Base processor data
project_id = "genealogy-ocr-index"
location = "us"
processor_id = "4cb2ae40b145c48"

class ocr_engine:
    """
        This class is used to load a Google Document AI processor and perform OCR on images stored in a Google Cloud Storage bucket.
        It extracts text from the images, identifies names using a custom NER model, and stores the results in a Firestore database.
    """
    def __init__(self) -> None:

        # Create a client
        cred = service_account.Credentials.from_service_account_file("secrets/docai_credentials.json")
        self.__client = documentai.DocumentProcessorServiceClient(
            credentials=cred,
            client_options={"api_endpoint": f"{location}-documentai.googleapis.com"}
        )

        # Get the processor
        self.__processor = self.__client.processor_path(project=project_id, location=location, processor=processor_id)

    # Method to extract the first year found in the document
    def extract_year(self, original_text):
        match = re.search(r"\b[^\d]\d{4}[^\d]\b", original_text)
        idx = match.span()[0]
        end = match.span()[1]
        
        return original_text[idx:end].replace(" ", "")
    
    # Method to create an indexed map of tokens
    def get_map(self, doc, original_text):    
        map = {}
        for page in doc.pages:
            for token in page.tokens:
                idx = token.layout.text_anchor.text_segments[0].start_index
                end = token.layout.text_anchor.text_segments[0].end_index
                # The vertices go clockwise, starting on the upper left corner
                vertices = [vertix for vertix in token.layout.bounding_poly.normalized_vertices]
                map[idx] = (idx, end, original_text[idx:end], vertices)
        
        return map

    def process_documents(self, department, municipality):

        bucket_data = db.storage_get_images()

        # Process each image
        for uri, url in bucket_data:
            # Get extension of images to set correct mime type
            extension = uri[-3:]
            if "jpg" in extension:
                extension = "jpeg"
            elif "png" in extension:
                extension = "png"

            # Create a raw document object
            document = documentai.GcsDocument(gcs_uri=uri, mime_type=f"image/{extension}")

             # Create API request
            request = documentai.ProcessRequest(name=self.__processor, 
                                                gcs_document=document,
                                                process_options={"ocr_config": {
                                                    "hints": {"language_hints": "es"}
                                                    }
                                                })

            # Get OCR result and text
            ocr_result = self.__client.process_document(request=request).document
            text = ocr_result.text

            # Extract year from text
            year = self.extract_year(text)

            # Generate token map
            token_map = self.get_map(ocr_result, text)

            # Get blocks of text
            block_idx = utils.get_indexes(ocr_result)

            # Get processed blocks of text
            text_blocks = []
            for idx in block_idx:
                text_block = utils.get_text(idx, text)
                #print(f"Block: {text_block}")
                text_blocks.append(utils.formatting(text_block))

            # Extract names and bounding boxes from text blocks
            ner = ner_model()
            ner_results = ner.extract_names(text_blocks=text_blocks, original_text=text, token_map=token_map)

            # Add document to Firestore Document collection
            doc_id = db.create_document(year, department, municipality, url)

            # Add a bounding box to Firestore Bounds collection
            for ner_result in ner_results:
                first_name = ner_result["first_name"]
                last_name = ner_result["last_name"]
                
                if not first_name and not last_name:
                    print("Skipping empty entry for: ", first_name, last_name)
                else:
                    # If first name or last name is None, set to empty string
                    if not first_name:
                        first_name = ""
                    if not last_name:
                        last_name = ""
                        
                    # Create a bounding box in the database
                    db.create_bound(ner_result["bounding_box"], doc_id, first_name, last_name)

            # Print Success Message
            print("Image processed successfully:", url)
        

App initialized


In [3]:
# Send API call and process documents
engine = ocr_engine()
result = engine.process_documents("Cundinamarca","Gachetá")



---------- New Block ----------
---------- New Block ----------
## Name found by default model:  Guacamayas PER
### Searching bounding box...
#### Indexes:  106 117 String:  'Guacamayas.'
#### No names found. Setting to none:  []
#### No names found. Setting to none:  []
---------- New Block ----------
## Name found by default model:  Eleuterio PER
### Searching bounding box...
#### Indexes:  166 176 String:  'Eleuterio '
     -- MATCHES MATCHER. Custom model results:  Eleuterio FIRST_NAME / Matcher results:  Eleuterio FIRST_NAME
#### Joining first names...
     -- First Names:  Eleuterio
#### No names found. Setting to none:  []
## Name found by default model:  Julian Patricio PER
### Searching bounding box...
#### Indexes:  178 196 String:  'Ju-\nlian Patricio '
     -- MATCHES MATCHER. Custom model results:  Julian FIRST_NAME / Matcher results:  Julian FIRST_NAME
     -- MATCHES MATCHER. Custom model results:  Patricio FIRST_NAME / Matcher results:  Patricio FIRST_NAME
#### Joining 