In [1]:
from IPython.lib.deepreload import reload
%load_ext autoreload
%autoreload 2

In [2]:
import spacy
import re
from spacy import displacy

from adept.preprocess import preprocessors
from adept.components.registry import ComponentsRegistry
from adept.components.sentencizer import Sentencizer
from spacy.pipeline import EntityRuler
from adept.components.base import BaseComponent
from pathlib import Path
from adept.config import CORPUS_DIR, spacy_config
from adept.utils.expand import ExpandSpan
import enum
from abc import ABCMeta, abstractmethod, ABC
from enum import Enum
from typing import Union
import yaml
from adept.config import ASSETS_DIR

from spacy.matcher import Matcher
from spacy.tokens import Span, Token
from adept.config import unit_registry
from spacy.util import filter_spans
from spacy.tokens import Span, Token, Doc

from adept.components.registry import ComponentsRegistry
from adept.components.sentencizer import Sentencizer
from adept.components.numeric import (NumericDimension, NumericExpand, NumericFraction, NumericMeasurement, NumericRange)
from adept.components.anatomical import AnatomicalEntity
from adept.components.traits import (CustomTraitsEntity, DiscreteTraitsEntity, NumericTraitsEntity)


In [3]:
nlp = spacy.load("en_core_web_trf")

In [4]:
registry = ComponentsRegistry(nlp)
registry.add_components([
    Sentencizer,
    AnatomicalEntity,
    CustomTraitsEntity,
    DiscreteTraitsEntity,
    NumericTraitsEntity,
    NumericExpand,
    NumericDimension,
    NumericMeasurement,
    NumericRange,
    NumericFraction,
])

INIT custom_sentencizer
INIT custom_traits_entity
INIT numeric_trait_entity
INIT numeric_expand
INIT numeric_dimensions
INIT numeric_measurements
INIT numeric_range
INIT numeric_fraction


In [5]:
# text = "Herbs, perennial, 40-100 cm tall, with long rhizomes; stems erect, unbranched or branched in upper part, often with short sterile branches at leaf axils above middle, striate, usually white villous. Leaves sessile; leaf blade lanceolate, oblong-lanceolate, or sublinear, 5-20 x 1-2.5 cm, (2 or)3-pinnatisect, abaxially densely villous, adaxially densely depressed glandular punctuate; ultimate segments lanceolate to linear, 0.5-1.5 x 0.3-0.5 mm, apex cartilaginous-mucronulate. Synflorescence a terminal flat-topped panicle 2-6 cm in diam. Capitula many. Involucres oblong or subovoid, ca. 4 x 3 mm; phyllaries in 3 rows, elliptic or oblong, 1.5-3 x 1-1.3 mm, scarious margin pale yellow or brown; midvein convex. Paleae oblong-elliptic, scarious, abaxially yellow gland-dotted. Ray florets 5; lamina white, pink, or violet-red, suborbicular, 1.5-3 x 2-2.5 mm, apex 2- or 3-denticulate. Disk florets yellow, tubular, 2.2-3 mm, exterior gland-dotted, apex 5-lobed. Achenes greenish, oblong, ca. 2 mm, with white lateral ribs. Corona absent. flower. and fruit. Jul-Sep. 2n = 18, 27, 36 + 0-2B, 45, 54 + 0-3B, 72."


text = "Perennials, 40-100 cm tall 1 stamen (usually rhizomatous, sometimes stoloniferous). Stems 1(-4), erect, simple or branched, densely lanate-tomentose to glabrate. Leaves petiolate (proximally) or sessile (distally, weakly clasping and gradually reduced); blades oblong or lanceolate, 3.5-35+ cm x 5-35 mm, 1-2-pinnately lobed (ultimate lobes +- lanceolate, often arrayed in multiple planes), faces glabrate to sparsely tomentose or densely lanate. Heads 10-100+, in simple or compound, corymbiform arrays. Phyllaries 20-30 in +- 3 series, (light green, midribs dark green to yellowish, margins green to light or dark brown) ovate to lanceolate, abaxial faces tomentose. Receptacles convex; paleae lanceolate, 1.5-4 mm. Ray florets (3-)5-8, pistillate, fertile; corollas white or light pink to deep purple, laminae 1.5-3 x 1.5-3 mm. Disc florets 10-20; corollas white to grayish white, 2-4.5 mm. Cypselae 1-2 mm (margins broadly winged). 2n = 18, 27, 36, 45, 54, 63, 72 (including counts from Europe). Morphologic characters that have been used to segregate these populations into species and/or varieties include: (1) degree and persistence of tomentum; (2) phyllaries with greenish, light brown, or dark brown margins; (3) shapes of capitulescences (rounded or flat-topped); and (4) degrees of leaf dissection and shapes of lobes."

