In [1]:
import re
import spacy
import stanza
import textacy
from fastcoref import FCoref
from taxonerd import TaxoNERD
from spacy.matcher import Matcher
from spacy.matcher import DependencyMatcher, PhraseMatcher
from pprint import pprint
import requests

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class Tools:
    def __init__(self, *, text=None):
        # Tools
        self.sp_nlp = spacy.load("en_core_web_lg")
        self.sp_doc = None
        self.tn_nlp = TaxoNERD().load(model="en_ner_eco_biobert")
        self.tn_doc = None
        self.fcoref = FCoref(enable_progress_bar=False)
        self.token_map = None
        if text:
            self.update(text)

    @staticmethod
    def clean_text(text):
        cleaned_text = re.sub("[\(\[].*?[\)\]]", "", text)
        cleaned_text = re.sub("\s+", " ", cleaned_text)
        return cleaned_text
        
    def update(self, text):
        self.sp_doc = self.sp_nlp(text)
        self.tn_doc = self.tn_nlp(text)
        # Map Tokens to Index
        self.token_map = {}
        for token in self.sp_doc:
            self.token_map[token.idx] = token.i

In [3]:
class References:
    def __init__(self, tools, texts=None):
        self.tools = tools
        self.predictions = None
        self.cluster_map = None
        if texts:
            self.update(texts)

    def update(self, texts):
        self.predictions = self.tools.fcoref.predict(texts=texts)
        self.cluster_map = self.get_cluster_map(self.predictions)
        
    def get_cluster_map(self, predictions):
        cluster_map = {}
        for prediction in predictions:
            clusters = prediction.get_clusters(as_strings=False)
            for cluster in clusters:
                # Converting the spans in a cluster to tokens.
                # This makes it easier when using it later.
                token_cluster = []
                for span in cluster:
                    if span[0] not in self.tools.token_map:
                        raise Exception("Invalid Token")
                    index = self.tools.token_map[span[0]]
                    token_cluster.append(self.tools.sp_doc[index])
                # Mapping
                for token in token_cluster:
                    cluster_map[token.i] = list(filter(lambda t: t != token, token_cluster))
        return cluster_map
            
    def get_references(self, tokens):
        refs = []
        for token in tokens:
            index = token.i
            if index in self.cluster_map:
                refs += self.cluster_map[index]
        return refs

