# Speech Recognition


## Imports

In [1]:
import cv2
import pickle

import librosa
import warnings
import numpy as np
import pandas as pd
import seaborn as sns
import plotly.io as pio

import plotly.express as px
import matplotlib.pyplot as plt
from IPython.display import Image
import plotly.graph_objects as go
from keras.utils.vis_utils import plot_model

from keras import backend as K
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.optimizers import SGD, Adam, RMSprop
from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint, TensorBoard

In [None]:
sns.set()
plt.style.use('ggplot')
%matplotlib inline
warnings.filterwarnings("ignore")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
pd.set_option("expand_frame_repr", False)
pd.set_option('display.float_format', '{:.2f}'.format)
sys.path.append(os.path.abspath(os.path.join('../scripts')))

In [None]:
from clean_audio import CleanAudio
from file_handler import FileHandler
from audio_vis import AudioVis
from log_melgram_layer import LogMelgramLayer

In [None]:
clean_audio = CleanAudio()
file_handler = FileHandler()
audio_vis = AudioVis()

## Load Data


In [None]:
PATH_TRAIN_WAV = "../data/AMHARIC_CLEAN/train/wav/"
PATH_TEST_WAV = "../data/AMHARIC_CLEAN/test/wav/"

In [None]:
data = pd.read_csv(r'../data/clean_data.csv')
data.head(5)

In [None]:
def get_paths(df):
  paths = []
  for col, row in df.iterrows():
    if(row["category"] == "Train"):
      paths.append(PATH_TRAIN_WAV + row["key"] + ".npy")
    else:
      paths.append(PATH_TEST_WAV + row["key"] + ".npy")

  return paths

In [None]:
data["path"] = get_paths(data)
data.sort_values(by=["duration"], inplace=True)
data.reset_index(drop=True, inplace=True)
data = data[["text", "char_length", "duration", "path"]]
data[["text", "char_length", "duration"]]

## Tokenizer

In [None]:
class TokenizerWrap(Tokenizer):
    def __init__(self, texts, padding, len_sent, filters, reverse=False):
        Tokenizer.__init__(self, filters=filters, char_level=True)

        self.len_sent = len_sent
        self.fit_on_texts(texts)

        self.index_to_word = dict(zip(self.word_index.values(), self.word_index.keys()))
        self.tokens = self.texts_to_sequences(texts)

        if reverse:
            self.tokens = [list(reversed(x)) for x in self.tokens]
            truncating = 'pre'
        else:
            truncating = 'post'

        self.tokens_padded = pad_sequences(self.tokens,
                                           maxlen=len_sent,
                                           padding=padding,
                                           truncating=truncating
                                           )

    def token_to_word(self, token):
        word = " " if token == 0 else self.index_to_word[token]
        return word

    def tokens_to_string(self, tokens):
        words = [self.index_to_word[token] for token in tokens if token != 0]
        text = "".join(words)
        return text

    def text_to_tokens(self, text, reverse=False, padding=False):
        tokens = self.texts_to_sequences([text])
        tokens = np.array(tokens)

        if reverse:
            tokens = np.flip(tokens, axis=1)
            truncating = 'pre'
        else:
            truncating = 'post'

        if padding:
            tokens = pad_sequences(tokens,
                                   maxlen=self.len_sent,
                                   padding=truncating,
                                   truncating=truncating
                                   )
        return tokens


In [None]:
MAX_SENTENCE_LENGTH = 125       # The longest sentence in the data is around 150 chars
filters = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n።”፤፦’፥'  # { ።”፤፦’፥' } unique for amharic

In [None]:
%%time
tokenizer = TokenizerWrap(texts=data.text,
                          padding='post',
                          reverse=False,
                          len_sent=MAX_SENTENCE_LENGTH,
                          filters=filters)

In [None]:
print(len(tokenizer.word_index))
print(tokenizer.word_index)

In [None]:
data.text[1]

In [None]:
sample = tokenizer.text_to_tokens(data.text[1], padding=True)
sample

In [None]:
print(tokenizer.tokens_to_string(sample[0]))

save token

In [None]:
with open('../models/char_tokenizer_amharic.pickle', 'wb') as handle:
    pickle.dump(tokenizer, handle)

