In [None]:
%pip install -r requirements.txt

## Setup

In [None]:
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display

## Load the Dataset

In [None]:
data_path = "train_adapt"   # put the path to the dataset
wavs_path = data_path + "/train/"
metadata_path = data_path + "/train.csv"

validation_path = data_path + "/adapt/"
validation_metadata_path = data_path + "/adapt.csv"


# Read metadata file and parse it
metadata_df = pd.read_csv(metadata_path, sep=",", header=0, quoting=3)
metadata_df.columns = ["audio", "transcript"]
metadata_df = metadata_df[["audio", "transcript"]]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)


validation_metadata_df = pd.read_csv(validation_metadata_path, sep=",", header=0, quoting=3)
validation_metadata_df.columns = ["audio", "transcript"]
validation_metadata_df = validation_metadata_df[["audio", "transcript"]]
validation_metadata_df = validation_metadata_df.sample(frac=1).reset_index(drop=True)


In [None]:
validation_metadata_df.head(3)

We now split the data into training and validation set.

In [None]:
df_train = metadata_df
df_val = validation_metadata_df
print(f"Size of the training set: {len(df_train)}")
print(f"Size of the validation set: {len(df_val)}")


## Preprocessing

We first prepare the vocabulary to be used.

In [None]:
# The set of characters accepted in the transcription.
characters = [x for x in "غظضذخثةتشقرصفعسمنلكيطحزوؤهدجبىائءإآأ "]
# Mapping characters to integers
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

Next, we create the function that describes the transformation that we apply to each
element of our dataset.

In [None]:
# An integer scalar Tensor. The window length in samples.
frame_length = 240 
# An integer scalar Tensor. The number of samples to step.
frame_step = 120 
# An integer scalar Tensor. The size of the FFT to apply.
# If not provided, uses the smallest power of 2 enclosing frame_length. /////////////////////////////

fft_length = 256 

sample_rate = 16000
def encode_single_sample_train(wav_file, label):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    # 2. Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # 3. Change type to float
    audio = tf.cast(audio, tf.float32)
    # 4. Get the spectrogram
    stfts = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )

    # 5. We only need the magnitude, which can be derived by applying tf.abs
    spectrogram = tf.abs(stfts)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    # 6. normalisation
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    ###########################################
    ##  Process the label
    ##########################################
    # 7. Split the label
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")
    # 8. Map the characters in label to numbers
    label = char_to_num(label)
    # 10. Return a dict as our model is expecting two inputs
    return spectrogram, label   #spectrogram


def encode_single_sample_validation(wav_file, label):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(validation_path + wav_file + ".wav")
    # 2. Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # 3. Change type to float
    audio = tf.cast(audio, tf.float32)
    # 4. Get the spectrogram
    stfts = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )

    # 5. We only need the magnitude, which can be derived by applying tf.abs
    spectrogram = tf.abs(stfts)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    # 6. normalisation
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    ###########################################
    ##  Process the label
    ##########################################
    # 7. Split the label
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")
    # 8. Map the characters in label to numbers
    label = char_to_num(label)
    # 10. Return a dict as our model is expecting two inputs
    return spectrogram, label   #spectrogram

## Creating `Dataset` objects

We create a `tf.data.Dataset` object that yields
the transformed elements, in the same order as they
appeared in the input.

In [None]:
batch_size = 32
print(len(list(df_train["audio"])))
print(len(list(df_train["transcript"])))

