In [None]:
%pip install jiwer

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tensorflow import keras
# from tensorflow.keras import layers
from keras import layers
from jiwer import wer

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
WAV_PATH = '/content/drive/MyDrive/datasetSpeechToText/audioJava500/wavs/'
TSV_PATH = '/content/drive/MyDrive/datasetSpeechToText/audioJava500/line_indexEdited.tsv'

In [None]:
metadataDF = pd.read_csv(TSV_PATH, sep='\t', header=None, quoting = 3)

In [None]:
metadataDF.tail(6)

In [None]:
metadataDF.head(10)

In [None]:
metadataDF.columns = ["file_name", "transcription"]
metadataDF = metadataDF[["file_name", "transcription"]]
metadataDF = metadataDF.sample(frac=1).reset_index(drop=True)
metadataDF.head(6)

In [None]:
split = int(len(metadataDF) * 0.90)
dfTrain = metadataDF[:split]
dfVal = metadataDF[split:]

print(f"Size of the training set: {len(dfTrain)}")
print(f"Size of the validation set: {len(dfVal)}")

In [None]:
char = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]
charToNum = keras.layers.StringLookup(vocabulary=char, oov_token="")
numToChar = keras.layers.StringLookup(vocabulary=charToNum.get_vocabulary(), oov_token="", invert=True)

print(f"The Vocabulary size: {charToNum.get_vocabulary()}")
print(f"(size = {charToNum.vocabulary_size()})")

In [None]:
charToNum

In [None]:
frameLength = 256
frameStep = 160
fftLength = 384

def encodeSingleSample(wavFile, label):
    file = tf.io.read_file(WAV_PATH + wavFile + '.wav')
    audio, _ = tf.audio.decode_wav(file)

    audio = tf.squeeze(audio, axis=-1)
    audio = tf.cast(audio, tf.float32)

    spectogram = tf.signal.stft(audio, frame_length=frameLength, frame_step=frameStep, fft_length=fftLength)
    spectogram = tf.abs(spectogram)
    spectogram = tf.math.pow(spectogram, 0.5)

    means = tf.math.reduce_mean(spectogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectogram, 1, keepdims=True)
    spectogram = (spectogram - means) / (stddevs + 1e-10)

    label = tf.strings.lower(label)
    label = tf.strings.unicode_split(label, input_encoding = 'UTF-8')
    label = charToNum(label)

    return spectogram, label

In [None]:
BATCH_SIZE = 4

