<a href="https://colab.research.google.com/github/depalmami/presidio/blob/main/Stan%2BMistral.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Installa le dipendenze necessarie
!pip install -q gradio transformers torch presidio-analyzer presidio-anonymizer spacy bitsandbytes accelerate
!python -m spacy download it_core_news_sm

Collecting it-core-news-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/it_core_news_sm-3.8.0/it_core_news_sm-3.8.0-py3-none-any.whl (13.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m118.8 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('it_core_news_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [7]:
import os
import torch
import json
import re
import time
import gradio as gr
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

# Inizializza il tokenizer e modello di Mistral
def initialize_mistral():
    print("Inizializzazione del modello Mistral...")
    try:
        # Verifica disponibilità di GPU e memoria
        if torch.cuda.is_available():
            free_gpu_memory = torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated(0)
            free_gb = free_gpu_memory / (1024**3)
            print(f"GPU disponibile con {free_gb:.2f} GB di memoria libera")
        else:
            print("GPU non disponibile, utilizzo CPU")
            free_gb = 0

        # Scegli il modello in base alla memoria disponibile
        if free_gb > 10:
            # Più di 10GB di memoria libera, prova Mistral 7B
            model_name = "mistralai/Mistral-7B-v0.1"
            print(f"Utilizzo {model_name} con 4-bit quantization")
            use_4bit = True
        elif free_gb > 5:
            # Tra 5 e 10GB, usa un modello più piccolo
            model_name = "google/gemma-2b"
            print(f"Utilizzo {model_name} con 4-bit quantization")
            use_4bit = True
        elif free_gb > 2:
            # Memoria molto limitata, usa un modello ancora più piccolo
            model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
            print(f"Utilizzo {model_name} con 8-bit quantization")
            use_4bit = False
        else:
            # Memoria GPU insufficiente o nessuna GPU
            model_name = "distilgpt2"
            print(f"Memoria GPU insufficiente, utilizzo {model_name} su CPU")
            use_4bit = False

        # Configurazione per la quantizzazione
        from transformers import BitsAndBytesConfig, AutoModelForCausalLM, AutoTokenizer

        # Prepara configurazioni di caricamento in base alla memoria disponibile
        load_kwargs = {
            "device_map": "auto" if torch.cuda.is_available() else "cpu"
        }

        if use_4bit and torch.cuda.is_available():
            # Configurazione 4-bit
            bnb_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_use_double_quant=True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_compute_dtype=torch.float16
            )
            load_kwargs["quantization_config"] = bnb_config
            load_kwargs["torch_dtype"] = torch.float16
        elif torch.cuda.is_available():
            # Configurazione 8-bit o standard
            load_kwargs["torch_dtype"] = torch.float16

        # Aggiungi offload CPU se la memoria è scarsa
        if free_gb < 4 and torch.cuda.is_available():
            load_kwargs["offload_folder"] = "offload_folder"
            load_kwargs["offload_state_dict"] = True
            print("Aggiunta configurazione di offload su CPU/disco")

        # Carica modello e tokenizer separatamente
        print(f"Caricamento modello {model_name}...")
        try:
            model = AutoModelForCausalLM.from_pretrained(
                model_name,
                **load_kwargs
            )

            print("Caricamento tokenizer...")
            tokenizer = AutoTokenizer.from_pretrained(model_name)
            if tokenizer.pad_token is None:
                tokenizer.pad_token = tokenizer.eos_token

            # Inizializza pipeline con modello già configurato
            pipe = pipeline(
                "text-generation",
                model=model,
                tokenizer=tokenizer,
                max_new_tokens=512,
                do_sample=False
            )

            print(f"Modello {model_name} inizializzato con successo")
            return pipe
        except (ValueError, RuntimeError) as e:
            print(f"Errore durante il caricamento del modello {model_name}: {e}")
            # Se il modello è fallito e non è già distilgpt2, prova con distilgpt2
            if model_name != "distilgpt2":
                print("Tentativo con distilgpt2 (modello più piccolo)")
                return initialize_gpt2_fallback()
            else:
                raise e
    except Exception as e:
        print(f"Errore critico nell'inizializzazione del modello: {e}")
        return None

# Inizializzazione del modello fallback GPT-2 distillato (molto leggero)
def initialize_gpt2_fallback():
    try:
        print("Tentativo di caricamento del modello minimo distilgpt2...")

        # Carica senza quantizzazione né GPU se necessario
        model = AutoModelForCausalLM.from_pretrained("distilgpt2")
        tokenizer = AutoTokenizer.from_pretrained("distilgpt2")

        if tokenizer.pad_token is None:
            tokenizer.pad_token = tokenizer.eos_token

        # Usa sempre CPU per il fallback
        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            device=-1,  # Forza CPU
            max_new_tokens=128,  # Ridotto per evitare timeout
            do_sample=False
        )

        print("Modello distilgpt2 caricato con successo su CPU")
        return pipe
    except Exception as e:
        print(f"Errore critico anche con distilgpt2: {e}")
        print("Nessun modello LLM disponibile, il sistema userà solo il modello Stanford")
        return None