train_dataset = tf.data.Dataset.from_tensor_slices(
    (np.array(df_train["audio"].tolist()), np.array(df_train["transcript"].tolist()))
)
train_dataset = (
    train_dataset.map(encode_single_sample_train, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Define the validation dataset
validation_dataset = tf.data.Dataset.from_tensor_slices(
    (np.array(df_val["audio"].tolist()), np.array(df_val["transcript"].tolist()))
)
validation_dataset = (
    validation_dataset.map(encode_single_sample_validation, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)


## Visualize the data

Let's visualize an example in our dataset, including the
audio clip, the spectrogram and the corresponding label.

In [None]:
fig = plt.figure(figsize=(8, 5))
for batch in validation_dataset.take(1):
    spectrogram = batch[0][0].numpy()
    spectrogram = np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
    label = batch[1][0]
    # Spectrogram
    label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
    ax = plt.subplot(2, 1, 1)
    ax.imshow(spectrogram, vmax=1)
    ax.set_title(label)
    ax.axis("off")
    # Wav
    file = tf.io.read_file(validation_path + list(df_val["audio"])[0] + ".wav")
    audio, _ = tf.audio.decode_wav(file)
    audio = audio.numpy()
    ax = plt.subplot(2, 1, 2)
    plt.plot(audio)
    ax.set_title("Signal Wave")
    ax.set_xlim(0, len(audio))
    display.display(display.Audio(np.transpose(audio), rate=16000))
plt.show()

## Model

We first define the CTC Loss function.

In [None]:

def CTCLoss(y_true, y_pred):
    # Compute the training-time loss value
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss


We now define our model. We will define a model similar to
[DeepSpeech2](https://nvidia.github.io/OpenSeq2Seq/html/speech-recognition/deepspeech2.html).

In [None]:

def build_model(input_dim, output_dim, rnn_layers= 5, rnn_units=256):
    """Model similar to DeepSpeech2."""
    # Model's input
    input_spectrogram = layers.Input((None, input_dim), name="input")
    # Expand the dimension to use 2D CNN.
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)

     # Convolutional layers
    for i, (filters, kernel_size, strides) in enumerate(
        [(96, [11, 41], [2, 2]), (128, [11, 21], [1, 2])
        ]
    ):
        x = layers.Conv2D(
            filters=filters,
            kernel_size=kernel_size,
            strides=strides,
            padding="same",
            use_bias=False,
            name=f"conv_{i+1}",
            kernel_initializer=tf.initializers.GlorotUniform(),
        )(x)
        x = layers.ReLU(name=f"conv_{i+1}_relu")(x)

    # Reshape the resulted volume to feed the RNNs layers
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
    # RNN layers
    for i in range(1, rnn_layers + 1):
        recurrent = layers.GRU(
            units=rnn_units,
            activation="tanh",
            recurrent_activation="sigmoid",
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}",
            kernel_initializer=tf.initializers.GlorotUniform(),
        )
        x = layers.Bidirectional(
            recurrent, name=f"bidirectional_{i}", merge_mode="concat"
        )(x)

    # Dense layer
    x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
    x = layers.ReLU(name="dense_1_relu")(x)

    output = layers.Dense(units=output_dim + 1, activation="softmax")(x)
    model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
    # Optimizer

    opt = keras.optimizers.Adam(learning_rate=1e-7)
    # Compile the model and return
    model.compile(optimizer=opt, loss=CTCLoss)
    return model


# Get the model
model = build_model(
    input_dim=fft_length // 2 + 1,
    output_dim=char_to_num.vocabulary_size(),
    rnn_units=768,
)
model.summary(line_length=110)

In [None]:
# A utility function to decode the output of the network
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True, beam_width=512)[0][0]
    # Iterate over the results and get back the text
    output_text = []
    for result in results:
        result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text.append(result)
    return output_text


# A callback class to output a few transcriptions during training
class CallbackEval(keras.callbacks.Callback):
    """Displays a batch of outputs after every epoch."""

    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset

    def on_epoch_end(self, epoch: int, logs=None):
        predictions = []
        targets = []
        for batch in self.dataset:
            X, y = batch
            batch_predictions = model.predict(X)
            batch_predictions = decode_batch_predictions(batch_predictions)
            predictions.extend(batch_predictions)
            for label in y:
                label = (
                    tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
                )
                targets.append(label)
            break

        for i in np.random.randint(0, len(predictions), 32):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}")
            print("-" * 100)



Let's start the training process.

In [None]:
tf.test.gpu_device_name()

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

In [None]:
# Define the checkpoint callback
checkpoint_callback = ModelCheckpoint(
    filepath='milstones/model_{epoch:02d}_{val_loss:.2f}.h5',  # Path to save the model
    save_weights_only=True,  # Only save the model's weights
    save_freq= "epoch",  # Save after each epoch
    verbose=1  # Print a message when saving the model
)

# Define the number of epochs.
epochs = 1
# Callback function to check transcription on the val set.
validation_callback = CallbackEval(validation_dataset)
# Train the model
with tf.device('/device:GPU:0'):
  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[checkpoint_callback , validation_callback]
  )