In [4]:
class Possession:
    # There's no definite names for these patterns as I do not know what
    # to call them. These patterns are used to extract possessive
    # relationships from a sentence. I also could not find better names for
    # the two variables below.
    OWNER = "owner"
    OWNED = "owned"
    
    patterns = {
        "Pattern1": [
            {
                "RIGHT_ID": OWNED,
                "RIGHT_ATTRS": {
                    "POS": {
                        "IN": ["NOUN", "PROPN"]
                    }
                }
            },
            {
                "LEFT_ID": OWNED,
                "REL_OP": ">",
                "RIGHT_ID": OWNER,
                "RIGHT_ATTRS": {
                    "DEP": "poss"
                }
            }
        ],
        "Pattern2": [
             {
                "RIGHT_ID": OWNED,
                "RIGHT_ATTRS": {
                    "POS": {
                        "IN": ["NOUN", "PROPN"]
                    }
                }
            },
            {
                "LEFT_ID": OWNED,
                "REL_OP": ">",
                "RIGHT_ID": "adp",
                "RIGHT_ATTRS": {
                    "DEP": "prep",
                    "POS": {
                        "IN": ["ADP"]
                    }
                }
            },
            {
                "LEFT_ID": "adp",
                "REL_OP": ">",
                "RIGHT_ID": OWNER,
                "RIGHT_ATTRS": {
                    "DEP": "pobj",
                    "POS": {
                        "IN": ["NOUN", "PROPN"]
                    }
                }
            }
        ],
        "Pattern3": [
            {
                "RIGHT_ID": "verb",
                "RIGHT_ATTRS": {"POS": {"IN": ["VERB"]}}
            },
            {
                "LEFT_ID": "verb",
                "REL_OP": ">",
                "RIGHT_ID": OWNER,
                "RIGHT_ATTRS": {
                    "DEP": "nsubj",
                    "POS": {"IN": ["PRON"]}
                }
            },
            {
                "LEFT_ID": "verb",
                "REL_OP": ">",
                "RIGHT_ID": OWNED,
                "RIGHT_ATTRS": {
                    "DEP": "dobj",
                    "POS": {"IN": ["NOUN", "PROPN"]}
                }
            }
        ],
        "Pattern4": [
            {
                "RIGHT_ID": "verb",
                "RIGHT_ATTRS": {"POS": {"IN": ["VERB"]}}
            },
            {
                "LEFT_ID": "verb",
                "REL_OP": ">",
                "RIGHT_ID": OWNED,
                "RIGHT_ATTRS": {
                    "DEP": "nsubj",
                    "POS": {"IN": ["NOUN", "PROPN"]}
                }
            },
            {
                "LEFT_ID": "verb",
                "REL_OP": ">",
                "RIGHT_ID": "adp",
                "RIGHT_ATTRS": {
                    "DEP": "prep",
                    "POS": {"IN": ["ADP"]}
                }
            },
            {
                "LEFT_ID": "adp",
                "REL_OP": ">",
                "RIGHT_ID": OWNER,
                "RIGHT_ATTRS": {
                    "DEP": "pobj",
                    "POS": {"IN": ["NOUN", "PROPN"]}
                }
            }
        ],
    }
    
    def __init__(self, tools):
        self.tools = tools
        self.matcher = DependencyMatcher(self.tools.sp_nlp.vocab)
        for pattern_id, pattern in Possession.patterns.items():
            self.matcher.add(pattern_id, [pattern])
        self.update()
    
    def update(self):
        matches = self.matcher(self.tools.sp_doc)
        owner_map, owned_map = self.get_ownership_map(matches)
        self.owner_map = owner_map # Maps Owner to Owned
        self.owned_map = owned_map # Maps Owned to Owner
        
    def get_ownership_map(self, matches):
        owner_map = {}
        owned_map = {}

        for match_id, token_ids in matches:
            pattern_id = self.tools.sp_nlp.vocab.strings[match_id]
            # print(pattern_id)
            owner = None
            owned = None
            for i in range(len(token_ids)):
                right_id = Possession.patterns[pattern_id][i]["RIGHT_ID"]
                if right_id == Possession.OWNER:
                    owner = self.tools.sp_doc[token_ids[i]]
                if right_id == Possession.OWNED:
                    owned = self.tools.sp_doc[token_ids[i]]

            # Owner to Owned
            if owner.i not in owner_map:
                owner_map[owner.i] = []
            owner_map[owner.i].append(owned)

            # Owned to Owner
            if owned.i not in owned_map:
                owned_map[owned.i] = []
            owned_map[owned.i].append(owner)
            
        return (owner_map, owned_map)

    def get_owner(self, tokens):
        owners = []
        for token in tokens:
            index = token.i
            if index in self.owned_map:
                owners += self.owned_map[index]
        return owners

    def get_owned(self, tokens):
        owned = []
        for token in tokens:
            index = token.i
            if index in self.owner_map:
                owned += self.owner_map[index]
        return owned

In [5]:
class Unit:
    def __init__(self, *, species=None, trait=None, change=None, cause=None):
        self.species = species
        self.trait = trait
        self.cause = cause
        self.change = change

    def empty(self):
        if not self.species and not self.trait and not self.cause and not self.change:
            return True
        return False

    def not_empty(self):
        return not self.empty()

    def can_merge(self, unit):
        # Two units can merge if there's no
        # overlap.
        if self.species and unit.species:
            return False
        if self.trait and unit.trait:
            return False
        if self.cause and unit.cause:
            return False
        if self.change and unit.change:
            return False
        return True

    def merge(self, unit):
        # We take the parts that the
        # other unit has; assuming that
        # there's no overlap, there's
        # no loss of information.
        if unit.species:
            self.species = unit.species
        if unit.trait:
            self.trait = unit.trait
        if unit.cause:
            self.cause = unit.cause
        if unit.change:
            self.change = unit.change

    def __str__(self):
        return f"Species: {self.species}, Trait: {self.trait}, Cause: ({self.cause}), Change: {self.change}"

