<a href="https://colab.research.google.com/github/ShehabOrban/shehab1/blob/main/SpeechProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install jiwer

import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer

import keras
import pandas as pd

data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)
wavs_path = data_path + "/LJSpeech-1.1/wavs/"
metadata_path = data_path + "/LJSpeech-1.1/metadata.csv"

# Read metadata file and parse it
metadata_df = pd.read_csv(metadata_path, sep="|", header=None, quoting=3)
metadata_df.columns = ["file_name", "transcription", "normalized_transcription"]
metadata_df = metadata_df[["file_name", "normalized_transcription"]]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)
metadata_df.head(3)

split = int(len(metadata_df) * 0.90)
df_train = metadata_df[:split]
df_val = metadata_df[split:]

print(f"Size of the training set: {len(df_train)}")
print(f"Size of the training set: {len(df_val)}")

# The set of characters accepted in the transcription.
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
# Mapping characters to integers
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

# An integer scalar Tensor. The window length in samples.
frame_length = 256
# An integer scalar Tensor. The number of samples to step.
frame_step = 160
# An integer scalar Tensor. The size of the FFT to apply.
# If not provided, uses the smallest power of 2 enclosing frame_length.
fft_length = 384


def encode_single_sample(wav_file, label):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    # 2. Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # 3. Change type to float
    audio = tf.cast(audio, tf.float32)
    # 4. Get the spectrogram
    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    # 5. We only need the magnitude, which can be derived by applying tf.abs
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    # 6. normalisation
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    ###########################################
    ##  Process the label
    ##########################################
    # 7. Convert label to Lower case
    label = tf.strings.lower(label)
    # 8. Split the label
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")
    # 9. Map the characters in label to numbers
    label = char_to_num(label)
    # 10. Return a dict as our model is expecting two inputs
    return spectrogram, label

