In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install -q torchaudio transformers datasets soundfile
!apt-get install -y -qq ffmpeg
!pip install gtts
!pip install torch torchaudio transformers librosa soundfile
!apt-get install -y ffmpeg
!pip install sacremoses
!pip install pythainlp


Creazione del file di creazione del testset Per Tipolofìgia di errori inseriti

In [None]:
import pandas as pd
from datasets import load_from_disk

# === CONFIG ===
DATASET_DIR = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/modello_finetunato_th_errori"
CSV_ORIGINALE = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/merged_dataset.csv"
OUTPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset.csv"

# --- Carica dataset preprocessato ---
dataset = load_from_disk(DATASET_DIR)
test_dataset = dataset["test"]

# --- Carica CSV originale ---
df_orig = pd.read_csv(CSV_ORIGINALE)

# --- Ricrea audio_path per il testset ---
# HuggingFace non salva più gli indici originali, quindi selezioniamo il test split usando la stessa logica di train_test_split
# Supponiamo che tu abbia usato train_test_split con seed=42 e le stesse percentuali.
# Qui prendiamo semplicemente le ultime righe del CSV (come esempio)
# (devi adattare in base a come hai splittato)
num_test = len(test_dataset)
df_test_csv = df_orig.iloc[-num_test:][["audio_path", "trascrizione_originale", "trascrizione_errata", "dettagli_modifiche"]].reset_index(drop=True)

# --- Converto test_dataset in DataFrame ---
df_test_hf = test_dataset.to_pandas()

# --- Allinea le colonne ---
df_finale = pd.concat([df_test_hf.reset_index(drop=True), df_test_csv], axis=1)

# --- Rinomina ---
df_finale = df_finale.rename(columns={"trascrizione_originale": "trascrizione_corretta"})

# --- Mantieni solo le colonne desiderate ---
df_finale = df_finale[["input_values", "labels", "trascrizione_corretta", "trascrizione_errata", "dettagli_modifiche", "audio_path"]]

# --- Salva CSV ---
df_finale.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
print(f"✅ File di test salvato in: {OUTPUT_CSV}")
print(df_finale.head())


In [None]:
import pandas as pd
import difflib
import unicodedata

# --- CONFIG PATH ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset.csv"
OUTPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset_updated_from_pairs.csv"

# --- MAPPINGS (ricavati dal tuo codice di injection) ---
# tonal marks come li avevi:
tones = ['่', '้', '๊', '๋']

# costruisco i set di vocali e consonanti dalle mappe che mi hai mostrato
pronunciation_confusions = {
    'light': {
        'consonants': {
            'ด': ['ต'], 'ต': ['ด'],
            'ร': ['ล'], 'ล': ['ร'],
            'บ': ['ป'], 'ป': ['บ'],
            'น': ['ม'], 'ม': ['น'],
        },
        'vowels': {
            'า': ['ั'], 'ั': ['า'],
            'เ': ['แ'], 'แ': ['เ'],
            'ะ': ['า'], 'า': ['ะ'],
        },
    },
    'heavy': {
        'consonants': {
            'บ': ['พ'], 'พ': ['บ'],
            'ช': ['ซ'], 'ซ': ['ช'],
            'ง': ['น'], 'น': ['ง'],
            'ก': ['ข', 'ค'], 'ข': ['ก'], 'ค': ['ก'],
            'ญ': ['ย'], 'ย': ['ญ'],
        },
        'vowels': {
            'ิ': ['ี'], 'ี': ['ิ'],
            'ุ': ['ู'], 'ู': ['ุ'],
            'อ': ['โ'], 'โ': ['อ'],
        },
    },
    'tones': tones
}

# costruzione set vowel e consonant includendo chiavi e valori
vowel_set = set()
for mode in ['light', 'heavy']:
    vowel_dict = pronunciation_confusions[mode].get('vowels', {})
    for k,vlist in vowel_dict.items():
        vowel_set.add(k)
        for v in vlist:
            vowel_set.add(v)
# aggiungo vocali thai comuni che possono comparire
vowel_set.update(list("าเแะิีุูโ็ํ"))  # espandibile se necessario

consonant_set = set()
for mode in ['light', 'heavy']:
    cons_dict = pronunciation_confusions[mode].get('consonants', {})
    for k,vlist in cons_dict.items():
        consonant_set.add(k)
        for v in vlist:
            consonant_set.add(v)
