# Reconocimiento de fonemas usando CTC

In [2]:
import json
import torch
import torch.utils.data as data
import torchaudio
import torch.nn as nn
import torchaudio.transforms as F
from torch.nn.utils.rnn import pad_sequence
from collections import OrderedDict
from torch.utils.data import  Dataset, DataLoader
from torchaudio.models.decoder import ctc_decoder # vamos a hacer un decoder greedy, por razones 
                                                  # didácticas, este se usaría si quisiera
                                                  # implementarlo con beam search

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


## Implementación del dataset y el dataloader

### Dataset
Me convierte los datos crudos para que puedan ser usados por `Dataloader`. Me permite implementar una función `__getitem__()` en la cual leemos los datos y devolvemos por ejemplo el wav y la transcripción de cada dato.

In [3]:
vocab_file = 'data/label_encoder_new.txt'
train_json = 'data/train.json'
test_json = 'data/test.json'
valid_json = 'data/dev.json'

def load_phoneme_vocabulary(filepath: str) -> tuple[dict, dict]:
    """
    Carga un vocabulario de fonemas desde un archivo de texto.
    El archivo debe tener el formato 'fonema=>indice' por línea.

    Args:
        filepath (str): La ruta al archivo de vocabulario.

    Returns:
        tuple[dict, dict]: Una tupla que contiene:
            - phoneme_to_idx (dict): Un diccionario que mapea fonema a índice.
            - idx_to_phoneme (dict): Un diccionario que mapea índice a fonema.
    """
    phoneme_to_idx = {}
    idx_to_phoneme = {}

    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split('=>')
            if len(parts) == 2:
                phoneme = parts[0].strip().strip("'") 
                idx_str = parts[1].strip()
                try:
                    index = int(idx_str)
                    phoneme_to_idx[phoneme] = index
                    idx_to_phoneme[index] = phoneme
                except ValueError:
                    raise ValueError(f"Error: Índice inválido en la línea: '{line.strip()}'")
            else:
                raise ValueError(f"Error: Línea mal formada (se esperaba 'fonema=>indice'): '{line.strip()}'")
    
    return phoneme_to_idx, idx_to_phoneme


class TimitDataset(Dataset):
    def __init__(self, json_file, vocab_file):
        try:
            with open(json_file, 'r') as f:
                self.datos_json = json.load(f)
        except FileNotFoundError:
            print(f"Error: El archivo {json_file} no se encuentra.")
        # Get a list of all sample IDs (keys in the top-level dictionary)
        self.datos_ids = list(self.datos_json.keys())
        # Load phoneme vocabulary
        self.str2int, self.int2str = load_phoneme_vocabulary(vocab_file)

    def __len__(self):
        return len(self.datos_json)
    
    def __getitem__(self, idx):
        key = self.datos_ids[idx]
        wavdir = self.datos_json[key]['wav']
        duration = self.datos_json[key]['duration']
        phn = self.datos_json[key]['phn']
        # Load the audio file
        waveform, sample_rate = torchaudio.load(wavdir)
        # Convert waveform to a 1D tensor
        waveform = waveform.squeeze(0)
        # Convert phoneme labels to a tensor
        phn_list = phn.strip().split()
        phn_list = [self.str2int[phoneme] for phoneme in phn_list]
        
        return waveform, torch.tensor(phn_list)



def collate_fn(batch):
    # El batch es una lista de tuplas: [(dato1,label1), (dato2,label2),...]
    sequences, labels = zip(*batch) # Esto devuelve: 
                                    # sequences = (dato1,dato2,...)
                                    # labels = (label1,label2,...)
    #phn_tensors = [torch.tensor([ord(c) for c in label]) for label in labels]

    wav_length = torch.tensor([w.shape[0] for w in sequences], dtype=torch.long)
    phn_length = torch.tensor([p.shape[0] for p in labels], dtype=torch.long)
    
    wav_length = wav_length.float() / torch.max(wav_length.float()) 
    phn_length = phn_length.float() / torch.max(phn_length.float()) 
    padded_wav = pad_sequence(sequences, batch_first=True, padding_value=0)
    padded_phn = pad_sequence(labels, batch_first=True, padding_value= -1)
    return (padded_wav, wav_length), (padded_phn, phn_length) # Esta es la salida del dataloader

train_ds = TimitDataset(train_json,vocab_file)
test_ds = TimitDataset(test_json,vocab_file)
valid_ds = TimitDataset(valid_json,vocab_file)

train_dl = DataLoader(train_ds, batch_size=8, shuffle=True, collate_fn=collate_fn)
test_dl = DataLoader(train_ds, batch_size=16, shuffle=False, collate_fn=collate_fn)
valid_dl = DataLoader(train_ds, batch_size=16, shuffle=False, collate_fn=collate_fn)

  


## Implementación del modelo

In [4]:
# Parámetros
par = {'sample_rate': 16000, 'n_fft': 400, 'n_mels': 40, 'lr': 0.1, 'dropout': 0.15, 'cnn_channels': (128,256),
        'cnn_kernelsize': (3,3), 'rnn_layers': 4, 'rnn_neurons': 512, 'dnn_blocks':2, 'dnn_neurons':512}
        

In [5]:
# Bloque cnn similar al de speechbrain: 
# conv1d-->layer normalization-->Leaky Relu-->Pooling-->drop-out

# Modulo transpose para usarlo como una capa cualquiera
class Transpose(nn.Module):
    def __init__(self, dim0: int, dim1: int):
        super().__init__()
        self.dim0 = dim0
        self.dim1 = dim1

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return x.transpose(self.dim0, self.dim1)
    
class CNN_block(nn.Module):
    def __init__(self, n_mels: int, n_channels: int, kernel_size: int, 
                    pool_kernel_size: int, do_prob: float = 0.0):
        super().__init__()
        layers = []
        layers.append( nn.Conv1d(in_channels=n_mels, out_channels=n_channels, kernel_size=kernel_size, stride=1, 
                      padding="same", padding_mode= "replicate") )
        layers.append(Transpose(1,2))
        layers.append(nn.LayerNorm(n_channels))
        layers.append(Transpose(1,2))
        layers.append(nn.LeakyReLU())
        layers.append(nn.MaxPool1d(kernel_size=pool_kernel_size, stride=1))
        layers.append(nn.Dropout(p=do_prob))
        self.bloque_cnn = nn.Sequential(*layers)
        self.pool_kernel_size = pool_kernel_size
    
    def forward(self, x: torch.Tensor, lengths: torch.Tensor):
        x = self.bloque_cnn(x)

        # Create a boolean mask: True for valid data, False for padding
        new_lengths = lengths - self.pool_kernel_size + 1
        output_max_len = x.size(2)
        mask = torch.arange(output_max_len).unsqueeze(0) < new_lengths.unsqueeze(1)
        mask = mask.unsqueeze(1) 
        x_masked = x * mask

        return x_masked, new_lengths



In [6]:
from collections import OrderedDict
import torch
import torch.nn as nn
layers = OrderedDict()
n_cnn = 2
n_rnn = 2
n_dnn = 4
for i in range(n_cnn):
    layers[f"cnn_block_{i+1}"] = nn.ReLU()
for i in range(n_rnn):
    layers[f"rnn_block_{i+1}"] = nn.ReLU()
for i in range(n_dnn):
    layers[f"dnn_block_{i+1}"] = nn.ReLU()
model = nn.Sequential(layers)



In [None]:
capas = []
for i in range(n_cnn):