In [1]:
# !pip install --upgrade datasets fsspec aiohttp


## Importing libraries

In [1]:
from tensorflow.keras import layers, models
from huggingface_hub import login
import os
import numpy as np
import librosa
import tensorflow as tf
tf.config.run_functions_eagerly(True)
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.preprocessing import LabelEncoder
import soundfile as sf
from datasets import load_dataset
from tqdm import tqdm
from itertools import islice
import string 

2025-05-04 17:51:56.916832: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-05-04 17:51:56.999332: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1746361317.065639   12522 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1746361317.077862   12522 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1746361317.151852   12522 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

## Setting up variables for data downloading & training 

In [4]:
import os
login(token="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")
os.environ["HF_DATASETS_OFFLINE"] = "0"

In [None]:
max_samples = 1000
max_audio_duration =5.0
# ample_rate=16000

In [5]:
CHARS = string.ascii_lowercase + " '"
CHARS

"abcdefghijklmnopqrstuvwxyz '"

## Data Loading

### Data used in training is fetched from hugging face "common_voice_13_0" , As it is taking high resource I have used the streaming to collect only 1000 samples for training

In [None]:
def load_data(sample_rate=16000, max_samples=max_samples, max_audio_duration=max_audio_duration):

    dataset = load_dataset(
        "mozilla-foundation/common_voice_13_0",
        "en", 
        split="train", 
        streaming=True,
        storage_options={"http": {}} )

   
    print(f"Taking the first {max_samples} samples...")
    dataset_head = list(islice(dataset, max_samples))

    audio_proc = AudioPreprocessor(sample_rate, max_audio_duration)
    text_proc = TextPreprocessor()

    temp_audio_dir = "temp_audio"
    os.makedirs(temp_audio_dir, exist_ok=True)

    X = [] 
    texts = [] 
    processed_count = 0
    for i, item in tqdm(enumerate(dataset_head), total=max_samples):
        try:
            audio_array = item["audio"]["array"]
          
            path = os.path.join(temp_audio_dir, f"sample_{i}.wav")
            sf.write(path, audio_array, samplerate=sample_rate)
            processed_audio = audio_proc.load_and_process_audio(path)

            original_text = item["sentence"]

            if processed_audio is not None:
                X.append(processed_audio)
                texts.append(original_text)
                processed_count += 1


        except Exception as e:
            print(f"Error processing sample {i}: {e}")
         

    print(f"Successfully processed {processed_count} samples.")

    print("Encoding text data...")
    encoded_texts = [text_proc.encode(t) for t in texts]
    target_seq_length = audio_proc.max_len_time_steps
    y = pad_sequences(
        encoded_texts,
        maxlen=target_seq_length, 
        padding='post', 
        value=text_proc.padding_index )

    # Pad X to ensure consistent shape across all samples (should be consistent if audio processing is correct)
    # Use a padding value that the Masking layer in the model will ignore (0.0 in your model)
    X = pad_sequences(X, padding='post', dtype='float32', value=0.0)

    # Create sample weights: 1 for actual characters, 0 for padding
    # This mask tells the loss function which time steps to consider
    sample_weights = np.zeros_like(y, dtype=np.float32)
    for i, seq in enumerate(y):
        # Find where the actual data ends (before padding starts)
        # np.where returns a tuple, we need the first element (the array of indices)
        non_padding_indices = np.where(seq != text_proc.padding_index)[0]
        if len(non_padding_indices) > 0:
             # The weight should be 1.0 for all non-padding indices
             sample_weights[i, non_padding_indices] = 1.0


    # Ensure X, y, and sample_weights have the exact same number of samples
    # This handles cases where some samples might have failed processing
    min_samples = min(len(X), len(y), len(sample_weights))
    X = np.array(X[:min_samples])
    y = y[:min_samples]
    sample_weights = sample_weights[:min_samples]

    print("Data loading and preprocessing complete.")
    return X, y, sample_weights, text_proc

In [None]:


class AudioPreprocessor:
    def __init__(self, sample_rate=16000, max_duration=5.0):
        self.sample_rate = sample_rate
        self.max_duration = max_duration  # seconds
        self.max_len_samples = int(sample_rate * max_duration)
        # Calculate the exact number of time steps after Mel spectrogram
        # Using default n_fft=2048, hop_length=512.
        # The formula is floor(n_samples / hop_length) + 1 for center=True (default)
        self.max_len_time_steps = int(np.floor(self.max_len_samples / 512)) + 1


    def load_and_process_audio(self, file_path):
        """Loads audio, pads/truncates, and computes log Mel spectrogram."""
        try:
            y, sr = librosa.load(file_path, sr=self.sample_rate)
        except Exception as e:
            print(f"Error loading audio file {file_path}: {e}")
            return None # Return None if loading fails

        # Pad or truncate audio samples
        if len(y) > self.max_len_samples:
            y = y[:self.max_len_samples]
        else:
            y = np.pad(y, (0, max(0, self.max_len_samples - len(y))))

        # Compute Mel spectrogram
        mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128)
        # Convert to log scale
        log_mel_spec = librosa.power_to_db(mel_spec, ref=np.max)

        # Ensure the time dimension is exactly max_len_time_steps
        # This handles potential off-by-one issues with librosa padding/truncation
        if log_mel_spec.shape[1] > self.max_len_time_steps:
             log_mel_spec = log_mel_spec[:, :self.max_len_time_steps]
        elif log_mel_spec.shape[1] < self.max_len_time_steps:
             # Pad the time dimension if necessary (shouldn't happen with fixed audio length, but as a safeguard)
             pad_width = self.max_len_time_steps - log_mel_spec.shape[1]
             log_mel_spec = np.pad(log_mel_spec, ((0, 0), (0, pad_width)), mode='constant')

        # Transpose to (Time, Features) for the model input
        return log_mel_spec.T

class TextPreprocessor:
    def __init__(self):
        # Use the predefined character set and add a padding character
        # Sort to ensure consistent mapping
        self.chars = sorted(list(CHARS))
        # Create mapping from character to integer ID
        self.char_to_int = {c: i for i, c in enumerate(self.chars)}
        # Reserve the last index for padding
        self.padding_index = len(self.chars)
        # Create mapping from integer ID back to character/token
        self.int_to_char = {i: c for c, i in self.char_to_int.items()}
        self.int_to_char[self.padding_index] = "<pad>" # Define a token string for padding


    def encode(self, text):
        """Encodes text to a sequence of integers."""
        # Convert text to lowercase and filter out characters not in our vocabulary
        encoded = [self.char_to_int[c] for c in text.lower() if c in self.char_to_int]
        return encoded

    def decode(self, seq):
        """Decodes a sequence of integers back to text."""
        # Decode, ignoring the padding index
        decoded_chars = [self.int_to_char[i] for i in seq if i != self.padding_index]
        return "".join(decoded_chars)

    def get_vocab_size(self):
        """Returns the size of the vocabulary, including the padding token."""
        return len(self.chars) + 1 # +1 for the padding index