# aggiungo consonanti thai comuni (espandibile)
consonant_set.update(list("กขคฆงจชซฌญฎฏฐฑฒณดตถทธนบปผฝพฟภมยรฤลวสหฬฮ"))

tone_set = set(tones)

# funzione di utilità: normalizza le stringhe (NFC) per confronti consistenti
def normalize_text(s):
    if pd.isna(s):
        return ""
    return unicodedata.normalize('NFC', str(s))

# funzione che classifica un singolo cambiamento (char o sequence) in T/V/C (o None)
def classify_char_change(char):
    if char == "":
        return None
    # char potrebbe essere più di un codice (es. segno di tono)
    # controlliamo se contiene almeno un carattere di tono
    if any(c in tone_set for c in char):
        return 'TONO'
    # se tutti i caratteri appartengono al vowel_set (o contiene almeno uno), consideriamo VOCALE
    if any(c in vowel_set for c in char):
        return 'VOCALE'
    # se contiene consonanti
    if any(c in consonant_set for c in char):
        return 'CONSONANTE'
    # fallback: None (non classificabile)
    return None

# funzione principale: prende due stringhe e ritorna set di tipi trovati
def classify_changes_between_strings(ref, hyp):
    """
    ref: trascrizione_corretta (stringa)
    hyp: trascrizione_errata (stringa)
    restituisce: set di tipi tra {'TONO','VOCALE','CONSONANTE'} (vuoto se nessuna)
    """
    ref = normalize_text(ref)
    hyp = normalize_text(hyp)
    sm = difflib.SequenceMatcher(a=ref, b=hyp, autojunk=False)
    types_found = set()

    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        # tag in {'replace', 'delete', 'insert', 'equal'}
        if tag == 'equal':
            continue
        # segmenti coinvolti
        removed = ref[i1:i2]   # parte rimossa o sostituita
        inserted = hyp[j1:j2]  # parte inserita o sostituita

        # classify removed (deletion or from substitution)
        t_removed = classify_char_change(removed)
        t_inserted = classify_char_change(inserted)

        if t_removed:
            types_found.add(t_removed)
        if t_inserted:
            types_found.add(t_inserted)

        # corner case: substitution where removed empty and inserted empty handled
        # also handle case where removed/inserted are multiple char sequences:
        # try to analyze per-char if whole span not classified
        if not t_removed and removed:
            for ch in removed:
                tch = classify_char_change(ch)
                if tch:
                    types_found.add(tch)
        if not t_inserted and inserted:
            for ch in inserted:
                tch = classify_char_change(ch)
                if tch:
                    types_found.add(tch)

    return types_found

# --- Caricamento CSV e applicazione ---
df = pd.read_csv(INPUT_CSV, dtype=str)  # carichiamo come stringhe per sicurezza

# Assicuriamoci che le colonne esistano
required_cols = ['trascrizione_corretta', 'trascrizione_errata']
for c in required_cols:
    if c not in df.columns:
        raise ValueError(f"Colonna mancante nel CSV: {c}")

new_types = []
for idx, row in df.iterrows():
    ref = row['trascrizione_corretta']
    hyp = row['trascrizione_errata']
    types = classify_changes_between_strings(ref, hyp)
    if len(types) == 0:
        new_types.append('NESSUNA')
    else:
        # ordino per consistenza e concateno con ;
        new_types.append(';'.join(sorted(types)))

df['dettagli_modifiche'] = new_types

# Salvo il CSV aggiornato
df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
print("Salvato:", OUTPUT_CSV)
print(df[['trascrizione_corretta','trascrizione_errata','dettagli_modifiche']].head(12))


In [None]:
 import pandas as pd
import difflib
import unicodedata

# --- CONFIG ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset.csv"
OUTPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset_with_counts.csv"

# --- Set completi Thai ---
thai_vowels = set('ะัาำิีึืฺุูเแโใไฤฦๅ')  # Vowel signs
thai_consonants = set('กขฃคฅฆงจฉชซฌญฎฏฐฑฒณดตถทธนบปผพฟภมยรฤลฦวศษสหฬอฮ')
thai_tones = set('่้๊๋')  # Tone marks

# --- Funzioni di supporto ---
def classify_char(char):
    if char in thai_tones:
        return 'TONO'
    elif char in thai_vowels:
        return 'VOCALE'
    elif char in thai_consonants:
        return 'CONSONANTE'
    return None

