Il progetto trae ispirazione dalle tecniche sviluppate da Pangeanic e dal suo prodotto commerciale. Sebbene non ci sia l'intenzione di competere direttamente con Pangeanic, il loro approccio ha fornito preziosi spunti e chiarezza sulle possibili soluzioni per il problema del "NER Tagging e Anonimizzazione".

[Link alle soluzioni di Pangeanic per l'anonimizzazione](https://pangeanic.com/it/soluzioni-nlp/anonimizzazione)

al momento il codice gestisce i tag PER , ORG , LOC e MISC.
le entità di tipo PER ed ORG vengono riconosciute, accorpate e sostituite con un randomico iperinomio, viene gestito inoltre, tramite il tag nel testo, una molteplice occorrenza dell'entità che viene indicata con il numero assegnato.
Per quel che riguarda le LOC, vengono sostituiti dal tag generico [Location], sempre numerato, ma è possibile aggiungere più dettagli.
Il MISC invece viene solo segnalato al redattore, in quanto un tag generico non ha uno standard di gestione.

In [180]:
import pandas as pd
import numpy as np
import pickle


def read_tagging(file_name):
    path = "../data/" + "it" + "/tagging/" + file_name + ".conllu"
    data = pd.read_csv(path, sep="\t", quoting=3, names=["POSITION", "WORD", "TAG"])
    return data



In [181]:
import random

# Funzione di [Sostituzione] per ottenere o generare un pseudonimo per un'entità di tipo PERSON, ORGANIZATION o LOCATION
def get_pseudonym(
    entity_type, entity, pseudonym_counters, pseudonyms, generic_synonyms_or_hypernyms
):

    if entity_type in [
        "PER",
        "ORG",
        "LOC",
    ]:  # Verifica se l'entità è di uno dei tipi gestiti
        if entity in pseudonyms:  # Verifica se l'entità ha già un pseudonimo assegnato
            return pseudonyms[entity]  # Restituisce l'pseudonimo già assegnato
        else:
            # Se l'entità non ha ancora un pseudonimo, genera uno nuovo
            if entity not in pseudonym_counters[entity_type]:
                # Assegna un numero incrementale come identificativo dell'entità
                pseudonym_counters[entity_type][entity] = (
                    len(pseudonym_counters[entity_type]) + 1
                )
            # Costruisce l'pseudonimo con il formato corretto
            pseudonym = (
                f"[{entity_type.capitalize()}{pseudonym_counters[entity_type][entity]}]"
            )
            # Se è disponibile un elenco di sinonimi generici per il tipo di entità, aggiunge un sinonimo casuale
            if entity_type in generic_synonyms_or_hypernyms:
                pseudonym += (
                    f" {random.choice(generic_synonyms_or_hypernyms[entity_type])}"
                )
            # Memorizza l'pseudonimo per l'entità per evitare duplicazioni
            pseudonyms[entity] = pseudonym
            return pseudonym
    else:
        return entity  # Restituisce l'entità originale se non è di un tipo gestito

# Funzione di Maschera [Parola --> P****a]
def anonymize_mask(entity, tag, entity_counters):
    if tag not in entity_counters:
        entity_counters[tag] = {}

    if entity not in entity_counters[tag]:
        entity_counters[tag][entity] = len(entity_counters[tag]) + 1

    count = entity_counters[tag][entity]

    if len(entity) > 2:
        masked_entity = f"{entity[0]}{'*' * (len(entity) - 2)}{entity[-1]}"
    else:
        masked_entity = "*" * len(entity)

    return f"[{tag.capitalize()}{count}]{masked_entity}"


# Funzione di Lacuna [Parola --> ________]
def anonymize_gap(entity, tag, entity_counters):
    # Inizializziamo il contatore per il tipo di entità se non esiste
    if tag not in entity_counters:
        entity_counters[tag] = {}

    # Incrementiamo il contatore per l'entità specifica se è la prima volta che la incontriamo
    if entity not in entity_counters[tag]:
        entity_counters[tag][entity] = len(entity_counters[tag]) + 1

    # Costruiamo la rappresentazione della lacuna con "_" di dimensione fissa 8
    gap_entity = "_" * 8

    # Restituiamo la stringa formattata con il tag e l'indice dell'entità
    return f"[{tag.capitalize()}{entity_counters[tag][entity]}]{gap_entity}"

def get_user_choice():

    technique_mapping = {
    1: "sostituzione",
    2: "lacuna",
    3: "mascheramento"
    }
    
    print("Scegli la tecnica di anonimizzazione:")
    print("1. Sostituzione")
    print("2. Lacuna")
    print("3. Mascheramento")
    choice = input("Inserisci il numero della tecnica scelta: ")
    int_choice = int(''.join(map(str, choice)))
    print("")

    # Verifica se la scelta è valida
    if int_choice not in technique_mapping:
     raise ValueError("Scelta non valida. Inserisci 1, 2 o 3.")
    
    return int_choice, technique_mapping, choice

In [182]:
import pandas as pd

def anonymize_document(ner_df, generic_synonyms_or_hypernyms, technique):

    # Conversione di tutte le parole a stringhe
    ner_df["WORD"] = ner_df["WORD"].astype(str)

    # Dizionario che tiene traccia del numero di pseudonimi già assegnati per ogni tipo di entità
    pseudonym_counters = {
        "PER": {},  # Contatore per le entità di tipo PERSON
        "ORG": {},  # Contatore per le entità di tipo ORGANIZATION
        "LOC": {},  # Contatore per le entità di tipo LOCATION
        "MISC": 1,  # Contatore per le entità di tipo MISC (inizia da 1)
    }

    # Dizionario che mappa gli pseudonimi già assegnati a ciascuna entità
    pseudonyms = {}

    entity_counters = {}

    anonymized_text = ""
    current_tag = None
    buffer = []

    for index, row in ner_df.iterrows():
        word = row["WORD"]
        tag = row["TAG"]

        if tag.startswith("B-"):
            if buffer:
                if current_tag in ["PER", "ORG", "LOC"]:

                    if technique == "1":
                        pseudonym = get_pseudonym(
                            current_tag,
                            " ".join(buffer),
                            pseudonym_counters,
                            pseudonyms,
                            generic_synonyms_or_hypernyms,
                        )
                        anonymized_text += "[" + pseudonym + "]" + " "

                    elif technique == "2":  # Lacuna
                        anonymized_text += (
                            "["
                            + anonymize_gap(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )

                    elif technique == "3":  # Mascheramento
                        anonymized_text += (
                            "["
                            + anonymize_mask(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )

                elif current_tag == "MISC":
                    if technique == "2":
                        anonymized_text += (
                            "["
                            + anonymize_gap(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )
                    else:
                        anonymized_text += (
                            "["
                            + anonymize_mask(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )
                else:
                    anonymized_text += " ".join(buffer) + " "

                buffer = []

            current_tag = tag[2:]
            buffer.append(word)

        elif tag.startswith("I-") and current_tag == tag[2:]:
            buffer.append(word)
        else:
            if buffer:
                if current_tag in ["PER", "ORG", "LOC"]:
                    if technique == "1":
                        pseudonym = get_pseudonym(
                            current_tag,
                            " ".join(buffer),
                            pseudonym_counters,
                            pseudonyms,
                            generic_synonyms_or_hypernyms,
                        )
                        anonymized_text += "[" + pseudonym + "]" + " "

                    elif technique == "2":  # Lacuna
                        anonymized_text += (
                            "["
                            + anonymize_gap(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )

                    elif technique == "3":  # Mascheramento
                        anonymized_text += (
                            "["
                            + anonymize_mask(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )

                elif current_tag == "MISC":
                    if technique == "2":
                        anonymized_text += (
                            "["
                            + anonymize_gap(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )
                    else:
                        anonymized_text += (
                            "["
                            + anonymize_mask(
                                " ".join(buffer), current_tag, entity_counters
                            )
                            + "]"
                            + " "
                        )
                else:
                    anonymized_text += " ".join(buffer) + " "
                buffer = []
                current_tag = None

            if word in [".", ",", "!", "?"]:
                anonymized_text = anonymized_text.rstrip() + word + " "
            else:
                anonymized_text += word + " "

    if buffer:
        if current_tag in ["PER", "ORG", "LOC"]:

            if technique == "1":
                pseudonym = get_pseudonym(
                    current_tag,
                    " ".join(buffer),
                    pseudonym_counters,
                    pseudonyms,
                    generic_synonyms_or_hypernyms,
                )
                anonymized_text += "[" + pseudonym + "]" + " "

            elif technique == "2":  # Lacuna
                anonymized_text += (
                    "["
                    + anonymize_gap(" ".join(buffer), current_tag, entity_counters)
                    + "]"
                    + " "
                )

            elif technique == "3":  # Mascheramento
                anonymized_text += (
                    "["
                    + anonymize_mask(" ".join(buffer), current_tag, entity_counters)
                    + "]"
                    + " "
                )

        elif current_tag == "MISC":
            if technique == "2":
                anonymized_text += (
                    "["
                    + anonymize_gap(" ".join(buffer), current_tag, entity_counters)
                    + "]"
                    + " "
                )
            else:
                anonymized_text += (
                    "["
                    + anonymize_mask(" ".join(buffer), current_tag, entity_counters)
                    + "]"
                    + " "
                )
        else:
            anonymized_text += " ".join(buffer) + " "

    anonymized_text = anonymized_text.strip()

    return anonymized_text

Valutazione delle performance

parametria classica come accuratezza, precisione(Di quelli trovati quali lo sono effettivamente) e recall(quanti non me ne sono scappati e sono passati in chiaro). [Visto che la qualità e robustezza dell'anonimizzazione dipende direttamente da quella del tagging, sarà su questo processo che avverrà l'analisi delle performance]

In [183]:
#conversione di un dataframe che contine le frasi etichettate 
#in un dataframe che contiene solo le entità etichettate nella forma di quadrupe (tag, sentence number, start index, end index)
def extract_entities_from_dataframe(dataframe):
    entity_spans = []

    current_entity_span = None
    current_sentence_index = 0

    for index, row in dataframe.iterrows():
        word = row['WORD']
        tag = row['TAG']
        position = row['POSITION']

        if position == 0:  # Inizio di una nuova frase
            current_sentence_index += 1

        if tag != 'O':
            if tag.startswith('B-'):
                # Se inizia una nuova entità, chiudi quella corrente e inizia una nuova
                if current_entity_span is not None:
                    entity_spans.append(current_entity_span)
                current_entity_span = {'Tag': tag[2:], 'Sentence Number': current_sentence_index}
                current_entity_span['Start Index'] = index
                current_entity_span['End Index'] = index
            elif tag.startswith('I-'):
                # Aggiungi la parola all'entità corrente
                if current_entity_span is not None:
                    current_entity_span['End Index'] = index
            else:
                print("Errore: Tag non riconosciuto.")

        else:
            # Se il tag è "O" ma siamo all'interno di una serie di tag non "O", chiudi l'entità corrente
            if current_entity_span is not None:
                entity_spans.append(current_entity_span)
                current_entity_span = None

    # Aggiungo l'ultima entità se presente
    if current_entity_span is not None:
        entity_spans.append(current_entity_span)

    # Creazione del dataframe di output
    output_data = {'Tag': [], 'Sentence Number': [], 'Start Index': [], 'End Index': []}
    for entity_span in entity_spans:
        output_data['Tag'].append(entity_span['Tag'])
        output_data['Sentence Number'].append(entity_span['Sentence Number'])
        output_data['Start Index'].append(entity_span['Start Index'])
        output_data['End Index'].append(entity_span['End Index'])

    output_df = pd.DataFrame(output_data)

    return output_df

In [184]:
def calculate_accuracy(system_df, golden_df):
    # Unione i due dataframe per confrontare i tag
    merged_df = pd.merge(system_df, golden_df, left_index=True, right_index=True, suffixes=('_system', '_golden'))
    # Conteggio di tag corrispondenti uguali
    correct_tags = (merged_df['TAG_system'] == merged_df['TAG_golden']).sum()
    accuracy = correct_tags / len(system_df)
    accuracy_percent = round(accuracy * 100, 1)
    return accuracy_percent

def calculate_precision_recall(predicted_df, golden_df):
    # Unione delle entità predette e delle entità corrette
    merged_df = pd.merge(predicted_df, golden_df, how='outer', indicator=True)
    # Calcolo dei true positives (TP), false positives (FP) e false negatives (FN)
    TP = merged_df[(merged_df['_merge'] == 'both')].shape[0]
    FP = merged_df[(merged_df['_merge'] == 'right_only')].shape[0]
    FN = merged_df[(merged_df['_merge'] == 'left_only')].shape[0]

    fn_list = merged_df[((merged_df['_merge'] == 'left_only'))].iloc[:, :3]

    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    precision_percent = round(precision * 100, 1)
    recall_percent = round(recall * 100, 1)

    return precision_percent, recall_percent, FN,  fn_list


In [185]:

from datetime import datetime

dataset_size = 28000
technique_choice, technique_mapping, choice = get_user_choice()
technique = technique_mapping[technique_choice]

current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

ner_df = read_tagging("viterbi_tag")[:dataset_size]
golden_df = read_tagging("golden_tag")[:dataset_size]

gold_quadruples_df = extract_entities_from_dataframe(golden_df)
ner_quadruples_df = extract_entities_from_dataframe(ner_df)

precision, recall, nfailed, listfailed = calculate_precision_recall(ner_quadruples_df,gold_quadruples_df)

# Dizionario di sinonimi o iperonimie generici per PERSON e ORGANIZATION
generic_synonyms_or_hypernyms = {
    "PER": ["il soggetto", "l'individuo", "la persona", "il cittadino"],
    "ORG": ["la compagnia", "l'azienda", "l'entità", "l'organizzazione"],
    "LOC": ["Lo Stato", "Il Paese", "La Zona", "il Circondario"],
    # è possibile aggiungere elementi al dizionario
}

golden_anonymized_document = anonymize_document(golden_df, generic_synonyms_or_hypernyms, choice)
vit_anonymized_document = anonymize_document(ner_df, generic_synonyms_or_hypernyms, choice)
print(vit_anonymized_document)

with open("Results.txt", "a", encoding="utf-8") as file:
 file.write("---HMM Taggin Anonimizzation------"+'\n')
 file.write(current_date+ " | tecnica selezionata: "+ technique)
 file.write('\n')
 file.write('\n')
 file.write(vit_anonymized_document)
 file.write('\n')
 file.write("______________________________________________"+'\n')
 file.write("accuratezza del tagging: "+str(calculate_accuracy(ner_df,golden_df))+"%"+'\n')
 file.write("Precisione del Tagging: "+str(precision)+"%"+'\n')
 file.write("Recall del tagging: " +str(recall)+"%"+'\n')
 file.write("______________________________________________"+'\n')
 file.write('\n')
 file.write("Attenzione, l'anonimizzazione automatica non ha elaborato "+str(nfailed)+" Elementi")
 file.write("lista delle entità non elaborate automaticamente:")
 file.write(pd.DataFrame.to_string(listfailed))
 file.write('\n')
 file.write('\n')
 file.write("------ Testo anonimizzato d'esempio -------"+'\n')
 file.write(golden_anonymized_document)

 print("Results have been saved to Results.txt")

Scegli la tecnica di anonimizzazione:
1. Sostituzione
2. Lacuna
3. Mascheramento

Si stabilì ad [[Loc1] Lo Stato] per la sua ammirazione nei confronti della letteratura tedesca ( aveva imparato la lingua in carcere ), specialmente per i romantici come [[Per1] il soggetto]. Anche i veicoli interplanetari utilizzano principalmente i razzi chimici, anche se alcuni hanno usato sperimentalmente con successo i nuovi propulsori ionici ( come ad esempio il satellite dell' [[Org1] la compagnia] SMART-1 ). Dal 1866 al 1869 egli frequentò il liceo di recente costruzione a [[Loc2] Il Paese], e nel 1870 egli passò gli esami sulle lingue classiche necessario per l' ammissione all' università. [[Misc1]U********a] militare venne costruita attraverso le colline che vanno da [[Loc3] il Circondario] fino a [[Loc4] La Zona]. Raggiunge la popolarità nel 1988 nel varietà televisivo " Cocco " di [[Org2] l'azienda] con il personaggio dell' automobilista " incazzato come una bestia! ". Nel 2009, 2010, 2011, 20