In [8]:
doc = nlp(text)

for ent in doc.ents:
    if ent._.trait_value:
        print(ent.label_)
        print(ent._.trait_value)
        # print(sent)    





YYY
STAMEN_COUNT
1
PLOIDY
2n = 18, 27, 36, 45, 54, 63, 72


In [29]:
displacy.render(doc, style='ent', jupyter=True)

In [101]:
from abc import ABCMeta, abstractmethod, ABC
from enum import Enum
from typing import Union
import yaml
from adept.config import ASSETS_DIR

anatomical_parts_path = Path(ASSETS_DIR / spacy_config['file']['anatomical_parts'])
anatomical_parts = yaml.full_load(anatomical_parts_path.open())

class MeasurementTrait(object):
    
    class Type(Enum):
        WIDTH = 0
        LENGTH = 1  
        HEIGHT = 2
        DEPTH = 3
            
    def __init__(self, anatomical_part, target_unit, dimensions=(Type.WIDTH, Type.LENGTH), minmax=True):
        self.anatomical_part = anatomical_part        
        self.synonyms = anatomical_parts.get(anatomical_part, None) or []
        self.target_unit = target_unit
        self.dimensions = dimensions
        self._parts = set(self.synonyms + [anatomical_part])
        self._data = {d: {} for d in self.dimensions}
        self._source_units = {}
        
    def matches_part(self, part: Union[Span, str]):                    
        _part = (part.text.lower(), part.lemma_) if isinstance(part, Span) else (part,)
        return bool(self._parts.intersection(_part))
        
    def __repr__(self):
        return f'MeasurementTrait({self.anatomical_part} - {self.target_unit})'
        
    def set_value(self, dimension_type: Type, ent: Span, unit):
        # WHAT TO DO WHEN THERE's no value         
        value = self._get_ent_value(ent)
        self._data[dimension_type] = self._get_ent_value(ent)
                
        if unit:
            self.set_source_unit(unit)
            
    def get_value(self, convert=True):
        
        unpack = lambda ks: ([v for k in ks if (v := value_dict.get(k))])
        
        for dimension_type, value_dict in self._data.items():
            value_dict = {
                'min': min(unpack(['lower', 'from'])),
                'max': max(unpack(['to', 'upper']))
            }

            if convert:
                value_dict = {
                    k: self.convert_value(v, self._source_units[dimension_type], self.target_unit) for k,v in value_dict.items()
                }
                unit = self.target_unit
            else:                
                unit = self._source_units[dimension_type]
                
            unit_symbol = unit_registry.get_symbol(str(unit))
                
            for key, value in value_dict.items():
                label = f'{self.anatomical_part} {key}. {dimension_type.name.lower()} [{unit_symbol}]'                
                yield label, value.m
                
    def convert_value(self, value, source_unit, target_unit):
        try:
            measurement = float(value) * source_unit 
        except ValueError as e:
            logger.error(e)
            return None
        else:
            converted_value = measurement.to(target_unit)  
            return round(converted_value, 2)

    def set_source_unit(self, unit):
        # Set the source units
        # Sometimes a dimension only has units set for the second item e.g. 1 x 2 cm
        # So if a unit isn;t available when value is set, this will update previous items            
        for dimension_type in self._data.keys():
            if dimension_type not in self._source_units:
                self._source_units[dimension_type] = unit   
        
    @staticmethod
    def _get_ent_value(ent: Span):
        if ent._.numeric_range:
            value = ent._.numeric_range
        else:
            num = [int(token.text) for token in ent if token.pos_ == 'NUM']
            value = {'from': min(num), 'to': max(num)} 
            
        return value
        
measurement_traits = [
    MeasurementTrait('plant', target_unit=unit_registry.m, dimensions=(MeasurementTrait.Type.HEIGHT,)),
    MeasurementTrait('petal', target_unit=unit_registry.cm),
    MeasurementTrait('root', target_unit=unit_registry.cm, dimensions=(MeasurementTrait.Type.DEPTH,))
]

def get_measurement_trait(part):
    for measurement_trait in measurement_traits:
        if measurement_trait.matches_part(part):
            return measurement_trait        

dimension_types = [MeasurementTrait.Type.WIDTH, MeasurementTrait.Type.LENGTH]