def normalize_text(s):
    if pd.isna(s):
        return ""
    return unicodedata.normalize('NFC', str(s)).replace(" ", "")

def count_changes_between_strings(ref, hyp):
    ref = normalize_text(ref)
    hyp = normalize_text(hyp)
    sm = difflib.SequenceMatcher(a=ref, b=hyp, autojunk=False)
    counts = {'TONO': 0, 'VOCALE': 0, 'CONSONANTE': 0}

    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        removed = ref[i1:i2]
        inserted = hyp[j1:j2]

        if tag == 'equal':
            continue
        elif tag == 'replace':
            # Conta tutti i caratteri sostituiti una sola volta
            # allineando quelli corrispondenti
            for r, c in zip(removed, inserted):
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1
            # Se ci sono caratteri extra in removed o inserted, contali
            for r in removed[len(inserted):]:
                t_r = classify_char(r)
                if t_r: counts[t_r] += 1
            for c in inserted[len(removed):]:
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1
        else:  # 'insert' o 'delete'
            for r in removed:
                t_r = classify_char(r)
                if t_r: counts[t_r] += 1
            for c in inserted:
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1

    return counts

# --- Carica CSV ---
df = pd.read_csv(INPUT_CSV, dtype=str)

# --- Controllo colonne necessarie ---
required_cols = ['trascrizione_corretta', 'trascrizione_errata']
for c in required_cols:
    if c not in df.columns:
        raise ValueError(f"Colonna mancante: {c}")

# --- Calcolo tipi concatenati e conteggi ---
concat_types = []
tono_counts = []
vocale_counts = []
consonante_counts = []

for idx, row in df.iterrows():
    counts = count_changes_between_strings(row['trascrizione_corretta'], row['trascrizione_errata'])
    tipo_riga = [k for k, v in counts.items() if v > 0]
    concat_types.append(';'.join(tipo_riga) if tipo_riga else 'NESSUNA')
    tono_counts.append(counts['TONO'])
    vocale_counts.append(counts['VOCALE'])
    consonante_counts.append(counts['CONSONANTE'])

df['dettagli_modifiche'] = concat_types
df['TONO_count'] = tono_counts
df['VOCALE_count'] = vocale_counts
df['CONSONANTE_count'] = consonante_counts

# --- Salva CSV aggiornato ---
df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
print("✅ Salvato:", OUTPUT_CSV)


In [None]:
import pandas as pd

# --- CONFIG ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset_with_counts.csv"
OUTPUT_DIR = "/content/drive/MyDrive/TesiMaggistrale/TestSuTipologia/"

# --- Carica CSV aggiornato ---
df = pd.read_csv(INPUT_CSV, dtype=str)

# --- Filtra per singolo tipo di errore ---
def filter_single_type(df, tipo):
    """
    Ritorna solo le righe dove dettagli_modifiche contiene il tipo specificato
    e non contiene altri tipi.
    """
    return df[df['dettagli_modifiche'] == tipo]

# TONO singolo
df_tono = filter_single_type(df, 'TONO').head(10)
df_tono.to_csv(f"{OUTPUT_DIR}testset_tono.csv", index=False, encoding='utf-8')

# VOCALE singolo
df_vocale = filter_single_type(df, 'VOCALE').head(10)
df_vocale.to_csv(f"{OUTPUT_DIR}testset_vocale.csv", index=False, encoding='utf-8')

# CONSONANTE singolo
df_consonante = filter_single_type(df, 'CONSONANTE').head(10)
df_consonante.to_csv(f"{OUTPUT_DIR}testset_consonante.csv", index=False, encoding='utf-8')

print("✅ Generati testset separati per TONO, VOCALE e CONSONANTE.")
print(f"- {len(df_tono)} frasi TONO")
print(f"- {len(df_vocale)} frasi VOCALE")
print(f"- {len(df_consonante)} frasi CONSONANTE")


Creazione testset per gli espermenti sulla **quantità di errori** introdotto

In [None]:
import pandas as pd
import difflib
import unicodedata

# --- CONFIG ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset.csv"
OUTPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset_with_counts.csv"

# --- Set completi Thai ---
thai_vowels = set('ะัาำิีึืฺุูเแโใไฤฦๅ')  # Vowel signs
thai_consonants = set('กขฃคฅฆงจฉชซฌญฎฏฐฑฒณดตถทธนบปผพฟภมยรฤลฦวศษสหฬอฮ')
thai_tones = set('่้๊๋')  # Tone marks

