In [1]:
from collections.abc import Sequence, Iterable
import json
from random import sample

from tqdm.notebook import tqdm

from analyser import (BaseAnalyser,
                      punctuation_analyser,
                      SpacingEntity, spacing_analyser,
                      integer_analyser,
                     )
from dag import EntitiesDAG, BaseEntity, ConnectingEntity, TextEntity
from tokenization import text_to_tokens
from word_analyser import WordAnalyser, WordEntity

In [2]:
word_analyser = WordAnalyser.from_json('data/word_analyser.json')

# NameAnalyser implementation

Having info about words which are parts of personal name, such are first names, surnames, patronimics, let's build analyser which would search for chains like that:
```
           first_name >> patronimic
surname >> first_name >> patronimic
           first_name >> patronimic >> surname
surname >> first_name
           first_name >> surname
```
And also for surnames with initials (for simplicity initials here are only single letter + period repeated twice)

Analyser triggers on word and checks if there are chains defined by `LookupRule`'s. Each `LookupRule` contains info about required type of entity and guard function which takes entity and returns `None` if it doesn't match or numerical matching score if there's a match. For example, `surname_rule` returns score 2 if surname starts with capital letter and 1 for other surnames.<br>
Then if full chain is found analyser creates new `PersonNameEntity` for it, which also contains likelihood (product of matching scores for matched entities).<br>
Analyser's lookup implemented in a way that allows intermediate entities within the chain (for example spaces and connecting entities)