## Data Augmentation

In [None]:
class AudioAugment():
  def __init__(self):
    pass
   
  def change_speed(self, data):
    speed_rate = np.random.uniform(0.8, 1.2)
    wav_speed_tune = cv2.resize(data, (1, int(len(data) * speed_rate))).squeeze()

    if len(wav_speed_tune) < len(data):
      padding = len(data) - len(wav_speed_tune)
      offset = padding // 2
      wav_speed_tune = np.pad(wav_speed_tune, (offset, padding - offset), "constant")
    else:
      wav_speed_tune = wav_speed_tune[:len(data)]

    return wav_speed_tune

  def add_noise(self, data, noise_levels=(0, 0.3)):
    noise_level = np.random.uniform(*noise_levels)
    noise = np.random.randn(len(data))
    data_noise = data + noise_level * noise

    return data_noise

  def change_pitch(self, data):
    n_steps = np.random.randint(-1, 2)
    return librosa.effects.pitch_shift(data, 8000, n_steps)


## DataGenerator


In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data, sr, batch_size=32, shuffle=True):
        self.data = data      # Data Augmentation
        self.sr = sr
        self.batch_size = batch_size / 4      # Data Augmentation
        self.audio_augment = AudioAugment()
        self.len = int(np.floor(data.shape[0]/ self.batch_size))
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return self.len

    def __data_generation(self, batch_data):

        longest_audio = int(batch_data["duration"].max() * self.sr)
        longest_trans = int(batch_data["char_length"].max())

        X_audio = np.zeros([int(self.batch_size * 4), longest_audio], dtype="float32")
        y_trans = np.ones([int(self.batch_size * 4), longest_trans], dtype="int64")
        X_length = np.ones([int(self.batch_size * 4), 1], dtype="int64") * longest_audio
        y_length = np.zeros([int(self.batch_size * 4), 1], dtype="int64")

        i = 0
        for col, row in batch_data.iterrows():

            # Add transcription
            transcription = tf.convert_to_tensor(tokenizer.text_to_tokens(row["text"], padding=True)[:, :longest_trans])
            y_trans[i,] = y_trans[i + 1,] = y_trans[i + 2,] = y_trans[i + 3,] = transcription
            y_length[i] = y_length[i + 1] = y_length[i + 2] = y_length[i + 3] = row["char_length"]

            # Add original Audio
            wav = np.load(row["path"])
            audio_length = int(row["duration"] * self.sr)
            X_audio[i, :audio_length] = wav
            i += 1

            # Add noise
            wav_ = self.audio_augment.add_noise(wav)
            X_audio[i, :audio_length] = wav_
            i += 1

            # Add noise
            wav_ = self.audio_augment.add_noise(wav)
            X_audio[i, :audio_length] = wav_
            i += 1

            # # Pitch change
            # wav_ = self.audio_augment.change_pitch(wav)
            # X_audio[i, :audio_length] = wav_
            # i+=1

            # Speed change
            wav_ = self.audio_augment.change_speed(wav)
            X_audio[i, :audio_length] = wav_
            i += 1

        outputs = {'ctc': tf.zeros(([int(self.batch_size * 4)]), dtype=tf.dtypes.float32)}
        inputs = {
            'the_input': tf.convert_to_tensor(X_audio),
            'the_labels': tf.convert_to_tensor(y_trans),
            'input_length': tf.convert_to_tensor(X_length, dtype="float32"),
            'label_length': tf.convert_to_tensor(y_length)
        }
        return (inputs, outputs)

    def on_epoch_end(self):

        self.indexes = np.arange(self.len * self.batch_size)

        if self.shuffle == True:

            self.indexes = self.indexes.reshape(int(self.len), int(self.batch_size))
            np.random.shuffle(self.indexes)

            for i in range(self.len):
                np.random.shuffle(self.indexes[i])

            self.indexes = self.indexes.reshape(int(self.len * self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[int(index * self.batch_size):int((index + 1) * self.batch_size)]
        batch_data = self.data.iloc[indexes]
        return self.__data_generation(batch_data)


In [None]:
sr = 8000
batch_size = 128
sample_generator = DataGenerator(data, sr, batch_size, False)

In [None]:
sample_generator.__len__()

In [None]:
%%time
sample_data = sample_generator.__getitem__(261)

In [None]:
sample_audios = sample_data[0]["the_input"]
sample_labels = sample_data[0]["the_labels"]
sample_audios_length = sample_data[0]["input_length"]
sample_labels_length = sample_data[0]["label_length"]

In [None]:
print(sample_audios.shape)
print(sample_labels.shape)
print(sample_audios_length.shape)
print(sample_labels_length.shape)

In [None]:
sample_labels[0]


In [None]:
print(tokenizer.tokens_to_string(sample_labels[0].numpy()))
audio_vis.play_audio(sample_audios[0], sr)

In [None]:
hkfhghj

## Log Melgram


In [None]:
def preprocessin_model(fft_size, hop_size, n_mels, mfcc=False):

    input_data = Input(name='input', shape=(None,), dtype="float32")
    spec = LogMelgramLayer(
        num_fft=fft_size,
        hop_length=hop_size,
        num_mels=n_mels,
        sample_rate=sr,
        f_min=0.0,
        f_max=sr // 2,
        eps=1e-6)(input_data)
    x = BatchNormalization(axis=2)(spec)
    # x = Permute((2, 1, 3), name='permute', dtype="float32")(x)
    model = Model(inputs=input_data, outputs=x, name="preprocessin_model")

    return model

### hop_size and  n_mels choise


In [None]:
def compare(i, fft_size, n_mels_list, hop_size_list, sr=16000):

    sample_data = sample_generator.__getitem__(i)
    sample_audios = sample_data[0]["the_input"]
    sample_labels = sample_data[0]["the_labels"]

    nrows, ncols = len(hop_size_list), len(n_mels_list),
    plt.figure(figsize=(4 * nrows, 4 * ncols))

    for i in range(nrows):
        n_mels = n_mels_list[i]

        for y in range(ncols):
            hop_size = hop_size_list[y]

            plt.subplot(nrows, ncols, i * ncols + y + 1)

            model = preprocessin_model(fft_size, hop_size, n_mels)
            pred = model.predict(sample_audios)

            pred = pred[0, :, :, 0]
            librosa.display.specshow(pred.T, sr=sr, hop_length=hop_size, cmap="jet")
            plt.title('hop: {}, n_mels: {}, shape: {}'.format(hop_size, n_mels, pred.shape), fontsize=11)

    print("The longest sentence in this batch has {} characters".format(sample_labels.shape[1]))

    plt.tight_layout()
    plt.show()

In [None]:
fft_size = 256
n_mels_list = [256, 160, 128, 64]
hop_size_list = [256, 160, 128, 64]
compare(260, fft_size, n_mels_list, hop_size_list, sr)

## Final Choice

In [None]:
fft_size = 256
hop_size = 128
n_mels = 128


In [None]:
melspecModel = preprocessin_model(fft_size, hop_size, n_mels)
melspecModel.summary()

In [None]:
def vis(j=5):
    for i in range(0, 220, 220 // j):
        sample_data = sample_generator.__getitem__(i)
        sample_audios = sample_data[0]["the_input"]
        sample_labels = sample_data[0]["the_labels"]
        sample_labels_length = sample_data[0]["input_length"]

        melspec = melspecModel.predict(sample_audios)

        print('\n')
        print('-' * 100)

        print("The longest sentence in this batch has {} characters".format(sample_labels.shape[1]))
        print("We have to multiply the longest sentence by {} to reach length of Time steps".format(
            np.log2([melspec.shape[1] / sample_labels.shape[1]])[0]))

        print('-' * 100)
        print('\n')

        fig, ax = plt.subplots(figsize=(16, 4))
        pred = melspec[0, :, :, 0]
        vis_model(pred, "Mel-frequency spectrogram")

In [None]:
def vis_model(pred, title, cmap="jet"):
    librosa.display.specshow(pred.T, sr=sr, y_axis='mel', x_axis='time', hop_length=hop_size, cmap=cmap)
    plt.title('{}. Shape = {}'.format(title, pred.shape))
    plt.colorbar(format='%+2.0f dB')
    plt.tight_layout()
    plt.show()

In [None]:
vis(5)

## CTC

In [None]:
def ctc_lambda_func(args):
    y_pred, labels, input_length, label_length = args
    return K.ctc_batch_cost(labels, y_pred, input_length, label_length)

In [None]:
def input_lengths_lambda_func(args):
    input_length = args
    return tf.cast(tf.math.ceil(input_length / hop_size), dtype="float32")

In [None]:
x = input_lengths_lambda_func(sample_audios_length[1]).numpy()
x

In [None]:
def add_ctc_loss(model_builder):
    the_labels = Input(name='the_labels', shape=(None,), dtype='float32')
    input_lengths = Input(name='input_length', shape=(1,), dtype='float32')
    label_lengths = Input(name='label_length', shape=(1,), dtype='float32')

    input_lengths2 = Lambda(input_lengths_lambda_func)(input_lengths)
    if model_builder.output_length:
        output_lengths = Lambda(model_builder.output_length)(input_lengths2)
    else:
        output_lengths = input_lengths2

    # CTC loss is implemented in a lambda layer
    loss_out = Lambda(ctc_lambda_func, output_shape=(1,), name='ctc')(
        [model_builder.output, the_labels, output_lengths, label_lengths])
    model = Model(inputs=[model_builder.input, the_labels, input_lengths, label_lengths], outputs=loss_out)
    return model

## Main Model

In [None]:
def simple_rnn_model(input_dim, output_dim=224):

    input_data = Input(name='the_input', shape=(None, input_dim))

    simp_rnn = GRU(output_dim, return_sequences=True, implementation=2, name='rnn')(input_data)

    y_pred = Activation('softmax', name='softmax')(simp_rnn)

    model = Model(inputs=input_data, outputs=y_pred, name="simple_rnn_model")

    model.output_length = lambda x: x

    return model

In [None]:
simple_rnn_model = simple_rnn_model(128, 224)
plot_model(simple_rnn_model, to_file='../img/simple_rnn_model.png')
simple_rnn_model.summary()

## Model Builder


In [None]:
def build_model(output_dim, custom_model, mfcc=False, calc=None):

    input_audios = Input(name='the_input', shape=(None,))
    pre = melspecModel(input_audios)
    pre = tf.squeeze(pre, [3])

    y_pred = custom_model(pre)
    model = Model(inputs=input_audios, outputs=y_pred, name="model_builder")
    model.output_length = calc

    return model

In [None]:
model0 = build_model(len(tokenizer.word_index) + 2, simple_rnn_model)
model0.summary()


## Train

In [None]:
# Parameters
batch_size = 32
shuffle = True

In [None]:
split_point = int(data.shape[0] * .8)
train_data = data[:split_point]
val_data = data[split_point:]

In [None]:
train_gen = DataGenerator(train_data, sr, batch_size, False)
val_gen = DataGenerator(val_data, sr, batch_size, False)

In [None]:
def train(model_builder,
          model_name,
          epochs=20,
          verbose=1,
          optimizer=SGD(lr=0.002, decay=1e-6, momentum=0.9, nesterov=True, clipnorm=5),
          ):

    model = add_ctc_loss(model_builder)

    # optimizer = Adam(lr=.01, clipnorm = 1, decay=1e-6)
    model.compile(loss={'ctc': lambda y_true, y_pred: y_pred}, optimizer=optimizer)
    print(model.summary())

    # make results/ directory, if necessary
    if not os.path.exists('../models'):
        os.makedirs('../models')

    # add checkpointer
    checkpointer = ModelCheckpoint(filepath="../models/" + model_name + '.h5', verbose=0)
    early_stopping = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

    # train the model
    hist = model.fit_generator(generator=train_gen,
                               validation_data=val_gen,
                               epochs=epochs,
                               callbacks=[checkpointer, early_stopping],
                               verbose=verbose,
                               use_multiprocessing=False)

    # save model loss
    with open("../models/" + model_name + '.pickle', 'wb') as f:
        pickle.dump(hist.history, f)


In [None]:
train(model_builder=model0, model_name="simple_rnn_model", epochs=1)

In [None]:
simple_rnn_model = simple_rnn_model(sr, 12, fft_size, hop_size, n_mels)
plot_model(simple_rnn_model, to_file='models/simple_rnn_model.png')
simple_rnn_model.summary()