# --- Funzioni di supporto ---
def classify_char(char):
    if char in thai_tones:
        return 'TONO'
    elif char in thai_vowels:
        return 'VOCALE'
    elif char in thai_consonants:
        return 'CONSONANTE'
    return None

def normalize_text(s):
    if pd.isna(s):
        return ""
    return unicodedata.normalize('NFC', str(s)).replace(" ", "")

def count_changes_between_strings(ref, hyp):
    ref = normalize_text(ref)
    hyp = normalize_text(hyp)
    sm = difflib.SequenceMatcher(a=ref, b=hyp, autojunk=False)
    counts = {'TONO': 0, 'VOCALE': 0, 'CONSONANTE': 0}

    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        removed = ref[i1:i2]
        inserted = hyp[j1:j2]

        if tag == 'equal':
            continue
        elif tag == 'replace':
            for r, c in zip(removed, inserted):
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1
            for r in removed[len(inserted):]:
                t_r = classify_char(r)
                if t_r: counts[t_r] += 1
            for c in inserted[len(removed):]:
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1
        else:  # 'insert' o 'delete'
            for r in removed:
                t_r = classify_char(r)
                if t_r: counts[t_r] += 1
            for c in inserted:
                t_c = classify_char(c)
                if t_c: counts[t_c] += 1

    return counts

# --- Carica CSV ---
df = pd.read_csv(INPUT_CSV, dtype=str)

# --- Controllo colonne necessarie ---
required_cols = ['trascrizione_corretta', 'trascrizione_errata']
for c in required_cols:
    if c not in df.columns:
        raise ValueError(f"Colonna mancante: {c}")

# --- Calcolo tipi concatenati e conteggi ---
concat_types = []
tono_counts = []
vocale_counts = []
consonante_counts = []

for idx, row in df.iterrows():
    counts = count_changes_between_strings(row['trascrizione_corretta'], row['trascrizione_errata'])
    tipo_riga = [k for k, v in counts.items() if v > 0]
    concat_types.append(';'.join(tipo_riga) if tipo_riga else 'NESSUNA')
    tono_counts.append(counts['TONO'])
    vocale_counts.append(counts['VOCALE'])
    consonante_counts.append(counts['CONSONANTE'])

df['dettagli_modifiche'] = concat_types
df['TONO_count'] = tono_counts
df['VOCALE_count'] = vocale_counts
df['CONSONANTE_count'] = consonante_counts

# --- Aggiungi colonna totale errori ---
df['TOTALE_errori'] = df['TONO_count'] + df['VOCALE_count'] + df['CONSONANTE_count']

# --- Salva CSV aggiornato ---
df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
print("✅ Salvato con colonna TOTALE_errori:", OUTPUT_CSV)


In [None]:
import pandas as pd

# --- CONFIG ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/testeset_with_counts.csv"
OUTPUT_DIR = "/content/drive/MyDrive/TesiMaggistrale/TestSuTotaleErrori/"

# --- Carica CSV aggiornato ---
df = pd.read_csv(INPUT_CSV, dtype=str)

# Converti TOTALE_errori in intero per poter filtrare numericamente
df['TOTALE_errori'] = df['TOTALE_errori'].astype(int)

# --- Funzione per selezionare le prime 10 righe con un certo numero di errori ---
def select_n_rows_by_total_errors(df, total_errors, n=10):
    filtered = df[df['TOTALE_errori'] == total_errors].head(n)
    return filtered

# --- Genera file CSV per 1,2,3,4,5 errori totali ---
for i in range(1, 6):
    df_subset = select_n_rows_by_total_errors(df, i, n=10)
    df_subset.to_csv(f"{OUTPUT_DIR}testset_{i}_errori.csv", index=False, encoding='utf-8')
    print(f"✅ {len(df_subset)} righe con {i} errore totale salvate in testset_{i}_errori.csv")


Genera testset per 8,9 e 10 errori


In [None]:
import random
import nltk
from pythainlp.corpus.common import thai_words
from pythainlp.corpus.wordnet import synsets

# Assicura la presenza di WordNet
try:
    nltk.data.find('corpora/omw-1.4')
except LookupError:
    nltk.download('omw-1.4')

