DENSE RETRIEVAL - DA TESTO A EMBEDDINGS - CALCOLO FAISS

whisper_env

In [1]:
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import spacy
import json
import faiss

In [None]:
model = SentenceTransformer('intfloat/multilingual-e5-large')

In [3]:
# Carica il modello linguistico italiano di spaCy
nlp = spacy.load("it_core_news_lg")

def chunk_text(text, max_tokens=300, min_tokens=20):
    """
    Suddivide un testo in chunk (blocchi) di dimensioni specificate, rispettando i confini delle frasi.
    
    Args:
        text (str): Il testo da suddividere
        max_tokens (int, optional): Numero massimo di token per chunk. Default: 300
        min_tokens (int, optional): Numero minimo di token per chunk. Default: 20
    
    Returns:
        list: Una lista di chunk di testo
    """
    doc = nlp(text)
    
    chunks = []
    current_chunk = []
    current_len = 0

    for sent in doc.sents:
        token_len = len(sent.text.split())
        
        if current_len + token_len <= max_tokens:
            current_chunk.append(sent.text)
            current_len += token_len
        else:
            # Solo aggiungi il chunk se ha almeno min_tokens
            chunk_text_str = " ".join(current_chunk)
            if len(chunk_text_str.split()) >= min_tokens:
                chunks.append(chunk_text_str)
            current_chunk = [sent.text]
            current_len = token_len

    # Aggiungi l'ultimo chunk se è sufficientemente lungo
    if current_chunk:
        chunk_text_str = " ".join(current_chunk)
        if len(chunk_text_str.split()) >= min_tokens:
            chunks.append(chunk_text_str)

    return chunks


In [4]:
# Importa SequenceMatcher per il confronto tra stringhe
from difflib import SequenceMatcher

def normalized(word):
    """
    Normalizza una parola rimuovendo punteggiatura e convertendo in minuscolo.
    
    Args:
        word (str): Parola da normalizzare
    
    Returns:
        str: Parola normalizzata
    """
    return word.strip(".,?!;:").lower()

def similarity(a, b):
    """
    Calcola la similarità tra due stringhe usando il rapporto di SequenceMatcher.
    
    Args:
        a (str): Prima stringa da confrontare
        b (str): Seconda stringa da confrontare
    
    Returns:
        float: Punteggio di similarità (0.0-1.0)
    """
    return SequenceMatcher(None, a, b).ratio()

def match_chunk_with_timestamps_fuzzy(chunk_text, word_list, prev, tolerance=0.50):
    """
    Trova il miglior match tra un chunk di testo e una lista di parole con timestamp,
    usando un approccio fuzzy (approssimato).
    
    Args:
        chunk_text (str): Testo da abbinare
        word_list (list): Lista di dizionari con parole e timestamp (formato: {"text": "...", "start": x, "end": y})
        prev (float): Timestamp precedente (per evitare sovrapposizioni)
        tolerance (float, optional): Soglia minima di similarità. Default: 0.50
    
    Returns:
        tuple: (start_time, end_time) oppure (None, None) se nessun match trovato
    """
    print("fuzzy")

    # Normalizza le parole del chunk
    chunk_words = [normalized(w) for w in chunk_text.split()]

    start_time = None
    
    # Gestione caso speciale: chunk vuoto
    if not chunk_words:
        return None, None

    # Normalizza le parole della trascrizione
    transcript_words = [normalized(w["text"]) for w in word_list]

    max_score = 0  # Memorizza il punteggio di similarità massimo trovato
    best_match = None  # Memorizza l'indice del miglior match

    # Scorri la trascrizione con una finestra mobile della stessa lunghezza del chunk
    for i in range(len(transcript_words) - len(chunk_words) + 1):
        # Estrai la finestra corrente
        window = transcript_words[i:i + len(chunk_words)]
        
        # Calcola il punteggio medio di similarità per questa finestra
        score = sum(similarity(w1, w2) for w1, w2 in zip(chunk_words, window)) / len(chunk_words)

        # Aggiorna il miglior match se trovato un punteggio più alto
        if score > max_score:
            max_score = score
            best_match = i

    # Se il punteggio massimo supera la tolleranza, restituisci i timestamp
    if max_score >= tolerance:
        start_time = word_list[best_match]["start"]
        end_time = word_list[best_match + len(chunk_words) - 1]["end"]
        
        return start_time, end_time 
    
    print("Chunk senza match:", chunk_words[:200])  # Prime 200 chars del chunk problematico
    print("Trascrizione corrispondente:", [w["text"] for w in word_list][:20])  # Prime 20 parole della trascrizione

    # Gestione dei casi particolari per start_time
    if start_time == None:
        start_time = prev
    elif start_time < prev:
        start_time = prev

    # Nessun match sufficientemente buono trovato
    return start_time, None