In [6]:
class Species:
    def __init__(self, tools):
        self.tools = tools
        self.species_indices = None
        self.update()

    def update(self):
        self.species_indices = self.get_species_indices()
        
    def get_species_indices(self):
        indices = []

        lowered_text = self.tools.sp_doc.text.lower()
        for token in self.tools.sp_doc:
            if token.pos_ not in ["NOUN", "PROPN"]:
                continue
            results = requests.get(f"https://api.inaturalist.org/v1/search?q={token.lemma_}&sources=taxa&include_taxon_ancestors=false")
            results = results.json()
            results = results["results"]
            for result in results:
                if "record" not in result or "name" not in result["record"]:
                    continue
                if lowered_text.find(result["record"]["name"].lower()) == -1:
                    continue
                indices.append(token.i)
                
        for species_span in self.tools.tn_doc.ents:
            for species in species_span:
                if species.idx not in self.tools.token_map:
                    raise Exception("Invalid Token")
                index = self.tools.token_map[species.idx]
                if index in indices:
                    continue
                indices.append(index)
        return indices

    def is_species(self, token):
        index = token.i
        return index in self.species_indices
        
    def contains_species(self, tokens):
        for token in tokens:
            if token.i in self.species_indices:
                return True
        return False

In [7]:
class Parser:
    def __init__(self, text, *, tools=None, species=None, possession=None, references=None):
        self.tools = tools if tools else Tools(text=text)
        self.species = species if species else Species(self.tools)
        self.possession = possession if possession else Possession(self.tools)
        self.references = references if references else References(self.tools, texts=[text])
        self.change_keywords = []
        for keyword in {"increase", "decrease", "change", "weaken"}:
            self.change_keywords.append(self.tools.sp_nlp(keyword))
        self.change_quantity_keywords = []
        for keyword in {"tenfold", "half", "double", "triple", "quadruple", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"}:
            self.change_quantity_keywords.append(self.tools.sp_nlp(keyword))
            
    def update(self, text):
        self.tools.update(text)
        self.species.update()
        self.possession.update()
        self.references.update(texts=[text])

    def is_change_keyword(self, token):
        # print(f"Lemma of {token}: {token.lemma_}")
        token_lemma = self.tools.sp_nlp(token.lemma_)
        for keyword in self.change_keywords:
            if keyword.similarity(token_lemma) > 0.9:
                return True
        return False

    def has_change_quantity(self, tokens):
        for token in tokens:
            if token.pos_ == "NUM":
                return True
            if token.lower_ == "to":
                return True
            if token.pos_ != "NOUN":
                continue
            token_lemma = self.tools.sp_nlp(token.lemma_)
            for keyword in self.change_quantity_keywords:
                if keyword.similarity(token_lemma) > 0.9:
                    return True
        return False
                
    def parse_segment(self, l_i, r_i):
        # print(f"\n\nPARSING SEGMENT\n")
        # print(f"Text: {self.tools.sp_doc[l_i:r_i+1].text}")
        used = []

        # Find Cause
        cause = []
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if token not in used and token.pos_ in ["SCONJ"]:
                start_i = token.i + 1
                end_i = start_i
                while end_i <= r_i and self.tools.sp_doc[end_i] not in used and self.tools.sp_doc[end_i].pos_ in ["ADP", "DET", "NOUN", "PROPN", "AUX", "ADV", "PRON", "ADJ"]:
                    used.append(self.tools.sp_doc[end_i])
                    cause.append(self.tools.sp_doc[end_i])
                    end_i += 1
                used.append(token)
        # print(f"Cause: {cause}")
        
        # Find Species
        species = None
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if self.species.is_species(token) and token not in used and (token.head and (token.head.pos_ not in ["SCONJ", "ADP"] or token.head.lower_ == "of")):
                species = token
                used.append(species)
                break
        # print(f"Species: {species}")
        
        # Find Change
        change_keywords = {"increase", "decrease", "change"}
        change = []
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if token not in used and not token.is_oov:
                if self.is_change_keyword(token):
                    change.append(token)
                    used.append(token)
            # I only want one word that represents
            # the change for simplicity
            if change:
                break
        # print(f"Change 1: {change}")

        # Next Method to Find Change
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if token not in used and token.pos_ == "ADP" and token.lower_ != "of":
                # print("In Next Method...", token, token not in used, token.pos_)
                start_i = token.i + 1
                end_i = start_i
                possible_changes = []
                while end_i <= r_i and self.tools.sp_doc[end_i].pos_ in ["NUM", "SYM", "NOUN", "ADP", "DET"]:
                    possible_changes.append(self.tools.sp_doc[end_i])
                    end_i += 1
                if not self.has_change_quantity(possible_changes):
                    continue
                # print(f"Actual Changes: {possible_changes}")
                for possible_change in possible_changes:
                    used.append(possible_change)
                    change.append(possible_change)
                used.append(token)
                break
        # print(f"Change 2: {change}")

        # Find Trait
        trait = []
        if species:
            possible_traits = self.possession.get_owned([species])
            trait = possible_traits
        if change:
            # The trait is listed before the change (i.e. "diet shifts from ...")
            prev_i = change[0].i - 1
            prev_token = None if prev_i < 0 else self.tools.sp_doc[prev_i]
            if prev_token and prev_token not in used and prev_token.pos_ == "NOUN":
                used.append(prev_token)
                trait.append(prev_token)
            else:
                # Look for "in" (i.e. "increase in ...")
                for child in change[0].children:
                    # print(child, child.pos_, child.children)
                    if child in used:
                        continue
                    if child.pos_ == "ADP" and child.children:
                        children = list(child.children)
                        if children[0] not in used:
                            used.append(children[0])
                            trait.append(children[0])
        # print(f"Trait: {trait}")
        
        # Find Cause
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if token not in used and token.pos_ in ["ADP"]:
                start_i = token.i + 1
                end_i = start_i
                buffer = []
                noun_found = False
                while end_i <= r_i and self.tools.sp_doc[end_i] not in used and self.tools.sp_doc[end_i].pos_ in ["ADP", "DET", "NOUN", "PROPN", "AUX", "ADV", "PRON", "ADJ"]:
                    if self.tools.sp_doc[end_i].pos_ in ["NOUN", "PROPN", "PRON"]:
                        noun_found = True
                    buffer.append(self.tools.sp_doc[end_i])
                    end_i += 1
                if noun_found:
                    for token in buffer:
                        used.append(token)
                        cause.append(token)
                    used.append(token)
        # print(f"Cause: {cause}")
        
        unit = Unit(species=species, trait=trait, change=change, cause=cause)
        return unit

    def parse_sentence(self, l_i, r_i):
        # Find Verb
        # The verb is used to divide
        # the "parsing" space, which
        # makes the work simpler.
        verb = None
        for token in self.tools.sp_doc[l_i:r_i+1]:
            if token.pos_ == "VERB":
                verb = token
                break

        # Base Case
        # If there is no verb, we have
        # reached the simplest case and
        # can extract information.
        if verb == None:
            return self.parse_segment(l_i, r_i)
        # Merge
        # Given two units of information,
        # we need to merge them.
        else:
            verb_is_change = self.is_change_keyword(verb)
            print(f"Verb: {verb}")
            print(f"Text: {self.tools.sp_doc[l_i:r_i+1].text}")
            l_unit = self.parse_sentence(l_i, verb.i - 1)
            r_unit = self.parse_sentence(verb.i + 1, r_i)
            m_unit = None
            print(f"\tL Unit: {l_unit}")
            print(f"\tR Unit: {r_unit}")
            if l_unit.not_empty() and r_unit.empty():
                print(1)
                if verb_is_change:
                    l_unit.change.append(verb)
                m_unit = l_unit
            elif r_unit.not_empty() and l_unit.empty():
                print(2)
                if verb_is_change:
                    r_unit.change.append(verb)
                m_unit = r_unit    
            elif l_unit.can_merge(r_unit):
                print(3)
                l_unit.merge(r_unit)
                if verb_is_change:
                    l_unit.change.append(verb)
                m_unit = l_unit
            elif self.is_change_keyword(verb):
                print(4)
                r_unit.cause = l_unit
                if verb_is_change:
                    r_unit.change.append(verb)
                m_unit = r_unit
            else:
                print(5)
                if verb.i - 1 > r_i - (verb.i + 1):
                    m_unit = l_unit
                else:
                    m_unit = r_unit
            print(f"M Unit: {m_unit}")
            return m_unit

    def parse(self):
        units = []
        for sent in self.tools.sp_doc.sents:
            units.append(self.parse_sentence(sent.start, sent.end - 1))
        return units

In [8]:
# text = Tools.clean_text("Acridoidea and Selachii exhibited significant diet shifts from grass to herbs (Kruskal-Wallis test, P 0.01, df 3) when they were in the presence of the comparatively sedentary species (the smaller Pisaurina and the larger Hogna) compared to controls without spiders (Fig. 2).")
# text = Tools.clean_text("Our results show that phototrophs can indirectly decrease the population density of heterotrophic bacteria by modification of the nature of bacterial interactions with predators.")
# text = Tools.clean_text("Our results show that Selachii can indirectly decrease the population density of Selachimorpha by modification of the nature of bacterial interactions with predators.")
# text = Tools.clean_text("All predators inflicted significant mortality on the prey at each prey density compared to the predator-free control for that density")
text = Tools.clean_text("Our results show that an increase in sediment organic matter content is associated to a decline in the abundance of Loripes lucinalis (lucinid bivalve) in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.")
print(text)

Our results show that an increase in sediment organic matter content is associated to a decline in the abundance of Loripes lucinalis in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.


In [9]:
tools = Tools(text=text)
species = Species(tools)
possession = Possession(tools)
references = References(tools, texts=[text])

05/01/2025 00:16:53 - INFO - 	 missing_keys: []
05/01/2025 00:16:53 - INFO - 	 unexpected_keys: []
05/01/2025 00:16:53 - INFO - 	 mismatched_keys: []
05/01/2025 00:16:53 - INFO - 	 error_msgs: []
05/01/2025 00:16:53 - INFO - 	 Model Parameters: 90.5M, Transformer: 82.1M, Coref head: 8.4M
05/01/2025 00:17:02 - INFO - 	 Tokenize 1 inputs...
Map: 100%|██████████| 1/1 [00:00<00:00, 36.95 examples/s]
05/01/2025 00:17:02 - INFO - 	 ***** Running Inference on 1 texts *****


In [10]:
parser = Parser(text, tools=tools, species=species, possession=possession, references=references)
parser.update(text)
units = parser.parse()
for unit in units:
    print(unit)

05/01/2025 00:17:10 - INFO - 	 Tokenize 1 inputs...
Map: 100%|██████████| 1/1 [00:00<00:00, 46.81 examples/s]
05/01/2025 00:17:11 - INFO - 	 ***** Running Inference on 1 texts *****


Verb: show
Text: Our results show that an increase in sediment organic matter content is associated to a decline in the abundance of Loripes lucinalis in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.
Verb: associated
Text: that an increase in sediment organic matter content is associated to a decline in the abundance of Loripes lucinalis in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.
Verb: lucinalis
Text: to a decline in the abundance of Loripes lucinalis in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.
Verb: studied
Text: in the Cymodocea nodosa meadows studied, which potentially may weaken the mutualism between the two species.
Verb: weaken
Text: , which potentially may weaken the mutualism between the two species.
	L Unit: Species: None, Trait: [], Cause: ([]), Change: []
	R Unit: Species: None, Trait: [

In [11]:
species.species_indices

[20, 24, 24, 24, 25, 21, 38]