for sent in doc.sents:
    
    sent_part = 'plant' if sent.start == 0 else sent._.anatomical_part
    
    if sent_part and (sent._.dimensions or sent._.measurements):
        measurement_trait = get_measurement_trait(sent_part)
        if not measurement_trait: continue
        
        if sent._.dimensions:
            dimension = sent._.dimensions[0]
            for dimension_type, ent in zip(dimension_types, dimension.ents):
                measurement_trait.set_value(dimension_type, ent, ent._.measurement_unit)
                    
        elif sent._.measurements:
            if len(sent._.measurements) == 2:
                for dimension_type, measurement in zip(dimension_types, sent._.measurements):
                    measurement_trait.set_value(dimension_type, measurement, measurement._.measurement_unit)
            elif len(sent._.measurements) == 1:
                measurement = sent._.measurements[0]
                # FIXME - need a smarter way of identifying measurement type
                # Currently uses the default trait if there's one - or length if multiple                 
                if len(measurement_trait.dimensions) == 1:
                    dimension_type = measurement_trait.dimensions[0]
                else:
                    dimension_type = MeasurementTrait.Type.LENGTH

                measurement_trait.set_value(dimension_type, measurement, measurement._.measurement_unit)
                
        for trait, value in measurement_trait.get_value():
            print(trait)
            print(value.m)
        # print(measurement_trait.get_value())

                

        
    #     # FIXME: Subject is wrong!!!!         
    # print(sent)
    # print(sent_part)
    #     print(measurement_trait)
        # if i == 0:
#             dimension_trait = MeasurementTrait('plant', unit=unit_registry.m, measurement_type=MeasurementTrait.)
#         elif sent_part := ent.sent._.anatomical_part:
            
            
#             print(sent_part)
#             # pass
            
            
    
#     if sent._.dimensions:
            
#         for dimension in sent._.dimensions:

#             # width, length = dimension.ents
#             dimension_types = ['width', 'height']
#             dimensions = {}
#             for dimension_type, ent in zip(dimension_types, dimension.ents):



#                 unit = ent._.measurement_unit
                
                


                    
                

#     elif sent._.measurements:
#         pass
    
    
#         # pass need to detect width/height?         
                
        
        
#         # print(width._.numeric_range)
#         # print(length)
#         # print(length._.numeric_range)
        
#     # print(sent._.measurements)
    
    

plant min. height [m]
0.4
plant max. height [m]
1.0
petal min. width [cm]
0.15
petal max. width [cm]
0.3
petal min. length [cm]
0.15
petal max. length [cm]
0.3
petal min. width [cm]
0.15
petal max. width [cm]
0.3
petal min. length [cm]
0.2
petal max. length [cm]
0.45


In [177]:
from adept.traits.accdb import ACCDBTraits

accdb = ACCDBTraits()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

In [179]:
import pandas as pd

terms = accdb.get_terms()

for t in terms:
    if '_' in t:
        print(t)
        


In [180]:
import yaml
from adept.config import ASSETS_DIR

plant_group = 'angiosperm' 

trait_ents = [ent for ent in doc.ents if ent.label_ == 'TRAIT']

df = accdb._df

traits = df.trait.unique()

from collections import Counter

trait_types = []

for trait in traits:
    try:
        trait_prefix, trait_suffix = trait.split()
    except ValueError:
        continue
    else:
        trait_types.append(trait_suffix)

duplicated_trait_types = [trait_type for trait_type, count in Counter(trait_types).items() if count > 1]

print(duplicated_trait_types)


# print(df.columns)
# # 
data = {}

anatomical_parts_path = Path(ASSETS_DIR / spacy_config['file']['anatomical_parts'])

anatomical_parts = yaml.full_load(anatomical_parts_path.open())

class MeasurementTrait(object):
    pass

MeasurementTrait('plant height [m]', unit=unit_registry.m, measurement_type=MeasurementType.HEIGHT)

class DimensionTrait(object):
    
    def __init__(self, anatomical_part, unit):
        self.anatomical_part = anatomical_part
        self.unit = unit
        
    def set(range_dict):
        pass
        
    
# min_max = True

# MeasurementCharacter('leaf min width [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM, measurement_type=MeasurementType.WIDTH),
# MeasurementCharacter('leaf max width [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO, measurement_type=MeasurementType.WIDTH),
# MeasurementCharacter('leaf min length [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM, measurement_type=MeasurementType.LENGTH),
# MeasurementCharacter('leaf max length [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO, measurement_type=MeasurementType.LENGTH),
# MeasurementCharacter('dispersule min width [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM),
# MeasurementCharacter('dispersule max width [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO),
# MeasurementCharacter('dispersule min length [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM),
# MeasurementCharacter('dispersule max length [cm]', unit=unit_registry.cm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO),
# MeasurementCharacter('seed min width [mm]', unit=unit_registry.mm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM),
# MeasurementCharacter('seed max width [mm]', unit=unit_registry.mm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO),
# MeasurementCharacter('seed min length [mm]', unit=unit_registry.mm, dedupe=Dedupe.PREFER_LOWEST, range_type=Range.FROM),
# MeasurementCharacter('seed max length [mm]', unit=unit_registry.mm, dedupe=Dedupe.PREFER_HIGHEST, range_type=Range.TO),        
# MeasurementCharacter('root depth [cm]', unit=unit_registry.cm),