# Crea un riconoscitore personalizzato basato sul modello Stanford
class StanfordDeidentifier:
    def __init__(self):
        print("Caricamento modello Stanford...")
        try:
            self.tokenizer = AutoTokenizer.from_pretrained("StanfordAIMI/stanford-deidentifier-base")
            self.model = AutoModelForTokenClassification.from_pretrained("StanfordAIMI/stanford-deidentifier-base")
            print("Modello caricato con successo!")
        except Exception as e:
            print(f"Errore nel caricamento del modello Stanford: {e}")
            print("Tentativo con nome alternativo del modello...")
            try:
                self.tokenizer = AutoTokenizer.from_pretrained("stanfordaimi/stanford-deidentifier-base")
                self.model = AutoModelForTokenClassification.from_pretrained("stanfordaimi/stanford-deidentifier-base")
                print("Modello caricato con successo!")
            except Exception as e:
                print(f"Errore nel caricamento del modello alternativo: {e}")
                print("Caricamento di un modello NER generico...")
                self.tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
                self.model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")
                print("Modello NER generico caricato come fallback!")

        # Ottieni le etichette del modello
        self.id2label = self.model.config.id2label
        self.label2id = self.model.config.label2id
        print(f"Etichette del modello: {list(set([l.split('-')[1] if '-' in l else l for l in self.id2label.values() if l != 'O']))}")

    def analyze(self, text, threshold=0.5):
        # Tokenizzazione
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, padding=True)

        # Predizione
        with torch.no_grad():
            outputs = self.model(**inputs)

        # Processa le predizioni
        probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
        predictions = torch.argmax(probabilities, dim=-1)

        # Mappatura approssimativa token -> testo originale
        token_map = []
        tokens = self.tokenizer.convert_ids_to_tokens(inputs.input_ids[0])

        # Pre-processing per mappare token ai caratteri
        current_pos = 0
        for token in tokens:
            # Ignora token speciali
            if token in ["[CLS]", "[SEP]", "[PAD]", "<s>", "</s>", "<pad>"]:
                token_map.append((-1, -1))
                continue

            # Gestisci token normali
            clean_token = token.replace("##", "").replace("Ġ", "")
            if not clean_token:  # Skip token vuoti
                token_map.append((-1, -1))
                continue

            # Cerca nel testo
            pos = text.find(clean_token, current_pos)
            if pos >= 0:
                token_map.append((pos, pos + len(clean_token)))
                current_pos = pos + len(clean_token)
            else:
                # Se non trovato, cerca senza case sensitivity
                pos = text.lower().find(clean_token.lower(), current_pos)
                if pos >= 0:
                    token_map.append((pos, pos + len(clean_token)))
                    current_pos = pos + len(clean_token)
                else:
                    token_map.append((-1, -1))

        # Estrai entità
        entities = []
        current_entity = None

        for i, pred_id in enumerate(predictions[0]):
            if i >= len(token_map):
                continue

            pred_label = self.id2label[pred_id.item()]
            confidence = probabilities[0, i, pred_id].item()

            # Salta token speciali o sotto la soglia
            if token_map[i][0] == -1 or confidence < threshold or pred_label == "O":
                if current_entity is not None:
                    entities.append(current_entity)
                    current_entity = None
                continue

            # Inizio nuova entità
            if pred_label.startswith("B-"):
                if current_entity is not None:
                    entities.append(current_entity)

                entity_type = pred_label[2:]  # rimuovi "B-"
                current_entity = {
                    "entity_type": entity_type,
                    "start": token_map[i][0],
                    "end": token_map[i][1],
                    "text": text[token_map[i][0]:token_map[i][1]],
                    "score": confidence
                }

            # Continuazione entità
            elif pred_label.startswith("I-") and current_entity is not None:
                if token_map[i][0] >= 0:
                    current_entity["end"] = token_map[i][1]
                    current_entity["text"] = text[current_entity["start"]:current_entity["end"]]
                    current_entity["score"] = (current_entity["score"] + confidence) / 2

        # Aggiungi l'ultima entità
        if current_entity is not None:
            entities.append(current_entity)

        # In caso di errori nella mappatura, usa metodo basato su regex come fallback
        if not entities:
            entities = self.analyze_with_regex(text)

        return entities

    # Fallback con regex se il modello non funziona correttamente
    def analyze_with_regex(self, text):
        entities = []

        # Rileva nomi di persona (formato: parole che iniziano con maiuscola)
        for match in re.finditer(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b', text):
            entities.append({
                "entity_type": "PER",
                "start": match.start(),
                "end": match.end(),
                "text": match.group(),
                "score": 0.85
            })

        # Rileva date
        for match in re.finditer(r'\b\d{1,2}[\/\-\.]\d{1,2}[\/\-\.]\d{2,4}\b', text):
            entities.append({
                "entity_type": "DATE",
                "start": match.start(),
                "end": match.end(),
                "text": match.group(),
                "score": 0.9
            })

        # Rileva email
        for match in re.finditer(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
            entities.append({
                "entity_type": "EMAIL",
                "start": match.start(),
                "end": match.end(),
                "text": match.group(),
                "score": 0.95
            })

        # Rileva numeri di telefono
        for match in re.finditer(r'\b\+?[0-9]{10,15}\b|\b\+?[0-9]{2,4}[- ][0-9]{5,10}\b', text):
            entities.append({
                "entity_type": "PHONE",
                "start": match.start(),
                "end": match.end(),
                "text": match.group(),
                "score": 0.9
            })

        return entities

# Funzione per il refinement tramite Mistral
def refine_with_mistral(text, entities, mistral_pipe):
    if not mistral_pipe:
        print("Mistral non disponibile, skipping refinement")
        return entities, []

    # Salva le entità originali per confronto
    original_entities = entities.copy()

    # Prepara un contesto con le entità già estratte
    entities_context = ""
    for i, entity in enumerate(entities):
        entities_context += f"{i+1}. Tipo: {entity['entity_type']}, Testo: '{entity['text']}'\n"

    prompt = f"""Sei un assistente specializzato nell'anonimizzazione dei dati personali.
Nel testo seguente, ho già identificato alcune informazioni sensibili, ma potrebbero essercene altre che ho perso.

Testo da analizzare:
{text}

Entità già identificate:
{entities_context}

Per favore, identifica e aggiungi SOLO altre entità sensibili che NON ho già trovato, come:
- Nomi e cognomi di persone (PERSON)
- Codici fiscali italiani (CF) - formato: 16 caratteri alfanumerici
- Partite IVA italiane (PIVA) - formato: IT seguito da 11 cifre
- Date di nascita o altre date rilevanti (DATE)
- Indirizzi (LOCATION)
- Numeri di telefono (PHONE)
- IBAN o altri identificativi bancari (IBAN) - formato: IT seguito da caratteri alfanumerici
- Targhe di veicoli (TARGA) - formato italiano: 2 lettere, 3 numeri, 2 lettere
- Email (EMAIL)

Presta particolare attenzione a codici fiscali, IBAN, partite IVA e targhe che potrebbero essere stati ignorati.

Fornisci il risultato in questo formato JSON:
[
  {{
    "entity_type": "TIPO_ENTITÀ",
    "text": "testo dell'entità",
    "reason": "motivo dell'identificazione"
  }}
]

Se non ci sono altre entità da aggiungere, restituisci un array vuoto: []
"""

    # Lista per tenere traccia delle entità aggiunte da Mistral
    added_entities = []

    # Ottieni la risposta da Mistral
    try:
        start_time = time.time()
        response = mistral_pipe(prompt, temperature=0.1, max_new_tokens=256)[0]['generated_text'].replace(prompt, "").strip()
        end_time = time.time()
        print(f"Modello LLM ha impiegato {end_time - start_time:.2f} secondi per l'analisi")

        # Debug: mostra parte della risposta per diagnostica
        print(f"Risposta del modello (primi 100 caratteri): {response[:100]}...")

        # Estrai la parte JSON dalla risposta
        json_match = re.search(r'\[\s*{.*?}\s*(?:,\s*{.*?}\s*)*\]', response, re.DOTALL)
        if json_match:
            json_str = json_match.group(0)
            try:
                # Pulisci il JSON per renderlo valido se necessario
                json_str = json_str.replace("'", '"')  # Sostituisci apici singoli con doppi
                json_str = re.sub(r',\s*\]', ']', json_str)  # Rimuovi virgole finali

                print(f"Tentativo parsing JSON: {json_str[:100]}...")
                additional_entities = json.loads(json_str)

                # Aggiungi le nuove entità, trovando le posizioni nel testo
                new_entities_added = 0
                for new_entity in additional_entities:
                    entity_text = new_entity.get("text", "")
                    entity_type = new_entity.get("entity_type", "UNKNOWN")

                    # Normalizza il tipo di entità
                    if entity_type.upper() in ["NOME E COGNOME", "PERSON", "PERSONA", "NOME COGNOME", "NOME_COGNOME"]:
                        entity_type = "PERSON"
                    elif entity_type.upper() in ["TELEFONO", "NUMERO TELEFONO", "NUMERO DI TELEFONO", "NUMERO_TELEFONO"]:
                        entity_type = "PHONE"
                    elif entity_type.upper() in ["DATA", "DATA DI NASCITA", "DATA_NASCITA"]:
                        entity_type = "DATE"
                    elif entity_type.upper() in ["INDIRIZZO", "LOCATION", "LOCALITÀ"]:
                        entity_type = "LOCATION"
                    elif entity_type.upper() in ["CODICE FISCALE", "CF", "CODICE_FISCALE"]:
                        entity_type = "CF"
                    elif entity_type.upper() in ["PARTITA IVA", "PIVA", "P_IVA", "PARTITA_IVA"]:
                        entity_type = "PIVA"
                    elif entity_type.upper() in ["IBAN", "IBAN_CODE", "CODICE IBAN"]:
                        entity_type = "IBAN"

                    # Se il testo è vuoto, salta
                    if not entity_text:
                        continue

                    # Cerca la posizione nel testo
                    start_pos = text.find(entity_text)
                    if start_pos >= 0:
                        # Verifica che l'entità non sia già presente o sovrapposta
                        is_duplicate = False
                        is_overlapping = False

                        for existing_entity in entities:
                            # Controlla duplicati esatti
                            if (existing_entity["text"] == entity_text and
                                existing_entity["entity_type"] == entity_type):
                                is_duplicate = True
                                break

                            # Controlla sovrapposizioni
                            new_start = start_pos
                            new_end = start_pos + len(entity_text)
                            existing_start = existing_entity["start"]
                            existing_end = existing_entity["end"]

                            # Due intervalli si sovrappongono se l'inizio di uno è minore della fine dell'altro
                            # e la fine di uno è maggiore dell'inizio dell'altro
                            if (new_start < existing_end and new_end > existing_start):
                                is_overlapping = True
                                break

                        if not is_duplicate and not is_overlapping:
                            new_entity = {
                                "entity_type": entity_type,
                                "start": start_pos,
                                "end": start_pos + len(entity_text),
                                "text": entity_text,
                                "score": 0.85,  # Score fisso per le entità LLM
                                "source": "llm",
                                "reason": new_entity.get("reason", "Identificato da LLM")
                            }

                            entities.append(new_entity)
                            added_entities.append(new_entity)
                            new_entities_added += 1

                print(f"LLM ha identificato {new_entities_added} nuove entità valide")

            except json.JSONDecodeError as je:
                print(f"Errore nel parsing JSON dalla risposta: {je}")
                # Tentativo di estrarre manualmente i dati dalla risposta testuale
                try:
                    # Cerca pattern di entità nella risposta testuale
                    added_entities = manual_extraction(text, response, entities)
                except Exception as ex:
                    print(f"Fallito anche il parsing manuale: {ex}")
        else:
            print("Nessun JSON trovato nella risposta, tentativo estrazione manuale")
            added_entities = manual_extraction(text, response, entities)

    except Exception as e:
        print(f"Errore durante l'analisi con LLM: {e}")

    return entities, added_entities

# Funzione ausiliaria per estrarre manualmente entità dalla risposta testuale
def manual_extraction(text, response, entities):
    # Lista per tenere traccia delle entità aggiunte manualmente
    added_entities = []

    # Cerca pattern comuni per codici fiscali italiani
    cf_pattern = r'[A-Z]{6}\d{2}[A-Z]\d{2}[A-Z]\d{3}[A-Z]'
    for match in re.finditer(cf_pattern, text):  # Nota: cerchiamo direttamente nel testo originale, non nella risposta
        cf_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == cf_text and
                existing_entity["entity_type"] == "CF"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "CF",
                "start": start_pos,
                "end": start_pos + len(cf_text),
                "text": cf_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di codice fiscale"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Cerca IBAN italiani
    iban_pattern = r'IT\d{2}[A-Z0-9]{10,30}'
    for match in re.finditer(iban_pattern, text):
        iban_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == iban_text and
                existing_entity["entity_type"] == "IBAN"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "IBAN",
                "start": start_pos,
                "end": start_pos + len(iban_text),
                "text": iban_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di IBAN"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Cerca date nel formato gg/mm/aaaa
    date_pattern = r'\b\d{1,2}/\d{1,2}/\d{4}\b'
    for match in re.finditer(date_pattern, text):
        date_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == date_text and
                existing_entity["entity_type"] == "DATE"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "DATE",
                "start": start_pos,
                "end": start_pos + len(date_text),
                "text": date_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di data"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Cerca targhe nel formato AA000BB
    targa_pattern = r'\b[A-Z]{2}\d{3}[A-Z]{2}\b'
    for match in re.finditer(targa_pattern, text):
        targa_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == targa_text and
                existing_entity["entity_type"] == "TARGA"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "TARGA",
                "start": start_pos,
                "end": start_pos + len(targa_text),
                "text": targa_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di targa"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Cerca Partite IVA italiane
    piva_pattern = r'\b(IT)?\d{11}\b'
    for match in re.finditer(piva_pattern, text):
        piva_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == piva_text and
                existing_entity["entity_type"] == "PIVA"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "PIVA",
                "start": start_pos,
                "end": start_pos + len(piva_text),
                "text": piva_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di partita IVA"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Cerca numeri di telefono
    phone_pattern = r'\b\+?\d{2,4}[-\s]?\d{3,10}[-\s]?\d{3,10}\b'
    for match in re.finditer(phone_pattern, text):
        phone_text = match.group(0)
        start_pos = match.start()

        # Verifica che l'entità non sia già presente
        is_duplicate = False
        for existing_entity in entities:
            if (existing_entity["text"] == phone_text and
                existing_entity["entity_type"] == "PHONE"):
                is_duplicate = True
                break

        if not is_duplicate:
            new_entity = {
                "entity_type": "PHONE",
                "start": start_pos,
                "end": start_pos + len(phone_text),
                "text": phone_text,
                "score": 0.9,
                "source": "regex",
                "reason": "Estratto da pattern di telefono"
            }
            entities.append(new_entity)
            added_entities.append(new_entity)

    # Riporta la lista delle entità aggiunte
    return added_entities

# Definizione dei colori per le entità
entity_colors = {
    "PER": "#fecaca",
    "PERSON": "#fecaca",
    "NOME": "#fecaca",
    "COGNOME": "#fecaca",
    "PHONE": "#fde68a",
    "TELEFONO": "#fde68a",
    "EMAIL": "#fed7aa",
    "DATE": "#fcd34d",
    "DATA": "#fcd34d",
    "LOC": "#fbcfe8",
    "LOCATION": "#fbcfe8",
    "INDIRIZZO": "#fbcfe8",
    "ORG": "#93c5fd",
    "ORGANIZATION": "#93c5fd",
    "ID": "#ddd6fe",
    "CF": "#ddd6fe",
    "PIVA": "#ddd6fe",
    "CODICE_FISCALE": "#ddd6fe",
    "PARTITA_IVA": "#ddd6fe",
    "IBAN": "#1e90ff",
    "IBAN_CODE": "#1e90ff",
    "MEDICALRECORD": "#6ee7b7",
    "AGE": "#fdba74",
    "DOCTOR": "#c4b5fd",
    "PATIENT": "#fecaca",
    "HOSPITAL": "#93c5fd",
    "TARGA": "#bdb76b"
}

entity_names = {
    "PER": "Nome Persona",
    "PERSON": "Nome Persona",
    "NOME": "Nome",
    "COGNOME": "Cognome",
    "PHONE": "Numero di Telefono",
    "TELEFONO": "Numero di Telefono",
    "EMAIL": "Email",
    "DATE": "Data",
    "DATA": "Data",
    "LOC": "Località",
    "INDIRIZZO": "Indirizzo",
    "ORG": "Organizzazione",
    "ORGANIZATION": "Organizzazione",
    "ID": "Identificativo",
    "CF": "Codice Fiscale",
    "PIVA": "Partita IVA",
    "CODICE_FISCALE": "Codice Fiscale",
    "PARTITA_IVA": "Partita IVA",
    "IBAN": "IBAN",
    "IBAN_CODE": "IBAN",
    "MEDICALRECORD": "Cartella Medica",
    "AGE": "Età",
    "DOCTOR": "Medico",
    "PATIENT": "Paziente",
    "HOSPITAL": "Ospedale",
    "TARGA": "Targa Veicolo"
}

# Funzione per evidenziare le entità in HTML
def highlight_entities_html(text, entities):
    if not entities:
        return text

    # Prepariamo HTML con span colorati
    chars = list(text)
    spans = []

    for entity in entities:
        entity_type = entity["entity_type"]
        color = entity_colors.get(entity_type, "#cccccc")
        name = entity_names.get(entity_type, entity_type)
        score = int(entity.get("score", 0.8) * 100)
        source = entity.get("source", "stanford")

        spans.append({
            "index": entity["start"],
            "content": f'<span style="background-color: {color}; padding: 2px; border-radius: 3px;" title="{name} ({score}%) - {source}">',
            "is_opening": True
        })

        spans.append({
            "index": entity["end"],
            "content": '</span>',
            "is_opening": False
        })

    # Ordina i span (chiusura prima dell'apertura se stesso indice)
    spans.sort(key=lambda x: (x["index"], not x["is_opening"]))

    # Inserisce i tag span nel testo
    offset = 0
    for span in spans:
        adjusted_index = span["index"] + offset
        if 0 <= adjusted_index <= len(chars):  # Controlla indici validi
            chars.insert(adjusted_index, span["content"])
            offset += 1

    return "".join(chars)

# Funzioni di anonimizzazione
def anonymize_text(text, entities, anonymization_type="replace"):
    # Usa implementazione personalizzata invece di Presidio
    if not entities:
        return text

    # Ordina le entità per posizione di fine in ordine decrescente
    sorted_entities = sorted(entities, key=lambda x: x["end"], reverse=True)

    if anonymization_type == "replace":
        # Sostituisci con tag
        anonymized = text
        for entity in sorted_entities:
            anonymized = (
                anonymized[:entity["start"]] +
                f"<{entity['entity_type']}>" +
                anonymized[entity["end"]:]
            )

    elif anonymization_type == "redact":
        # Sostituisci con asterischi
        anonymized = text
        for entity in sorted_entities:
            redaction_length = entity["end"] - entity["start"]
            redaction = "*" * redaction_length
            anonymized = (
                anonymized[:entity["start"]] +
                redaction +
                anonymized[entity["end"]:]
            )

    else:  # pseudonymize
        # Conta le occorrenze per tipo
        type_counts = {}
        entity_to_pseudonym = {}

        # Crea pseudonimi
        for entity in entities:
            entity_type = entity["entity_type"]
            entity_text = entity["text"]

            if entity_type not in type_counts:
                type_counts[entity_type] = 0

            if entity_text not in entity_to_pseudonym:
                type_counts[entity_type] += 1

                # Genera pseudonimo
                if entity_type in ["PER", "PERSON", "PATIENT", "NOME", "COGNOME"]:
                    pseudonym = f"Persona {type_counts[entity_type]}"
                elif entity_type == "DOCTOR":
                    pseudonym = f"Medico {type_counts[entity_type]}"
                elif entity_type == "EMAIL":
                    pseudonym = f"email{type_counts[entity_type]}@example.com"
                elif entity_type in ["PHONE", "TELEFONO"]:
                    pseudonym = f"+xx-xxx-{str(type_counts[entity_type]).zfill(4)}"
                elif entity_type in ["ORG", "ORGANIZATION", "HOSPITAL"]:
                    pseudonym = f"Organizzazione {type_counts[entity_type]}"
                elif entity_type in ["DATE", "DATA"]:
                    pseudonym = "GG/MM/AAAA"
                elif entity_type == "AGE":
                    pseudonym = "XX anni"
                elif entity_type in ["CF", "CODICE_FISCALE"]:
                    pseudonym = f"CODFIS{type_counts[entity_type]}"
                elif entity_type in ["PIVA", "PARTITA_IVA"]:
                    pseudonym = f"PIVA{type_counts[entity_type]}"
                elif entity_type == "MEDICALRECORD":
                    pseudonym = f"ID-XXXXX{type_counts[entity_type]}"
                elif entity_type in ["LOC", "LOCATION", "INDIRIZZO"]:
                    pseudonym = f"Luogo {type_counts[entity_type]}"
                elif entity_type in ["IBAN", "IBAN_CODE"]:
                    pseudonym = f"ITXX-XXXX-XXXX-{type_counts[entity_type]}"
                elif entity_type == "TARGA":
                    pseudonym = f"TARGA{type_counts[entity_type]}"
                else:
                    pseudonym = f"{entity_type} {type_counts[entity_type]}"

                entity_to_pseudonym[entity_text] = pseudonym

        # Sostituisci entità
        anonymized = text
        for entity in sorted_entities:
            pseudonym = entity_to_pseudonym[entity["text"]]
            anonymized = (
                anonymized[:entity["start"]] +
                pseudonym +
                anonymized[entity["end"]:]
            )

    return anonymized

def process_text(text, anonymization_type, threshold, use_llm):
    """
    Elabora il testo analizzandolo con Stanford e opzionalmente con LLM
    """
    # Analizza il testo con il modello Stanford
    start_time = time.time()
    entities = stanford_model.analyze(text, threshold)
    stanford_time = time.time() - start_time

    # Refine con LLM se richiesto
    llm_time = 0
    added_entities = []
    if use_llm and mistral_pipe:
        start_time = time.time()
        entities, added_entities = refine_with_mistral(text, entities, mistral_pipe)
        llm_time = time.time() - start_time

    # Genera il testo HTML con entità evidenziate
    highlighted = highlight_entities_html(text, entities)

    # Anonimizza il testo
    anonymized = anonymize_text(text, entities, anonymization_type)

    # Calcola statistiche
    type_count = {}
    source_count = {"stanford": 0, "llm": 0, "regex": 0}

    for entity in entities:
        entity_type = entity["entity_type"]
        type_count[entity_type] = type_count.get(entity_type, 0) + 1

        source = entity.get("source", "stanford")
        source_count[source] = source_count.get(source, 0) + 1

    avg_confidence = sum(entity.get("score", 0.8) for entity in entities) / len(entities) if entities else 0

    # Crea tabella entità in formato HTML per una migliore visualizzazione
    entity_table = "<table style='width:100%; border-collapse: collapse;'>"
    entity_table += "<tr style='background-color: #f2f2f2;'><th>Tipo</th><th>Testo</th><th>Fonte</th><th>Confidenza</th></tr>"

    # Ordina le entità per posizione nel testo
    sorted_entities = sorted(entities, key=lambda x: x["start"])

    for entity in sorted_entities:
        entity_type = entity["entity_type"]
        source = entity.get("source", "stanford")
        confidence = entity.get("score", 0.8) * 100

        # Colore di sfondo in base alla fonte
        bg_color = "#ffffff"
        if source == "stanford":
            bg_color = "#e6f7ff"  # Azzurro chiaro
        elif source == "llm":
            bg_color = "#f6ffed"  # Verde chiaro
        elif source == "regex":
            bg_color = "#fff7e6"  # Giallo chiaro

        entity_table += f"<tr style='background-color: {bg_color};'>"
        entity_table += f"<td>{entity_type}</td><td>{entity['text']}</td><td>{source}</td><td>{confidence:.1f}%</td>"
        entity_table += "</tr>"

    entity_table += "</table>"

    # Crea tabella separata per le entità aggiunte da LLM
    llm_contribution_table = ""
    if use_llm and added_entities:
        llm_contribution_table = "<h3>Entità Aggiunte da LLM/Pattern Matching</h3>"
        llm_contribution_table += "<table style='width:100%; border-collapse: collapse;'>"
        llm_contribution_table += "<tr style='background-color: #f2f2f2;'><th>Tipo</th><th>Testo</th><th>Fonte</th><th>Motivo</th></tr>"

        # Ordina le entità aggiunte per tipo e poi per testo
        sorted_added = sorted(added_entities, key=lambda x: (x["entity_type"], x["text"]))

        for entity in sorted_added:
            entity_type = entity["entity_type"]
            source = entity.get("source", "unknown")
            reason = entity.get("reason", "")

            # Colore di sfondo in base alla fonte
            bg_color = "#ffffff"
            if source == "llm":
                bg_color = "#f6ffed"  # Verde chiaro
            elif source == "regex":
                bg_color = "#fff7e6"  # Giallo chiaro

            llm_contribution_table += f"<tr style='background-color: {bg_color};'>"
            llm_contribution_table += f"<td><b>{entity_type}</b></td><td>{entity['text']}</td><td>{source}</td><td>{reason}</td>"
            llm_contribution_table += "</tr>"

        llm_contribution_table += "</table>"
    elif use_llm:
        llm_contribution_table = "<p><i>Il modello LLM non ha identificato entità aggiuntive.</i></p>"

    # Statistiche di performance
    stats = {
        "totalEntities": len(entities),
        "originalEntities": len(entities) - len(added_entities),
        "addedEntities": len(added_entities),
        "entityTypes": [{"type": t, "count": c, "name": entity_names.get(t, t)} for t, c in type_count.items()],
        "sourceCounts": source_count,
        "averageConfidence": avg_confidence,
        "processingTimes": {
            "stanford": f"{stanford_time:.2f}s",
            "llm": f"{llm_time:.2f}s",
            "total": f"{stanford_time + llm_time:.2f}s"
        }
    }

    # Combina le visualizzazioni
    combined_html = highlighted + "<br><br><h3>Dettaglio Entità Rilevate</h3>" + entity_table + "<br><br>" + llm_contribution_table

    # Restituisci i risultati
    return combined_html, anonymized, json.dumps(stats, indent=2)

# Esempi di testo
default_general_text = "Ciao, mi chiamo Mario Rossi e la mia email è mario.rossi@example.com. Puoi contattarmi al numero +39 123 456 7890."

default_medical_text = "Il paziente Mario Bianchi, nato il 15/04/1978, di 47 anni, è stato ricoverato presso l'Ospedale San Raffaele il 23/03/2025 con numero di cartella MED-12345678. Il Dr. Carlo Verdi ha diagnosticato un'ipertensione di grado moderato. Contattare il paziente al numero 333-1234567 o all'email mario.bianchi@email.it per il follow-up."

default_financial_text = "Gentile Cliente Antonio Neri (CF: NRENNTN80A01H501Z), la informiamo che l'accredito di €1.250,00 è stato eseguito sul suo conto IBAN: IT60X0542811101000000123456. Per ulteriori informazioni contatti il numero verde 800-123456 o passi presso la nostra filiale in Via Roma 123, Milano."

# Inizializza i modelli
stanford_model = StanfordDeidentifier()
mistral_pipe = initialize_mistral()
anonymizer = AnonymizerEngine()

# Interfaccia Gradio
demo = gr.Interface(
    fn=process_text,
    inputs=[
        gr.Textbox(label="Testo da analizzare", value=default_financial_text, lines=6),
        gr.Radio(["replace", "redact", "pseudonymize"], label="Tipo di anonimizzazione", value="replace"),
        gr.Slider(minimum=0.1, maximum=1.0, value=0.5, step=0.05, label="Soglia di confidenza"),
        gr.Checkbox(label="Utilizza LLM per migliorare il rilevamento", value=True, info="Usa Mistral o altro LLM disponibile per trovare entità aggiuntive")
    ],
    outputs=[
        gr.HTML(label="Entità rilevate con dettagli"),
        gr.Textbox(label="Testo anonimizzato", lines=6),
        gr.JSON(label="Statistiche")
    ],
    title="Sistema Ibrido di De-identificazione (Stanford + LLM)",
    description="""Analizza e anonimizza testi utilizzando un approccio ibrido: prima con Stanford Deidentifier, poi raffinando i risultati con un LLM.
    <br><b>Nota sui colori:</b> <span style="background-color:#e6f7ff; padding:2px">Azzurro</span> = Stanford, <span style="background-color:#f6ffed; padding:2px">Verde</span> = LLM, <span style="background-color:#fff7e6; padding:2px">Giallo</span> = Pattern matching""",
    examples=[
        [default_general_text, "replace", 0.5, True],
        [default_medical_text, "pseudonymize", 0.4, True],
        [default_financial_text, "redact", 0.5, True],
        ["Mario Bianchi, residente in Via Garibaldi 35, 20121 Milano, ha acquistato una nuova Fiat Panda con targa AB123CD.", "pseudonymize", 0.5, True]
    ]
)

# Avvia l'interfaccia
if __name__ == "__main__":
    demo.launch(debug=True, share=True)

Caricamento modello Stanford...
Modello caricato con successo!
Etichette del modello: ['PHONE', 'ID', 'DATE', 'VENDOR', 'PATIENT', 'HOSPITAL', 'HCW']
Inizializzazione del modello Mistral...
GPU disponibile con 10.89 GB di memoria libera
Utilizzo mistralai/Mistral-7B-v0.1 con 4-bit quantization
Caricamento modello mistralai/Mistral-7B-v0.1...


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Device set to use cuda:0


Caricamento tokenizer...
Modello mistralai/Mistral-7B-v0.1 inizializzato con successo
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://296af15c9749cee5e0.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




Modello LLM ha impiegato 15.89 secondi per l'analisi
Risposta del modello (primi 100 caratteri): ...
Nessun JSON trovato nella risposta, tentativo estrazione manuale
Modello LLM ha impiegato 15.93 secondi per l'analisi
Risposta del modello (primi 100 caratteri): ...
Nessun JSON trovato nella risposta, tentativo estrazione manuale
Modello LLM ha impiegato 16.12 secondi per l'analisi
Risposta del modello (primi 100 caratteri): ...
Nessun JSON trovato nella risposta, tentativo estrazione manuale
Modello LLM ha impiegato 15.79 secondi per l'analisi
Risposta del modello (primi 100 caratteri): ...
Nessun JSON trovato nella risposta, tentativo estrazione manuale
Keyboard interruption in main thread... closing server.


KeyboardInterrupt: 