def load_data(sample_rate=16000, max_samples=1000, max_audio_duration=5.0):
    """Loads and preprocesses audio and text data from Common Voice."""
    print("Loading Common Voice dataset...")
    dataset = load_dataset(
        "mozilla-foundation/common_voice_13_0",
        "en", # Specify the English subset
        split="train", # Use the training split
        streaming=True, # Stream the data to avoid loading everything into memory
        storage_options={"http": {}} # Required for streaming from http
    )

    # Grab first max_samples from the streaming dataset
    print(f"Taking the first {max_samples} samples...")
    dataset_head = list(islice(dataset, max_samples))

    audio_proc = AudioPreprocessor(sample_rate, max_audio_duration)
    text_proc = TextPreprocessor()

    # Create a temporary directory to save audio files
    temp_audio_dir = "temp_audio"
    os.makedirs(temp_audio_dir, exist_ok=True)

    X = [] # List to store processed audio features
    texts = [] # List to store original text transcripts
    print("Processing audio and text data...")
    processed_count = 0
    for i, item in tqdm(enumerate(dataset_head), total=max_samples):
        try:
            audio_array = item["audio"]["array"]
            # Create a temporary file path
            path = os.path.join(temp_audio_dir, f"sample_{i}.wav")
            # Save the audio array to a temporary WAV file
            sf.write(path, audio_array, samplerate=sample_rate)

            # Process the audio file
            processed_audio = audio_proc.load_and_process_audio(path)

            # Process the text transcript
            original_text = item["sentence"]

            # Only append if audio processing was successful
            if processed_audio is not None:
                X.append(processed_audio)
                texts.append(original_text)
                processed_count += 1
            else:
                print(f"Skipping sample {i} due to audio processing error.")

        except Exception as e:
            print(f"Error processing sample {i}: {e}")
            # Continue to the next sample if an error occurs

    print(f"Successfully processed {processed_count} samples.")

    # Encode texts to integer sequences using the TextPreprocessor
    print("Encoding text data...")
    encoded_texts = [text_proc.encode(t) for t in texts]

    # Determine the target sequence length based on the audio time steps
    target_seq_length = audio_proc.max_len_time_steps
    print(f"Target sequence length for text (matching audio time steps): {target_seq_length}")

    # Pad the encoded text sequences to the target sequence length (157)
    # Use the padding_index defined in TextPreprocessor
    y = pad_sequences(
        encoded_texts,
        maxlen=target_seq_length, # Pad to the same length as audio time steps
        padding='post', # Pad at the end
        value=text_proc.padding_index # Use the padding index for padding
    )

    # Pad X to ensure consistent shape across all samples (should be consistent if audio processing is correct)
    # Use a padding value that the Masking layer in the model will ignore (0.0 in your model)
    X = pad_sequences(X, padding='post', dtype='float32', value=0.0)

    # Create sample weights: 1 for actual characters, 0 for padding
    # This mask tells the loss function which time steps to consider
    sample_weights = np.zeros_like(y, dtype=np.float32)
    for i, seq in enumerate(y):
        # Find where the actual data ends (before padding starts)
        # np.where returns a tuple, we need the first element (the array of indices)
        non_padding_indices = np.where(seq != text_proc.padding_index)[0]
        if len(non_padding_indices) > 0:
             # The weight should be 1.0 for all non-padding indices
             sample_weights[i, non_padding_indices] = 1.0


    # Ensure X, y, and sample_weights have the exact same number of samples
    # This handles cases where some samples might have failed processing
    min_samples = min(len(X), len(y), len(sample_weights))
    X = np.array(X[:min_samples])
    y = y[:min_samples]
    sample_weights = sample_weights[:min_samples]

    print("Data loading and preprocessing complete.")
    return X, y, sample_weights, text_proc

def build_asr_model(input_shape, output_dim):
    """Builds the ASR model using Bidirectional LSTMs and TimeDistributed Dense layers."""
    inputs = layers.Input(shape=input_shape, name="input_features")

    # Masking layer to ignore padded audio frames (value=0.0 as used in pad_sequences for X)
    # This layer automatically generates a mask based on the mask_value
    # The mask is then propagated to subsequent layers that support masking
    x = layers.Masking(mask_value=0.0, name="masking_input")(inputs)

    # Bidirectional LSTM layers process the sequence in both forward and backward directions
    # They support masking and propagate the mask
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True), name="bidirectional_lstm_1")(x)
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True), name="bidirectional_lstm_2")(x)

    # TimeDistributed Dense layers apply a Dense layer independently to each time step
    # They are designed to handle masks propagated from previous layers
    x = layers.TimeDistributed(layers.Dense(256, activation='relu'), name="time_distributed_dense_1")(x)
    # Output layer predicts probability distribution over characters for each time step
    # The output dimension is the size of the vocabulary (including padding)
    outputs = layers.TimeDistributed(layers.Dense(output_dim, activation='softmax'), name="output_softmax")(x)

    model = models.Model(inputs, outputs, name="asr_model")
    model.compile(
        optimizer='adam', # Adam optimizer is a good default
        loss='sparse_categorical_crossentropy', # Appropriate loss for integer targets and softmax output
        metrics=['accuracy'] # Monitor accuracy during training
    )
    return model