#         numeric_traits = []

# TODO: Look up plant group
for ent in trait_ents:
    term = ent.text.lower().replace('-', '_')
    
    rows = df[((df.term == term) | (df.character == term) | (df.synonym.astype(str).str.contains(term)) ) & (df['Plants Group'] == plant_group)] 
    for row in rows.itertuples():
        
        # Try and split the trait into prefix, suffic e.g. leaf, shape
        # If suffix is one that applied to multiple parts e.g. shape
        # then validate it against the sent part         
        
        try:
            trait_prefix, trait_suffix = row.trait.split()
        except ValueError:
            pass
        else:
            # Is the trait suffix used across different traits - e.g. shape             
            if trait_suffix in duplicated_trait_types:
                
                # We need sentence to have an anatomical part                 
                if not ent.sent._.anatomical_part:
                    continue
                    
                sent_part = set([ent.sent._.anatomical_part.text.lower(), ent.sent._.anatomical_part.lemma_])                
                trait_parts = set((anatomical_parts[trait_prefix] or []) + [trait_prefix])
                
                # If the trait parts does not match the sent, skip it                 
                if not sent_part.intersection(trait_parts):
                    continue
                    
        data.setdefault(row.trait, set()).add(row.character)
        
        
        
        
        
    # Plant height [m]
        

        
# print(data)
                    

                    
                    
                    


['apex', 'shape', 'arrangement', 'architecture', 'margin', 'colour', 'surface', 'structure', 'orientation', 'base', 'position', 'cross-section', 'consistency']
{'life cycle': {'perennial'}, 'clonality': {'rhizomatous', 'stoloniferous'}, 'perennial organ': {'rhizome', 'stolon'}, 'indumentum': {'glabrous', 'tomentose', 'woolly'}, 'leaf architecture': {'lobed', 'sessile'}, 'leaf shape': {'lanceolate', 'oblong'}, 'leaf apex': {'lanceolate'}, 'leaf margin': {'repand'}, 'leaf arraangement': {'compound'}, 'inflorescence arrangement': {'compound', 'corymb'}, 'flower sex': {'pistillate'}, 'fruit type': {'cypsela'}, 'dispersal mode': {'anemochory'}, 'dispersion axillary': {'seed winged'}}


In [136]:
%env TOKENIZERS_PARALLELISM=(true | false)

env: TOKENIZERS_PARALLELISM=(true | false)


In [15]:
from adept.scripts.helpers import get_descriptions

In [16]:
import pandas as pd
from adept.config import RAW_DATA_DIR
from adept.traits.accdb import ACCDBTraits
from collections import Counter

traits = ACCDBTraits()

input_path = CORPUS_DIR / 'preprocessed-descriptions.csv'

columns = {}
nonunique_terms = {}
nonunique_trait_types = {}

xls = pd.ExcelFile(RAW_DATA_DIR / 'functional-trait-list.xlsx')

for sheet_name in xls.sheet_names:
    df = pd.read_excel(RAW_DATA_DIR / 'functional-trait-list.xlsx', sheet_name=sheet_name)
    taxon_group = sheet_name.replace('traits', '').strip().lower()
    columns[taxon_group] = df.columns.to_list()
    
    df = traits.get_plant_group(taxon_group)
    terms_df = traits.get_terms(taxon_group)

    # Get terms which are not unique across parts - these will need to be validated as 
    # belonging to a sentence group
    nonunique_terms[taxon_group] = terms_df[['part', 'term']].drop_duplicates().groupby('term').filter(lambda x: len(x) > 1)['term'].unique()

    nonunique_trait_types[taxon_group] = df[['trait', 'trait_type']].drop_duplicates().groupby('trait_type').filter(lambda x: len(x) > 1)['trait_type'].unique()
    


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

In [20]:
print(nonunique_trait_types)

for source, taxon, taxon_group, description in get_descriptions(input_path):

    doc = nlp(description)
    print(taxon_group)
    
    break







