In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import torch
import torchaudio

from tqdm import tqdm
from evaluate import load
import matplotlib.pyplot as plt 

from model.Speech2Text import Speech2Text
from model.SpeechGenerator import SpeechGenerator
from utils.Config import ConfigSLP, ConfigNAC, ConfigDiTTO
from utils.MLS import MLSDataset
from utils.Processing import Processing

from torch.utils.data import DataLoader

In [None]:
ConfigSLP.display()
ConfigNAC.display()
ConfigDiTTO.display()

In [None]:
# Processing.remove_metadata_from_audio_folder(ConfigSLP.TRAIN_PATH+"/"+"audio", ConfigSLP.TRAIN_PATH+"/"+"audio_clean",)
# Processing.remove_metadata_from_audio_folder(ConfigSLP.TEST_PATH+"/"+"audio", ConfigSLP.TEST_PATH+"/"+"audio_clean",)
# Processing.remove_metadata_from_audio_folder(ConfigSLP.DEV_PATH+"/"+"audio", ConfigSLP.DEV_PATH+"/"+"audio_clean",)

## Speech Generation with DiTTO-TTs and Vocoder

In [None]:
train_set = MLSDataset(
    data_dir=ConfigDiTTO.TRAIN_PATH,
    max_text_token_length=ConfigDiTTO.MAX_TOKEN_LENGTH,
    sampling_rate=ConfigDiTTO.SAMPLE_RATE,
    nb_samples=ConfigDiTTO.NB_SAMPLES,
    tokenizer_model="gpt2"
)

test_set = MLSDataset(
    data_dir=ConfigDiTTO.TEST_PATH,
    max_text_token_length=ConfigDiTTO.MAX_TOKEN_LENGTH,
    sampling_rate=ConfigDiTTO.SAMPLE_RATE,
    nb_samples=ConfigDiTTO.NB_SAMPLES,
    tokenizer_model="gpt2"
)

train_loader = DataLoader(train_set, batch_size=ConfigNAC.BATCH_SIZE, shuffle=True, collate_fn=MLSDataset.collate_fn)
test_loader = DataLoader(test_set, batch_size=ConfigNAC.BATCH_SIZE, shuffle=True, collate_fn=MLSDataset.collate_fn)

In [None]:
ConfigDiTTO.DIFFUSION_STEPS = 1000

# remove SLP in the Speech generator if you want to load it on sorbonne's PPTI (disk quota exceeded)
speech_generator = SpeechGenerator(
    nac_model_path="/tempory/M2-DAC/UE_DEEP/AMAL/DiTTO-TTS/src/params/NAC_epoch_20.pth",
    ditto_model_path="/tempory/M2-DAC/UE_DEEP/AMAL/DiTTO-TTS/src/params/DiTTO_epoch_20.pth",
    slp_path="/tempory/M2-DAC/UE_DEEP/AMAL/DiTTO-TTS/src/params/SLP_epoch_20.pth",
    lambda_factor=ConfigNAC.LAMBDA_FACTOR,
    sample_rate=ConfigNAC.SAMPLE_RATE,
    device=ConfigDiTTO.DEVICE
)

In [None]:
def test_with_loader(loader, prompt=None):
    ConfigDiTTO.DIFFUSION_STEPS = 1000

    batch = next(iter(loader))
    batch["audio"] = batch["audio"].to(ConfigDiTTO.DEVICE)
    batch["text"]["input_ids"] = batch["text"]["input_ids"].to(ConfigDiTTO.DEVICE)
    batch["text"]["attention_mask"] = batch["text"]["attention_mask"].to(ConfigDiTTO.DEVICE)

    is_tokenized =  prompt is None

    for audio_tensor, padding_mask_audio, text_input  in zip(batch["audio"], batch["padding_mask_audio"],  batch["text"]["input_ids"]):
        prompt = prompt if prompt is not None else text_input.unsqueeze(0)
        generated_waveform = speech_generator.generate_speech_from_audio_tensor(
            audio_tensor.to(ConfigDiTTO.DEVICE).unsqueeze(0), 
            padding_mask_audio.to(ConfigDiTTO.DEVICE).unsqueeze(0),
            prompt,
            is_tokenized=is_tokenized
        )
        output_path = "output.wav"
        torchaudio.save(output_path, generated_waveform.cpu(), ConfigDiTTO.SAMPLE_RATE)
        break

In [None]:
wave = test_with_loader(train_loader, "Bonjour, comment çava tout le monde ?")

In [None]:
ConfigDiTTO.DIFFUSION_STEPS = 1000

## CER and WER Computation
- Train : 
    - CER score: 0.9305486490966351
    - WER score: 0.9981549815498155
- Test :
    - CER score: 0.9305370442963544
    - WER score: 0.9973509933774835

In [None]:
import torch
from tqdm import tqdm
from evaluate import load
from transformers import GPT2Tokenizer