trainDataset = tf.data.Dataset.from_tensor_slices(
    (list(dfTrain['file_name']), list(dfTrain['transcription']))
)
trainDataset = (
    trainDataset.map(encodeSingleSample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(BATCH_SIZE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

valDataset = tf.data.Dataset.from_tensor_slices(
    (list(dfVal['file_name']), list(dfVal['transcription']))
)
valDataset = (
    valDataset.map(encodeSingleSample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(BATCH_SIZE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [None]:
fig = plt.figure(figsize=(10, 10))
for batch in trainDataset.take(1):
    spectogram = batch[0][0].numpy()
    spectogram = np.array([np.trim_zeros(x) for x in np.transpose(spectogram)])
    label = batch[1][0]

    label = tf.strings.reduce_join(numToChar(label)).numpy().decode('utf-8')
    ax = fig.add_subplot(2,1,1)
    ax.imshow(spectogram, vmax=1)
    ax.set_title(label)
    ax.axis('off')

    file = tf.io.read_file(WAV_PATH + dfTrain['file_name'][0] + '.wav')
    audio, _ = tf.audio.decode_wav(file)
    audio = audio.numpy()
    ax = fig.add_subplot(2,1,2)
    ax.plot(audio)
    ax.set_title("Signal Wave")
    ax.set_xlim([0, len(audio)])
    display.display(display.Audio(np.transpose(audio), rate=16000))
plt.show()

In [None]:
def CTCLoss (y_true, y_pred):
    batchLen = tf.cast(tf.shape(y_true)[0], dtype='int64')
    inputLen = tf.cast(tf.shape(y_pred)[1], dtype='int64')
    labelLen = tf.cast(tf.shape(y_true)[1], dtype='int64')

    inputLen = inputLen * tf.ones(shape=(batchLen, 1), dtype='int64')
    labelLen = labelLen * tf.ones(shape=(batchLen, 1), dtype='int64')

    loss = keras.backend.ctc_batch_cost(y_true, y_pred, inputLen, labelLen)
    return loss

In [None]:
def buildModel(inputDim, outputDim, rnnLayers = 5, rnnUnits = 128):
    inputSpectogram = layers.Input((None, inputDim), name='input')

    # Expand Dimension to use 2D CNN
    x = layers.Reshape((-1, inputDim, 1), name = "expandDim")(inputSpectogram)

    # Convolution Layer 1
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 41],
        strides=[2, 2],
        padding='same',
        use_bias=False,
        name='conv_1'
    )(x)
    x = layers.BatchNormalization(name='conv_1_bn')(x)
    x = layers.ReLU(name='conv_1_relu')(x)

    # Convolution Layer 2
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 21],
        strides=[1, 2],
        padding='same',
        use_bias=False,
        name='conv_2'
    )(x)
    x = layers.BatchNormalization(name='conv_2_bn')(x)
    x = layers.ReLU(name='conv_2_relu')(x)

    # Reshape the resulted volume to feed the RNN's layers
    x = layers.Reshape((-1, x.shape[2] * x.shape[-1]))(x)

    # RNN Layers
    for i in range (1, rnnLayers + 1):
        recurrent = layers.GRU(
            units=rnnUnits,
            activation='tanh',
            recurrent_activation='sigmoid',
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f'gru_{i}'
        )
        x = layers.Bidirectional(recurrent, name=f'bidirectional_{i}', merge_mode="concat")(x)
        if i < rnnLayers:
            x = layers.Dropout(rate = 0.5)(x)

    # Dense Layer
    x = layers.Dense(units = rnnUnits * 2, name = "dense_1")(x)
    x = layers.ReLU(name = "dense_1_relu")(x)
    x = layers.Dropout(rate = 0.5)(x)

    # Classification Layer
    output = layers.Dense(units = outputDim + 1, activation = "softmax")(x)

    # Model
    model = keras.Model(inputs = inputSpectogram, outputs = output, name = "DeepSpeech2")

    # Optimizer
    optimizer = keras.optimizers.Adam(learning_rate = 1e-4)

    # Compile
    model.compile(optimizer = optimizer, loss = CTCLoss)

    return model

model = buildModel(inputDim = fftLength // 2 + 1, outputDim = charToNum.vocabulary_size(), rnnUnits = 512)
model.summary()

In [None]:
def decodeBatchPredictions(pred):
    inputLen = np.ones(pred.shape[0]) * pred.shape[1]
    results = keras.backend.ctc_decode(pred, input_length=inputLen, greedy=True)[0][0]
    outputText = []
    for result in results:
        result = tf.strings.reduce_join(numToChar(result)).numpy().decode('utf-8')
        outputText.append(result)
    return outputText

class CallbackEval(keras.callbacks.Callback):
    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset

    def on_epoch_end(self, epoch, logs=None):
        predictions = []
        targets = []

        for batch in self.dataset:
            X, y = batch
            batchPredictions = model.predict(X)
            batchPredictions = decodeBatchPredictions(batchPredictions)
            predictions.extend(batchPredictions)
            for label in y:
                label = (
                    tf.strings.reduce_join(numToChar(label)).numpy().decode('utf-8')
                )
                targets.append(label)
        werScore = wer(targets, predictions)
        print("-" * 50)
        print(f"Word Error Rate: {werScore:.4f}")
        print("-" * 50)
        for i in np.random.randint(0, len(predictions), 5):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}")
            print("-" * 50)

In [None]:
EPOCHS = 135
valCallBack = CallbackEval(valDataset)
history = model.fit(trainDataset, validation_data=valDataset, epochs=EPOCHS, callbacks=[valCallBack])

In [None]:
# save the models to disk
model.save('model.h5')

In [None]:
predictions = []
targets = []
for batch in valDataset:
    X, y = batch
    batchPredictions = model.predict(X)
    batchPredictions = decodeBatchPredictions(batchPredictions)
    predictions.extend(batchPredictions)
    for label in y:
        label = (
            tf.strings.reduce_join(numToChar(label)).numpy().decode('utf-8')
        )
        targets.append(label)
werScore = wer(targets, predictions)
print("-" * 50)
print(f"Word Error Rate: {werScore:.4f}")
print("-" * 50)
for i in np.random.randint(0, len(predictions), 5):
    print(f"Target    : {targets[i]}")
    print(f"Prediction: {predictions[i]}")
    print("-" * 50)