{'angiosperm': array(['shape', 'base', 'arrangement', 'apex', 'colour', 'structure',
       'architecture'], dtype=object), 'bryophyte': array(['surface', 'colour', 'apex', 'shape', 'base', 'architecture',
       'arrangement', 'structure', 'orientation', 'cross-section'],
      dtype=object), 'pteridophyte': array(['shape', 'surface', 'architecture', 'arrangement', 'apex', 'base',
       'margin', 'orientation', 'position', 'cross-section', 'structure',
       'consistency'], dtype=object)}




ERROR
3-
Angiosperm


In [94]:
from adept.config import ASSETS_DIR
import yaml

anatomical_parts_path = Path(ASSETS_DIR / spacy_config['file']['anatomical_parts'])

anatomical_parts = yaml.full_load(anatomical_parts_path.open())

synonyms = {syn: part for part, synonyms in anatomical_parts.items() for syn in synonyms or [] }


terms_df = traits.get_terms()

trait_ents = [ent for ent in doc.ents if ent.label_ == 'TRAIT']
taxon_group = taxon_group.lower()

df = traits.get_plant_group(taxon_group)


# TODO: DISPERSION FOR MEASURMENT
# Manual mappings of a part to a trait e.g. seed sentence will include dispersal data
trait_part_mappings = {
    'seed': [
        'dispersion axillary',
        'dispersal mode'
    ]
}

data = {}

def add_trait(name, character):    
    data.setdefault(name, set()).add(character) 

traits_overide_multiple_parts = [
    'clonality',
    'perennial organ'
]

from abc import ABC, abstractmethod



        


for ent in trait_ents:
    term = ent.text.lower().replace('-', '_')
    rows = terms_df[(terms_df.term == term) & (terms_df['Plants Group'] == taxon_group)]
    
    # Does this term have multiple parts?
    # We check unique parts, as it can have multiple rows but if they're for the
    # same part, we don't care  
    term_parts = df[df.term == term].part.unique()
        
    # Allow overiding multiple parts check     
    if row.trait in traits_overide_multiple_parts:
        term_has_multiple_parts = False
    else:
        term_has_multiple_parts = len(df[df.term == term].part.unique()) > 1
        
    # DEF VALIDATE 
    # is_valid_trait_for_sent()
            
    for row in rows.itertuples():
        
        is_nonunique_trait_types = row.trait_type in nonunique_trait_types[taxon_group]
        
        if term_has_multiple_parts or is_nonunique_trait_types:
            if ent.sent.start == 0:
                sent_part = 'plant'
            elif ent.sent._.anatomical_part:
                sent_part = ent.sent._.anatomical_part.lemma_
                if sent_part in synonyms: sent_part = synonyms[sent_part] 
            else:
            
                # print('---')
                # print('SENT: ', sent_part)
                # print(ent.sent)
                # print(row.part)
                # print(row.trait)
                # print(row.character)
                # print(ent)   
                continue
            

            trait_part = [row.part]
            if row.part in synonyms: trait_part.append(synonyms[row.part])

            if sent_part in trait_part_mappings and row.trait in trait_part_mappings[sent_part]:
                add_trait(row.trait, row.character)
                continue
            elif sent_part in trait_part:
                add_trait(row.trait, row.character)
                continue
            elif row.trait in traits_overide_multiple_parts:
                add_trait(row.trait, row.character)
                continue                
            else:
                print('--SKIP--')
                print('SENT: ', sent_part)
                print(ent.sent)
                print(row.part)
                print(row.character)
                print(ent)
#                 if row.trait == 'clonality':
#                     print(rows)
                    
#                 # print(row.character)
#                 print(ent)                 
                    

     
            
        else:
            # if 'colour'
            # print(row.trait)
            add_trait(row.trait, row.character)
#                 print('IGNORING')
# #                 pass
# #             else:
        

#             # # is_unique = term in nonunique_terms
#             # print(term_has_multiple_parts)
#             print('--')
        
    # print(len(rows))
    
    # rows = df[((df.term == term) | (df.character == term) | (df.synonym.astype(str).str.contains(term)) ) & (df['Plants Group'] == taxon_group)] 
    # print(rows)

print(data)

print(description)
    

--SKIP--
SENT:  plant
A strongly scented perennial herb with far-creeping stolons and erect, furrowed, usually simple,  woolly, short, barren stems and taller flowering stems 8-45 (-60) cm.
leaf
simple
simple
--SKIP--
SENT:  inflorescence
Inflorescence of +- dense terminal corymbs, 4-6 cm across.
gynoecium
terminal
terminal
--SKIP--
SENT:  inflorescence
Capitula >25(-50) Involucre ovoid, bracts rigid, oblong, blunt, keeled, +- glabrous, with a broad brown or blackish scarious margin.
fruit
ovoid
ovoid
--SKIP--
SENT:  inflorescence
Capitula >25(-50) Involucre ovoid, bracts rigid, oblong, blunt, keeled, +- glabrous, with a broad brown or blackish scarious margin.
leaf
oblong
oblong
--SKIP--
SENT:  inflorescence
Capitula >25(-50) Involucre ovoid, bracts rigid, oblong, blunt, keeled, +- glabrous, with a broad brown or blackish scarious margin.
fruit
brown
brown
--SKIP--
SENT:  inflorescence
Capitula >25(-50) Involucre ovoid, bracts rigid, oblong, blunt, keeled, +- glabrous, with a broad br