In [5]:
def match_chunk_exact(chunk_text, word_list, prev, lookahead=10):
    """
    Cerca il match esatto delle prime N parole del chunk nella trascrizione.
    
    Args:
        chunk_text (str): Testo del chunk da abbinare
        word_list (list): Lista di dizionari delle parole con timestamp
        prev (float): Ultimo timestamp valido conosciuto
        lookahead (int): Numero di parole da usare per il matching iniziale
    
    Returns:
        tuple: (start_time, end_time) o (None, None) se non trovato
    """
    # Prendi le prime 'lookahead' parole del chunk (normalizzate)
    chunk_words = [normalized(w) for w in chunk_text.split()[:lookahead] if w.strip()]
    start_time = None
    
    if not chunk_words:
        return None, None
    
    # Crea una lista di parole normalizzate dalla trascrizione
    transcript_words = [normalized(w["text"]) for w in word_list]
    
    # Cerca la posizione di inizio del match
    for i in range(len(transcript_words) - len(chunk_words) + 1):
        match = True
        for j in range(len(chunk_words)):
            if transcript_words[i+j] != chunk_words[j]:
                match = False
                break
        
        if match:
            start_idx = i
            end_idx = i + len(chunk_text.split()) - 1
            
            # Controllo sicurezza sugli indici
            if end_idx >= len(word_list):
                end_idx = len(word_list) - 1
                
            start_time = word_list[start_idx]["start"]
            end_time = word_list[end_idx]["end"]
            
            
                
            return start_time, end_time
    # Gestione casi particolari
    if start_time is None:
        start_time = prev
    elif start_time < prev:
        start_time = prev

    return start_time, None

In [6]:
def match_chunk_hybrid(chunk_text, word_list, prev):
    # Prima prova matching esatto sulle prime 3 parole
    exact_match = match_chunk_exact(chunk_text, word_list, prev)
    if exact_match != (None, None):
        return exact_match
    
    # Fallback al fuzzy solo se necessario
    return match_chunk_with_timestamps_fuzzy(chunk_text, word_list, prev, tolerance=0.7)

In [7]:
# Inizializzazione delle liste per memorizzare i chunk e i metadati associati
all_chunks = []      # Contiene il testo dei chunk
all_chunk_data = []  # Contiene dizionari con metadati e informazioni sui chunk
prev = 0             # Tiene traccia del timestamp finale dell'ultimo chunk processato

# Directory contenente i file di testo da elaborare
input_dir = "datasetOnlyTextUni\\onlyTextLessonsTurbo"

# Itera attraverso tutti i file nella directory di input
for filename in os.listdir(input_dir):
    print(filename)  # Stampa il nome del file corrente per tracciare l'avanzamento
    
    # Processa solo i file con estensione .txt
    if filename.endswith(".txt"):
        input_path = os.path.join(input_dir, filename)

        # Legge il contenuto del file di testo
        with open(input_path, "r", encoding="utf-8") as f:
            long_text = f.read()

        # Divide il testo lungo in chunk utilizzando la funzione chunk_text
        chunks = chunk_text(long_text)
        all_chunks.extend(chunks)  # Aggiunge i chunk alla lista globale
        
        # Carica il file JSON con i timestamp corrispondente al file di testo
        with open(f"datasetOnlyTextUni\\timestamp_json_turbo\\{filename.replace(".txt", ".json")}", 
                 "r", encoding="utf-8") as f:
            words = json.load(f)  # Carica i dati JSON contenenti parole e timestamp
        
        # Itera attraverso tutti i chunk creati dal file corrente
        for i, c in enumerate(chunks):
            print(f"{i} / {len(chunks)}")  # Stampa progresso elaborazione chunk
            
            # Trova i timestamp di inizio e fine per il chunk corrente
            start, end = match_chunk_hybrid(c, words, prev)
            
            # Aggiorna il timestamp precedente per il prossimo chunk
            if end != None:
                prev = end
            else:
                prev = 0  # Resetta se non è stato trovato un match
                
            # Estrae le entità nominate dal chunk usando spaCy
            entities = [{"text": ent.text, "label": ent.label_} for ent in nlp(c).ents]

            # Entity linking con relik
            #linked_entities = link_entities(c, chunk_id=i)

            # Aggiunge tutte le informazioni alla lista dei metadati
            all_chunk_data.append({
                "text": c,            # Testo del chunk
                "source": filename,    # Nome del file di origine
                "chunk_id": i,        # Indice del chunk nel file
                "start_time": start,   # Timestamp di inizio (se trovato)
                "end_time": end,      # Timestamp di fine (se trovato)
                "entities": entities  # Lista di entità nominate trovate
                #"kb_links": linked_entities  # Aggiungi questo campo
            })

