In [None]:
import pandas as pd

# Add the parent directory of the 'utils' directory to the Python path
import sys
import os 
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))
 

import utils.utils as u
import utils.utils_people as up

-------------------------------------------------------------------------------------------------------------------

# Rule-based spaCy model for Named Entity Recognition

-------------------------------------------------------------------------------------------------------------------

#### Load dataset of unresolved entries

In [None]:
dataset_DF = pd.read_json('../data_catastici/data_post-processing/pipeline_steps/catastici_1741_step5.json')

dataset_unresolved_DF = dataset_DF[dataset_DF['owner_code'] == 'TODO'].copy()
dataset_unresolved_DF.loc[:, 'owner_name'] = dataset_unresolved_DF['owner_name'].map(lambda x: u.text_to_minimal((x)))
print("There are", len(dataset_unresolved_DF), "unresolved parcel entries.")

In [None]:
dataset_unresolved_DF.head()

In [None]:
to_drop = [
    "owner_code",
    "owner_count",
    "owner_count_remark",
    "owner_title",
    "owner_title_std",
    "owner_first_name",
    "owner_family_name",
    "owner_family_group",
    "owner_mestiere",
    "owner_entity",
    "ten_name",
    "function",
    "an_rendi",
    "id_napo",
    "quantity_income",
    "quality_income",
    "author",
    "place",
    "parish",
    "sestiere",
    "uid",
    "path_img"
]

owner_entries_DF = dataset_unresolved_DF.drop(columns = to_drop)
owner_entries_DF.head()

#### Load necessary dictionaries

In [None]:
family_names = sorted(pd.read_json('../dictionaries/PPL_dictionary/family_names.json')[0].to_list(), key=len, reverse=True)
family_names_comp = sorted(pd.read_json('../dictionaries/PPL_dictionary/family_names_comp.json')[0].to_list(), key=len, reverse=True)
first_names = sorted(pd.read_json('../dictionaries/PPL_dictionary/first_names.json')[0].to_list(), key=len, reverse=True)
mestieri = sorted(pd.read_json('../dictionaries/PPL_dictionary/mestieri.json')[0].to_list(), key=len, reverse=True)
titles = sorted(pd.read_json('../dictionaries/PPL_dictionary/titles.json')[0].to_list(), key=len, reverse=True)
unknown_relatives = sorted(pd.read_json('../dictionaries/PPL_dictionary/unknown_relatives.json')[0].to_list(), key=len, reverse=True)
unknown_relatives_sing = sorted(pd.read_json('../dictionaries/PPL_dictionary/unknown_relatives_sing.json')[0].to_list(), key=len, reverse=True)
unknown_relatives_plur = sorted(pd.read_json('../dictionaries/PPL_dictionary/unknown_relatives_plur.json')[0].to_list(), key=len, reverse=True)
entities = sorted(pd.read_json('../dictionaries/ENT_dictionary/entities.json')[0].to_list(), key=len, reverse=True)
ignore_next = sorted(pd.read_json('../dictionaries/MSC_dictionary/ignore_next.json')[0].to_list(), key=len, reverse=True) 
separators = sorted(pd.read_json('../dictionaries/MSC_dictionary/separators.json')[0].to_list(), key=len, reverse=True)

## Format owner names

Formatting is useful to remove all text that is surely not related to the owner name. In particular, this text is everything in people-owned parcels that comes after a "ignore_next" expression. 

In [None]:
def format_owner_name(owner_name, use_minimal=False):
    if u.has_multiple_owners(owner_name, separators, use_minimal):
        owner_text_formatted = owner_name
        
        # don't apply ignore next for entities - people only
        if not u.string_contains_one_of_substrings(owner_name, entities, use_minimal):
            formatted_names = []
            split_owner_name = owner_name
            
            for separator in separators:
                if separator in split_owner_name:
                    split_owner_name = split_owner_name.replace(separator, " || ")
            
            for owner in split_owner_name.split(" || "):
                formatted_owner = up.remove_ignore_next(owner.strip(), ignore_next, family_names)
                formatted_names.append(formatted_owner)
        
            owner_text_formatted = ",".join(formatted_names)
    else:
        owner_text_formatted = up.remove_ignore_next(owner_name.strip(), ignore_next, family_names)
        
    return owner_text_formatted


In [None]:
owner_entries_DF['owner_name'] = owner_entries_DF['owner_name'].apply(lambda x: format_owner_name(x))

## spaCy

In [None]:
import spacy
from spacy.lang.it import Italian
from spacy.pipeline import EntityRuler
import json