# Dizionario confusione esteso
pronunciation_confusions = {
    'light': {
        'consonants': {
            'ด': ['ต'], 'ต': ['ด'],
            'ร': ['ล'], 'ล': ['ร'],
            'บ': ['ป'], 'ป': ['บ'],
            'น': ['ม'], 'ม': ['น'],
        },
        'vowels': {
            'า': ['ั'], 'ั': ['า'],
            'เ': ['แ'], 'แ': ['เ'],
            'ะ': ['า'], 'า': ['ะ'],
        },
    },
    'heavy': {
        'consonants': {
            'บ': ['พ'], 'พ': ['บ'],
            'ช': ['ซ'], 'ซ': ['ช'],
            'ง': ['น'], 'น': ['ง'],
            'ก': ['ข', 'ค'], 'ข': ['ก'], 'ค': ['ก'],
            'ญ': ['ย'], 'ย': ['ญ'],
        },
        'vowels': {
            'ิ': ['ี'], 'ี': ['ิ'],
            'ุ': ['ู'], 'ู': ['ุ'],
            'อ': ['โ'], 'โ': ['อ'],
        },
    },
    'tones': ['่', '้', '๊', '๋']
}

# combinazioni per fallback forzato
combined_consonants = {}
combined_vowels = {}
for lvl in ['light', 'heavy']:
    for k, v in pronunciation_confusions[lvl]['consonants'].items():
        combined_consonants.setdefault(k, set()).update(v)
    for k, v in pronunciation_confusions[lvl]['vowels'].items():
        combined_vowels.setdefault(k, set()).update(v)
combined_consonants = {k: list(v) for k, v in combined_consonants.items()}
combined_vowels = {k: list(v) for k, v in combined_vowels.items()}

thai_vocab = set(thai_words())
tone_possible_chars = set("กขคฆงจชซฌญฎฏฐฑฒณดตถทธนบปผฝพฟภมยรฤลวสหฬอฮ")


def is_valid_lexical(word):
    return word in thai_vocab


def is_valid_semantic(word):
    return len(synsets(word)) > 0


def generate_pronunciation_variants(word, severity='light', max_steps=3):
    consonants = pronunciation_confusions.get(severity, {}).get('consonants', {})
    vowels = pronunciation_confusions.get(severity, {}).get('vowels', {})
    tones = pronunciation_confusions['tones']

    variants = set()

    def recursive_modify(current_word, steps_left, changes):
        if steps_left == 0:
            return

        for i, char in enumerate(current_word):
            new_variants = []

            # Sostituzione consonante
            if char in consonants:
                for rep in consonants[char]:
                    mod_word = current_word[:i] + rep + current_word[i+1:]
                    new_variants.append((mod_word, changes + [(char, rep, 'suono_consonante')]))

            # Sostituzione vocale
            if char in vowels:
                for rep in vowels[char]:
                    mod_word = current_word[:i] + rep + current_word[i+1:]
                    new_variants.append((mod_word, changes + [(char, rep, 'suono_vocale')]))

            # Inserimento di tono (dopo il carattere)
            if char in tone_possible_chars:
                for tone in tones:
                    mod_word = current_word[:i+1] + tone + current_word[i+1:]
                    new_variants.append((mod_word, changes + [('', tone, 'aggiunta_tono')]))

            # Rimozione/sostituzione di vocale/tono
            if char in list(vowels.keys()):
                alt_chars = [v for v in vowels[char] if v != char]
                for alt in alt_chars:
                    mod_word = current_word[:i] + alt + current_word[i+1:]
                    new_variants.append((mod_word, changes + [(char, alt, 'sostituzione_vocale')]))
            if char in tones:
                alt_chars = [t for t in tones if t != char]
                for alt in alt_chars:
                    mod_word = current_word[:i] + alt + current_word[i+1:]
                    new_variants.append((mod_word, changes + [(char, alt, 'sostituzione_tono')]))

            for variant, var_changes in new_variants:
                if (variant, tuple(var_changes)) not in variants:
                    variants.add((variant, tuple(var_changes)))
                    recursive_modify(variant, steps_left - 1, var_changes)

    recursive_modify(word, max_steps, [])
    return list(variants)