# Genera gli embeddings per tutti i chunk usando il modello
embeddings = model.encode(all_chunks, normalize_embeddings=True)

economia_applicata_clean_01_Lez001.txt
0 / 12
1 / 12
2 / 12
3 / 12
4 / 12
5 / 12
6 / 12
7 / 12
8 / 12
9 / 12
10 / 12
11 / 12
economia_applicata_clean_02_Lez002.txt
0 / 17
1 / 17
2 / 17
3 / 17
4 / 17
5 / 17
6 / 17
7 / 17
8 / 17
9 / 17
10 / 17
11 / 17
12 / 17
13 / 17
14 / 17
15 / 17
16 / 17
economia_applicata_clean_03_Lez003.txt
0 / 9
1 / 9
2 / 9
3 / 9
4 / 9
5 / 9
6 / 9
7 / 9
8 / 9
economia_applicata_clean_04_Lez004.txt
0 / 10
1 / 10
2 / 10
3 / 10
4 / 10
5 / 10
6 / 10
7 / 10
8 / 10
9 / 10
economia_applicata_clean_05_Lez005.txt
0 / 13
1 / 13
2 / 13
3 / 13
4 / 13
5 / 13
6 / 13
7 / 13
8 / 13
9 / 13
10 / 13
11 / 13
12 / 13
economia_applicata_clean_06_Lez006.txt
0 / 11
1 / 11
2 / 11
3 / 11
4 / 11
5 / 11
6 / 11
7 / 11
8 / 11
9 / 11
10 / 11
economia_applicata_clean_07_Lez007.txt
0 / 16
1 / 16
2 / 16
3 / 16
4 / 16
5 / 16
6 / 16
7 / 16
8 / 16
9 / 16
10 / 16
11 / 16
12 / 16
13 / 16
14 / 16
15 / 16
economia_applicata_clean_08_Lez008.txt
0 / 16
1 / 16
2 / 16
3 / 16
4 / 16
5 / 16
6 / 16
7 / 16
8 / 16

In [8]:
#cercando di riempire timestamp vuoti

for i, c in enumerate(all_chunk_data):
    if c["start_time"] is None and c["end_time"] is None:
        # Usa il fine del chunk precedente come inizio, se disponibile
        if i > 0 and all_chunk_data[i - 1]["end_time"] is not None:
            c["start_time"] = all_chunk_data[i - 1]["end_time"]

        # Usa l'inizio del chunk successivo come fine, se disponibile
        if i < len(all_chunk_data) - 1 and all_chunk_data[i + 1]["start_time"] is not None:
            c["end_time"] = all_chunk_data[i + 1]["start_time"]


In [None]:
# Salva gli embeddings (come float32 per FAISS)
np.save("embeddings300.npy", embeddings.astype("float32"))

# Crea l'indice FAISS (cosine similarity: inner product su embeddings normalizzati)
index = faiss.IndexFlatIP(embeddings.shape[1])
index.add(embeddings)

# Salva l'indice
faiss.write_index(index, "faiss_index300.index")
print("✅ Indice FAISS salvato!")

# Salva i metadati
with open("chunks_metadata300.json", "w", encoding="utf-8") as f:
    json.dump(all_chunk_data, f, ensure_ascii=False, indent=2)

print("✅ Embeddings e metadati salvati!")

✅ Indice FAISS salvato!
✅ Embeddings e metadati salvati!
