<a href="https://colab.research.google.com/github/RogueRock/IDC-409-Speech_to_text/blob/main/idc409_1stdraft.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#SPEECH TO TEXT

In [None]:
pip install jiwer

Collecting jiwer
  Downloading jiwer-3.0.3-py3-none-any.whl (21 kB)
Collecting rapidfuzz<4,>=3 (from jiwer)
  Downloading rapidfuzz-3.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m37.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz, jiwer
Successfully installed jiwer-3.0.3 rapidfuzz-3.4.0


https://pypi.org/project/jiwer/
<br>
The above link is for jiwer, a package used for verifying the accuracy, etc of an automatic speech recognition model.




In [None]:
##importing the required libraries
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display

from jiwer import wer

Using the LJSpeech dataset, the link for the same is following:
#https://keithito.com/LJ-Speech-Dataset/
#https://www.kaggle.com/datasets/mathurinache/the-lj-speech-dataset/data
#https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2

In [None]:
data_url = "https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2"
## extracting the data of LJspeech dataset using the keras in tensorflow
## untar = True is used to extract the file if it is archived
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar =True)



Downloading data from https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2


In [None]:
wavs_path = data_path + "/wavs/"
metadata_path = data_path + "/metadata.csv"

In [None]:
##converting the csv file to a dataframe using pandas
metadata_df = pd.read_csv(metadata_path, sep = "|", header = None, quoting = 3)
metadata_df.head(10)


Unnamed: 0,0,1,2
0,LJ001-0001,"Printing, in the only sense with which we are ...","Printing, in the only sense with which we are ..."
1,LJ001-0002,in being comparatively modern.,in being comparatively modern.
2,LJ001-0003,For although the Chinese took impressions from...,For although the Chinese took impressions from...
3,LJ001-0004,"produced the block books, which were the immed...","produced the block books, which were the immed..."
4,LJ001-0005,the invention of movable metal letters in the ...,the invention of movable metal letters in the ...
5,LJ001-0006,"And it is worth mention in passing that, as an...","And it is worth mention in passing that, as an..."
6,LJ001-0007,"the earliest book printed with movable types, ...","the earliest book printed with movable types, ..."
7,LJ001-0008,has never been surpassed.,has never been surpassed.
8,LJ001-0009,"Printing, then, for our purpose, may be consid...","Printing, then, for our purpose, may be consid..."
9,LJ001-0010,"Now, as all books not primarily intended as pi...","Now, as all books not primarily intended as pi..."


In [None]:
metadata_df.columns = ["file name", "transcription", "normalized transcription"]
##reshuffling the rows of the metadata df in a random order and dropping the index column

metadata_df = metadata_df.sample(frac = 1).reset_index (drop = True)
metadata_df.head(3)

Unnamed: 0,file name,transcription,normalized transcription
0,LJ005-0229,"By another clause of the Jail Act, two justice...","By another clause of the Jail Act, two justice..."
1,LJ008-0296,The time of the arrival of this report was gen...,The time of the arrival of this report was gen...
2,LJ018-0351,"and that, in spite of the verdict of the jury,...","and that, in spite of the verdict of the jury,..."


In [None]:
## splitting the dataframe into two parts : training (90%), test (10%); using int to get an index at which to split
split = int(len(metadata_df) *0.90)
df_train = metadata_df [:split]
df_test = metadata_df[split:]
print ("size of the training dataframe : ",{len(df_train)})
print ("size of the test dataframe : ",{len(df_test)})

size of the training dataframe :  {11790}
size of the test dataframe :  {1310}


In [None]:
#defining a list of allowed vocabulary
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?!"]
#converting the charactrs to integer values using keras; any character not in the list is given an empty string
char_to_num = keras.layers.StringLookup(vocabulary = characters, oov_token = "")
#converting the integer back to the character using keras, specifying it using invert
num_to_char = keras.layers.StringLookup(vocabulary = char_to_num.get_vocabulary(), oov_token = "", invert = True)
print (char_to_num.get_vocabulary())
print(char_to_num.vocabulary_size())

['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!']
30


In [None]:
frame_length = 256
frame_step = 160
fft_length = 384
def encode_single_sample (wav_file, label):
  file = tf.io.read_file (wavs_path + wav_file + ".wav")
  audio,_ = tf.audio.decode_wav(file)
  audio = tf.squeeze(audio, axis = -1)
  audio = tf.cast(audio, tf.float32)
  spectrogram = tf.signal.stft (audio, frame_length = frame_length, frame_step = frame_step, fft_length = fft_length)
  spectrogram = tf.abs(spectrogram)
  spectrogram = tf.math.pow(spectrogram, 0.5)
  means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
  stddev = tf.math.reduce_std(spectrogram, 1, keepdims=True)
  spectrogram = (spectrogram - means)/(stddev - 1e-10)
  label = tf.strings.lower(label)
  label = tf.strings.unicode_split(label, input_encoding = "UTF-8")
  label = char_to_num(label)
  return spectrogram, label


In [None]:
batch_size = 32
file_names = np.array(df_train["file name"])
transcriptions = np.array(df_train["normalized transcription"])
train_dataset = tf.data.Dataset.from_tensor_slices(( file_names,transcriptions))
train_dataset = (train_dataset.map(encode_single_sample, num_parallel_calls = tf.data.AUTOTUNE).padded_batch(batch_size).prefetch(buffer_size = tf.data.AUTOTUNE))
test_dataset = tf.data.Dataset.from_tensor_slices((file_names, transcriptions))
test_dataset = (test_dataset.map(encode_single_sample, num_parallel_calls = tf.data.AUTOTUNE).padded_batch(batch_size).prefetch(buffer_size = tf.data.AUTOTUNE))

