In [1]:
# Install jiwer for WER evaluation
!pip install jiwer

# download & extract into /content/LJSpeech
!mkdir -p /content/LJSpeech
!wget -P /content/LJSpeech \
    https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2
!tar xvjf /content/LJSpeech/LJSpeech-1.1.tar.bz2 \
       -C /content/LJSpeech

import os, numpy as np, pandas as pd, tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from jiwer import wer
import matplotlib.pyplot as plt
from IPython import display
from tensorflow.keras import mixed_precision

# Enable mixed precision but with float32 accumulation for better stability
mixed_precision.set_global_policy('mixed_float16')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
LJSpeech-1.1/wavs/LJ022-0089.wav
LJSpeech-1.1/wavs/LJ030-0192.wav
LJSpeech-1.1/wavs/LJ041-0078.wav
LJSpeech-1.1/wavs/LJ045-0249.wav
LJSpeech-1.1/wavs/LJ034-0035.wav
LJSpeech-1.1/wavs/LJ010-0152.wav
LJSpeech-1.1/wavs/LJ036-0174.wav
LJSpeech-1.1/wavs/LJ035-0076.wav
LJSpeech-1.1/wavs/LJ032-0176.wav
LJSpeech-1.1/wavs/LJ046-0113.wav
LJSpeech-1.1/wavs/LJ017-0096.wav
LJSpeech-1.1/wavs/LJ004-0098.wav
LJSpeech-1.1/wavs/LJ010-0147.wav
LJSpeech-1.1/wavs/LJ042-0230.wav
LJSpeech-1.1/wavs/LJ041-0033.wav
LJSpeech-1.1/wavs/LJ045-0229.wav
LJSpeech-1.1/wavs/LJ014-0199.wav
LJSpeech-1.1/wavs/LJ002-0082.wav
LJSpeech-1.1/wavs/LJ006-0055.wav
LJSpeech-1.1/wavs/LJ045-0120.wav
LJSpeech-1.1/wavs/LJ050-0028.wav
LJSpeech-1.1/wavs/LJ045-0215.wav
LJSpeech-1.1/wavs/LJ013-0121.wav
LJSpeech-1.1/wavs/LJ008-0025.wav
LJSpeech-1.1/wavs/LJ005-0240.wav
LJSpeech-1.1/wavs/LJ044-0026.wav
LJSpeech-1.1/wavs/LJ048-0127.wav
LJSpeech-1.1/wavs/LJ006-0195.wav
LJSpeech-1.

In [2]:
# Path setup
DATA_DIR      = "/content/LJSpeech/LJSpeech-1.1"
WAVS_PATH     = os.path.join(DATA_DIR, "wavs") + "/"
METADATA_PATH = os.path.join(DATA_DIR, "metadata.csv")

# Load and prepare data
df = pd.read_csv(METADATA_PATH, sep="|", header=None, quoting=3)
df.columns = ["file_name","transcription","normalized_transcription"]
df = df[["file_name","normalized_transcription"]]

# Filter out very long or very short samples for better batch efficiency
df['audio_length'] = df['file_name'].apply(lambda x: os.path.getsize(WAVS_PATH + x + '.wav'))
df = df[(df['audio_length'] > 10000) & (df['audio_length'] < 200000)]
df = df.drop('audio_length', axis=1).sample(frac=1, random_state=42).reset_index(drop=True)

# Split to train/validation sets
split = int(len(df)*0.9)
df_train, df_val = df[:split], df[split:]
print("train size:", len(df_train), "val size:", len(df_val))

# Use 20% of the data - increased from 15% for better representation
sample_fraction = 0.2
df_train = df_train.sample(frac=sample_fraction, random_state=42).reset_index(drop=True)
df_val = df_val.sample(frac=sample_fraction, random_state=42).reset_index(drop=True)

print(f"ORIGINAL: train={split}, val={len(df) - split}")
print(f"REDUCED: train={len(df_train)}, val={len(df_val)}")

