In [None]:
import os
import pandas as pd
import numpy as np
import librosa
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import MelSpectrogram
from sklearn.model_selection import train_test_split

: 

In [None]:
def create_manifest(data_path):
    transcripts = []
    for root, dirs, files in os.walk(data_path):
        for file in files:
            if file.endswith('.trans.txt'):
                with open(os.path.join(root, file), 'r') as f:
                    lines = f.readlines()
                    for line in lines:
                        parts = line.strip().split(' ')
                        transcript = ' '.join(parts[1:]).lower()
                        audio_file = os.path.join(root, parts[0] + '.flac')
                        transcripts.append({'audio_path': audio_file, 'transcript': transcript})
    return pd.DataFrame(transcripts)

data_path = '' #falta el data set (Estamos viendo como hacer en espanol)
manifest_df = create_manifest(data_path)

In [None]:
print(manifest_df.head())
print(f"Cantidad de muestras: {len(manifest_df)}")

In [None]:
train_df, val_df = train_test_split(manifest_df, test_size=0.1, random_state=42)
print(f"Muestras de entrenamiento: {len(train_df)}, Muestras de validacion: {len(val_df)}")

In [None]:
char_map_str = """
' 0
a 1
b 2
c 3
d 4
e 5
f 6
g 7
h 8
i 9
j 10
k 11
l 12
m 13
n 14
o 15
p 16
q 17
r 18
s 19
t 20
u 21
v 22
w 23
x 24
y 25
z 26
"""

char_map = {}
index_map = {}
for line in char_map_str.strip().split('\n'):
    ch, index = line.split()
    char_map[ch] = int(index)
    index_map[int(index)] = ch

In [None]:
def text_to_int_sequence(text):
    return [char_map.get(c, char_map[' ']) for c in text]

def int_sequence_to_text(seq):
    return ''.join([index_map[i] for i in seq])

In [None]:
class SpeechDataset(Dataset):
    def __init__(self, df, char_map, transform=None):
        self.df = df
        self.char_map = char_map
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        audio_path = self.df.iloc[idx]['audio_path']
        transcript = self.df.iloc[idx]['transcript']

        waveform, sample_rate = torchaudio.load(audio_path)

        if sample_rate != 16000:
            resampler = torchaudio.transforms.Resample(sample_rate, 16000)
            waveform = resampler(waveform)
            sample_rate = 16000

        if self.transform:
            spectrogram = self.transform(waveform)
        else:
            spectrogram = waveform

        transcript_seq = text_to_int_sequence(transcript)
        transcript_seq = torch.Tensor(transcript_seq).int()

        return spectrogram.squeeze(0).transpose(0, 1), transcript_seq

In [None]:
def collate_fn(batch):
    spectrograms = []
    transcript_seqs = []
    input_lengths = []
    target_lengths = []

    for (spectrogram, transcript_seq) in batch:
        spectrograms.append(spectrogram)
        transcript_seqs.append(transcriptSeq := transcript_seq)
        input_lengths.append(spectrogram.shape[0])
        target_lengths.append(len(transcript_seq))

    spectrograms = torch.nn.utils.rnn.pad_sequence(spectrograms, batch_first=True)
    transcript_seqs = torch.nn.utils.rnn.pad_sequence(transcript_seqs, batch_first=True)

    return spectrograms, transcript_seqs, input_lengths, target_lengths

In [None]:
transform = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_mels=128)

train_dataset = SpeechDataset(train_df.reset_index(drop=True), char_map, transform=transform)
val_dataset = SpeechDataset(val_df.reset_index(drop=True), char_map, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class SpeechRecognitionModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SpeechRecognitionModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=2, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.fc(x)
        x = F.log_softmax(x, dim=2)
        return x

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

input_size = 128
hidden_size = 256
output_size = len(char_map)

model = SpeechRecognitionModel(input_size, hidden_size, output_size).to(device)

In [None]:
criterion = nn.CTCLoss(blank=0, zero_infinity=True)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, targets, input_lengths, target_lengths) in enumerate(train_loader):
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        outputs = outputs.permute(1, 0, 2)

        loss = criterion(outputs, targets, input_lengths, target_lengths)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 10 == 0:
            print(f"Epoch {epoch+1}/{num_epochs}, Step {i+1}/{len(train_loader)}, Loss: {loss.item():.4f}")

    print(f"Epoch {epoch+1} se completo con una perdida/loss de: {running_loss/len(train_loader):.4f}")

In [None]:
def cer(prediction, reference):
    prediction = ''.join(prediction).replace(' ', '')
    reference = ''.join(reference).replace(' ', '')
    errors = sum(1 for a, b in zip(prediction, reference) if a != b) + abs(len(prediction) - len(reference))
    return errors / len(reference)

model.eval()
total_cer = 0.0

with torch.no_grad():
    for i, (inputs, targets, input_lengths, target_lengths) in enumerate(val_loader):
        inputs = inputs.to(device)
        targets = targets.to(device)

        outputs = model(inputs)
        outputs = outputs.permute(1, 0, 2)

        decoded_output, _ = torch.max(outputs, dim=2)
        for j in range(len(decoded_output)):
            pred_indices = decoded_output[j].cpu().numpy()
            pred_text = int_sequence_to_text([int(i) for i in pred_indices])
            target_indices = targets[j].cpu().numpy()
            target_text = int_sequence_to_text([int(i) for i in target_indices])
            total_cer += cer(pred_text, target_text)

    print(f"Promedio CER: {total_cer / len(val_loader):.4f}")

In [None]:
def predict(audio_path, model, transform, device):
    waveform, sample_rate = torchaudio.load(audio_path)
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(sample_rate, 16000)
        waveform = resampler(waveform)

    spectrogram = transform(waveform)
    spectrogram = spectrogram.squeeze(0).transpose(0, 1)
    spectrogram = spectrogram.to(device)
    spectrogram = spectrogram.unsqueeze(0)

    with torch.no_grad():
        outputs = model(spectrogram)
        outputs = outputs.permute(1, 0, 2)
        decoded_output, _ = torch.max(outputs, dim=2)
        pred_indices = decoded_output[0].cpu().numpy()
        pred_text = int_sequence_to_text([int(i) for i in pred_indices])
        return pred_text

In [None]:

test_audio_path = 'test.flac'
predicted_text = predict(test_audio_path, model, transform, device)
print(f"Predicted Transcript: {predicted_text}")