In [3]:
class TextBasedEntity(BaseEntity):
    def __init__(self, text_content: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.features['text_content'] = text_content

    @property
    def text_content(self):
        return self.features['text_content']

class PersonNameEntity(TextBasedEntity):
    def __init__(self, likelihood, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.features['likelihood'] = likelihood

    def __str__(self):
        return f'Name<{self.text_content}|{self.features["likelihood"]:.2g}>'


def first_name_guard(entity: WordEntity):
    if entity.features.get('person_name_part') == 'first_name':
        if entity.text_content[0].isupper():
            return 2
        return 1

def surname_guard(entity: WordEntity):
    if entity.features.get('person_name_part') == 'surname':
        if entity.text_content[0].isupper():
            return 2
        return 1

def patronimic_guard(entity: WordEntity):
    if entity.features.get('person_name_part') == 'patronimic':
        if entity.text_content[0].isupper():
            return 2
        return 1

def letter_guard(entity: TextEntity):
    if len(entity.text_content) == 1 and entity.text_content.isalpha():
        if entity.text_content[0].isupper():
            return 2
        return 1

def period_guard(entity: TextEntity):
    if entity.text_content == '.':
        return 1


class LookupRule:
    def __init__(self, trigger_types, guard_func):
        self.trigger_types = tuple(trigger_types)
        self.guard_func = guard_func


class PersonNameAnalyser(BaseAnalyser):
    def __init__(self):
        super().__init__(trigger_on_instances=[WordEntity])

    def trigger(self, dag_entity: WordEntity):
        patronimic_likelihood = patronimic_guard(dag_entity)
        
        surname_rule = LookupRule(trigger_types=[WordEntity], guard_func=surname_guard)
        first_name_rule = LookupRule(trigger_types=[WordEntity], guard_func=first_name_guard)
        letter_rule = LookupRule(trigger_types=[TextEntity], guard_func=letter_guard)
        period_rule = LookupRule(trigger_types=[TextEntity], guard_func=period_guard)
        
        allowed_intermediate = (ConnectingEntity, SpacingEntity)
        
        if patronimic_likelihood is not None:
            #            first_name >> patronimic
            # surname >> first_name >> patronimic
            #            first_name >> patronimic >> surname
            for matched, likelihood in self.lookup(ref_entity=dag_entity,
                                                   rules_chain=[first_name_rule],
                                                   position='before',
                                                   allowed_intermediate=allowed_intermediate):
                # first_name >> patronimic
                self.embed_result(matched, likelihood*patronimic_likelihood)
                for matched2, likelihood2 in self.lookup(ref_entity=matched[0],
                                                         rules_chain=[surname_rule],
                                                         position='before',
                                                         allowed_intermediate=allowed_intermediate):
                    # surname >> first_name >> patronimic
                    self.embed_result(matched2 + matched[1:],
                                      likelihood2*likelihood*patronimic_likelihood)
                for matched2, likelihood2 in self.lookup(ref_entity=dag_entity,
                                                         rules_chain=[surname_rule],
                                                         position='after',
                                                         allowed_intermediate=allowed_intermediate):
                    # first_name >> patronimic >> surname
                    self.embed_result(matched + matched2[1:],
                                      likelihood2*likelihood*patronimic_likelihood)
        
        surname_likelihood = surname_guard(dag_entity)
        if surname_likelihood is not None:
            initials_chain = [letter_rule, period_rule, letter_rule, period_rule]
            # surname L.L.
            for matched, likelihood in self.lookup(ref_entity=dag_entity,
                                                   rules_chain=initials_chain,
                                                   position='after',
                                                   allowed_intermediate=allowed_intermediate):
                self.embed_result(matched, likelihood*surname_likelihood)
            # L.L. surname
            for matched, likelihood in self.lookup(ref_entity=dag_entity,
                                                   rules_chain=initials_chain,
                                                   position='before',
                                                   allowed_intermediate=allowed_intermediate):
                self.embed_result(matched, likelihood*surname_likelihood)
            # surname >> first_name
            for matched, likelihood in self.lookup(ref_entity=dag_entity,
                                                   rules_chain=[first_name_rule],
                                                   position='after',
                                                   allowed_intermediate=allowed_intermediate):
                self.embed_result(matched, likelihood*surname_likelihood)
            # first_name >> surname
            for matched, likelihood in self.lookup(ref_entity=dag_entity,
                                                   rules_chain=[first_name_rule],
                                                   position='before',
                                                   allowed_intermediate=allowed_intermediate):
                self.embed_result(matched, likelihood*surname_likelihood)

    def lookup(self, ref_entity: BaseEntity,
               rules_chain: Iterable[LookupRule],  # TODO: fix Iterable is not reversable 
               position: str = 'after',
               allowed_intermediate: tuple[BaseEntity] = (ConnectingEntity,)):
        to_search = ref_entity.next_entities
        if position == 'before':
            to_search = ref_entity.previous_entities
            rules_chain = reversed(rules_chain)
        
        to_check = [([ref_entity, i], rules_chain, 1) for i in to_search]
        checked = {i for i in to_search}
        while len(to_check) > 0:
            (*head, entity), (rule, *rules_tail), cur_likelihood = to_check.pop(0)
            if isinstance(entity, rule.trigger_types):
                likelihood = rule.guard_func(entity)
                if likelihood is not None:
                    # Matched element of the chain
                    cur_likelihood *= likelihood
                    if len(rules_tail) == 0:
                        # Reach end of the rules_chain
                        if position == 'after':
                            yield [*head, entity], cur_likelihood
                        elif position == 'before':
                            yield [entity, *head[::-1]], cur_likelihood
                    else:
                        to_search = entity.next_entities
                        if position == 'before':
                            to_search = entity.previous_entities
                        for i in to_search:
                            if not i in checked:
                                checked.add(i)
                            to_check.append(([*head, entity, i], rules_tail, cur_likelihood))
            if isinstance(entity, allowed_intermediate):
                to_search = entity.next_entities
                if position == 'before':
                    to_search = entity.previous_entities
                for i in to_search:
                    to_check.append(([*head, entity, i], [rule, *rules_tail], cur_likelihood))

    def embed_result(self, matched_dag_entities: Sequence[BaseEntity], likelihood):
        text = ''.join(i.text_content for i in matched_dag_entities
                       if isinstance(i, (TextEntity, WordEntity, SpacingEntity)))
        new_entity = PersonNameEntity(likelihood=likelihood, text_content=text)
        new_entity.features['matched_entities'] = list(matched_dag_entities)
        for i in matched_dag_entities[0].previous_entities:
            i.add_next(new_entity)
        for i in matched_dag_entities[-1].next_entities:
            new_entity.add_next(i)
        for i in matched_dag_entities:
            i.part_of.append(new_entity)

In [4]:
person_name_analyser = PersonNameAnalyser()

In [5]:
input_text = 'Мельниченко Иван Иванович'
# input_text = 'Зубенко Михаил Петрович'
# input_text = 'Михаил Петрович Зубенко'
# input_text = 'Зубенко р.П. Мельниченко З.  П.'
# input_text = '''Борейчук Максим Петрович'''
tokens = text_to_tokens(input_text)
dag = EntitiesDAG(tokens)
spacing_analyser.analyse(dag)
word_analyser.analyse(dag)
person_name_analyser.analyse(dag)
dag.pprint()

•Мельниченко• •Иван• •Иванович•
                    ␣ Word<Иванович|иванович>
               Word<Иван|иван>
             ␣ Name<Иван Иванович|4>
 Word<Мельниченко|мельниченко>
 Word<Мельниченко|мельниченко>
 Name<Мельниченко Иван Иванович|8>
 Name<Мельниченко Иван|4>


# Test on real-world data

To test that idea generally works I've applied pipeline to some messages from public social media groups and printed matches of `PersonNameEntity`'s. Results shows that analyser matches personal names. From quik overview I'd say it gives low amount of false positives, but by design due to the limited vocabulary it gives quite a lot of false negatives.

So far I haven't performed any numerical evaluations on available labeled datasets.

In [6]:
with open('data/messages_list.json') as f:
    messages_list = json.load(f)
texts = [i['text'] for i in messages_list[:15_000]]

In [7]:
sample(texts, 5)

['В вашем ассортименте топовое 8 общежитие \nЗа 3 и 6 не шарю',
 'Ти шо тут робиш',
 'Спиш?',
 'Слінченко, Слінченко!!!!',
 'F']

In [8]:
analysers = [spacing_analyser, word_analyser, person_name_analyser]

for text in tqdm(texts):
    tokens = text_to_tokens(text)
    dag = EntitiesDAG(tokens)
    for a in analysers:
        a.analyse(dag)
    
    smth_printed = False
    dag_printed = False
    to_print = []
    for i in dag:
        if isinstance(i, PersonNameEntity):
            if i.features['likelihood'] >= 4:
                print(i)
                smth_printed = True
    if smth_printed:
        print()

  0%|          | 0/15000 [00:00<?, ?it/s]

Name<Тараса Шевченка|4>

Name<Карла Маркса|4>

Name<Карпенко А.М.|8>

Name<Коваленко Максим|4>

Name<Эдуард Петрович|4>
Name<Эдуард Петрович|4>
Name<Эдуард Петрович|4>

Name<Роман Богданович|4>
Name<Роман Богданович|4>

Name<Паша Коваль|4>

Name<Виталий Андреевич|4>

Name<Степанов Денис|4>

Name<Микола Тарасович|4>
Name<Андрійчук Микола Тарасович|8>
Name<Андрійчук Микола|4>

Name<Гринь А.Р.|8>
Name<Білоконь В.П.|8>

Name<Пушик Максим|4>

Name<Андрей Анатольевич|4>



P.S. it prints captured names with likelihood scores of that match. Also by design even when capturing full name like "Андрійчук Микола Тарасович" it also captures parts of that name like "Микола Тарасович" and "Андрійчук Микола", that may be usefull if full capture is erroneous