In [None]:
file_names = np.array(df_train["file name"])
transcriptions = np.array(df_train["normalized transcription"])
data_type = file_names.dtype
print (data_type)

object


In [None]:
def CTCloss (y_true, y_pred):
  batch_len = tf.cast(tf.shape(y_true)[0], dtype = "int64")
  input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64")
  label_length = tf.cast(tf.shape(y_true)[1], dtype = "int64")
  input_length = input_length * tf.ones(shape = (batch_len, 1), dtype = "int64")
  label_length = label_length * tf.ones(shape = (batch_len, 1), dtype = "int64")

  loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
  return loss


In [None]:
def draft_model (inputdim, outputdim, rnn_layers = 5, rnn_units = 128):
  input_spectrogram = layers.Input ((None, inputdim), name = "input")
  x = layers.Reshape((-1, inputdim, 1), name = "expanddim")(input_spectrogram)
  x = layers.Conv2D( filters = 32, kernel_size = [11,41], strides = [2,2], padding = "same", use_bias = False, name = "conv_1",)(x)
  x = layers.BatchNormalization(name = "conv_1_bn")(x)
  x = layers.ReLU(name = "conv_1_relu")(x)
  x = layers.Conv2D(filters = 32, kernel_size =[11,21], strides = [1,2], padding = "same", use_bias = False, name = "conv_2")(x)
  x = layers.BatchNormalization(name ="conv_2_bn")(x)
  x = layers.ReLU(name = "conv_2_relu")(x)
  x = layers.Reshape((-1, x.shape[-2]*x.shape[-1]))(x)
  for i in range (1, rnn_layers + 1):
    recurrent = layers.GRU(units= rnn_units, activation = "tanh", recurrent_activation = "sigmoid", use_bias = True, return_sequences = True,
                           reset_after = True, name = f"gru_{i}",)
    x = layers.Bidirectional(recurrent, name = f"bidirectional_{i}", merge_mode="concat")(x)
    if i < rnn_layers:
      x = layers.Dropout(rate= 0.5)(x)
  x = layers.Dense(units = rnn_units * 2, name = "dense_1")(x)
  x = layers.ReLU(name = "dense_1_relu")(x)
  x = layers.Dropout ( rate = 0.5 )(x)
  output = layers.Dense(units = outputdim + 1, activation = "softmax")(x)
  model = keras.Model(input_spectrogram, output, name = "Deepspeech_2")
  opt = keras.optimizers.Adam(learning_rate= 1e-4)
  model.compile(optimizer = opt, loss= CTCloss)
  return model
model = draft_model(inputdim = fft_length // 2 + 1, outputdim= char_to_num.vocabulary_size(), rnn_units = 512,)
model.summary(line_length = 110)



Model: "Deepspeech_2"
______________________________________________________________________________________________________________
 Layer (type)                                    Output Shape                                Param #          
 input (InputLayer)                              [(None, None, 193)]                         0                
                                                                                                              
 expanddim (Reshape)                             (None, None, 193, 1)                        0                
                                                                                                              
 conv_1 (Conv2D)                                 (None, None, 97, 32)                        14432            
                                                                                                              
 conv_1_bn (BatchNormalization)                  (None, None, 97, 32)                     

In [None]:
def decode_batch_predictions (pred):
  input_len = np.ones(pred.shape[0]) * pred.shape [1]
  results = keras.backend.ctc_decode(pred, input_length = input_len, greedy = True)[0][0]
  output_text =[]
  for result in results :
    result = tf.strings.reduce_join(num_to_char(result), numpy().decode("utf-8"))
    output_text.append(result)
  return output_text

class CallbackEval(keras.callbacks.Callback):

  def __init__(self, dataset):
    super().__init__()
    self.dataset = dataset
  def on_epoch_end(self, epoch : int, logs = None):
     predictions = []
     targets = []
     for batch in self.dataset:
      X, y = batch
      batch_predictions = model.predict(X)
      batch_predictions = decode_batch_predictions (batch_predictions)
      predictions.extend(batch_predictions)
      for label in y :
       label = (tf.strings.reduce_join (num_to_char(label)).numpy().decode("utf-8"))
       targets.append (label)
     wer_score = wer(targets,predictions)
     print ("." *100)
     print (f"word error rate: , {wer_score:.4f}")
     print ("." *100)
     for i in np.random.randint (0, len(predictions),2):
      print ("target : ", (targets[i]))
      print ("prediction: ", (predictions[i]))
      print ("." *100)


In [None]:
epochs = 1
validation_callback = CallbackEval(test_dataset)
history = model.fit(train_dataset, validation_data= test_dataset, epochs= epochs, callbacks = [validation_callback],)


 72/369 [====>.........................] - ETA: 10:01:56 - loss: 358.3846

In [None]:
predictions = []
targets = []
for batch in test_dataset :
  X, y = batch
  batch_predictions = model.predict(X)
  batch_predictions = decode_batch_predictions (batch_predictions)
  predictions.extend(batch_predictions)
  for label in y :
       label = (tf.strings.reduce_join (num_to_char(label)).numpy().decode("utf-8"))
       targets.append (label)
wer_score = wer(targets,predictions)
print ("." *100)
print (f"word error rate: , {wer_score:.4f}")
print ("." *100)
for i in np.random.randint (0, len(predictions),2):
      print ("target : ", (targets[i]))
      print ("prediction: ", (predictions[i]))
      print ("." *100)