def force_modify_word(word):
    if not word:
        return word, []

    i = random.randrange(len(word))
    char = word[i]

    if char in combined_consonants and random.random() < 0.6:
        rep = random.choice(combined_consonants[char])
        new_word = word[:i] + rep + word[i+1:]
        return new_word, [(char, rep, 'forzata_sostituzione_consonante')]

    if char in combined_vowels and random.random() < 0.6:
        rep = random.choice(combined_vowels[char])
        new_word = word[:i] + rep + word[i+1:]
        return new_word, [(char, rep, 'forzata_sostituzione_vocale')]

    if char in tone_possible_chars:
        tone = random.choice(pronunciation_confusions['tones'])
        new_word = word[:i+1] + tone + word[i+1:]
        return new_word, [('', tone, 'forzata_aggiunta_tono')]

    all_replacements = []
    for lst in combined_consonants.values():
        all_replacements.extend(lst)
    for lst in combined_vowels.values():
        all_replacements.extend(lst)
    if all_replacements:
        rep = random.choice(all_replacements)
        new_word = word[:i] + rep + word[i+1:]
        return new_word, [(char, rep, 'forzata_sostituzione_generica')]

    return word, []


def inject_pronunciation_error(word, severity='light', max_steps=3, require_valid=True, allow_force=False):
    variants = generate_pronunciation_variants(word, severity=severity, max_steps=max_steps)

    if require_valid:
        valid_variants = [(w, list(changes)) for (w, changes) in variants
                          if (is_valid_lexical(w) or is_valid_semantic(w)) and w != word]
    else:
        valid_variants = [(w, list(changes)) for (w, changes) in variants if w != word]

    if valid_variants:
        chosen_word, changes = random.choice(valid_variants)
        return chosen_word, changes, True

    if allow_force:
        forced_word, forced_changes = force_modify_word(word)
        if forced_word != word and forced_changes:
            return forced_word, forced_changes, True

    return word, [], False


def inject_exact_num_errors(sentence, reference_sentence, target_errors=8,
                            severity='light', require_valid=True, allow_force=True, max_per_word=None):
    words = sentence.split()
    new_words = words[:]
    reference_words = reference_sentence.split()

    errors_injected = 0
    change_log = []
    per_word_count = [0] * len(new_words)

    stages = [
        {'require_valid': require_valid, 'allow_force': False},
        {'require_valid': False, 'allow_force': False},
        {'require_valid': False, 'allow_force': allow_force},
    ]

    for stage in stages:
        req = stage['require_valid']
        af = stage['allow_force']

        if errors_injected >= target_errors:
            break

        no_progress_rounds = 0
        while errors_injected < target_errors and no_progress_rounds < 3:
            indices = list(range(len(new_words)))
            random.shuffle(indices)
            progress_this_round = False

            for i in indices:
                if errors_injected >= target_errors:
                    break

                if max_per_word is not None and per_word_count[i] >= max_per_word:
                    continue

                original_word = new_words[i]
                chosen_word, changes, applied = inject_pronunciation_error(
                    original_word, severity=severity, max_steps=1, require_valid=req, allow_force=af
                )

                if applied and chosen_word != original_word and len(changes) > 0:
                    change_log.append((i, original_word, chosen_word, changes))
                    new_words[i] = chosen_word
                    per_word_count[i] += len(changes)
                    errors_injected += len(changes)
                    progress_this_round = True

                    if errors_injected >= target_errors:
                        break

            if not progress_this_round:
                no_progress_rounds += 1
            else:
                no_progress_rounds = 0

    if errors_injected > target_errors:
        while errors_injected > target_errors and change_log:
            idx, orig, neww, changes = change_log.pop()
            new_words[idx] = orig
            errors_injected -= len(changes)

    success = (errors_injected == target_errors)
    final_sentence = ' '.join(new_words[:len(reference_words)])

    full_changes = []
    for idx, orig, new_w, changes in change_log:
        full_changes.append({
            'index': idx,
            'original': orig,
            'modified': new_w,
            'changes': changes
        })

    return final_sentence, full_changes, errors_injected, success


