<a href="https://colab.research.google.com/github/7azem-walid/Contact-Management-System/blob/main/examples/audio/ipynb/ctc_asr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install jiwer tensorflow-addons



In [2]:
!pip uninstall keras -y  # Remove standalone keras if present

Found existing installation: keras 3.9.2
Uninstalling keras-3.9.2:
  Successfully uninstalled keras-3.9.2


In [3]:
!pip install --upgrade tensorflow

Collecting keras>=3.5.0 (from tensorflow)
  Using cached keras-3.9.2-py3-none-any.whl.metadata (6.1 kB)
Using cached keras-3.9.2-py3-none-any.whl (1.3 MB)
Installing collected packages: keras
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow-text 2.18.1 requires tensorflow<2.19,>=2.18.0, but you have tensorflow 2.19.0 which is incompatible.
tensorflow-decision-forests 1.11.0 requires tensorflow==2.18.0, but you have tensorflow 2.19.0 which is incompatible.
tf-keras 2.18.0 requires tensorflow<2.19,>=2.18, but you have tensorflow 2.19.0 which is incompatible.[0m[31m
[0mSuccessfully installed keras-3.9.2


In [5]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer


In [6]:
# Download and prepare dataset
data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
data_path = tf.keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)
wavs_path = data_path + "/wavs/"
metadata_path = data_path + "/metadata.csv"

Downloading data from https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2
[1m2748572632/2748572632[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 0us/step


In [None]:
# Read metadata
metadata_df = pd.read_csv(metadata_path, sep="|", header=None, quoting=3)
metadata_df.columns = ["file_name", "transcription", "normalized_transcription"]
metadata_df = metadata_df[["file_name", "normalized_transcription"]]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)

In [None]:
# Split dataset
split = int(len(metadata_df) * 0.90)
df_train = metadata_df[:split]
df_val = metadata_df[split:]

print(f"Training set size: {len(df_train)}")
print(f"Validation set size: {len(df_val)}")


In [None]:
# Character processing
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(f"Vocabulary: {char_to_num.get_vocabulary()} (size={char_to_num.vocabulary_size()})")

In [None]:
# Audio processing parameters
frame_length = 256
frame_step = 160
fft_length = 384

def encode_single_sample(wav_file, label):
    # Process audio
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    audio = tf.cast(audio, tf.float32)

    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)

    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)

    # Process label
    label = tf.strings.lower(label)
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")
    label = char_to_num(label)

    return spectrogram, label

In [None]:
# Create datasets
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]), list(df_train["normalized_transcription"]))
train_dataset = (
    train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

validation_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]), list(df_val["normalized_transcription"]))
validation_dataset = (
    validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [None]:
# Transformer Model Components
class PositionalEncoding(layers.Layer):
    def __init__(self, max_steps, max_dims, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        if max_dims % 2 == 1: max_dims += 1
        p, i = np.meshgrid(np.arange(max_steps), np.arange(max_dims // 2))
        pos_emb = np.empty((1, max_steps, max_dims))
        pos_emb[0, :, ::2] = np.sin(p / 10000**(2 * i / max_dims)).T
        pos_emb[0, :, 1::2] = np.cos(p / 10000**(2 * i / max_dims)).T
        self.positional_embedding = tf.constant(pos_emb.astype(self.dtype))

    def call(self, inputs):
        shape = tf.shape(inputs)
        return inputs + self.positional_embedding[:, :shape[-2], :shape[-1]]

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, rate=0.1):
        super().__init__()
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(feed_forward_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, num_heads, feed_forward_dim, dropout_rate=0.1):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.feed_forward_dim = feed_forward_dim
        self.dropout_rate = dropout_rate

        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential([
            layers.Dense(feed_forward_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.dropout_1 = layers.Dropout(dropout_rate)
        self.dropout_2 = layers.Dropout(dropout_rate)
        self.dropout_3 = layers.Dropout(dropout_rate)
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, training, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask,
            training=training,
        )
        attention_output_1 = self.dropout_1(attention_output_1, training=training)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
            training=training,
        )
        attention_output_2 = self.dropout_2(attention_output_2, training=training)
        attention_output_2 = self.layernorm_2(attention_output_1 + attention_output_2)

        ffn_output = self.ffn(attention_output_2)
        ffn_output = self.dropout_3(ffn_output, training=training)
        return self.layernorm_3(attention_output_2 + ffn_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

def build_model():
    # Encoder
    encoder_inputs = keras.Input(shape=(None, fft_length // 2 + 1), name="encoder_inputs")
    x = layers.Conv1D(256, 11, strides=2, padding="same", activation="relu")(encoder_inputs)
    x = layers.Conv1D(256, 11, strides=2, padding="same", activation="relu")(x)
    x = layers.Conv1D(256, 11, strides=2, padding="same", activation="relu")(x)
    x = layers.Dense(256, activation="relu")(x)
    encoder_outputs = TransformerEncoder(256, 4, 1024)(x)
    encoder = keras.Model(encoder_inputs, encoder_outputs, name="encoder")

    # Decoder
    decoder_inputs = keras.Input(shape=(None,), name="decoder_inputs")
    encoded_seq_inputs = keras.Input(shape=(None, 256), name="decoder_state_inputs")
    x = layers.Embedding(char_to_num.vocabulary_size(), 256)(decoder_inputs)
    x = PositionalEncoding(200, 256)(x)
    x = TransformerDecoder(256, 4, 1024)(x, encoded_seq_inputs)
    x = layers.Dropout(0.5)(x)
    decoder_outputs = layers.Dense(char_to_num.vocabulary_size(), activation="softmax")(x)
    decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs, name="decoder")

    # Combined model
    decoder_outputs = decoder([decoder_inputs, encoder_outputs])
    transformer = keras.Model(
        [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
    )

    return transformer

model = build_model()
model.summary()

In [None]:
# Learning rate schedule
class CustomSchedule(keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super().__init__()
        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        step = tf.cast(step, tf.float32)
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps**-1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(256)
optimizer = keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

# Loss and metrics
def loss_fn(y_true, y_pred):
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False)(y_true, y_pred)
    mask = tf.cast(tf.not_equal(y_true, 0), dtype=loss.dtype)
    loss *= mask
    return tf.reduce_sum(loss) / tf.reduce_sum(mask)

def accuracy_fn(y_true, y_pred):
    preds = tf.argmax(y_pred, axis=-1)
    preds = tf.cast(preds, dtype=y_true.dtype)
    matches = tf.cast(tf.equal(y_true, preds), dtype=tf.float32)
    mask = tf.cast(tf.not_equal(y_true, 0), dtype=matches.dtype)
    return tf.reduce_sum(matches) / tf.reduce_sum(mask)

model.compile(optimizer=optimizer, loss=loss_fn, metrics=[accuracy_fn])

# Training
epochs = 10
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs
)

# Evaluation
predictions = []
targets = []
for batch in validation_dataset.take(20):
    X, y = batch
    batch_predictions = model.predict(X)
    batch_predictions = tf.argmax(batch_predictions, axis=-1)
    for pred, label in zip(batch_predictions, y):
        pred_text = tf.strings.reduce_join(num_to_char(pred)).numpy().decode("utf-8")
        label_text = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        predictions.append(pred_text)
        targets.append(label_text)

wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Final Word Error Rate: {wer_score:.4f}")
print("-" * 100)

for i in range(5):
    print(f"Target: {targets[i]}")
    print(f"Prediction: {predictions[i]}")
    print("-" * 100)