In [None]:
# ⚠️ Esegui questo in una cella separata PRIMA di importare torch
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# ⬇️ Installazione pacchetti (solo la prima volta)
!pip install -q transformers accelerate bitsandbytes nltk tqdm
!pip install -q nltk
import nltk

# Download the 'punkt_tab' data package
nltk.download('punkt_tab')

# ✅ Import standard
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
import re
import nltk
import time
from tqdm.notebook import tqdm
import concurrent.futures
import logging
from typing import List, Dict, Any, Tuple, Optional, Union
import os.path

# Configura logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("entity_extraction.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("entity_extraction")

# ✅ Scarichiamo le risorse necessarie per NLTK (per la tokenizzazione)
try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

# ✅ Libera la memoria GPU (in caso di errori precedenti)
torch.cuda.empty_cache()

# Classe configurabile per estrazione entità
class EntityExtractor:
    """
    Classe per l'estrazione di entità da testi utilizzando modelli di language generation.
    Supporta la personalizzazione di entità, l'elaborazione parallela e il caching dei risultati.
    """

    def __init__(
        self,
        model_id: str = "expertai/LLaMAntino-3-SLIMER-IT",
        entity_types: List[str] = None,
        entity_descriptions: Dict[str, str] = None,
        chunk_size: int = 800,
        max_new_tokens: int = 1024,
        cache_dir: str = "cache",
        use_cache: bool = True,
        parallelize: bool = False,
        max_workers: int = 4,
        device: str = "auto"
    ):
        """
        Inizializza l'estrattore di entità.

        Args:
            model_id: ID del modello Hugging Face da usare
            entity_types: Tipi di entità da estrarre in formato IOB
            entity_descriptions: Descrizioni personalizzate delle entità
            chunk_size: Dimensione massima dei chunk di testo
            max_new_tokens: Numero massimo di token generati in risposta
            cache_dir: Directory per il caching dei risultati
            use_cache: Se utilizzare il caching per risparmiare tempo
            parallelize: Se elaborare i chunk in parallelo
            max_workers: Numero massimo di worker per elaborazione parallela
            device: Dispositivo su cui eseguire il modello ("cpu", "cuda", "auto")
        """
        self.model_id = model_id
        self.chunk_size = chunk_size
        self.max_new_tokens = max_new_tokens
        self.cache_dir = cache_dir
        self.use_cache = use_cache
        self.parallelize = parallelize
        self.max_workers = max_workers

        # Crea directory cache se non esiste
        if self.use_cache and not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)

        # Default entity types se non forniti
        if entity_types is None:
            self.entity_types = [
                "B-LOC", "I-LOC",           # Luoghi
                "B-ORG", "I-ORG",           # Organizzazioni
                "B-PER", "I-PER",           # Persone
                "B-LAW", "I-LAW",           # Leggi e norme
                "B-ACT", "I-ACT",           # Atti amministrativi
                "B-OPA", "I-OPA",           # Oggetti procedura d'appalto
                "B-DATE", "I-DATE",         # Date
                "B-AMOUNT", "I-AMOUNT",     # Importi monetari
                "B-ROLE", "I-ROLE",         # Ruoli professionali
                "O"                         # Other
            ]
        else:
            self.entity_types = entity_types

        # Default entity descriptions se non fornite
        if entity_descriptions is None:
            self.entity_descriptions = {
                "LOC": "luoghi o posizioni geografiche rilevanti nei documenti",
                "PER": "persone fisiche menzionate nei documenti",
                "LAW": "riferimenti a leggi, decreti, normative o regolamenti",
                "ACT": "atti amministrativi, delibere, determine o provvedimenti",
                "ORG": "organizzazioni, enti, aziende o istituzioni",
                "OPA": "oggetti della procedura d'appalto, servizi, forniture o lavori",
                "DATE": "date e periodi temporali",
                "AMOUNT": "importi, cifre monetarie, percentuali",
                "ROLE": "ruoli professionali e titoli"
            }
        else:
            self.entity_descriptions = entity_descriptions

        logger.info(f"Caricamento modello {model_id}...")
        start_time = time.time()

        # Inizializza tokenizer e modello
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            device_map=device,
            load_in_4bit=True,
            torch_dtype=torch.float16
        )

        load_time = time.time() - start_time
        logger.info(f"Modello caricato in {load_time:.2f} secondi")

    def create_prompt(self, text: str) -> str:
        """
        Crea un prompt per l'estrazione di entità.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Prompt formattato per il modello
        """
        prompt = "Estrai tutte le entità dal seguente testo. Utilizza il formato IOB (Inside-Outside-Beginning):\n\n"
        prompt += "DEFINIZIONI:\n"

        # Aggiunge descrizioni delle entità al prompt
        for entity_type in self.entity_types:
            if entity_type.startswith('B-') or entity_type.startswith('I-'):
                entity = entity_type[2:]
                if entity in self.entity_descriptions:
                    prompt += f"- {entity}: {self.entity_descriptions[entity]}\n"

        prompt += "\nREGOLE DI ANNOTAZIONE:\n"
        prompt += "- B-XXX indica l'inizio di un'entità di tipo XXX\n"
        prompt += "- I-XXX indica la continuazione di un'entità di tipo XXX\n"
        prompt += "- O indica token che non appartengono a nessuna entità\n\n"
        prompt += f"TESTO:\n{text}\n\n"
        prompt += "Restituisci una lista di token con le relative etichette nel formato:\n"
        prompt += "[{\"token\": \"parola1\", \"tag\": \"etichetta1\"}, {\"token\": \"parola2\", \"tag\": \"etichetta2\"}, ...]\n"

        return prompt

    def parse_output(self, text: str) -> List[Dict[str, str]]:
        """
        Analizza l'output del modello e estrae le entità.

        Args:
            text: Testo di output dal modello

        Returns:
            Lista di dizionari {token, tag} per le entità estratte
        """
        # Prova a recuperare array di coppie ["token", "tag"]
        array_pattern = re.compile(r'\["([^"]+)",\s*"([^"]+)"\]')
        matches = array_pattern.findall(text)

        if matches:
            return [{"token": token, "tag": tag} for token, tag in matches]

        # Terzo tentativo: cercare solo coppie di parole tra virgolette
        third_pattern = re.compile(r'"([^"]+)",\s*"([^"]+)"')
        matches = third_pattern.findall(text)

        if matches:
            return [{"token": token, "tag": tag} for token, tag in matches]

        try:
            # Controlla se il testo è già un JSON valido
            parsed = json.loads(text)
            # Verifica che sia una lista di oggetti con token e tag
            if isinstance(parsed, list) and all(isinstance(item, dict) and "token" in item and "tag" in item for item in parsed):
                return parsed
        except:
            pass

        logger.warning(f"Formato di output non riconosciuto. Estratto grezzo: {text[:100]}...")
        return []

    def get_cache_key(self, text: str) -> str:
        """
        Genera una chiave di cache univoca per un testo.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Chiave di cache univoca
        """
        # Usa i primi 100 caratteri + lunghezza del testo come chiave
        key = f"{text[:100]}_{len(text)}"
        # Rimuovi caratteri non validi per i nomi dei file
        key = re.sub(r'[^\w\-_\. ]', '_', key)
        # Limita la lunghezza della chiave
        if len(key) > 200:
            key = key[:200]
        return key

    def process_chunk(self, chunk: str) -> List[Dict[str, str]]:
        """
        Elabora un singolo chunk di testo per estrarre entità.

        Args:
            chunk: Chunk di testo da elaborare

        Returns:
            Lista di entità estratte
        """
        if not chunk.strip():
            return []

        cache_key = None
        cache_path = None

        # Verifica cache
        if self.use_cache:
            cache_key = self.get_cache_key(chunk)
            cache_path = os.path.join(self.cache_dir, f"{cache_key}.json")

            if os.path.exists(cache_path):
                try:
                    with open(cache_path, 'r', encoding='utf-8') as f:
                        return json.load(f)
                except Exception as e:
                    logger.warning(f"Errore lettura cache: {e}")

        # Prepara prompt
        prompt = self.create_prompt(chunk)
        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True)

        # Sposta input al dispositivo corretto
        device = next(self.model.parameters()).device
        inputs = {k: v.to(device) for k, v in inputs.items()}

        # Genera risposta
        with torch.no_grad():
            output_ids = self.model.generate(
                **inputs,
                max_new_tokens=self.max_new_tokens,
                do_sample=False
            )

        # Decodifica l'output
        output_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
        generated_response = output_text[len(prompt):].strip()

        # Analizza l'output
        parsed_tokens = self.parse_output(generated_response)

        # Salva in cache
        if self.use_cache and cache_path and len(parsed_tokens) > 0:
            try:
                with open(cache_path, 'w', encoding='utf-8') as f:
                    json.dump(parsed_tokens, f, ensure_ascii=False, indent=2)
            except Exception as e:
                logger.warning(f"Errore scrittura cache: {e}")

        return parsed_tokens

    def split_into_chunks(self, text: str) -> List[str]:
        """
        Divide il testo in chunk di dimensione appropriata.
        Utilizza frasi come unità di divisione quando possibile.

        Args:
            text: Testo da dividere in chunk

        Returns:
            Lista di chunk di testo
        """
        # Usa NLTK per dividere in frasi
        sentences = nltk.sent_tokenize(text)
        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_length = len(sentence)

            # Se la frase è più lunga del limite, dividila in parole
            if sentence_length > self.chunk_size:
                if current_chunk:
                    chunks.append(" ".join(current_chunk))
                    current_chunk = []
                    current_length = 0

                words = sentence.split()
                temp_chunk = []
                temp_length = 0

                for word in words:
                    word_length = len(word) + 1  # +1 per lo spazio

                    if temp_length + word_length > self.chunk_size:
                        chunks.append(" ".join(temp_chunk))
                        temp_chunk = [word]
                        temp_length = word_length
                    else:
                        temp_chunk.append(word)
                        temp_length += word_length

                if temp_chunk:
                    chunks.append(" ".join(temp_chunk))

            # Altrimenti, aggiungi la frase al chunk corrente se c'è spazio
            elif current_length + sentence_length + 1 <= self.chunk_size:  # +1 per lo spazio
                current_chunk.append(sentence)
                current_length += sentence_length + 1

            # Se non c'è spazio, salva il chunk corrente e inizia uno nuovo
            else:
                chunks.append(" ".join(current_chunk))
                current_chunk = [sentence]
                current_length = sentence_length

        # Aggiungi l'ultimo chunk se non è vuoto
        if current_chunk:
            chunks.append(" ".join(current_chunk))

        return chunks

    def extract_entities(self, text: str) -> List[Dict[str, str]]:
        """
        Estrae entità dal testo completo, dividendolo in chunk.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Lista di entità estratte
        """
        logger.info(f"Inizio estrazione entità da testo di {len(text)} caratteri")
        start_time = time.time()

        # Dividi in chunk
        chunks = self.split_into_chunks(text)
        logger.info(f"Testo diviso in {len(chunks)} chunk")

        all_entities = []

        # Elaborazione sequenziale o parallela
        if self.parallelize and len(chunks) > 1:
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # Invia chunk per elaborazione parallela con progress bar
                futures = [executor.submit(self.process_chunk, chunk) for chunk in chunks]

                for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Elaborazione chunk"):
                    all_entities.extend(future.result())
        else:
            # Elaborazione sequenziale con progress bar
            for i, chunk in enumerate(tqdm(chunks, desc="Elaborazione chunk")):
                chunk_entities = self.process_chunk(chunk)
                all_entities.extend(chunk_entities)
                logger.debug(f"Chunk {i+1}/{len(chunks)}: estratte {len(chunk_entities)} entità")

        elapsed_time = time.time() - start_time
        logger.info(f"Estrazione completata in {elapsed_time:.2f} secondi. Trovate {len(all_entities)} entità")

        return all_entities

    def post_process(self, entities: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        Applica post-processing alle entità estratte.

        Args:
            entities: Lista di entità estratte

        Returns:
            Lista di entità dopo il post-processing
        """
        # Copia lista per non modificare l'originale
        processed = entities.copy()

        # 1. Correggi tag inconsistenti: se un token ha tag I-X ma non è preceduto da B-X o I-X
        for i in range(1, len(processed)):
            current = processed[i]
            previous = processed[i-1]

            if current["tag"].startswith("I-"):
                entity_type = current["tag"][2:]
                prev_tag = previous["tag"]

                # Se il tag precedente non è lo stesso tipo di entità, correggi
                if not (prev_tag == f"B-{entity_type}" or prev_tag == f"I-{entity_type}"):
                    processed[i]["tag"] = f"B-{entity_type}"

        # 2. Normalizza tag: garantisce che tutti i tag abbiano prefisso B- o I- (tranne O)
        for i in range(len(processed)):
            tag = processed[i]["tag"]
            if tag != "O" and not (tag.startswith("B-") or tag.startswith("I-")):
                # Se è un tag senza prefisso (es. "LOC"), aggiungi "B-"
                processed[i]["tag"] = f"B-{tag}"

        # 3. Merge di entità dello stesso tipo separate da spazi o punteggiatura
        # (Da implementare se necessario)

        return processed

    def extract_and_save(
        self,
        text: str,
        output_formats: List[str] = ["iob", "json", "tsv"]
    ) -> Dict[str, Any]:
        """
        Estrae entità e le salva in vari formati.

        Args:
            text: Testo da cui estrarre le entità
            output_formats: Formati di output da generare

        Returns:
            Dizionario con statistiche sulla estrazione
        """
        # Estrai entità
        entities = self.extract_entities(text)

        # Applica post-processing
        processed_entities = self.post_process(entities)

        stats = {
            "total_entities": len(processed_entities),
            "entity_types": {}
        }

        # Raccoglie statistiche sui tipi di entità
        for entity in processed_entities:
            tag = entity["tag"]
            # Normalizza tag per statistiche
            if tag.startswith("B-") or tag.startswith("I-"):
                entity_type = tag[2:]
            else:
                entity_type = tag

            stats["entity_types"][entity_type] = stats["entity_types"].get(entity_type, 0) + 1

        # Salva nei formati richiesti
        if "iob" in output_formats:
            iob_output = self.convert_to_iob(processed_entities)
            with open("annotazioni_iob.txt", "w", encoding="utf-8") as f:
                f.write(iob_output)

        if "json" in output_formats:
            with open("annotazioni_iob.json", "w", encoding="utf-8") as f:
                json.dump(processed_entities, f, ensure_ascii=False, indent=2)

        if "tsv" in output_formats:
            with open("annotazioni_iob.tsv", "w", encoding="utf-8") as f:
                f.write("Token\tTag\n")
                for entity in processed_entities:
                    f.write(f"{entity['token']}\t{entity['tag']}\n")

        logger.info(f"Risultati salvati nei formati: {', '.join(output_formats)}")
        return stats

    def convert_to_iob(self, entities: List[Dict[str, str]]) -> str:
        """
        Converte entità nel formato IOB standard.

        Args:
            entities: Lista di entità estratte

        Returns:
            Testo formattato in formato IOB
        """
        iob_lines = []
        for entity in entities:
            token = entity["token"]
            tag = entity["tag"]

            # Assicurati che il tag sia in formato IOB
            if tag != "O" and not (tag.startswith("B-") or tag.startswith("I-")):
                tag = f"B-{tag}"

            iob_lines.append(f"{token}\t{tag}")

        return "\n".join(iob_lines)

# Funzione semplificata per l'uso
def extract_entities_from_file(
    file_path: str,
    output_formats: List[str] = ["iob", "json", "tsv"],
    model_id: str = "expertai/LLaMAntino-3-SLIMER-IT",
    entity_types: List[str] = None,
    entity_descriptions: Dict[str, str] = None,
    chunk_size: int = 800,
    use_cache: bool = True,
    parallelize: bool = False
) -> Dict[str, Any]:
    """
    Estrae entità da un file di testo e le salva nei formati specificati.

    Args:
        file_path: Percorso del file da cui estrarre le entità
        output_formats: Formati di output ("iob", "json", "tsv")
        model_id: ID del modello da utilizzare
        entity_types: Tipi di entità da estrarre
        entity_descriptions: Descrizioni delle entità
        chunk_size: Dimensione massima dei chunk di testo
        use_cache: Se utilizzare il caching
        parallelize: Se elaborare i chunk in parallelo

    Returns:
        Dizionario con statistiche sulla estrazione
    """
    # Leggi il file
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # Inizializza l'estrattore
    extractor = EntityExtractor(
        model_id=model_id,
        entity_types=entity_types,
        entity_descriptions=entity_descriptions,
        chunk_size=chunk_size,
        use_cache=use_cache,
        parallelize=parallelize
    )

    # Estrai e salva
    stats = extractor.extract_and_save(text, output_formats)

    return stats

# Esempio di utilizzo
if __name__ == "__main__":
    # Caricamento testo da file
    file_path = "/content/test_ricostruito.txt"

    # Esempio di aggiunta di entità personalizzate
    custom_entities = [
        "B-LOC", "I-LOC",           # Luoghi
        "B-ORG", "I-ORG",           # Organizzazioni
        "B-PER", "I-PER",           # Persone
        "B-LAW", "I-LAW",           # Leggi e norme
        "B-ACT", "I-ACT",           # Atti amministrativi
        "B-OPA", "I-OPA",           # Oggetti procedura d'appalto
        "B-DATE", "I-DATE",         # Date
        "B-AMOUNT", "I-AMOUNT",     # Importi monetari
        "B-ROLE", "I-ROLE",         # Ruoli professionali
        "O"                         # Other
    ]

    custom_descriptions = {
        "LOC": "luoghi o posizioni geografiche rilevanti nei documenti amministrativi",
        "PER": "persone fisiche menzionate nei documenti, come sindaci, assessori o responsabili",
        "LAW": "riferimenti a leggi, decreti, normative o regolamenti citati nelle delibere",
        "ACT": "atti amministrativi, delibere, determine o provvedimenti",
        "ORG": "organizzazioni, enti, aziende, istituzioni o uffici pubblici",
        "OPA": "oggetti della procedura d'appalto, servizi, forniture o lavori",
        "DATE": "date di delibere, decreti, scadenze o periodi temporali",
        "AMOUNT": "importi, cifre monetarie, percentuali o valori numerici rilevanti",
        "ROLE": "ruoli professionali, cariche pubbliche e titoli di persone"
    }

    # Esegui estrazione usando l'interfaccia semplificata
    print("Inizio estrazione entità...")
    stats = extract_entities_from_file(
        file_path=file_path,
        output_formats=["iob", "json", "tsv"],
        entity_types=custom_entities,
        entity_descriptions=custom_descriptions,
        chunk_size=800,
        use_cache=True,
        parallelize=True
    )

    # Mostra statistiche
    print("\nStatistiche estrazione:")
    print(f"Totale entità trovate: {stats['total_entities']}")
    print("\nDistribuzione per tipo:")
    for entity_type, count in stats["entity_types"].items():
        print(f"- {entity_type}: {count}")

OTTIMIZZATO

In [1]:
# ⚠️ Esegui questo in una cella separata PRIMA di importare torch
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# ⬇️ Installazione pacchetti (solo la prima volta)
!pip install -q transformers accelerate bitsandbytes nltk tqdm

# Scarica esplicitamente le risorse necessarie per NLTK
import nltk
try:
    # Prova a scaricare il pacchetto punkt (non punkt_tab)
    nltk.download('punkt')
except:
    print("Errore nel download delle risorse NLTK. Utilizzeremo una funzione di divisione in frasi alternativa.")

# ✅ Import standard
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
import re
import time
from tqdm.notebook import tqdm
import concurrent.futures
import logging
from typing import List, Dict, Any, Tuple, Optional, Union
import os.path

# Configura logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("entity_extraction.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("entity_extraction")

# ✅ Libera la memoria GPU (in caso di errori precedenti)
torch.cuda.empty_cache()

# Funzione alternativa per dividere in frasi (nel caso NLTK non funzioni)
def simple_sentence_tokenize(text):
    """
    Divide il testo in frasi usando euristiche semplici.
    Questa funzione viene usata come fallback se NLTK non è disponibile.
    """
    # Dividi il testo ai punti seguiti da spazio o a capo
    sentences = re.split(r'(?<=[.!?])\s+', text)
    # Filtra le frasi vuote
    return [s for s in sentences if s.strip()]

# Classe configurabile per estrazione entità
class EntityExtractor:
    """
    Classe per l'estrazione di entità da testi utilizzando modelli di language generation.
    Supporta la personalizzazione di entità, l'elaborazione parallela e il caching dei risultati.
    """

    def __init__(
        self,
        model_id: str = "expertai/LLaMAntino-3-SLIMER-IT",
        entity_types: List[str] = None,
        entity_descriptions: Dict[str, str] = None,
        chunk_size: int = 800,
        max_new_tokens: int = 1024,
        cache_dir: str = "cache",
        use_cache: bool = True,
        parallelize: bool = False,
        max_workers: int = 4,
        device: str = "auto"
    ):
        """
        Inizializza l'estrattore di entità.

        Args:
            model_id: ID del modello Hugging Face da usare
            entity_types: Tipi di entità da estrarre in formato IOB
            entity_descriptions: Descrizioni personalizzate delle entità
            chunk_size: Dimensione massima dei chunk di testo
            max_new_tokens: Numero massimo di token generati in risposta
            cache_dir: Directory per il caching dei risultati
            use_cache: Se utilizzare il caching per risparmiare tempo
            parallelize: Se elaborare i chunk in parallelo
            max_workers: Numero massimo di worker per elaborazione parallela
            device: Dispositivo su cui eseguire il modello ("cpu", "cuda", "auto")
        """
        self.model_id = model_id
        self.chunk_size = chunk_size
        self.max_new_tokens = max_new_tokens
        self.cache_dir = cache_dir
        self.use_cache = use_cache
        self.parallelize = parallelize
        self.max_workers = max_workers

        # Verifica se NLTK è disponibile
        try:
            nltk.sent_tokenize("Test sentence. Another test.")
            self.use_nltk = True
        except:
            logger.warning("NLTK non disponibile. Utilizzo divisione in frasi alternativa.")
            self.use_nltk = False

        # Crea directory cache se non esiste
        if self.use_cache and not os.path.exists(self.cache_dir):
            os.makedirs(self.cache_dir)

        # Default entity types se non forniti
        if entity_types is None:
            self.entity_types = [
                "B-LOC", "I-LOC",           # Luoghi
                "B-ORG", "I-ORG",           # Organizzazioni
                "B-PER", "I-PER",           # Persone
                "B-LAW", "I-LAW",           # Leggi e norme
                "B-ACT", "I-ACT",           # Atti amministrativi
                "B-OPA", "I-OPA",           # Oggetti procedura d'appalto
                "B-DATE", "I-DATE",         # Date
                "B-AMOUNT", "I-AMOUNT",     # Importi monetari
                "B-ROLE", "I-ROLE",         # Ruoli professionali
                "O"                         # Other
            ]
        else:
            self.entity_types = entity_types

        # Default entity descriptions se non fornite
        if entity_descriptions is None:
            self.entity_descriptions = {
                "LOC": "luoghi o posizioni geografiche rilevanti nei documenti",
                "PER": "persone fisiche menzionate nei documenti",
                "LAW": "riferimenti a leggi, decreti, normative o regolamenti",
                "ACT": "atti amministrativi, delibere, determine o provvedimenti",
                "ORG": "organizzazioni, enti, aziende o istituzioni",
                "OPA": "oggetti della procedura d'appalto, servizi, forniture o lavori",
                "DATE": "date e periodi temporali",
                "AMOUNT": "importi, cifre monetarie, percentuali",
                "ROLE": "ruoli professionali e titoli"
            }
        else:
            self.entity_descriptions = entity_descriptions

        logger.info(f"Caricamento modello {model_id}...")
        start_time = time.time()

        # Inizializza tokenizer e modello
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            device_map=device,
            load_in_4bit=True,
            torch_dtype=torch.float16
        )

        load_time = time.time() - start_time
        logger.info(f"Modello caricato in {load_time:.2f} secondi")

    def create_prompt(self, text: str) -> str:
        """
        Crea un prompt per l'estrazione di entità.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Prompt formattato per il modello
        """
        prompt = "Estrai tutte le entità dal seguente testo. Utilizza il formato IOB (Inside-Outside-Beginning):\n\n"
        prompt += "DEFINIZIONI:\n"

        # Aggiunge descrizioni delle entità al prompt
        for entity_type in self.entity_types:
            if entity_type.startswith('B-') or entity_type.startswith('I-'):
                entity = entity_type[2:]
                if entity in self.entity_descriptions:
                    prompt += f"- {entity}: {self.entity_descriptions[entity]}\n"

        prompt += "\nREGOLE DI ANNOTAZIONE:\n"
        prompt += "- B-XXX indica l'inizio di un'entità di tipo XXX\n"
        prompt += "- I-XXX indica la continuazione di un'entità di tipo XXX\n"
        prompt += "- O indica token che non appartengono a nessuna entità\n\n"
        prompt += f"TESTO:\n{text}\n\n"
        prompt += "Restituisci una lista di token con le relative etichette nel formato:\n"
        prompt += "[{\"token\": \"parola1\", \"tag\": \"etichetta1\"}, {\"token\": \"parola2\", \"tag\": \"etichetta2\"}, ...]\n"

        return prompt

    def parse_output(self, text: str) -> List[Dict[str, str]]:
        """
        Analizza l'output del modello e estrae le entità.

        Args:
            text: Testo di output dal modello

        Returns:
            Lista di dizionari {token, tag} per le entità estratte
        """
        # Prova a recuperare array di coppie ["token", "tag"]
        array_pattern = re.compile(r'\["([^"]+)",\s*"([^"]+)"\]')
        matches = array_pattern.findall(text)

        if matches:
            return [{"token": token, "tag": tag} for token, tag in matches]

        # Terzo tentativo: cercare solo coppie di parole tra virgolette
        third_pattern = re.compile(r'"([^"]+)",\s*"([^"]+)"')
        matches = third_pattern.findall(text)

        if matches:
            return [{"token": token, "tag": tag} for token, tag in matches]

        try:
            # Controlla se il testo è già un JSON valido
            parsed = json.loads(text)
            # Verifica che sia una lista di oggetti con token e tag
            if isinstance(parsed, list) and all(isinstance(item, dict) and "token" in item and "tag" in item for item in parsed):
                return parsed
        except:
            pass

        logger.warning(f"Formato di output non riconosciuto. Estratto grezzo: {text[:100]}...")
        return []

    def get_cache_key(self, text: str) -> str:
        """
        Genera una chiave di cache univoca per un testo.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Chiave di cache univoca
        """
        # Usa i primi 100 caratteri + lunghezza del testo come chiave
        key = f"{text[:100]}_{len(text)}"
        # Rimuovi caratteri non validi per i nomi dei file
        key = re.sub(r'[^\w\-_\. ]', '_', key)
        # Limita la lunghezza della chiave
        if len(key) > 200:
            key = key[:200]
        return key

    def process_chunk(self, chunk: str) -> List[Dict[str, str]]:
        """
        Elabora un singolo chunk di testo per estrarre entità.

        Args:
            chunk: Chunk di testo da elaborare

        Returns:
            Lista di entità estratte
        """
        if not chunk.strip():
            return []

        cache_key = None
        cache_path = None

        # Verifica cache
        if self.use_cache:
            cache_key = self.get_cache_key(chunk)
            cache_path = os.path.join(self.cache_dir, f"{cache_key}.json")

            if os.path.exists(cache_path):
                try:
                    with open(cache_path, 'r', encoding='utf-8') as f:
                        return json.load(f)
                except Exception as e:
                    logger.warning(f"Errore lettura cache: {e}")

        # Prepara prompt
        prompt = self.create_prompt(chunk)
        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True)

        # Sposta input al dispositivo corretto
        device = next(self.model.parameters()).device
        inputs = {k: v.to(device) for k, v in inputs.items()}

        # Genera risposta
        with torch.no_grad():
            output_ids = self.model.generate(
                **inputs,
                max_new_tokens=self.max_new_tokens,
                do_sample=False
            )

        # Decodifica l'output
        output_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
        generated_response = output_text[len(prompt):].strip()

        # Analizza l'output
        parsed_tokens = self.parse_output(generated_response)

        # Salva in cache
        if self.use_cache and cache_path and len(parsed_tokens) > 0:
            try:
                with open(cache_path, 'w', encoding='utf-8') as f:
                    json.dump(parsed_tokens, f, ensure_ascii=False, indent=2)
            except Exception as e:
                logger.warning(f"Errore scrittura cache: {e}")

        return parsed_tokens

    def split_into_chunks(self, text: str) -> List[str]:
        """
        Divide il testo in chunk di dimensione appropriata.
        Utilizza frasi come unità di divisione quando possibile.

        Args:
            text: Testo da dividere in chunk

        Returns:
            Lista di chunk di testo
        """
        # Usa NLTK o la funzione di fallback per dividere in frasi
        if self.use_nltk:
            try:
                sentences = nltk.sent_tokenize(text)
            except:
                logger.warning("Errore con NLTK sent_tokenize. Uso divisore alternativo.")
                sentences = simple_sentence_tokenize(text)
        else:
            sentences = simple_sentence_tokenize(text)

        chunks = []
        current_chunk = []
        current_length = 0

        for sentence in sentences:
            sentence_length = len(sentence)

            # Se la frase è più lunga del limite, dividila in parole
            if sentence_length > self.chunk_size:
                if current_chunk:
                    chunks.append(" ".join(current_chunk))
                    current_chunk = []
                    current_length = 0

                words = sentence.split()
                temp_chunk = []
                temp_length = 0

                for word in words:
                    word_length = len(word) + 1  # +1 per lo spazio

                    if temp_length + word_length > self.chunk_size:
                        chunks.append(" ".join(temp_chunk))
                        temp_chunk = [word]
                        temp_length = word_length
                    else:
                        temp_chunk.append(word)
                        temp_length += word_length

                if temp_chunk:
                    chunks.append(" ".join(temp_chunk))

            # Altrimenti, aggiungi la frase al chunk corrente se c'è spazio
            elif current_length + sentence_length + 1 <= self.chunk_size:  # +1 per lo spazio
                current_chunk.append(sentence)
                current_length += sentence_length + 1

            # Se non c'è spazio, salva il chunk corrente e inizia uno nuovo
            else:
                chunks.append(" ".join(current_chunk))
                current_chunk = [sentence]
                current_length = sentence_length

        # Aggiungi l'ultimo chunk se non è vuoto
        if current_chunk:
            chunks.append(" ".join(current_chunk))

        return chunks

    def extract_entities(self, text: str) -> List[Dict[str, str]]:
        """
        Estrae entità dal testo completo, dividendolo in chunk.

        Args:
            text: Testo da cui estrarre le entità

        Returns:
            Lista di entità estratte
        """
        logger.info(f"Inizio estrazione entità da testo di {len(text)} caratteri")
        start_time = time.time()

        # Dividi in chunk
        chunks = self.split_into_chunks(text)
        logger.info(f"Testo diviso in {len(chunks)} chunk")

        all_entities = []

        # Elaborazione sequenziale o parallela
        if self.parallelize and len(chunks) > 1:
            with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                # Invia chunk per elaborazione parallela con progress bar
                futures = [executor.submit(self.process_chunk, chunk) for chunk in chunks]

                for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Elaborazione chunk"):
                    all_entities.extend(future.result())
        else:
            # Elaborazione sequenziale con progress bar
            for i, chunk in enumerate(tqdm(chunks, desc="Elaborazione chunk")):
                chunk_entities = self.process_chunk(chunk)
                all_entities.extend(chunk_entities)
                logger.debug(f"Chunk {i+1}/{len(chunks)}: estratte {len(chunk_entities)} entità")

        elapsed_time = time.time() - start_time
        logger.info(f"Estrazione completata in {elapsed_time:.2f} secondi. Trovate {len(all_entities)} entità")

        return all_entities

    def post_process(self, entities: List[Dict[str, str]]) -> List[Dict[str, str]]:
        """
        Applica post-processing alle entità estratte.

        Args:
            entities: Lista di entità estratte

        Returns:
            Lista di entità dopo il post-processing
        """
        # Copia lista per non modificare l'originale
        processed = entities.copy()

        # 1. Correggi tag inconsistenti: se un token ha tag I-X ma non è preceduto da B-X o I-X
        for i in range(1, len(processed)):
            current = processed[i]
            previous = processed[i-1]

            if current["tag"].startswith("I-"):
                entity_type = current["tag"][2:]
                prev_tag = previous["tag"]

                # Se il tag precedente non è lo stesso tipo di entità, correggi
                if not (prev_tag == f"B-{entity_type}" or prev_tag == f"I-{entity_type}"):
                    processed[i]["tag"] = f"B-{entity_type}"

        # 2. Normalizza tag: garantisce che tutti i tag abbiano prefisso B- o I- (tranne O)
        for i in range(len(processed)):
            tag = processed[i]["tag"]
            if tag != "O" and not (tag.startswith("B-") or tag.startswith("I-")):
                # Se è un tag senza prefisso (es. "LOC"), aggiungi "B-"
                processed[i]["tag"] = f"B-{tag}"

        # 3. Merge di entità dello stesso tipo separate da spazi o punteggiatura
        # (Da implementare se necessario)

        return processed

    def extract_and_save(
        self,
        text: str,
        output_formats: List[str] = ["iob", "json", "tsv"]
    ) -> Dict[str, Any]:
        """
        Estrae entità e le salva in vari formati.

        Args:
            text: Testo da cui estrarre le entità
            output_formats: Formati di output da generare

        Returns:
            Dizionario con statistiche sulla estrazione
        """
        # Estrai entità
        entities = self.extract_entities(text)

        # Applica post-processing
        processed_entities = self.post_process(entities)

        stats = {
            "total_entities": len(processed_entities),
            "entity_types": {}
        }

        # Raccoglie statistiche sui tipi di entità
        for entity in processed_entities:
            tag = entity["tag"]
            # Normalizza tag per statistiche
            if tag.startswith("B-") or tag.startswith("I-"):
                entity_type = tag[2:]
            else:
                entity_type = tag

            stats["entity_types"][entity_type] = stats["entity_types"].get(entity_type, 0) + 1

        # Salva nei formati richiesti
        if "iob" in output_formats:
            iob_output = self.convert_to_iob(processed_entities)
            with open("annotazioni_iob.txt", "w", encoding="utf-8") as f:
                f.write(iob_output)

        if "json" in output_formats:
            with open("annotazioni_iob.json", "w", encoding="utf-8") as f:
                json.dump(processed_entities, f, ensure_ascii=False, indent=2)

        if "tsv" in output_formats:
            with open("annotazioni_iob.tsv", "w", encoding="utf-8") as f:
                f.write("Token\tTag\n")
                for entity in processed_entities:
                    f.write(f"{entity['token']}\t{entity['tag']}\n")

        logger.info(f"Risultati salvati nei formati: {', '.join(output_formats)}")
        return stats

    def convert_to_iob(self, entities: List[Dict[str, str]]) -> str:
        """
        Converte entità nel formato IOB standard.

        Args:
            entities: Lista di entità estratte

        Returns:
            Testo formattato in formato IOB
        """
        iob_lines = []
        for entity in entities:
            token = entity["token"]
            tag = entity["tag"]

            # Assicurati che il tag sia in formato IOB
            if tag != "O" and not (tag.startswith("B-") or tag.startswith("I-")):
                tag = f"B-{tag}"

            iob_lines.append(f"{token}\t{tag}")

        return "\n".join(iob_lines)

# Funzione semplificata per l'uso
def extract_entities_from_file(
    file_path: str,
    output_formats: List[str] = ["iob", "json", "tsv"],
    model_id: str = "expertai/LLaMAntino-3-SLIMER-IT",
    entity_types: List[str] = None,
    entity_descriptions: Dict[str, str] = None,
    chunk_size: int = 800,
    use_cache: bool = True,
    parallelize: bool = False
) -> Dict[str, Any]:
    """
    Estrae entità da un file di testo e le salva nei formati specificati.

    Args:
        file_path: Percorso del file da cui estrarre le entità
        output_formats: Formati di output ("iob", "json", "tsv")
        model_id: ID del modello da utilizzare
        entity_types: Tipi di entità da estrarre
        entity_descriptions: Descrizioni delle entità
        chunk_size: Dimensione massima dei chunk di testo
        use_cache: Se utilizzare il caching
        parallelize: Se elaborare i chunk in parallelo

    Returns:
        Dizionario con statistiche sulla estrazione
    """
    # Leggi il file
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # Inizializza l'estrattore
    extractor = EntityExtractor(
        model_id=model_id,
        entity_types=entity_types,
        entity_descriptions=entity_descriptions,
        chunk_size=chunk_size,
        use_cache=use_cache,
        parallelize=parallelize
    )

    # Estrai e salva
    stats = extractor.extract_and_save(text, output_formats)

    return stats

# Esempio di utilizzo
if __name__ == "__main__":
    # Caricamento testo da file
    file_path = "/content/test_ricostruito.txt"

    # Esempio di aggiunta di entità personalizzate
    custom_entities = [
        "B-LOC", "I-LOC",           # Luoghi
        "B-ORG", "I-ORG",           # Organizzazioni
        "B-PER", "I-PER",           # Persone
        "B-LAW", "I-LAW",           # Leggi e norme
        "B-ACT", "I-ACT",           # Atti amministrativi
        "B-OPA", "I-OPA",           # Oggetti procedura d'appalto
        "B-DATE", "I-DATE",         # Date
        "B-AMOUNT", "I-AMOUNT",     # Importi monetari
        "B-ROLE", "I-ROLE",         # Ruoli professionali
        "O"                         # Other
    ]

    custom_descriptions = {
        "LOC": "luoghi o posizioni geografiche rilevanti nei documenti amministrativi",
        "PER": "persone fisiche menzionate nei documenti, come sindaci, assessori o responsabili",
        "LAW": "riferimenti a leggi, decreti, normative o regolamenti citati nelle delibere",
        "ACT": "atti amministrativi, delibere, determine o provvedimenti",
        "ORG": "organizzazioni, enti, aziende, istituzioni o uffici pubblici",
        "OPA": "oggetti della procedura d'appalto, servizi, forniture o lavori",
        "DATE": "date di delibere, decreti, scadenze o periodi temporali",
        "AMOUNT": "importi, cifre monetarie, percentuali o valori numerici rilevanti",
        "ROLE": "ruoli professionali, cariche pubbliche e titoli di persone"
    }

    # Esegui estrazione usando l'interfaccia semplificata
    print("Inizio estrazione entità...")
    stats = extract_entities_from_file(
        file_path=file_path,
        output_formats=["iob", "json", "tsv"],
        entity_types=custom_entities,
        entity_descriptions=custom_descriptions,
        chunk_size=800,
        use_cache=True,
        parallelize=True
    )

    # Mostra statistiche
    print("\nStatistiche estrazione:")
    print(f"Totale entità trovate: {stats['total_entities']}")
    print("\nDistribuzione per tipo:")
    for entity_type, count in stats["entity_types"].items():
        print(f"- {entity_type}: {count}")

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Inizio estrazione entità...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

model-00003-of-00004.safetensors:  30%|###       | 1.48G/4.92G [00:00<?, ?B/s]

model-00001-of-00004.safetensors:  30%|###       | 1.51G/4.98G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:  30%|##9       | 1.49G/5.00G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/194 [00:00<?, ?B/s]

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Elaborazione chunk:   0%|          | 0/17 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128001 for


Statistiche estrazione:
Totale entità trovate: 685

Distribuzione per tipo:
- D.Lgs.: 86
- Presidente: 2
- ROLE: 28
- ORG: 202
- ACT: 8
- OPA: 79
- LOC: 25
- DATE: 13
- SINDACO: 1
- LAW: 105
- PER: 20
- data: 2
- AMOUNT: 31
- Via Bandone: 1
- TO: 1
- Regolamento di Contabilità: 1
- art.: 1
- s.r.l.: 40
- Cuorgné: 39
