In [1]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
os.environ["TORCH_USE_CUDA_DSA"] = "1"

In [2]:
import torch

print(torch.cuda.is_available())

True


In [3]:
text_doc = "resources/images/lightfootcat/images/extracted_text.txt"
divisions = ["Dicotyledones", "Monocotyledones", "Pteridophyta", "Hepaticae", "Algae"]#

text_doc2 = "resources/images/hanbury/extracted_text.txt"
divisions2 = ["Dicotyledones", "Gamopetalae", "MONOCHLAMYDEAE","GYMNOSPERMEAE", "Monocotyledones"]

In [4]:
with open(text_doc, "r") as f:
    extracted_text = f.read()

with open(text_doc2, "r") as f:
    extracted_text2 = f.read()

In [5]:
from lib.utils.text_utils import FAMILY_REGEX, FAMILY_REGEX_WITH_LOOKAHEAD, FAMILY_REGEX_PATTERN
import re

In [29]:
from typing import Optional, Iterator, Match
from pygbif import species

class TextProcessor:

    def __init__(self, max_chunk_size: int=3000):
        
        # Family Regex
        self.family_regex = re.compile(rf"""(?P<PAGENOSTART>^\d+)?\s*
                                       (?P<INDEX>[IVXLCDM\.]+)?(\s+|-)
                                       (?P<FAMILY>{FAMILY_REGEX_PATTERN})\s*
                                       (?P<PAGENOEND>\d+$)?""", flags=re.VERBOSE)
        # Species Regex
        self.species_regex_pattern = "(?:\d+\.\s)?[A-Z][a-z]+(?:\s[a-z]+)?(?:\s(var\.|subsp\.|f\.)\s[a-z]+)?(?:\s[A-Z][a-z]+)?(?:\s\([\w\s]+\))?"
        self.species_regex = re.compile(rf"(?P<SPECIES>{self.species_regex_pattern})")

        # Known non-species words
        self.not_species_text = set()

        self.max_chunk_size = max_chunk_size

    
    def __call__(self, text: str, divisions: list, return_blocks: bool):
        
        # Pre-process input text to clean it
        text = self.preprocess_text(text, divisions[0])

        # split the structure by divisions
        div_struct = self.split_by_divisions(text, divisions)

        # Define a new structure
        struct = dict()

        for current_div, div_content in div_struct.items():
            current_div = current_div.strip()
            print(f"==> Processing {current_div}")
            
            # Split the text into paragraphs
            split_content = div_content.split("\n\n")

            current_family = None

            # Start a dict for the division
            struct[current_div] = dict(details=[], families = {})

            for line in split_content:
                line = line.strip()
                if not line:
                    continue

                family_matches = self._check_family(line)

                print(family_matches)
        
        return struct

    def _check_family(self, line: str) -> dict:
        """
        Check for all families in a line and organize their contents.
        
        Args:
            line (str): The line to check for families
        
        Returns:
            dict: A dictionary with family names as keys and their contents as values
        """
        if not line:
            return {}
        
        # Find all family matches
        family_matches = list(re.finditer(self.family_regex, line))
        
        # If no families found, return empty dict
        if not family_matches:
            return {}
        
        # Dictionary to store family information
        family_data = {}
        
        def clean_family_name (fname: str) -> str:
            fname = re.sub(r".*?([A-Za-z]+).*?", r"\1", fname)
            fname = fname.upper()
            return fname
        
        # Process each family match
        for idx, match in enumerate(family_matches):
            family_name = match.group('FAMILY').strip()
            family_name = clean_family_name(family_name)
            # Skip if this family was already processed (take the earliest match)
            if family_name in family_data:
                continue
            
            # Determine the text segment for this family (until next family or end of line)
            start_pos = match.end()
            
            # Find the next family match position, or end of line if this is the last family
            end_pos = len(line)
            for next_match in family_matches[idx+1:]:
                next_family_name = next_match.group('FAMILY').strip()
                next_family_name = clean_family_name(next_family_name)
                # Only consider this a boundary if it's a different family
                if next_family_name != family_name:
                    end_pos = next_match.start()
                    break
            
            # Extract the content belonging to this family
            family_content = line[start_pos:end_pos].strip()
            
            # Store the family data
            family_data[family_name] = {
                'match': match,
                'content': family_content,
                'species': []  # To be filled if needed
            }
        
        return family_data

    def _check_species(self, line: str, family: Optional[str]=None) -> object:
        
        species_matches = re.finditer(self.species_regex, line)

        # filter out any matches that are 5 characters and above and have atleast 2 words in it
        species_matches = filter(lambda x: (len(x.group(0)) > 5 and len(x.group(0).split(" ")) >= 2), species_matches)

        regex_check = lambda x: True if re.match(r"\d+?\.?\s*\w{3,}\s\w{2}\.*", x.group(0)) else False
        species_matches = filter(regex_check, species_matches)
        
        species_matches = filter(lambda x: x.group(0) not in self.not_species_text, species_matches)

        check_against_gbif = lambda x: self._check_against_gbif(x, family)
        species_matches = filter(check_against_gbif, species_matches)

        return species_matches

    def _check_against_gbif(self, species_match, family: str=None) -> bool:

        species_name = re.sub(r"^(\d+\.\s*)?", "", species_match.group(0))
        species_name = species_name.strip()
        
        if family:
            gbif_search = species.name_backbone(name=species_name, family=family, kingdom="plants", strict=False, verbose=True, limit=1)
        else:
            gbif_search = species.name_backbone(name=species_name, kingdom="plants", strict=False, verbose=True, limit=1)
        

        def check_gbif_dict(gbif_dict: dict) -> bool:
            
            if gbif_dict["matchType"] == "NONE":
                return False

            if (
                (gbif_dict["rank"].lower() in ["genus", "species"]) 
                and
                (gbif_dict["confidence"] >= 50)
                and
                (gbif_dict["status"].lower() == "ACCEPTED".lower())
                ):
                return True
            
            return False
        
        check_first_line = check_gbif_dict(gbif_search)

        if check_first_line:
            return True
        elif "alternatives" in gbif_search and len(gbif_search["alternatives"]) >= 1:
            # Only checking the first alternative
            check_alternative = check_gbif_dict(gbif_search["alternatives"][0])
            
            if check_alternative:
                return True
        
        self.not_species_text.add(species_match.group(0))
        return False
            


    def preprocess_text(self, text: str, first_division: str) -> str:
        """
        Preprocess the text for splitting into text blocks

        Args:
            text (str): Extracted text
            first_division (str): The first division in text

        Returns:
            str: Cleaned text
        """
        text = re.sub(rf"^.*?({re.escape(first_division)})", r"\1", text, flags=re.S | re.I)
         
        text = re.sub(r"\*\*(.+?)\*\*", r"\1", text, flags=re.MULTILINE) # Remove any markdown (bold) on string
        text = re.sub(r"\*", "", text, flags=re.MULTILINE)
        text = re.sub(r"```", "", text, flags=re.MULTILINE) # Remove any markdown
        text = re.sub(r"^(Catalogue|catalogue)$", "", text, flags=re.MULTILINE) # Remove Catalogue/catalogue
        text = re.sub(f"^\d+$", "", text, flags=re.MULTILINE)
        # Clean family ending
        text = re.sub(r"Æ", "AE", text, flags=re.MULTILINE)
        text = re.sub(r"ACE\.E\.", "ACEAE", text, flags= re.MULTILINE) # This changes for all family level ones
        text = re.sub(r"ace\.e\.", "aceae", text, flags= re.MULTILINE | re.I) # This changes for all others
        text = re.sub(r"OR\.E", "ORAE", text, flags= re.MULTILINE) # This changes for all family level ones
        text = re.sub(r"or\.e", "orae", text, flags= re.MULTILINE | re.I) # This changes for all others
        
        return text
    
    def _create_division_regex(self, divisions: Optional[list]=None) -> re.Pattern:
        """
        Generated the division regex

        Args:
            divisions (Optional[list], optional): List of divisions. Defaults to None.

        Returns:
            re.Pattern: Pattern for division regex
        """
        if not(divisions):
            return re.compile(f"(?:\d+\.?\s+)?([A-Z][a-z]+|[A-Z]+)\.?")
        
        division_str = "|".join(map(re.escape, divisions))
        return re.compile(f"(?:\d+\.?\s+)?({division_str})\.?", re.IGNORECASE)
    
    def split_by_divisions(self, text: str, divisions: list) -> dict:
        """
        Split the text by division and clean the output to get a structured hierarchy of divisions

        Args:
            text (str): extracted text
            divisions (list): List of divisions to split by

        Returns:
            dict: a structured hierarchy of divisions and their contents
        """

        # Generate div regexes
        # To split divisions
        div_regex = self._create_division_regex(divisions)
        # To check if a division
        div_check_regex = self._create_division_regex()

        # Intialise structure
        struct = {}

        #Split by divisions and clean
        div_split = re.split(div_regex, text)
        remove_newline = lambda x: not(re.match(re.compile(r"^(\n)+$"), x))
        div_split = list(filter(None,div_split))
        div_split = list(filter(remove_newline, div_split))

        # Pack into splits
        splits = list(zip(div_split[::2], div_split[1::2]))

        # Iterate through all divisions and Check if they match a divison, if not add it to previous divisions
        prev_div = None
        for div, content in splits:
            if re.match(div_check_regex, div):
                if div not in struct.keys():
                    struct[div] = content
                else:
                    struct[div] += content
                prev_div = div
            else:
                struct[prev_div] += div + content

        return struct



