<a href="https://colab.research.google.com/github/SammarieoBrown/AI-Expert-Roadmap/blob/main/deepspeech2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install jiwer

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
from IPython import display
from jiwer import wer
import os
import pandas as pd
import tarfile
from warnings import filterwarnings
filterwarnings('ignore')


In [None]:
from tensorflow.keras import mixed_precision

policy = mixed_precision.Policy("mixed_float16")
mixed_precision.set_global_policy(policy)


In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
data_url = 'https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2'
data_path = keras.utils.get_file("LJSpeech-1.1", data_url,untar=True)

In [None]:
wavs_path = data_path + "/wavs/"
metadata_path = data_path + "/metadata.csv"

In [None]:
# read metadata from file and parse it
metadata_df = pd.read_csv(metadata_path,sep="|",header=None, quoting=3)
metadata_df.head()

In [None]:

print(f"Number of samples: {len(metadata_df)}")

In [None]:
metadata_df.tail()

In [None]:
metadata_df.columns = ['file_name','transcription','normalized_transcription']
metadata_df = metadata_df[['file_name','normalized_transcription']]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)
metadata_df.head()

In [None]:
split = int(len(metadata_df)*0.90)
df_train = metadata_df[:split]
df_val = metadata_df[split:]

print(f"Number of training samples: {len(df_train)}")
print(f"Number of validation samples: {len(df_val)}")

In [None]:
# The set of characters accepted in the transcription.
characters = [x for x in "abcdefghijklmnoprstuvwxyz'?! "]
# Mapping characters to integers

char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(

vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
# an integer scalar tensor. the window length in samples
frame_length = 256
# an integer scalar tensor. the number of samples to step
frame_step = 160 
# a float scalar tensor. the magnitude of the STFT window
# if not provided, uses the smallest power of 2 enclosing frame_length
fft_length = 384


In [None]:

def encode_single_sample(wav_file, label): #encode_single_sample is the original implementation
  # Print the file path before reading the file
  # 1. read wav file
  file = tf.io.read_file(wavs_path + wav_file + ".wav")

  # 2. decode wav file
  audio, _ = tf.audio.decode_wav(file)
  audio = tf.squeeze(audio, axis=-1)
  # change type to float32
  audio = tf.cast(audio, tf.float32)
  # 4. get the spectrogram
  spectrogram = tf.signal.stft(
      audio,
      frame_length=frame_length,
      frame_step=frame_step,
      fft_length=fft_length,
      )
  # 5. get the magnitude spectrogram
  spectrogram = tf.abs(spectrogram)
  spectrogram = tf.math.pow(spectrogram, 0.5)
  #6 normalize the spectrogram
  means = tf.math.reduce_mean(spectrogram, axis=1, keepdims=True)
  stddevs = tf.math.reduce_std(spectrogram, axis=1, keepdims=True)
  spectrogram = (spectrogram - means) / (stddevs + 1e-10)

  # process the label

  # 7. convert the label to a tensor
  label = tf.strings.lower(label)
  label = tf.strings.unicode_split(label, input_encoding="UTF-8")
  # 8. map the characters to numbers
  label = char_to_num(label)
  return spectrogram, label


In [None]:
batch_size = 32
# Define the trainig dataset
train_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]),list(df_train["normalized_transcription"]))
)
train_dataset = (
    train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Define the validation dataset

validation_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]),list(df_val["normalized_transcription"]))
)
validation_dataset = (
    validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)


In [None]:
fig = plt.figure(figsize=(8, 5))
for batch in train_dataset.take(1):
  spectrogram = batch[0][0].numpy()
  spectrogram = np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
  label = batch[1][0]
  # print(f"spectrogram  {spectrogram}")

  # print(f"labels  {label}")

  # plot the spectrogram
  label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
  ax = plt.subplot(2, 1, 1)
  ax.imshow(spectrogram, aspect="auto", origin="lower")
  ax.set_title(label)
  ax.axis("off")

  # plot the waveform
  file = tf.io.read_file(wavs_path + list(df_train["file_name"])[0] + ".wav")
  audio, _ = tf.audio.decode_wav(file, desired_channels=1)
  audio = audio.numpy()
  ax = plt.subplot(2, 1, 2)
  plt.plot(audio)
  ax.set_title("Waveform")
  ax.set_xlim(0, len(audio))
  display.display(display.Audio(np.transpose(audio), rate=18000))