In [37]:
import dataclasses
from dataclasses import dataclass, field

class Postprocessor(ABC):
    
    def __init__(self, taxon_group):
        self._characters = set()
        self.taxon_group = taxon_group
        
    @property
    @abstractmethod
    def _postprocess(self, text):
        return text   

    def __call__(self, doc):
        return self._postprocess(doc)

@dataclass()
class Trait(ABC):
    
    name: str
    part: str = field(init=False)
    _part: str = field(init=False, repr=False)
    _synonyms: list[str] = field(default_factory=list, init=False)
    
    @property
    def part(self) -> str:
        return self._part

    @part.setter
    def part(self, part: str):
        self._part = part
        try:
            self._synonyms = anatomical_parts[part] or []
        except KeyError:
            self._synonyms = []
        
    def matches_part(self, part):
        return part == self.part or part in self._synonyms 
    
    @property
    @abstractmethod
    def get_value(self):
        return text     
    
@dataclass
class DiscreteTrait(Trait):
    
    trait_type: str = field(default=None, init=False) 
    _require_anatomical_part: str = field(default=True, init=False)
    _characters: set[str] = field(default_factory=set, init=False)
    
    def __post_init__(self):
        self._parse_name_part()

    @property
    def require_anatomical_part(self):
        # Only require anatomical part if there is a part
        return self._require_anatomical_part and bool(self._part)
        
    def _parse_name_part(self):
        try:
            part, trait_type = self.name.split()
        except ValueError:   
            part = None
        else:        
            if part in synonyms:
                part = synonyms[part]
            elif part not in anatomical_parts:
                part = None
            
        self.part = part            
    
    def set_character(self, character):
        self._characters.add(character)
        
    def get_value(self):
        yield self.name, self._characters       
             
@dataclass()
class MeasurementTrait(Trait):
    
    class Type(Enum):
        WIDTH = 0
        LENGTH = 1  
        HEIGHT = 2
        DEPTH = 3

    target_unit: str = field(default=None) 
    dimensions: list[Type] = field(default=(Type.WIDTH, Type.LENGTH)) 
    _data: dict = field(default_factory=dict, init=False)        
    _source_units: dict = field(default_factory=dict, init=False)

    def __post_init__(self):
        self._data = {d: {} for d in self.dimensions}
        self.part = self.name
                        
    def set_value(self, dimension_type: Type, ent: Span, unit):
        # WHAT TO DO WHEN THERE's no value         
        value = self._get_ent_value(ent)
        self._data[dimension_type] = self._get_ent_value(ent)
                
        if unit:
            self.set_source_unit(unit)
            
    def get_value(self, convert=True):
        
        if not any(self._data.values()):
            return 
        
        unpack = lambda ks: ([v for k in ks if (v := value_dict.get(k))])

        for dimension_type, value_dict in self._data.items():
            value_dict = {
                'min': min(unpack(['lower', 'from']), default=None),
                'max': max(unpack(['to', 'upper']), default=None)
            }

            if convert:
                value_dict = {
                    k: self.convert_value(v, self._source_units[dimension_type], self.target_unit) for k,v in value_dict.items()
                }
                unit = self.target_unit
            else:                
                unit = self._source_units[dimension_type]
                
            unit_symbol = unit_registry.get_symbol(str(unit))
                
            for key, value in value_dict.items():
                if value:
                    label = f'{self.part} {key}. {dimension_type.name.lower()} [{unit_symbol}]'                
                    yield label, value.m
                
    def convert_value(self, value, source_unit, target_unit):
        try:
            measurement = float(value) * source_unit 
        except (ValueError, TypeError) as e:
            # logger.error(e)
            return None
        else:
            converted_value = measurement.to(target_unit)  
            return round(converted_value, 2)

    def set_source_unit(self, unit):
        # Set the source units
        # Sometimes a dimension only has units set for the second item e.g. 1 x 2 cm
        # So if a unit isn;t available when value is set, this will update previous items            
        for dimension_type in self._data.keys():
            if dimension_type not in self._source_units:
                self._source_units[dimension_type] = unit   
        
    @staticmethod
    def _get_ent_value(ent: Span):
        if ent._.numeric_range:
            value = ent._.numeric_range
        else:
            num = [int(token.text) for token in ent if token.pos_ == 'NUM']
            value = {'from': min(num), 'to': max(num)} 
            
        return value 
    