train size: 2362 val size: 263
ORIGINAL: train=2362, val=263
REDUCED: train=472, val=53


In [3]:
all_text = " ".join(df_train.normalized_transcription).lower()
chars    = sorted(set(all_text))
# 0: CTC blank, 1: PAD, 2: SOS, 3: EOS
char_to_idx = {c:i+4 for i,c in enumerate(chars)}
char_to_idx.update({"[blank]":0, "[pad]":1, "[sos]":2, "[eos]":3})
idx_to_char = {i:c for c,i in char_to_idx.items()}
VOCAB_SIZE  = len(char_to_idx)
print("vocab size:", VOCAB_SIZE)

# TF-lookup from string to int
keys   = tf.constant(list(char_to_idx.keys()))
vals   = tf.constant(list(char_to_idx.values()), tf.int64)
char_table = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(keys, vals),
    default_value=1  # PAD
)

vocab size: 41


In [20]:
# Optimized spectrogram parameters
FRAME_LEN, FRAME_STEP, FFT_LEN = 256, 160, 512  # Increased FFT_LEN for better frequency resolution

@tf.function
def encode_single_sample(wav_file, label):
    # 1) load wav
    f = tf.io.read_file(WAVS_PATH + wav_file + ".wav")
    audio, _ = tf.audio.decode_wav(f)
    audio = tf.squeeze(audio, -1)
    audio = tf.cast(audio, tf.float32)

    # 2) More aggressive preprocessing
    # Apply a pre-emphasis filter
    pre_emphasis = 0.97
    audio = tf.concat([audio[:1], audio[1:] - pre_emphasis * audio[:-1]], 0)

    # 3) stft with improved parameters
    spec = tf.signal.stft(audio, FRAME_LEN, FRAME_STEP, FFT_LEN, window_fn=tf.signal.hann_window)
    spec = tf.abs(spec)

    # Apply power law compression (cube root instead of square root)
    spec = tf.pow(spec, 0.33)

    # 4) Better normalization - global normalization with fixed values
    # These fixed values help prevent extreme normalization
    mean = 0.5  # Fixed mean
    std = 0.1   # Fixed std
    spec = (spec - mean) / std

    # Clip outliers
    spec = tf.clip_by_value(spec, -3, 3)

    # 5) label → char ids with lower case conversion
    txt = tf.strings.lower(label)
    chars = tf.strings.unicode_split(txt, "UTF-8")
    ids = char_table.lookup(chars)
    ids = tf.cast(ids, tf.int32)

    # 6) prepare CTC labels, decoder in/out
    ctc_labels = ids
    dec_in = tf.concat([[char_to_idx["[sos]"]], ids], axis=0)
    dec_out = tf.concat([ids, [char_to_idx["[eos]"]]], axis=0)

    return spec, (ctc_labels, dec_in, dec_out)

In [21]:
# Increased batch size (from 8 to 16) for better training efficiency
BATCH = 16

def prepare_dataset(df, is_training=True):
    ds = tf.data.Dataset.from_tensor_slices((
        list(df.file_name), list(df.normalized_transcription)
    ))

    # More efficient mapping with parallel processing
    ds = ds.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)

    # Cache data if training for faster access
    if is_training:
        ds = ds.cache()

    # Shuffle with a large buffer for better randomization
    if is_training:
        ds = ds.shuffle(buffer_size=1000, seed=42)

    # Padded batch with more efficient padding values
    ds = ds.padded_batch(
        BATCH,
        padded_shapes=(
          [None, None],          # spec
          ([None], [None], [None])  # ctc, dec_in, dec_out
        ),
        padding_values=(
          0.0,  # spec pad
          (
            char_to_idx["[blank]"],  # ctc pad
            char_to_idx["[pad]"],    # dec_in pad
            char_to_idx["[pad]"]     # dec_out pad
          )
        )
    )

    return ds.prefetch(tf.data.AUTOTUNE)