In [30]:
processor = TextProcessor(max_chunk_size=3000)

In [31]:
blocks = processor(extracted_text2, divisions2, return_blocks=False)

==> Processing DICOTYLEDONES
{'POLYPETALAE.': {'match': <re.Match object; span=(0, 16), match='I. POLYPETALAE. '>, 'content': '', 'species': []}, 'THALAMIFLORAE.': {'match': <re.Match object; span=(16, 33), match='I. THALAMIFLORAE.'>, 'content': '', 'species': []}}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{'MENISPERMACEAE': {'match': <re.Match object; span=(846, 863), match='. MENISPERMACEAE '>, 'content': '19. Abuta rufescens, Aublet. a. Leaves. This specimen is one of J. Correa de Mello\'s Pl. Bra., and is his "Menispermaceae, No. 6, Folhas de Abutus grande ou Pareira Brava Grande." It was compared with Aublet\'s specimen in British Museum, 17 June, 1873, by D. H., who adds in pencil, "same as 3660 Abuta, Rio de Janeiro, Coll. Glaziou, in Herb. Kew. Doubtfully referred to Abuta rufescens, Aubl. in Herb. Burchell at Kew, but seems to me the same species. 14 June, 1873."', 'species': []}}
{'THALAMIFLORAE.': {'match': <re.Match object; span=(0, 17), match='4 THALAMIFLORAE. '>, 'content': '20. C

In [9]:
blocks

{'DICOTYLEDONES': {'details': [], 'families': {}},
 'GAMOPETALAE': {'details': [], 'families': {}},
 'MONOCHLAMYDEAE': {'details': [], 'families': {}},
 'GYMNOSPERMEAE': {'details': [], 'families': {}},
 'MONOCOTYLEDONES': {'details': [], 'families': {}}}

In [10]:
list(re.finditer(family_regex, blocks["DICOTYLEDONES"].split("\n\n")[1]))[0].group("INDEX")

NameError: name 'family_regex' is not defined

In [None]:
family_regex = re.compile(rf"(?P<PAGENOSTART>^\d+)?\s*(?P<INDEX>[IVXLCDM\.]+)?\s*(?P<FAMILY>{FAMILY_REGEX_PATTERN})\s*(?P<PAGENOEND>\d+$)?", flags=re.VERBOSE)
species_regex_pattern = "(?:\d+\.\s)?[A-Z][a-z]+(?:\s[a-z]+)?(?:\s(var\.|subsp\.|f\.)\s[a-z]+)?(?:\s[A-Z][a-z]+)?(?:\s\([\w\s]+\))?"
species_regex = re.compile(rf"(?P<SPECIES>{species_regex_pattern})")
page_no_regex = re.compile(r"(?P<PAGENO>^\d+$)")

In [None]:
new_struct = {}

for div, dc in blocks.items():
    print(f"==> {div}")
    for fam, fc in dc["families"].items():
        print(f">> {fam}")
        print("--")
        for line in fc["species"]:
            family_matches = re.finditer(family_regex, line)
            species_matches = re.finditer(species_regex, line)
            page_matches = re.finditer(page_no_regex, line)
            print(line)
            if (family_matches is None) and (species_matches is None) and (page_matches is None):
                print("NOTHING FOUND")
                print("--")
                continue
            if family_matches is not None:
                print(list(family_matches))
                # start_group = family_matches.group("PAGENOSTART")
                # print("PAGE_NO_START: {}".format(list(start_group) if start_group is not None else "None"))
                # fam_group = family_matches.group("FAMILY")
                # print("FAMILY: {}".format(list(fam_group) if fam_group is not None else "None"))
                # end_group = family_matches.group("PAGENOEND")
                # print("PAGE_NO_END: {}".format(list(end_group) if end_group is not None else "None"))
                pass
            if species_matches is not None:
                print(list(species_matches))
                #print("SPECIES: {}".format(list(species_matches.group("SPECIES"))))
            if page_matches is not None:
                print(list(page_matches))
                #print("PAGE_NO: {}".format(list(page_matches.group("PAGENO"))))
            # matches = re.match(species_regex, line)
            # if matches:
            #     name = matches.group(0)
            #     name = re.sub(r"(\d+\.\s)", "", name)
            #     print(f"Searching {name}...")

            #     result = species.name_backbone(name=name, family=fam, kingdom="plants", strict=False, verbose=True, limit=1)
            #     if "rank" in result and result["rank"] == "FAMILY":
            #         print("False Positive")
            #         print(result)
            #     else:
            #         print(f"Positive =====> {result}")
            print("--")

==> DICOTYLEDONES
>> RANUNCULACEAE.
--
1. Aconitum Napellus, L. a. A portion of the plant in flower. Hohneck, Vosges, 13 Aug., 1855. Pie de Sancy, Auvergne, Sept., 1858. b. Plant in flower, with root attached.
[]
[<re.Match object; span=(0, 20), match='1. Aconitum Napellus'>, <re.Match object; span=(62, 69), match='Hohneck'>, <re.Match object; span=(71, 77), match='Vosges'>, <re.Match object; span=(82, 85), match='Aug'>, <re.Match object; span=(88, 106), match='1855. Pie de Sancy'>, <re.Match object; span=(108, 116), match='Auvergne'>, <re.Match object; span=(118, 122), match='Sept'>, <re.Match object; span=(134, 142), match='Plant in'>]
[]
--
2. Aconitum Lycoctonum, L. a. Plant in flower, with root attached. Hohneck, Vosges, 13 Aug., 1855.
[]
[<re.Match object; span=(0, 22), match='2. Aconitum Lycoctonum'>, <re.Match object; span=(30, 38), match='Plant in'>, <re.Match object; span=(67, 74), match='Hohneck'>, <re.Match object; span=(76, 82), match='Vosges'>, <re.Match object; span=(87,

In [None]:
# When looking at alternative, need to look for rank, confidence and status

In [None]:
re.match(FAMILY_REGEX, "TRIBE I. ARINEAE.")

<re.Match object; span=(0, 17), match='TRIBE I. ARINEAE.'>