plt.show()

In [None]:
def CTCLoss(y_true, y_pred):
  # compute the ctc loss
  batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
  input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
  label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

  input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
  label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

  loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
  return loss


In [None]:
def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
  # model input
  input_spectrogram = layers.Input((None, input_dim), name="input")
  # Expand the dimension of the spectrogram to use te 2D convolution
  x = layers.Reshape((-1, input_dim, 1), name='expand_dim')(input_spectrogram)
  # Convolutional layer 1
  x = layers.Conv2D(
      filters=32,
      kernel_size=[11, 41],
      strides=[2, 2],
      # activation="relu",
      # kernel_initializer="he_normal",
      padding="same",
      use_bias=False,
      name="conv_1",
  )(x)
  x = layers.BatchNormalization(name="conv_1_bn")(x)
  x = layers.ReLU(name="conv_1_relu")(x)

  # Convolutional layer 2
  x = layers.Conv2D(
      filters=32,
      kernel_size=[11, 21],
      strides=[1, 2],
      # activation="relu",
      # kernel_initializer="he_normal",
      padding="same",
      use_bias=False,
      name="conv_2",
  )(x)
  x = layers.BatchNormalization(name="conv_2_bn")(x)
  x = layers.ReLU(name="conv_2_relu")(x)

  # reshape the resulted volume to feed the RNNs layers
  x = layers.Reshape((-1, x.shape[-2]*x.shape[-1]))(x)

  # RNN layers
  for i in range(1,rnn_layers+1):
    recurrent = layers.GRU(
        units= rnn_units,
        activation="tanh",
        recurrent_activation="sigmoid",
        use_bias=True,
        return_sequences=True,
        reset_after=True,
        # kernel_initializer="glorot_uniform",
        name=f"gru_{i}",
    )
    x = layers.Bidirectional(
      recurrent, name=f"bidirectional_{i}", merge_mode="concat"
    )(x)
    if i < rnn_layers:
      x = layers.Dropout(rate = 0.5)(x)
  # Dense layer
  x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
  x = layers.ReLU(name="dense_1_relu")(x)
  x = layers.Dropout(rate = 0.5)(x)

  # Classfication layer

  output = layers.Dense(units=output_dim + 1, activation="softmax")(x)

  # define the model

  model = keras.models.Model(input_spectrogram, output, name="DeepSpeech_2")

  # optimizer
  opt = keras.optimizers.Adam(learning_rate=1e-4)

  # compile the model
  model.compile(optimizer=opt, loss=CTCLoss)

  return model



In [None]:
# get the model

model = build_model(
    input_dim=fft_length// 2 + 1,
    output_dim= char_to_num.vocabulary_size(),
    rnn_layers=5,
)
model.summary(line_length=110)

In [None]:
#  utils function for the training and evaluation
def decode_batch_predictions(pred):
  input_len = np.ones(pred.shape[0]) * pred.shape[1]
  # use greedy search. For complex tasks, you can use beam search
  results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
  # iterate over the results and get back the text
  output_text = []
  for result in results:
    result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
    output_text.append(result)
  return output_text

In [None]:
class CallbackEval(keras.callbacks.Callback):
  def __init__(self, dataset):
    super().__init__()
    self.dataset = dataset

  def on_epoch_end(self, epoch:int, logs=None):
    predictions = []
    targets = []
    for batch in self.dataset:
      X, y = batch
      batch_predictions = model.predict(X)
      batch_predictions = decode_batch_predictions(batch_predictions)
      predictions.extend(batch_predictions)
      for label in y:
        label = (
            tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        )
        targets.append(label)
    wer_score = wer(targets, predictions)
    print("-" * 100)
    print(f"Word Error Rate: {wer_score:.4f}")
    print("-" * 100)
    for i in np.random.randint(0, len(predictions), 2):
      print(f"Target: {targets[i]}")
      print(f"Prediction: {predictions[i]}")
      print("-" * 100)

In [None]:
# train the model

epochs =2

# callback function to check transcription during training

validation_callback = CallbackEval(validation_dataset)

# train the model
history = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=validation_dataset,
    callbacks=[validation_callback],
) 