batch_size = 32
# Define the training dataset
train_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]), list(df_train["normalized_transcription"]))
)
train_dataset = (
    train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Define the validation dataset
validation_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]), list(df_val["normalized_transcription"]))
)
validation_dataset = (
    validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

Collecting jiwer
  Downloading jiwer-3.1.0-py3-none-any.whl.metadata (2.6 kB)
Collecting rapidfuzz>=3.9.7 (from jiwer)
  Downloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading jiwer-3.1.0-py3-none-any.whl (22 kB)
Downloading rapidfuzz-3.13.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m32.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.1.0 rapidfuzz-3.13.0
Downloading data from https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2
[1m2748572632/2748572632[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 0us/step
Size of the training set: 11790
Size of the training set: 1310
The vocabulary is: ['', np.str_('a'), np.str_('b'), np.str_('c'), np.str_('d'), np.str_('e'), np.str_('f'), np.str_('g'), np.str_('h'), np.str_('i'), np.str_('j'), np.str_

**Build the Transformer Encoder**

In [2]:
!pip install tensorflow jiwer matplotlib pandas numpy



In [3]:
# Define input shape parameters for the model
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss


In [4]:
class TokenEmbedding(layers.Layer):
    def __init__(self, num_vocab=1000, maxlen=100, embed_dim=64):
        super().__init__()
        self.emb = tf.keras.layers.Embedding(num_vocab, embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        x = self.emb(x)
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        return x + positions


In [5]:
class SpeechFeatureEmbedding(layers.Layer):
    def __init__(self, embed_dim=192, dropout=0.0):
        super().__init__()
        self.conv1 = layers.Conv1D(
            embed_dim, 11, strides=2, padding="same", activation="relu"
        )
        self.conv2 = layers.Conv1D(
            embed_dim, 11, strides=2, padding="same", activation="relu"
        )
        self.conv3 = layers.Conv1D(
            embed_dim, 11, strides=2, padding="same", activation="relu"
        )
        self.pos_emb = layers.Embedding(input_dim=2048, output_dim=embed_dim)
        self.dropout = layers.Dropout(dropout)

    def call(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)

        # Add positional embeddings
        positions = tf.range(start=0, limit=tf.shape(x)[1], delta=1)
        positions = self.pos_emb(positions)
        x = x + positions
        return self.dropout(x)


In [6]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training=False):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)


In [7]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = layers.LayerNormalization(epsilon=1e-6)

        self.self_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.enc_att = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )

        self.ffn = keras.Sequential(
            [
                layers.Dense(feed_forward_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )

        self.dropout1 = layers.Dropout(dropout_rate)
        self.dropout2 = layers.Dropout(dropout_rate)
        self.dropout3 = layers.Dropout(dropout_rate)

    def causal_attention_mask(self, batch_size, n_dest, n_src, dtype):
        """Masks the upper half of the dot product matrix in self attention"""
        i = tf.range(n_dest)[:, None]
        j = tf.range(n_src)
        m = i >= j - n_src + n_dest
        mask = tf.cast(m, dtype)
        mask = tf.reshape(mask, [1, n_dest, n_src])
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], 0
        )
        return tf.tile(mask, mult)

    def call(self, enc_output, target, training=False):
        input_shape = tf.shape(target)
        batch_size = input_shape[0]
        seq_len = input_shape[1]

        # Self-attention on decoder inputs with causal mask
        causal_mask = self.causal_attention_mask(batch_size, seq_len, seq_len, tf.bool)
        target_att = self.self_att(target, target, attention_mask=causal_mask)
        target_att = self.dropout1(target_att, training=training)
        out1 = self.layernorm1(target + target_att)

        # Attention with encoder outputs
        enc_att = self.enc_att(out1, enc_output)
        enc_att = self.dropout2(enc_att, training=training)
        out2 = self.layernorm2(out1 + enc_att)

        # Feed forward network
        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        return self.layernorm3(out2 + ffn_output)


In [8]:
# Define the model
def build_asr_model():
    # Input features: spectrograms
    input_spectrogram = layers.Input((None, fft_length // 2 + 1), name="input")

    # Embedding for spectrogram
    x = layers.Reshape((-1, fft_length // 2 + 1, 1))(input_spectrogram)
    x = layers.Conv2D(32, 3, activation="relu", padding="same")(x)
    x = layers.Conv2D(32, 3, activation="relu", padding="same")(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)

    # Speech feature embedding
    x = SpeechFeatureEmbedding(embed_dim=192)(x)

    # Transformer encoder blocks
    embed_dim = 192
    num_heads = 8
    ff_dim = 512

    encoder_output = x
    for _ in range(4):  # 4 transformer encoder blocks
        encoder_output = TransformerEncoder(embed_dim, num_heads, ff_dim)(encoder_output)

    # CTC Head
    ctc_output = layers.Dense(char_to_num.vocabulary_size(), activation="softmax", name="ctc_output")(encoder_output)

    # Decoder input
    decoder_input = layers.Input(shape=(None,), dtype=tf.int32, name="decoder_input")
    decoder_emb = TokenEmbedding(num_vocab=char_to_num.vocabulary_size(), embed_dim=embed_dim)(decoder_input)

    # Transformer decoder blocks
    decoder_output = decoder_emb
    for _ in range(2):  # 2 transformer decoder blocks
        decoder_output = TransformerDecoder(embed_dim, num_heads, ff_dim)(encoder_output, decoder_output)

    # Decoder head
    decoder_output = layers.Dense(char_to_num.vocabulary_size(), activation="softmax", name="decoder_output")(decoder_output)

    # Create the model
    model = keras.Model(
        inputs=[input_spectrogram, decoder_input],
        outputs=[ctc_output, decoder_output]
    )

    # Compile the model
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-4),
        loss={
            "ctc_output": CTCLoss,
            "decoder_output": keras.losses.SparseCategoricalCrossentropy()
        },
        metrics={
            "decoder_output": ["accuracy"]
        }
    )

    return model


In [9]:
# For inference (prediction), we will need a separate model
def build_inference_model(trained_model):
    # Encoder model for extracting features
    encoder_input = trained_model.get_layer("input").input
    encoder_output = trained_model.get_layer("ctc_output").input  # Get output before CTC layer
    encoder_model = keras.Model(encoder_input, encoder_output)

    # CTC prediction model
    ctc_prediction = trained_model.get_layer("ctc_output").output
    ctc_model = keras.Model(encoder_input, ctc_prediction)

    # Decoder model for autoregressive prediction
    decoder_input = trained_model.get_layer("decoder_input").input
    decoder_output = trained_model.get_layer("decoder_output").output
    decoder_model = keras.Model(
        [encoder_input, decoder_input],
        decoder_output
    )

    return encoder_model, ctc_model, decoder_model


In [10]:
# Generate target sequences for decoder during training
def get_decoder_input(batch_targets):
    # Add start token (0) to the beginning of each target sequence
    decoder_input = tf.concat(
        [tf.ones((batch_targets.shape[0], 1), dtype=tf.int32) * char_to_num.vocabulary_size(),
         batch_targets[:, :-1]], axis=1
    )
    return decoder_input


In [11]:
# Example custom training loop
def custom_train_step(model, input_batch, target_batch, optimizer):
    decoder_input = get_decoder_input(target_batch)

    with tf.GradientTape() as tape:
        ctc_output, decoder_output = model([input_batch, decoder_input], training=True)

        # Calculate losses
        ctc_loss = CTCLoss(target_batch, ctc_output)
        decoder_loss = keras.losses.sparse_categorical_crossentropy(
            target_batch, decoder_output
        )

        # Combine losses
        total_loss = ctc_loss + decoder_loss

    # Get gradients and update weights
    gradients = tape.gradient(total_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    return {"ctc_loss": ctc_loss, "decoder_loss": decoder_loss}


In [12]:
# Implement CTC decoding
def decode_ctc_predictions(predictions):
    input_len = tf.ones(shape=tf.shape(predictions)[0]) * tf.shape(predictions)[1]
    # Use tf.keras.backend.ctc_decode for beam search
    results = tf.keras.backend.ctc_decode(
        predictions, input_length=input_len, greedy=False, beam_width=5
    )[0][0]
    # Convert to text
    output_texts = []
    for result in results:
        indices = tf.gather(result, tf.where(tf.not_equal(result, -1)))
        decoded = tf.strings.reduce_join(num_to_char(indices))
        output_texts.append(decoded.numpy().decode("utf-8"))
    return output_texts


In [13]:
# Example function for getting predictions
def predict_example(spectrogram, model):
    encoder_model, ctc_model, decoder_model = build_inference_model(model)

    # CTC prediction (greedy)
    ctc_preds = ctc_model.predict(tf.expand_dims(spectrogram, 0))
    ctc_text = decode_ctc_predictions(ctc_preds)[0]

    # For demonstration of decoder (not a full implementation)
    # In practice, you'd need to handle start tokens and iterate through decoding
    max_len = 100
    decoder_input = tf.ones((1, 1), dtype=tf.int32) * char_to_num.vocabulary_size()  # Start token
    encoder_output = encoder_model.predict(tf.expand_dims(spectrogram, 0))

    decoded_text = ""
    for i in range(max_len):
        predictions = decoder_model.predict([tf.expand_dims(spectrogram, 0), decoder_input])
        next_char_idx = tf.argmax(predictions[0, i, :], axis=-1)
        if next_char_idx == char_to_num('?'):  # End token
            break
        char = num_to_char(next_char_idx).numpy().decode("utf-8")
        decoded_text += char
        decoder_input = tf.concat([decoder_input, tf.ones((1, 1), dtype=tf.int32) * next_char_idx], axis=1)

    return ctc_text, decoded_text

**Implement Training**

In [14]:
# Create the model
model = build_asr_model()
model.summary()


In [15]:
# We'll need to prepare the target sequences for the decoder input
class ASRDataGenerator(keras.utils.Sequence):
    def __init__(self, dataset, batch_size=32):
        self.dataset = dataset
        self.batch_size = batch_size
        self.indices = list(range(len(dataset)))

    def __len__(self):
        return len(self.dataset) // self.batch_size

    def __getitem__(self, idx):
        batch = list(self.dataset.take(1).as_numpy_iterator())[0]
        spectrograms, labels = batch

        # Create decoder inputs by shifting the labels
        decoder_inputs = np.zeros_like(labels)
        decoder_inputs[:, 1:] = labels[:, :-1]
        decoder_inputs[:, 0] = char_to_num.vocabulary_size()  # Start token

        return [spectrograms, decoder_inputs], [labels, labels]


In [16]:
# Train the model
epochs = 1  # You might need more epochs for good performance
learning_rate = 1e-4
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

In [18]:
# Custom training for more control
for epoch in range(epochs):
    print(f"\nEpoch {epoch+1}/{epochs}")

    # Training loop
    train_loss = {"ctc": 0.0, "decoder": 0.0}
    num_batches = 0

    for batch in train_dataset:
        spectrograms, labels = batch

        # Create decoder inputs (add start token, shift right)
        decoder_inputs = tf.zeros_like(labels)
        decoder_inputs = tf.concat(
            [tf.ones((tf.shape(labels)[0], 1), dtype=tf.int64) * char_to_num.vocabulary_size(),
             labels[:, :-1]],
            axis=1
        )

        with tf.GradientTape() as tape:
            # Forward pass
            ctc_output, decoder_output = model([spectrograms, decoder_inputs], training=True)

            # Calculate CTC loss
            batch_len = tf.shape(labels)[0]
            input_length = tf.shape(ctc_output)[1]
            label_length = tf.shape(labels)[1]

            # Reshape to 1D tensors for ctc_loss
            # Change dtype to "int32" to match tf.nn.ctc_loss requirements
            input_length = tf.cast(input_length, dtype="int32") * tf.ones(shape=(batch_len,), dtype="int32")
            label_length = tf.cast(label_length, dtype="int32") * tf.ones(shape=(batch_len,), dtype="int32")

            # Use tf.nn.ctc_loss instead of keras.backend.ctc_batch_cost
            ctc_loss = tf.nn.ctc_loss(
                labels=labels,
                logits=ctc_output,
                label_length=label_length,
                logit_length=input_length,
                logits_time_major=False, # Set to False as your logits are (batch, time, features)
                blank_index=-1 # Set to -1 for compatibility with StringLookup
            )

            # Calculate decoder loss (cross entropy)
            decoder_loss = keras.losses.sparse_categorical_crossentropy(
                labels, decoder_output
            )

            # Combined loss (you can adjust weights if needed)
            total_loss = tf.reduce_mean(ctc_loss) + tf.reduce_mean(decoder_loss) # Calculate the mean of ctc_loss

        # Calculate gradients and update weights
        gradients = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        # Update metrics
        train_loss["ctc"] += tf.reduce_mean(ctc_loss)
        train_loss["decoder"] += tf.reduce_mean(decoder_loss)
        num_batches += 1

    # Show epoch results
    train_loss["ctc"] /= num_batches
    train_loss["decoder"] /= num_batches
    print(f"Training CTC Loss: {train_loss['ctc']:.4f}")
    print(f"Training Decoder Loss: {train_loss['decoder']:.4f}")

    # Validation
    if epoch % 5 == 0:  # Every 5 epochs
        val_loss = {"ctc": 0.0, "decoder": 0.0}
        num_val_batches = 0

        for batch in validation_dataset:
            spectrograms, labels = batch

            # Create decoder inputs
            decoder_inputs = tf.zeros_like(labels)
            decoder_inputs = tf.concat(
                [tf.ones((tf.shape(labels)[0], 1), dtype=tf.int64) * char_to_num.vocabulary_size(),
                 labels[:, :-1]],
                axis=1
            )

            # Forward pass (no training)
            ctc_output, decoder_output = model([spectrograms, decoder_inputs], training=False)

            # Calculate CTC loss
            batch_len = tf.shape(labels)[0]
            input_length = tf.shape(ctc_output)[1]
            label_length = tf.shape(labels)[1]

            # Reshape to 1D tensors for ctc_loss
            # Change dtype to "int32" to match tf.nn.ctc_loss requirements
            input_length = tf.cast(input_length, dtype="int32") * tf.ones(shape=(batch_len,), dtype="int32")
            label_length = tf.cast(label_length, dtype="int32") * tf.ones(shape=(batch_len,), dtype="int32")

            # Use tf.nn.ctc_loss
            ctc_loss = tf.nn.ctc_loss(
                labels=labels,
                logits=ctc_output,
                label_length=label_length,
                logit_length=input_length,
                logits_time_major=False,
                blank_index=-1
            )

            # Calculate decoder loss
            decoder_loss = keras.losses.sparse_categorical_crossentropy(
                labels, decoder_output
            )

            # Update metrics
            val_loss["ctc"] += tf.reduce_mean(ctc_loss)
            val_loss["decoder"] += tf.reduce_mean(decoder_loss)
            num_val_batches += 1

        val_loss["ctc"] /= num_val_batches
        val_loss["decoder"] /= num_val_batches
        print(f"Validation CTC Loss: {val_loss['ctc']:.4f}")
        print(f"Validation Decoder Loss: {val_loss['decoder']:.4f}")


Epoch 1/1
Training CTC Loss: 796.7677
Training Decoder Loss: 1.4726
Validation CTC Loss: 796.8536
Validation Decoder Loss: 1.4127


In [20]:
# Save the model with the .keras extension
model.save("asr_model_ctc_decoder.keras")

**Implement Inference Code**

In [21]:
# Create inference models
def create_inference_models(trained_model):
    """Create separate models for inference"""
    # CTC inference model
    input_spectrogram = trained_model.get_layer("input").input
    ctc_output = trained_model.get_layer("ctc_output").output
    ctc_model = keras.Model(input_spectrogram, ctc_output)

    # Encoder model (extract features)
    encoder_output = trained_model.layers[-3].output  # Get encoder output
    encoder_model = keras.Model(input_spectrogram, encoder_output)

    # Decoder model (for autoregressive inference)
    decoder_input = keras.Input(shape=(None,), dtype=tf.int32)
    encoder_outputs = keras.Input(shape=(None, encoder_output.shape[-1]))

    # Get decoder layers from trained model
    decoder_emb = trained_model.get_layer("token_embedding")
    decoder_layers = [layer for layer in trained_model.layers if isinstance(layer, TransformerDecoder)]
    decoder_dense = trained_model.get_layer("decoder_output")

    # Build decoder inference model
    x = decoder_emb(decoder_input)
    for decoder_layer in decoder_layers:
        x = decoder_layer(encoder_outputs, x)
    output = decoder_dense(x)

    decoder_model = keras.Model([encoder_outputs, decoder_input], output)

    return ctc_model, encoder_model, decoder_model


In [22]:
def decode_sequence(input_spectrogram, encoder_model, decoder_model, max_length=100):
    """Decode sequence using autoregressive transformer decoder"""
    # Get encoder output
    encoder_output = encoder_model.predict(tf.expand_dims(input_spectrogram, 0))

    # Initialize target sequence with start token
    target_seq = np.zeros((1, 1), dtype=np.int32)
    target_seq[0, 0] = char_to_num.vocabulary_size()  # Start token

    # Collect the generated characters
    decoded_sentence = ""

    # Autoregressive generation
    for i in range(max_length):
        # Get predictions
        output_tokens = decoder_model.predict([encoder_output, target_seq])

        # Sample next token (use argmax for simplicity)
        sampled_token_idx = np.argmax(output_tokens[0, i, :])

        # Exit condition: either hit max length or end token
        if sampled_token_idx == char_to_num("?") or len(decoded_sentence) > max_length:
            break

        # Update target sequence for next iteration
        char = num_to_char(sampled_token_idx).numpy().decode("utf-8")
        decoded_sentence += char

        # Update the target sequence
        target_seq = np.concatenate([target_seq, [[sampled_token_idx]]], axis=-1)

    return decoded_sentence


In [23]:
def ctc_decode(logits):
    """Perform CTC decoding on logits"""
    # Use beam search decoding
    input_len = np.ones(logits.shape[0]) * logits.shape[1]
    results = keras.backend.ctc_decode(
        logits, input_length=input_len, greedy=False, beam_width=5
    )[0][0]

    # Convert to text
    output_text = ""
    for result in results:
        result = tf.gather(result, tf.where(tf.not_equal(result, -1)))
        decoded = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text += decoded

    return output_text


In [24]:
# Example of how to use the inference models
def predict_example(wav_file):
    # Load and preprocess audio file similar to training pipeline
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    audio = tf.cast(audio, tf.float32)

    # Extract features
    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)

    # Load trained model (assuming it's been saved)
    model = keras.models.load_model(
        "asr_model_ctc_decoder",
        custom_objects={
            "TokenEmbedding": TokenEmbedding,
            "SpeechFeatureEmbedding": SpeechFeatureEmbedding,
            "TransformerEncoder": TransformerEncoder,
            "TransformerDecoder": TransformerDecoder,
            "CTCLoss": CTCLoss
        }
    )

    # Create inference models
    ctc_model, encoder_model, decoder_model = create_inference_models(model)

    # Get CTC prediction
    ctc_preds = ctc_model.predict(tf.expand_dims(spectrogram, 0))
    ctc_text = ctc_decode(ctc_preds)

    # Get decoder prediction
    decoder_text = decode_sequence(spectrogram, encoder_model, decoder_model)

    return {
        "ctc_prediction": ctc_text,
        "decoder_prediction": decoder_text
    }


In [26]:
# Example evaluation
def evaluate_model(test_dataset=validation_dataset, num_samples=10):
    """Evaluate the model on test data"""
    # Load trained model
    model = keras.models.load_model(
        "asr_model_ctc_decoder",
        custom_objects={
            "TokenEmbedding": TokenEmbedding,
            "SpeechFeatureEmbedding": SpeechFeatureEmbedding,
            "TransformerEncoder": TransformerEncoder,
            "TransformerDecoder": TransformerDecoder,
            "CTCLoss": CTCLoss
        }
    )

    # Create inference models
    ctc_model, encoder_model, decoder_model = create_inference_models(model)

    # Track metrics
    wer_scores = {"ctc": [], "decoder": []}

    # Take a sample of test data
    sample_data = test_dataset.take(num_samples)

    for batch in sample_data.as_numpy_iterator():
        spectrograms, labels = batch

        for i in range(len(spectrograms)):
            # Ground truth
            label_indices = [j for j in labels[i] if j < char_to_num.vocabulary_size()]
            ground_truth = "".join([num_to_char(j).numpy().decode("utf-8") for j in label_indices])

            # CTC prediction
            ctc_preds = ctc_model.predict(np.expand_dims(spectrograms[i], 0))
            ctc_text = ctc_decode(ctc_preds)

            # Decoder prediction
            decoder_text = decode_sequence(spectrograms[i], encoder_model, decoder_model)

            # Calculate WER
            wer_scores["ctc"].append(wer(ground_truth, ctc_text))
            wer_scores["decoder"].append(wer(ground_truth, decoder_text))

            print(f"Sample {i+1}")
            print(f"Ground truth: {ground_truth}")
            print(f"CTC prediction: {ctc_text}")
            print(f"Decoder prediction: {decoder_text}")
            print(f"CTC WER: {wer_scores['ctc'][-1]:.4f}")
            print(f"Decoder WER: {wer_scores['decoder'][-1]:.4f}")
            print("-" * 50)

    # Calculate average WER
    avg_ctc_wer = sum(wer_scores["ctc"]) / len(wer_scores["ctc"])
    avg_decoder_wer = sum(wer_scores["decoder"]) / len(wer_scores["decoder"])

    print(f"Average CTC WER: {avg_ctc_wer:.4f}")
    print(f"Average Decoder WER: {avg_decoder_wer:.4f}")

**Model Testing**

In [27]:
# Function to visualize model predictions
def visualize_prediction(wav_file):
    """Visualize the waveform, spectrogram, and predictions for an audio file"""
    # Load audio file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)

    # Get ground truth from metadata
    ground_truth = metadata_df[metadata_df['file_name'] == wav_file]['normalized_transcription'].values[0]

    # Process audio for model input
    audio_float = tf.cast(audio, tf.float32)
    spectrogram = tf.signal.stft(
        audio_float, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram_norm = (spectrogram - means) / (stddevs + 1e-10)

    # Get predictions
    predictions = predict_example(wav_file)
    ctc_text = predictions["ctc_prediction"]
    decoder_text = predictions["decoder_prediction"]

    # Calculate WER
    ctc_wer_value = wer(ground_truth, ctc_text)
    decoder_wer_value = wer(ground_truth, decoder_text)

    # Create visualization
    plt.figure(figsize=(16, 10))

    # Plot waveform
    plt.subplot(3, 1, 1)
    plt.plot(audio.numpy())
    plt.title("Waveform")
    plt.ylabel("Amplitude")
    plt.grid(True)

    # Plot spectrogram
    plt.subplot(3, 1, 2)
    plt.imshow(tf.transpose(spectrogram), aspect="auto", origin="lower")
    plt.title("Spectrogram")
    plt.xlabel("Time")
    plt.ylabel("Frequency")
    plt.colorbar()

    # Display text results
    plt.subplot(3, 1, 3)
    plt.axis("off")
    result_text = (
        f"Ground Truth: {ground_truth}\n\n"
        f"CTC Prediction: {ctc_text}\n"
        f"CTC Word Error Rate: {ctc_wer_value:.4f}\n\n"
        f"Decoder Prediction: {decoder_text}\n"
        f"Decoder Word Error Rate: {decoder_wer_value:.4f}"
    )
    plt.text(0.1, 0.7, result_text, fontsize=12, verticalalignment="top")

    plt.tight_layout()
    plt.show()


In [28]:
# Function to track model progress during training
def plot_training_history(history):
    """Plot training and validation metrics"""
    plt.figure(figsize=(15, 5))

    # Plot CTC loss
    plt.subplot(1, 2, 1)
    plt.plot(history['ctc_loss'], label='CTC Training Loss')
    plt.plot(history['val_ctc_loss'], label='CTC Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('CTC Loss')
    plt.legend()
    plt.grid(True)

    # Plot decoder loss and accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history['decoder_loss'], label='Decoder Training Loss')
    plt.plot(history['val_decoder_loss'], label='Decoder Validation Loss')
    if 'decoder_accuracy' in history:
        plt.plot(history['decoder_accuracy'], label='Decoder Accuracy')
        plt.plot(history['val_decoder_accuracy'], label='Decoder Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Loss / Accuracy')
    plt.title('Decoder Metrics')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()


In [29]:
# Function to run comprehensive evaluation
def comprehensive_evaluation(num_samples=20):
    """Run comprehensive evaluation on random samples"""
    # Get random samples from validation set
    val_files = df_val['file_name'].sample(num_samples).tolist()

    # Track metrics
    results = {
        'ctc_wer': [],
        'decoder_wer': [],
        'combined_wer': [],
        'processing_time': []
    }

    for i, file_name in enumerate(val_files):
        print(f"Processing sample {i+1}/{num_samples}: {file_name}")

        # Get ground truth
        ground_truth = df_val[df_val['file_name'] == file_name]['normalized_transcription'].values[0]

        # Time the prediction
        start_time = time.time()
        predictions = predict_example(file_name)
        end_time = time.time()

        # Extract predictions
        ctc_text = predictions["ctc_prediction"]
        decoder_text = predictions["decoder_prediction"]

        # Simple ensemble (choose the shorter WER)
        ctc_wer_value = wer(ground_truth, ctc_text)
        decoder_wer_value = wer(ground_truth, decoder_text)
        combined_text = ctc_text if ctc_wer_value < decoder_wer_value else decoder_text
        combined_wer_value = min(ctc_wer_value, decoder_wer_value)

        # Store results
        results['ctc_wer'].append(ctc_wer_value)
        results['decoder_wer'].append(decoder_wer_value)
        results['combined_wer'].append(combined_wer_value)
        results['processing_time'].append(end_time - start_time)

        # Print sample results
        print(f"Ground truth: {ground_truth}")
        print(f"CTC prediction: {ctc_text}")
        print(f"CTC WER: {ctc_wer_value:.4f}")
        print(f"Decoder prediction: {decoder_text}")
        print(f"Decoder WER: {decoder_wer_value:.4f}")
        print(f"Processing time: {end_time - start_time:.2f} seconds")
        print("-" * 80)

    # Calculate average metrics
    avg_ctc_wer = sum(results['ctc_wer']) / len(results['ctc_wer'])
    avg_decoder_wer = sum(results['decoder_wer']) / len(results['decoder_wer'])
    avg_combined_wer = sum(results['combined_wer']) / len(results['combined_wer'])
    avg_time = sum(results['processing_time']) / len(results['processing_time'])

    print("\n===== EVALUATION RESULTS =====")
    print(f"Average CTC WER: {avg_ctc_wer:.4f}")
    print(f"Average Decoder WER: {avg_decoder_wer:.4f}")
    print(f"Average Combined WER: {avg_combined_wer:.4f}")
    print(f"Average processing time: {avg_time:.2f} seconds per sample")

    # Plot distribution of WER
    plt.figure(figsize=(12, 6))
    plt.hist([results['ctc_wer'], results['decoder_wer'], results['combined_wer']],
             bins=10, alpha=0.7, label=['CTC', 'Decoder', 'Combined'])
    plt.xlabel('Word Error Rate')
    plt.ylabel('Number of Samples')
    plt.title('Distribution of Word Error Rate')
    plt.legend()
    plt.grid(True)
    plt.show()

    return results


In [31]:
# Example of attention visualization
def visualize_attention(wav_file):
    """Visualize the attention weights from the decoder"""
    # Note: This requires modifying the model to output attention weights
    # This is a placeholder function showing how it could be implemented

    # For this to work, you'd need to:
    # 1. Modify the TransformerDecoder to return attention weights
    # 2. Create a separate model for attention visualization

    plt.figure(figsize=(12, 8))
    plt.title("Decoder Attention Visualization")
    plt.xlabel("Encoder Timesteps")
    plt.ylabel("Decoder Timesteps")

    # Placeholder - in a real implementation, you'd get actual attention weights
    # attention_weights = get_attention_weights(wav_file)
    # plt.imshow(attention_weights, aspect='auto', origin='lower')

    plt.colorbar(label="Attention Weight")
    plt.tight_layout()
    plt.show()