if __name__ == "__main__":
    original_sentence = "ฉัน รัก ภาษาไทย มาก มาก จริง ๆ วันนี้ สนุก"
    reference_sentence = original_sentence

    # scegli a caso 8, 9 o 10
    target = random.choice([8, 9, 10])

    corrupted, error_log, injected_count, success = inject_exact_num_errors(
        original_sentence,
        reference_sentence,
        target_errors=target,
        severity='light',
        require_valid=True,
        allow_force=True,
        max_per_word=1   # così una parola non viene modificata troppe volte
    )

    print("\n" + "="*60)
    print(f"Target scelto casualmente: {target}")
    print("Originale: ", original_sentence)
    print("Corrotto:  ", corrupted)
    print(f"Errori iniettati: {injected_count}  Successo esatto?: {success}")
    print("--- Log dettagliato ---")
    if not error_log:
        print("Nessuna modifica effettuata.")
    else:
        for idx, entry in enumerate(error_log, 1):
            i = entry['index']
            print(f"{idx}. index parola={i} '{entry['original']}' -> '{entry['modified']}'")
            for old, new_c, err_type in entry['changes']:
                if old == '':
                    print(f"    + Aggiunto tono: '{new_c}' ({err_type})")
                elif new_c == '':
                    print(f"    - Rimosso '{old}' ({err_type})")
                else:
                    print(f"    x {err_type.upper()} - '{old}' → '{new_c}'")


In [None]:
import os
import csv
import random
import torchaudio
import torchaudio.transforms as T
from transformers import pipeline
from pythaitts import TTS

# Percorsi
AUDIO_DIR = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/U/Office/Wav/UOM046_Pa046"
OUTPUT_DIR = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/Corpus_Injection_error_thai/"
CSV_PATH = os.path.join(OUTPUT_DIR, "testseterrori.csv")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Modelli
sr_pipe = pipeline("automatic-speech-recognition", model="airesearch/wav2vec2-large-xlsr-53-th")
tts_model = TTS(pretrained="khanomtan", version="1.0", mode="best_model")

# Funzione per preparare audio per ASR
def prepara_audio(path):
    waveform, sr = torchaudio.load(path)
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    if sr != 16000:
        waveform = T.Resample(sr, 16000)(waveform)
    return waveform.squeeze().numpy()

# Inizializza il CSV con intestazioni se non esiste
if not os.path.exists(CSV_PATH):
    with open(CSV_PATH, mode="w", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            "filename",
            "trascrizione_originale",
            "trascrizione_errata",
            "Totale_errori",
            "modificata",
            "dettagli_modifiche",
            "audio_path"
        ])

# Trova file audio
audio_files = []
for root, _, files in os.walk(AUDIO_DIR):
    for file in files:
        if file.endswith(".wav"):
            audio_files.append(os.path.join(root, file))

print(f"Trovati {len(audio_files)} file audio.")

# Loop sui file audio
for audio_path in audio_files:
    try:
        base_name = os.path.splitext(os.path.basename(audio_path))[0]

        # Step 1: ASR
        audio_data = prepara_audio(audio_path)
        trascrizione = sr_pipe(audio_data)["text"].strip()
        reference_sentence = trascrizione

        # Step 2: Iniezione errori (assicura 8, 9 o 10)
        target_errors = random.choice([8, 9, 10])
        max_attempts = 10  # evita loop infinito
        attempt = 0
        success = False

        while attempt < max_attempts and not success:
            frase_errata, log_error, injected_count, success_flag = inject_exact_num_errors(
                trascrizione,
                reference_sentence,
                target_errors=target_errors,
                severity='light',
                require_valid=True,
                allow_force=True,
                max_per_word=1
            )
            if injected_count == target_errors and success_flag:
                success = True
            else:
                attempt += 1

        if not success:
            print(f"⚠️ Attenzione: impossibile iniettare esattamente {target_errors} errori in {base_name}. Si procede comunque.")

        # Step 3: TTS
        audio_out_path = os.path.join(OUTPUT_DIR, f"{base_name}_corrupted.wav")
        tts_model.tts(
            text=frase_errata,
            speaker_idx="Tsyncone",
            language_idx="th-th",
            return_type="file",
            filename=audio_out_path
        )

        # Step 4: Log dettagliato modifiche
        if not log_error:
            dettagli = "Nessuna modifica"
        else:
            dettaglio_lista = []
            for entry in log_error:
                original = entry['original']
                modified = entry['modified']
                for old, new_c, tipo in entry['changes']:
                    if old == '':
                        dettaglio_lista.append(f"Aggiunto tono '{new_c}' in '{modified}'")
                    elif new_c == '':
                        dettaglio_lista.append(f"Rimosso '{old}' da '{original}'")
                    else:
                        dettaglio_lista.append(f"{tipo.upper()} - '{original}': '{old}'→'{new_c}'")
            dettagli = " | ".join(dettaglio_lista)

        # Step 5: Salva nel CSV (7 colonne, ordine coerente)
        with open(CSV_PATH, mode="a", encoding="utf-8", newline="") as f:
            writer = csv.writer(f)
            writer.writerow([
                base_name,
                trascrizione,
                frase_errata,
                injected_count,             # Totale_errori PRIMA
                "SI" if success else "NO",  # modificata
                dettagli,
                audio_out_path
            ])

        print(f"✅ Salvato: {base_name} (target errori={target_errors}, effettivi={injected_count})")

    except Exception as e:
        print(f"❌ Errore su {audio_path}: {str(e)}")