# --- Main Execution ---
if __name__ == "__main__":
    print("Starting ASR model training script...")
    # Adjust max_samples for faster testing if needed
    # A larger number of samples will give a more meaningful result but take longer
    MAX_SAMPLES_TO_LOAD = 5000 # Keep small for quick testing

    X, y, sample_weights, text_proc = load_data(max_samples=MAX_SAMPLES_TO_LOAD)

    print("\nData shapes after loading and preprocessing:")
    print(f"Shape of X (audio features): {X.shape}")
    print(f"Shape of y (target text sequences): {y.shape}")
    print(f"Shape of sample_weights (mask for loss): {sample_weights.shape}")

    # Get the vocabulary size including the padding token
    output_dim = text_proc.get_vocab_size()
    print(f"Output dimension (vocabulary size including padding): {output_dim}")

    print("\nBuilding model...")
    # Input shape for the model is (time_steps, features) excluding the batch size
    input_shape = (X.shape[1], X.shape[2])
    model = build_asr_model(input_shape, output_dim)

    # Print the model summary to see the layers and parameter counts
    model.summary()

    print("\nTraining model...")
    batch_size = 32 # Number of samples per gradient update
    epochs = 10 # Number of passes over the entire dataset
    validation_split = 0.2 # Fraction of the training data to use for validation

    # Train the model
    # Pass sample_weight to ignore the loss from padded target steps
    history = model.fit(
        X,
        y,
        batch_size=batch_size,
        epochs=epochs,
        validation_split=validation_split,
        sample_weight=sample_weights # Pass the sample weights here to mask loss
    )

    print("\nTraining finished.")

    # Optional: Save the trained model
    # try:
    #     model.save("asr_model.h5")
    #     print("Model saved to asr_model.h5")
    # except Exception as e:
    #     print(f"Error saving model: {e}")

    # Optional: You might want to save the text_proc object or its mappings
    # so you can use it for inference later to decode model predictions.
    # For example, save char_to_int and int_to_char to a JSON file.


Starting ASR model training script...
Loading Common Voice dataset...
Taking the first 5000 samples...


Reading metadata...: 1013968it [00:59, 16928.62it/s]


Processing audio and text data...


100%|███████████████████████████████████████| 5000/5000 [02:23<00:00, 34.89it/s]


Successfully processed 5000 samples.
Encoding text data...
Target sequence length for text (matching audio time steps): 157
Data loading and preprocessing complete.

Data shapes after loading and preprocessing:
Shape of X (audio features): (5000, 157, 128)
Shape of y (target text sequences): (5000, 157)
Shape of sample_weights (mask for loss): (5000, 157)
Output dimension (vocabulary size including padding): 29

Building model...


2025-05-04 17:29:54.172730: E external/local_xla/xla/stream_executor/cuda/cuda_platform.cc:51] failed call to cuInit: INTERNAL: CUDA error: Failed call to cuInit: UNKNOWN ERROR (303)



Training model...




Epoch 1/10


2025-05-04 17:29:55.297937: E tensorflow/core/util/util.cc:131] oneDNN supports DT_BOOL only on platforms with AVX-512. Falling back to the default Eigen-based implementation if present.


[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m593s[0m 5s/step - accuracy: 0.0569 - loss: 1.1037 - val_accuracy: 0.0601 - val_loss: 1.0841
Epoch 2/10
[1m 42/125[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m5:26[0m 4s/step - accuracy: 0.0612 - loss: 1.0880

"abcdefghijklmnopqrstuvwxyz '"