train_ds = prepare_dataset(df_train, is_training=True)
val_ds = prepare_dataset(df_val, is_training=False)

In [22]:
def get_pos_enc(maxlen, dm):
    pos = np.arange(maxlen)[:,None]
    i = np.arange(dm)[None,:]
    angle = pos / np.power(10000, (2*(i//2))/dm)
    angle[:,0::2] = np.sin(angle[:,0::2])
    angle[:,1::2] = np.cos(angle[:,1::2])
    return tf.cast(angle, tf.float32)

def create_padding_mask(seq):
    mask = tf.cast(tf.equal(seq, char_to_idx["[pad]"]), tf.float32)
    return mask[:,None,None,:]  # [B,1,1,T]

def create_look_ahead_mask(sz):
    return 1 - tf.linalg.band_part(tf.ones((sz,sz)), -1, 0)

In [23]:
def encoder_layer(dm, nh, pf):
    inp = layers.Input((None, dm))
    att = layers.MultiHeadAttention(nh, key_dim=dm)(inp, inp)
    x = layers.LayerNormalization(epsilon=1e-6)(inp + att)  # Added epsilon
    f = layers.Dense(pf, activation="relu")(x)
    f = layers.Dense(dm)(f)
    out = layers.LayerNormalization(epsilon=1e-6)(x + f)
    return keras.Model(inp, out, name="enc_layer")

def decoder_layer(dm, nh, pf):
    di = layers.Input((None, dm))
    eo = layers.Input((None, dm))
    la = layers.Input((1,None,None))
    pm = layers.Input((1,None,None))
    att1 = layers.MultiHeadAttention(nh, key_dim=dm)(
              di, di, attention_mask=la)
    x1 = layers.LayerNormalization(epsilon=1e-6)(di + att1)
    att2 = layers.MultiHeadAttention(nh, key_dim=dm)(
              x1, eo, attention_mask=pm)
    x2 = layers.LayerNormalization(epsilon=1e-6)(x1 + att2)
    f = layers.Dense(pf, activation="relu")(x2)
    f = layers.Dense(dm)(f)
    out = layers.LayerNormalization(epsilon=1e-6)(x2 + f)
    return keras.Model([di,eo,la,pm], out, name="dec_layer")

In [25]:
# Model hyperparameters - simplified further
DM = 96           # Reduced model dimension
N_ENC = 2         # Fewer encoder layers
N_DEC = 1         # Only one decoder layer
NH = 4            # Number of attention heads
PF = 256          # Smaller feed-forward dimension
FREQ_BINS = FFT_LEN // 2 + 1
DROPOUT_RATE = 0.2  # Increased dropout for regularization

# 1) Precompute positional encoding
pos_enc = get_pos_enc(5000, DM)
pos_enc = tf.constant(pos_enc, tf.float32)

# 2) Encoder input + projection
enc_in = layers.Input(shape=(None, FREQ_BINS), name="enc_in")
x = layers.Dense(DM, kernel_initializer='he_normal', name="proj_enc")(enc_in)

# 3) Add positional encoding
def add_positional_encoding(x):
    return x + tf.cast(pos_enc[:tf.shape(x)[1], :], x.dtype)

x = layers.Lambda(add_positional_encoding, name="add_pos_enc")(x)
x = layers.Dropout(DROPOUT_RATE, name="drop_proj")(x)

# 4) Encoder blocks - FIXED WITH UNIQUE NAMES
for i in range(N_ENC):
    # Create encoder layers with unique names for each component
    # Self-attention
    att = layers.MultiHeadAttention(
              num_heads=NH, key_dim=DM,
              name=f"enc_mha_{i}"
          )(x, x)
    x1 = layers.LayerNormalization(epsilon=1e-6, name=f"enc_ln1_{i}")(x + att)

    # Feed-forward
    f = layers.Dense(PF, activation="relu", name=f"enc_ffn1_{i}")(x1)
    f = layers.Dense(DM, name=f"enc_ffn2_{i}")(f)
    x = layers.LayerNormalization(epsilon=1e-6, name=f"enc_ln2_{i}")(x1 + f)

enc_out = x

# 5) CTC head
ctc_logits = layers.Dense(VOCAB_SIZE, name="ctc_logits")(enc_out)

# 6) Decoder input + embed
dec_in = layers.Input(shape=(None,), dtype=tf.int32, name="dec_in")
y = layers.Embedding(VOCAB_SIZE, DM, name="emb_dec")(dec_in)
y = layers.Lambda(add_positional_encoding, name="add_pos_dec")(y)
y = layers.Dropout(DROPOUT_RATE, name="drop_emb")(y)

# 7) Build masks
look_ahead_mask = layers.Lambda(
    lambda x: create_look_ahead_mask(tf.shape(x)[1]),
    name="look_mask"
)(dec_in)

pad_mask = layers.Lambda(
    lambda x: create_padding_mask(x),
    name="pad_mask"
)(dec_in)

combined_mask = layers.Lambda(
    lambda x: tf.maximum(x[0], x[1]),
    name="combined_mask"
)([look_ahead_mask, pad_mask])

# 8) Decoder blocks - ENSURE UNIQUE NAMES
for i in range(N_DEC):
    # Masked self-attention
    att1 = layers.MultiHeadAttention(
               num_heads=NH, key_dim=DM,
               name=f"dec_mha1_{i}"
           )(y, y, attention_mask=combined_mask)
    y1 = layers.LayerNormalization(epsilon=1e-6, name=f"dec_ln1_{i}")(y + att1)

    # Cross-attention
    att2 = layers.MultiHeadAttention(
               num_heads=NH, key_dim=DM,
               name=f"dec_mha2_{i}"
           )(y1, enc_out)
    y2 = layers.LayerNormalization(epsilon=1e-6, name=f"dec_ln2_{i}")(y1 + att2)

    # Feed-forward
    ffn = layers.Dense(PF, activation="relu", name=f"dec_ffn1_{i}")(y2)
    ffn = layers.Dense(DM, name=f"dec_ffn2_{i}")(ffn)
    y = layers.LayerNormalization(epsilon=1e-6, name=f"dec_ln3_{i}")(y2 + ffn)

# 9) Output projection
dec_logits = layers.Dense(VOCAB_SIZE, name="dec_logits")(y)

# 10) Model definition
model = keras.Model(inputs=[enc_in, dec_in],
                   outputs=[ctc_logits, dec_logits])

In [26]:
# CTC loss function with explicit handling of blank index
def ctc_loss_fn(y_true, y_pred):
    # Scale predictions more aggressively
    y_pred = tf.nn.log_softmax(y_pred, axis=-1)

    batch_size = tf.shape(y_pred)[0]
    input_length = tf.shape(y_pred)[1]

    # Calculate label lengths but ensure they're appropriate
    label_length = tf.math.count_nonzero(
        tf.not_equal(y_true, 0), # Using 0 for blank directly
        axis=1,
        dtype=tf.int32
    )

    # Ensure no zero-length labels
    label_length = tf.maximum(label_length, 1)

    # Simplify: make input length much shorter - this helps with alignment
    input_length = tf.fill([batch_size], tf.minimum(20, input_length))

    # Ensure tensors are the right type
    y_true = tf.cast(y_true, tf.int32)

    # Use sparse CTC loss
    loss = tf.nn.ctc_loss(
        labels=y_true,
        logits=y_pred,
        label_length=label_length,
        logit_length=input_length,
        blank_index=0,  # Using 0 directly
        logits_time_major=False
    )

    # Higher clip value to see actual progress
    loss = tf.clip_by_value(loss, 0, 300)

    return tf.reduce_mean(loss)
# Improved sequence loss with masking
def seq2seq_loss(y_true, y_pred):
    # Create mask to ignore padding tokens
    mask = tf.cast(tf.not_equal(y_true, char_to_idx["[pad]"]), tf.float32)

    # Also mask [eos] token (optional)
    eos_mask = tf.cast(tf.not_equal(y_true, char_to_idx["[eos]"]), tf.float32)
    mask = mask * eos_mask

    # Cross-entropy loss
    loss = tf.keras.losses.sparse_categorical_crossentropy(
        y_true, y_pred, from_logits=True
    )

    # Apply mask and calculate mean
    return tf.reduce_sum(loss * mask) / (tf.reduce_sum(mask) + 1e-6)

In [27]:
# Define a custom learning rate scheduler for better convergence
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

    def get_config(self):
        return {
            "d_model": self.d_model,
            "warmup_steps": self.warmup_steps
        }

# Fixed and improved CTC loss function
def ctc_loss_fn(y_true, y_pred):
    # Apply log softmax for numerical stability
    y_pred = tf.nn.log_softmax(y_pred, axis=-1)

    batch_size = tf.shape(y_pred)[0]
    input_length = tf.shape(y_pred)[1]

    # Ensure label lengths are appropriate
    label_length = tf.math.count_nonzero(
        tf.not_equal(y_true, char_to_idx["[blank]"]),
        axis=1,
        dtype=tf.int32
    )
    label_length = tf.maximum(label_length, 1)  # Avoid zero lengths

    # Ensure input lengths aren't too long compared to labels
    # This helps prevent CTC alignment issues
    max_label_len = tf.reduce_max(label_length)
    input_length = tf.fill([batch_size], tf.minimum(input_length, max_label_len * 4))

    # Ensure tensors are the right type
    y_true = tf.cast(y_true, tf.int32)

    # Use CTC loss with better parameters
    loss = tf.nn.ctc_loss(
        labels=y_true,
        logits=y_pred,
        label_length=label_length,
        logit_length=input_length,
        blank_index=char_to_idx["[blank]"],
        logits_time_major=False
    )

    # Clip to avoid extreme values
    loss = tf.clip_by_value(loss, 0, 100)

    return tf.reduce_mean(loss)

# Use a smaller learning rate
learning_rate = 5e-5  # Reduced from 1e-4

# Create optimizer with gradient clipping
optimizer = tf.keras.optimizers.Adam(
    learning_rate=learning_rate,
    beta_1=0.9,
    beta_2=0.98,
    epsilon=1e-9,
    clipnorm=0.5  # More aggressive gradient clipping
)

# Initially focus only on CTC loss (disable decoder)
model.compile(
    optimizer=optimizer,
    loss={
        "ctc_logits": ctc_loss_fn,
        "dec_logits": seq2seq_loss
    },
    loss_weights={
        "ctc_logits": 1.0,  # Only use CTC loss initially
        "dec_logits": 0.0   # Disable decoder loss
    }
)

# Display model summary
model.summary()

# Print training configuration for reference
print(f"Model parameters: DM={DM}, N_ENC={N_ENC}, N_DEC={N_DEC}, NH={NH}, PF={PF}")
print(f"Learning rate: {learning_rate}")
print(f"Optimizer: Adam with gradient clipping (clipnorm=1.0)")
print(f"Loss weights: CTC={0.4}, Decoder={0.6}")
print(f"Batch size: {BATCH}")

Model parameters: DM=96, N_ENC=2, N_DEC=1, NH=4, PF=256
Learning rate: 5e-05
Optimizer: Adam with gradient clipping (clipnorm=1.0)
Loss weights: CTC=0.4, Decoder=0.6
Batch size: 16


In [None]:
# Data preparation helper function
def pack_for_fit(spectrogram, labels):
    ctc_labels, dec_inp, dec_out = labels
    x = (spectrogram, dec_inp)
    y = (ctc_labels, dec_out)
    return x, y

# Prepare datasets
train_ds_fit = train_ds.map(pack_for_fit, num_parallel_calls=tf.data.AUTOTUNE)
val_ds_fit = val_ds.map(pack_for_fit, num_parallel_calls=tf.data.AUTOTUNE)

# Enhanced callbacks for better training
# Fix: Add .keras extension to checkpoint filepath
checkpoint_path = "speech_model_checkpoint.keras"

# Simple callbacks to avoid conflicts
callbacks = [
    # Model checkpoint to save best model
    tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path,
        save_best_only=True,
        monitor='val_loss',
        verbose=1
    ),

    # Early stopping with patience
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True,
        verbose=1
    ),

    # Add a callback to reduce batch size if needed
    tf.keras.callbacks.TerminateOnNaN()
]

# Clear backend session and free memory before training
import gc
gc.collect()
tf.keras.backend.clear_session()

# Disable XLA JIT compilation to avoid CTC loss errors
tf.config.optimizer.set_jit(False)

print("Starting training...")
print(f"Using {len(df_train)} training samples and {len(df_val)} validation samples")
print(f"Model dimensions: DM={DM}, Encoder layers={N_ENC}, Decoder layers={N_DEC}")

# Train with fewer epochs initially
history = model.fit(
    train_ds_fit,
    validation_data=val_ds_fit,
    epochs=10,  # Reduced number of epochs
    callbacks=callbacks,
    verbose=1
)

# Save the final model
model.save("speech_recognition_model.keras")

Starting training...
Using 472 training samples and 53 validation samples
Model dimensions: DM=96, Encoder layers=2, Decoder layers=1
Epoch 1/10
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13s/step - ctc_logits_loss: 100.0000 - dec_logits_loss: 4.4008 - loss: 100.0000 
Epoch 1: val_loss improved from inf to 100.00000, saving model to speech_model_checkpoint.keras
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m420s[0m 13s/step - ctc_logits_loss: 100.0000 - dec_logits_loss: 4.4010 - loss: 100.0000 - val_ctc_logits_loss: 100.0000 - val_dec_logits_loss: 4.3987 - val_loss: 100.0000
Epoch 2/10
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11s/step - ctc_logits_loss: 100.0000 - dec_logits_loss: 4.4144 - loss: 100.0000 
Epoch 2: val_loss did not improve from 100.00000
[1m30/30[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m361s[0m 11s/step - ctc_logits_loss: 100.0000 - dec_logits_loss: 4.4143 - loss: 100.0000 - val_ctc_logits_loss: 100.0000 - 

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss Over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot individual losses if available
plt.subplot(1, 2, 2)
if 'ctc_logits_loss' in history.history:
    plt.plot(history.history['ctc_logits_loss'], label='CTC Loss')
    plt.plot(history.history['dec_logits_loss'], label='Decoder Loss')
    plt.title('Component Losses')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
else:
    plt.text(0.5, 0.5, 'Component losses not available',
             horizontalalignment='center', verticalalignment='center')

plt.tight_layout()
plt.show()

In [None]:
def greedy_decode(spect):
    """Optimized greedy decoding function"""
    # Add batch dimension
    s = tf.expand_dims(spect, 0)  # [1,T,F]

    # Get encoder output - run through encoder
    enc_output = None
    x = model.get_layer("proj_enc")(s)
    x = model.get_layer("add_pos_enc")(x)
    x = model.get_layer("drop_proj")(x)

    # Run through encoder layers
    for i in range(N_ENC):
        for layer in model.layers:
            if layer.name == f"enc_layer_{i}":
                x = layer(x)
                break

    enc_output = x

    # CTC greedy decoding
    ctc_logits = model.get_layer("ctc_logits")(enc_output)
    ctc_logits = tf.cast(ctc_logits, tf.float32)  # Ensure float32

    # Transpose for ctc_greedy_decoder
    ctc_t = tf.transpose(ctc_logits, [1, 0, 2])

    # Get sequence length
    seq_len = tf.fill([1], tf.shape(ctc_logits)[1])

    # Run greedy decoder
    decoded, _ = tf.nn.ctc_greedy_decoder(ctc_t, seq_len)

    # Convert to dense
    seq = tf.sparse.to_dense(decoded[0])[0].numpy().tolist()

    # Convert to text - filtering out special tokens
    ctc_str = "".join(idx_to_char[i] for i in seq if i > 3)

    # Autoregressive decoding
    di = [char_to_idx["[sos]"]]
    max_len = 200  # Maximum sequence length

    for _ in range(max_len):
        # Get output for current sequence
        dec_input = tf.expand_dims(di, 0)  # Add batch dimension

        # Get decoder logits
        out = model.predict([s, dec_input], verbose=0)[1]

        # Get most likely next token
        next_token = tf.argmax(out[0, -1, :], axis=-1).numpy()

        # Break if EOS token
        if next_token == char_to_idx["[eos]"]:
            break

        # Add next token to sequence
        di.append(next_token)

    # Convert sequence to text, filtering out special tokens
    seq_str = "".join(idx_to_char[i] for i in di[1:]
                     if idx_to_char[i] not in ["[pad]", "[eos]", "[sos]", "[blank]"])

    return ctc_str, seq_str

# Function to calculate Word Error Rate
def calculate_wer(reference, hypothesis):
    return wer(reference, hypothesis)

In [None]:
# Test on multiple validation samples
num_samples = 5
total_wer_ctc = 0
total_wer_seq = 0

print("Evaluating model on validation samples...")
print("-" * 50)

for spec, (_, _, dec_out) in val_ds.take(num_samples):
    spec = spec[0].numpy()
    true_ids = dec_out[0].numpy()
    true_str = "".join(idx_to_char[i] for i in true_ids if i > 3 and i != char_to_idx["[eos]"])

    p_ctc, p_seq = greedy_decode(spec)

    # Calculate WER
    wer_ctc = calculate_wer(true_str, p_ctc)
    wer_seq = calculate_wer(true_str, p_seq)

    total_wer_ctc += wer_ctc
    total_wer_seq += wer_seq

    print("True     :", true_str)
    print("CTC pred :", p_ctc)
    print("Seq pred :", p_seq)
    print(f"WER(CTC) : {wer_ctc:.4f}")
    print(f"WER(Seq) : {wer_seq:.4f}")
    print("-" * 50)

# Calculate average WER
avg_wer_ctc = total_wer_ctc / num_samples
avg_wer_seq = total_wer_seq / num_samples

print(f"Average WER (CTC): {avg_wer_ctc:.4f}")
print(f"Average WER (Seq): {avg_wer_seq:.4f}")

In [None]:
# Optional: Function to process user-uploaded audio files
def process_audio_file(audio_file_path):
    """Process an uploaded audio file for inference"""
    # Read the audio file
    audio, sr = tf.audio.decode_wav(
        tf.io.read_file(audio_file_path)
    )
    audio = tf.squeeze(audio, -1)

    # Resample to 22050 Hz if needed
    if sr != 22050:
        # Implement resampling here if needed
        pass

    # Convert to spectrogram
    spec = tf.signal.stft(audio, FRAME_LEN, FRAME_STEP, FFT_LEN)
    spec = tf.abs(spec)
    spec = tf.pow(spec, 0.5)

    # Normalize
    mean = tf.reduce_mean(spec, axis=1, keepdims=True)
    std = tf.math.reduce_std(spec, axis=1, keepdims=True) + 1e-6
    spec = (spec - mean) / std

    # Get transcriptions
    ctc_pred, seq_pred = greedy_decode(spec)

    return {
        "CTC Prediction": ctc_pred,
        "Sequence Prediction": seq_pred
    }

print("Model ready for inference. Upload an audio file or use sample validation data.")