print("✅ Creazione database completata.")


In [None]:
import pandas as pd
import os
import torchaudio
from transformers import AutoProcessor

# === CONFIG ===
CSV_PATH = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/Corpus_Injection_error_thai/testseterrori.csv"
OUTPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/Corpus_Injection_error_thai/testseterrori_preprocessed.csv"
SAMPLING_RATE = 16000
PRETRAINED_MODEL_NAME = "airesearch/wav2vec2-large-xlsr-53-th"

# === STEP 1: Carica CSV (tutte le colonne) ===
df = pd.read_csv(CSV_PATH)

# Teniamo solo le righe con trascrizione_errata valida e file audio esistenti
df = df.dropna(subset=["trascrizione_errata"])
df = df[df["audio_path"].apply(os.path.exists)].reset_index(drop=True)

# === STEP 2: Carica processor ===
processor = AutoProcessor.from_pretrained(PRETRAINED_MODEL_NAME)

# === STEP 3: Funzione per estrarre input_values e labels ===
def extract_features(row):
    try:
        waveform, sr = torchaudio.load(row["audio_path"])

        # Resample se necessario
        if sr != SAMPLING_RATE:
            resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=SAMPLING_RATE)
            waveform = resampler(waveform)

        # Input values per il modello
        input_values = processor(
            waveform.squeeze().numpy(),
            sampling_rate=SAMPLING_RATE
        ).input_values[0]

        # Labels (trascrizione tokenizzata)
        with processor.as_target_processor():
            labels = processor(row["trascrizione_errata"]).input_ids

        return pd.Series({
            "input_values": input_values,
            "labels": labels
        })

    except Exception as e:
        print(f"Errore su file {row['audio_path']}: {e}")
        return pd.Series({"input_values": None, "labels": None})

# === STEP 4: Applica la funzione al DataFrame ===
features = df.apply(extract_features, axis=1)
df = pd.concat([features, df], axis=1)  # input_values e labels in testa

# === STEP 5: Riordina le colonne ===
ordered_cols = [
    "input_values",
    "labels",
    "trascrizione_originale",
    "trascrizione_errata",
    "dettagli_modifiche",
    "audio_path",
    "Totale_errori"
]
df = df[ordered_cols]

# === STEP 6: Salva il nuovo CSV ===
df.to_csv(OUTPUT_CSV, index=False)

print("CSV preprocessato salvato in:", OUTPUT_CSV)
print("Righe valide:", len(df))


In [None]:
import pandas as pd

# --- CONFIG ---
INPUT_CSV = "/content/drive/MyDrive/TesiMaggistrale/audiErrati/Corpus_Injection_error_thai/testseterrori.csv"
OUTPUT_DIR = "/content/drive/MyDrive/TesiMaggistrale/TestSuTotaleErrori/"

# --- Carica CSV ---
df = pd.read_csv(INPUT_CSV, dtype=str)

# --- Normalizza il nome della colonna ---
if "Totale_errori" in df.columns:
    df.rename(columns={"Totale_errori": "TOTALE_errori"}, inplace=True)

# Converti in int per filtrare numericamente
df["TOTALE_errori"] = df["TOTALE_errori"].astype(int)

# --- Funzione per selezionare le prime n righe con un certo numero di errori ---
def select_n_rows_by_total_errors(df, total_errors, n=10):
    return df[df["TOTALE_errori"] == total_errors].head(n)

# --- Genera file CSV per un certo intervallo di errori ---
for i in range(8, 11):  # 8, 9, 10 errori
    df_subset = select_n_rows_by_total_errors(df, i, n=10)
    df_subset.to_csv(f"{OUTPUT_DIR}testset_{i}_errori.csv", index=False, encoding="utf-8")
    print(f"✅ {len(df_subset)} righe con {i} errori totali salvate in testset_{i}_errori.csv")