# class DiscreteTraitsPostprocessor(Postprocessor):
    
#     def __init__(self, taxon_group):
#         super().__init__(taxon_group)
#         self.df = traits.get_terms(taxon_group)
#         self.traits = self._build_traits()
        
#     def _build_traits(self):
#         anatomical_parts_path = Path(ASSETS_DIR / spacy_config['file']['anatomical_parts'])
#         anatomical_parts = yaml.full_load(anatomical_parts_path.open())
#         synonyms = {syn: part for part, synonyms in anatomical_parts.items() for syn in synonyms or [] }

#         traits = {name: DiscreteTrait(name) for name in self.df.trait.unique()}
        
#         # Make any custom modifications         
#         traits['dispersal mode'].part = 'seed'
#         traits['dispersion axillary'].part = 'seed'
        
#         return traits
    
#     def _postprocess(self, doc):        
#         trait_ents = [ent for ent in doc.ents if ent.label_ == 'TRAIT']
#         for ent in trait_ents:
#             term = ent.text.lower().replace('-', '_')
#             rows = self.df[self.df.term == term]
                        
#             # term_has_multiple_parts = len(rows) > 1
            
#             for row in rows.itertuples():
#                 trait = self.traits[row.trait]
#                 if trait.require_anatomical_part:
#                     if not ent.sent._.anatomical_part:
#                         self._log("MISSING SENT PART", ent, trait)
#                         continue
#                     elif not trait.matches_part(ent.sent._.anatomical_part.lemma_):
#                         self._log("PART NO MATCH", ent, trait)
#                         continue
                
#                 trait.set_character(row.character)
                
#         return self.traits.values()
                
                
#     def _log(self, err, ent, trait):
#         pass
#         # print(err)
#         # print(ent)
#         # print(ent.sent)
#         # print(trait.name)
#         # print(trait.part)
        

        

        
# class MeasurementTraitsPostprocessor(Postprocessor):  
    
#     traits = {
#         'angiosperm': [
#             MeasurementTrait('plant', target_unit=unit_registry.m, dimensions=(MeasurementTrait.Type.HEIGHT,)),
#             MeasurementTrait('petal', target_unit=unit_registry.cm),
#             MeasurementTrait('root', target_unit=unit_registry.cm, dimensions=(MeasurementTrait.Type.DEPTH,)),
#             MeasurementTrait('leaf', target_unit=unit_registry.cm),
#             MeasurementTrait('dispersule', target_unit=unit_registry.cm),
#             MeasurementTrait('seed', target_unit=unit_registry.cm)
#         ]
#     }
    
#     default_dimension_types = [MeasurementTrait.Type.WIDTH, MeasurementTrait.Type.LENGTH]
        
#     def _get_trait(self, part):
#         for trait in self.traits[self.taxon_group]:
#             if trait.matches_part(part):
#                 return trait  
            
#     def _postprocess(self, doc):
#         for sent in doc.sents:

#             if sent.start == 0:
#                 sent_part = 'plant' 
#             elif sent._.anatomical_part:
#                 sent_part = sent._.anatomical_part.lemma_
#             else:
#                 # We always need a sent part for measurements                  
#                 continue

#             if sent_part and (sent._.dimensions or sent._.measurements):
#                 measurement_trait = self._get_trait(sent_part)
#                 if not measurement_trait: continue

#                 if sent._.dimensions:
#                     dimension = sent._.dimensions[0]
#                     for dimension_type, ent in zip(self.default_dimension_types, dimension.ents):
#                         measurement_trait.set_value(dimension_type, ent, ent._.measurement_unit)

#                 elif sent._.measurements:
#                     if len(sent._.measurements) == 2:
#                         for dimension_type, measurement in zip(dimension_types, sent._.measurements):
#                             measurement_trait.set_value(dimension_type, measurement, measurement._.measurement_unit)
#                     elif len(sent._.measurements) == 1:
#                         measurement = sent._.measurements[0]
#                         # FIXME - need a smarter way of identifying measurement type
#                         # Currently uses the default trait if there's one - or length if multiple                 
#                         if len(measurement_trait.dimensions) == 1:
#                             dimension_type = measurement_trait.dimensions[0]
#                         else:
#                             dimension_type = MeasurementTrait.Type.LENGTH

#                         measurement_trait.set_value(dimension_type, measurement, measurement._.measurement_unit)