cer_metric = load("cer")
wer_metric = load("wer")

model = Speech2Text(sampling_rate=16000)
model.eval()


predictions = []
references = []
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")

def cer_wer_on_loader(loader, max_batch=5):
    with torch.no_grad():
        for i,batch in tqdm(enumerate(loader)):

            batch["audio"] = batch["audio"].to(ConfigDiTTO.DEVICE)
            batch["text"]["input_ids"] = batch["text"]["input_ids"].to(ConfigDiTTO.DEVICE)
            batch["text"]["attention_mask"] = batch["text"]["attention_mask"].to(ConfigDiTTO.DEVICE)

            for audio_tensor, padding_mask_audio, text_input  in zip(batch["audio"], batch["padding_mask_audio"],  batch["text"]["input_ids"]):
                generated_waveform = speech_generator.generate_speech_from_audio_tensor(
                    audio_tensor.to(ConfigDiTTO.DEVICE).unsqueeze(0), 
                    padding_mask_audio.to(ConfigDiTTO.DEVICE).unsqueeze(0),
                    text_input.unsqueeze(0),
                    is_tokenized=True
                )
                transcription = model(generated_waveform)
                predictions.extend(transcription)
                
            ref_texts = tokenizer.batch_decode(batch["text"]["input_ids"].to(ConfigDiTTO.DEVICE), skip_special_tokens=True)
            references.extend(ref_texts)
            if i > max_batch:
                break

    # Calcul des métriques
    cer_score = cer_metric.compute(predictions=predictions, references=references)
    wer_score = wer_metric.compute(predictions=predictions, references=references)

    print("CER score:", cer_score)
    print("WER score:", wer_score)

In [None]:
cer_wer_on_loader(train_loader)
cer_wer_on_loader(test_loader)

## SIM-o and SIM-r Computation
- Train :
    - SIM-o score: 0.27285963
    - SIM-r score: 0.010607217
- Test : 
    - SIM-o score: 0.18861449
    - SIM-r score: 0.009921809

In [None]:
import torch
import torchaudio
from tqdm import tqdm
from speechbrain.pretrained import SpeakerRecognition
import numpy as np
from scipy.spatial.distance import cosine

spk_recog = SpeakerRecognition.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    savedir="tmp"
)

def extract_embedding(audio_tensor):
    emb = spk_recog.encode_batch(audio_tensor).squeeze().detach().cpu().numpy()
    return emb

def compute_similarity(audio_emb1, audio_emb2):
    return 1 - cosine(audio_emb1, audio_emb2)

def compute_sim_o_sim_r(loader, max_batch=5):
    similarities_o = []
    similarities_r = []

    with torch.no_grad():
        for i, batch in tqdm(enumerate(loader)):
            batch["audio"] = batch["audio"].to(ConfigDiTTO.DEVICE)
            batch["text"]["input_ids"] = batch["text"]["input_ids"].to(ConfigDiTTO.DEVICE)
            batch["text"]["attention_mask"] = batch["text"]["attention_mask"].to(ConfigDiTTO.DEVICE)
            
            batch_embeddings = [] 


            for audio_tensor, padding_mask_audio, text_input  in zip(batch["audio"], batch["padding_mask_audio"],  batch["text"]["input_ids"]):
                generated_waveform = speech_generator.generate_speech_from_audio_tensor(
                    audio_tensor.to(ConfigDiTTO.DEVICE).unsqueeze(0), 
                    padding_mask_audio.to(ConfigDiTTO.DEVICE).unsqueeze(0),
                    text_input.unsqueeze(0),
                    is_tokenized=True
                )
                    
                emb_ref = extract_embedding(audio_tensor) 
                emb_gen = extract_embedding(generated_waveform) 

                batch_embeddings.append((emb_ref, emb_gen))  # Stock embedding for SIM-r

                sim_o = compute_similarity(emb_ref, emb_gen)
                similarities_o.append(sim_o)

            # For SIM-r
            for idx, (emb_ref, emb_gen) in enumerate(batch_embeddings):
                other_embeddings = [emb[0] for j, emb in enumerate(batch_embeddings) if j != idx] 
                if other_embeddings:  
                    other_similarities = [compute_similarity(emb_other, emb_gen) for emb_other in other_embeddings]
                    sim_r = compute_similarity(emb_ref, emb_gen) - np.mean(other_similarities) 
                else:
                    sim_r = compute_similarity(emb_ref, emb_gen)  

                similarities_r.append(sim_r)
            if i > max_batch:
                break

    sim_o_score = np.mean(similarities_o)
    sim_r_score = np.mean(similarities_r)

    print("SIM-o score:", sim_o_score)
    print("SIM-r score:", sim_r_score)

    return sim_o_score, sim_r_score

In [None]:
compute_sim_o_sim_r(train_loader)
compute_sim_o_sim_r(test_loader)