In [None]:
def load_data(file):
    with open(file, 'r', encoding="utf-8") as f:
        data = json.load(f)
    return (data)

def save_data(file, data):
    with open(file, 'w', encoding="utf-8") as f:
        json.dump(data, f)

In [None]:
def create_name_patterns():
    name_patterns = []
    for ln in family_names:
        for fn in first_names:
            if fn != ln:
                name_patterns.append(f"{fn} {ln}")
                name_patterns.append(f"{ln} {fn}")
                
    name_patterns = list(set(name_patterns))
    return name_patterns

In [None]:
def create_reference_data(type):
    data = []
    if type == "FIRST_AND_LAST_NAME":
        data = create_name_patterns()
    elif type == "TITLE":
        data = titles
    elif type == "MESTIERE":
        data = mestieri
    elif type == "UNKNOWN_RELATIVES_S":
        data = unknown_relatives_sing
    elif type == "UNKNOWN_RELATIVES_P":
        data = unknown_relatives_plur
    elif type == "ENT":
        data = entities
    elif type == "LAST_NAME":
        data = family_names
    elif type == "FIRST_NAME":
        data = first_names
    elif type == "SEPARATOR":
        data = separators
        
    patterns = []
    for item in data:
        pattern = { "label" : type, "pattern" : item }
        patterns.append(pattern)
    
    return sorted(patterns, key=lambda x: x['pattern'])

In [None]:
patterns = []
patterns.extend(create_reference_data("ENT"))
patterns.extend(create_reference_data("FIRST_AND_LAST_NAME"))
patterns.extend(create_reference_data("LAST_NAME"))
patterns.extend(create_reference_data("FIRST_NAME"))
patterns.extend(create_reference_data("TITLE"))
patterns.extend(create_reference_data("MESTIERE"))
patterns.extend(create_reference_data("SEPARATOR"))
patterns.extend(create_reference_data("UNKNOWN_RELATIVES_S"))
patterns.extend(create_reference_data("UNKNOWN_RELATIVES_P"))

### NER Model spaCy

In [None]:
nlp = Italian()

def generate_ruler_model(patterns):
    entity_ruler = EntityRuler(nlp, overwrite_ents=True)
    entity_ruler = nlp.add_pipe("entity_ruler")
    entity_ruler.initialize(lambda: [], nlp=nlp, patterns=patterns)
    
generate_ruler_model(patterns)

# =====================================================================

In [None]:
def test_model(model, text):
    doc = model(text)
    results = []
    for ent in doc.ents:
        results.append((ent.label_, ent.text, ent.start_char, ent.end_char))
    return (results)

In [None]:
all_owners = []
for index, row in owner_entries_DF.iterrows():
    all_owners.append(row)
    
found_patterns = []
for o in all_owners:
    owner = u.text_to_minimal(o['owner_name'])
    
    results = test_model(nlp, owner)
    if len(results) > 0:    
        entry = {
            'uidx': o['uidx'],
            'owner_text': o['owner_name'],
            'owner_text_minimal': owner
        }
        
        i = 1
        for r in results:
            entry[f"pattern_{i}"] = r
            i+=1
            
        found_patterns.append(entry)
       
save_data("model_output_step6.json", found_patterns) 

### Create data structure of unresolved patterns

In [None]:
dataset_unresolved_patterns = pd.read_json('model_output_step6.json')
dataset_unresolved_patterns.head()

In [None]:
unresolved_patterns = []

In [None]:
for index, row in dataset_unresolved_patterns.iterrows():
    
    uidx = row['uidx']    
    owner_text = u.remove_extra_spaces(row['owner_text'])
    patterns = []
    
    i = 1
    pattern = 'pattern_1'
    while pattern in row and isinstance(row[pattern], list):
        patterns.append({
            "label": row[pattern][0],
            "pattern": row[pattern][1],
            "start_idx": row[pattern][2],
            "end_idx": row[pattern][3]
        })
        i += 1
        pattern = f"pattern_{str(i)}"
    
    unresolved_patterns.append({
        "uidx": uidx,
        "owner_text": owner_text,
        "patterns": patterns    
    })

In [None]:
unresolved_patterns_DF = pd.DataFrame(unresolved_patterns)
unresolved_patterns_DF

### ⚠️⚠️⚠️ Write the updated dataset to file

In [None]:
unresolved_patterns_DF.to_json('../data_catastici/data_post-processing/pipeline_steps/catastici_1741_step6_patterns.json', orient='records', index=['uidx'])

In [None]:
import time
print("Current time:", time.strftime("%H:%M:%S", time.localtime()))