#         return self.traits[self.taxon_group]
        
        
from adept.postprocess.traits.trait import Trait


@dataclass()
class CustomTrait(Trait):
    
    value: str = field(default=None) 
    
    def get_value(self):
        yield self.name, self.value
    
class CustomTraitsPostprocessor(Postprocessor): 
    
    def _postprocess(self, doc):
        # Get ents with ext trait_value, keyed by entity label          
        return [CustomTrait(name=ent.label_.lower(), value=ent._.trait_value) for ent in doc.ents if ent._.trait_value]

    
class Postprocess():    
    
    taxon_group = 'angiosperm'
    processors = [
        # DiscreteTraitsPostprocessor(taxon_group),
        # MeasurementTraitsPostprocessor(taxon_group),
        CustomTraitsPostprocessor(taxon_group)
    ]
    
    def __call__(self, doc):
        data = {}
        for processor in self.processors:
            traits = processor(doc)
            for trait in traits:
                # trait_name, value = trait.get_value()
                for trait_name, value in trait.get_value():
                    if value:
                        data[trait_name] = value
                        
        print(data)

                
        
    
postprocess = Postprocess()

postprocess(doc)


# print(description)

# DiscreteTrait('hello')

# @dataclass
# class Parent:
#     name: str
    
# @dataclass
# class DiscreteTrait(Trait):
#     trait_type: str = field(default=None, init=False) 
    
# @dataclass
# class DiscreteTrait(Trait):
    
#     trait_type: str = field(default_factory=None)    
    
# DiscreteTrait('leaf shape')    
    
# print(description)

[CustomTrait(name='stamen_count', part=<property object at 0x1473180e0>, _synonyms=[], value=1), CustomTrait(name='ploidy', part=<property object at 0x1473180e0>, _synonyms=[], value='2n = 18, 27, 36, 45, 54, 63, 72')]
{'stamen_count': 1, 'ploidy': '2n = 18, 27, 36, 45, 54, 63, 72'}


In [180]:
import numpy as np
np.empty((4,1))

array([[3.],
       [0.],
       [4.],
       [0.]])

In [154]:
from abc import ABC, abstractmethod

df = traits.get_terms(taxon_group)

# Get terms which are not unique across parts - these will need to be validated as 
# belonging to a sentence group
# nonunique_terms[taxon_group] = df[['part', 'term']].drop_duplicates().groupby('term').filter(lambda x: len(x) > 1)['term'].unique()

anatomical_parts_path = Path(ASSETS_DIR / spacy_config['file']['anatomical_parts'])
anatomical_parts = yaml.full_load(anatomical_parts_path.open())
synonyms = {syn: part for part, synonyms in anatomical_parts.items() for syn in synonyms or [] }

# nonunique_characters[taxon_group] = df[['trait', 'trait_type']].drop_duplicates().groupby('trait_type').filter(lambda x: len(x) > 1)['trait_type'].unique()

# print(anatomical_parts)

class Character():
    
    def __init__(self, name): 
        
        self.name = name     
        self.trait_type = None
        self.part = None
        self.synonyms = None
        self._require_anatomical_part = True        
        self._parse_part_from_name(name)

    @property
    def require_anatomical_part(self):
        # Only require anatomical part if there is a part         
        return self._require_anatomical_part and bool(self.part)
        
    def _parse_part_from_name(self, name):
        try:
            part, trait_type = name.split()
        except ValueError:   
            return
        
        if part in synonyms:
            part = synonyms[part]
            
        self.set_part(part)
        self.trait_type = trait_type

    def set_part(self, part):            
        if part in anatomical_parts:
            self.synonyms = anatomical_parts[part]
            self.part = part            
            
    def __repr__(self):
        name = f'{self.name} [{self.part}]' if self.part else self.name
        return f'Character({name})'            

characters = {name: Character(name) for name, group in df.groupby(df.trait)}

# # Set require_anatomical_part to false for some characters
# for char in [
#     'life form',
#     'habitat',
#     'habit',
#     'clonality',
#     'perennial organ'
# ]: characters[char].require_anatomical_part = False

# And manually modify the part for others

characters['dispersal mode'].set_part('seed')
characters['dispersion axillary'].set_part('seed')


for char in characters.values():
    # if not char.part:
    print(char.require_anatomical_part)

False
True
True
False
True
True
True
True
True
True
True
True
True
True
True
True
True
True
False
True
False
False
False
True
True
True
True
True
True
True
True
True
True
True
False
False
False
True
True
True
False
False
False
False
False
True
True
False
True
False
False


In [360]:
for ent in doc.ents:
    print(ent._.trait_value)
    

None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
None
