# Automatic Speech Recognition for Amharic Language 

In [None]:
"""
Title: Automatic Amharic Speech Recognition using  Deep Speech and CTC
Authors:
Date created: 
Last modified: 
Description: Training a CTC-based model for automatic speech recognition.
"""

## 1. Import Libraries 

In [None]:
from IPython.display import Image
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer
import IPython.display as ipd
from etnltk.lang.am import normalize
from sklearn.utils import shuffle
import pickle
from datetime import datetime

## 2. ASR Architecture 

In [None]:
Image(filename =r'../Data/ASR_arch.JPG')

## 3. Load Dataset

In [None]:
"""
## Load Amharic Dataset

The dataset contains 13,100 audio files as `wav` files in the `/wavs/` folder.
The label (transcript) for each audio file is a string
given in the `metadata.csv` file. The fields are:

- **ID**: this is the name of the corresponding .wav file
- **Transcription**: words spoken by the reader (UTF-8)
- **Normalized transcription**: transcription with numbers,
ordinals, and monetary units expanded into full words (UTF-8).

For this demo we will use on the "Normalized transcription" field.

Each audio file is a single-channel 16-bit PCM WAV with a sample rate of 22,050 Hz.
"""

data_path = '../Data'
wavs_path = data_path + "/wav/"
metadata_path = data_path + "/metadata_amh.csv"

# Read metadata file and parse it
metadata_df = pd.read_csv(metadata_path)
metadata_df.columns = ["file_name", "transcription"]
metadata_df.head(3)

## 4.Audio Preprocessing

In [None]:
def Preprocess_single_sample(wav_file):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(wavs_path + wav_file)    
    # 2. Decode the wav file
    audio, sample_rate = tf.audio.decode_wav(file,desired_channels=1)
    fs = sample_rate.numpy()
    audio = tf.squeeze(audio, axis=-1)
    return audio, fs

### 4.1. Audio chunksize problems handling(special case)


In [None]:
# samples are  taken out from index = 2539 to 3139 due to waveread error (chunk size  too short)
metadata_new_df = pd.concat([metadata_df[0:2539],metadata_df[3140:len(metadata_df)]], axis = 0).reset_index(drop=True)
metadata_new_df.head(3)

### 4.2.Check sample rate of all samples 

In [None]:
# Extract sample rate and duration of each audio sample in a given dataset
def get_samplerate_duration(df): 
    sample_rate_ls = []
    duration_ls = []
    for i in range(len(df)):    
        audio, sample_rate = Preprocess_single_sample(df['file_name'][i])
        sample_rate_ls.append(sample_rate)
        duration_ls.append(len(audio)/sample_rate)        
    return sample_rate_ls,duration_ls

In [None]:
#check sample rates of all samples
samplerate_total,duration_total = get_samplerate_duration(metadata_new_df)
plt.plot(samplerate_total)

In [None]:
#Check Audio vs Transcription mapping
index = 100 # randomly choosen number
print(metadata_new_df['transcription'][index])
ipd.Audio(wavs_path + metadata_new_df['file_name'][index])

### 4.3. Dataset duration analysis

In [None]:
#The list of durations for each samples 
def show_duration_distribution(dr):
    plt.hist(dr, bins = 10)
    plt.show()  
    # Note :
    #The grapth shows sampels have different duration 
    #Cutting audio to fixed duration will cause trascription miss match
    #Total Dataset duration
    print("dataset duration is :",sum(dr)/3600 ,"Hours")

In [None]:
show_duration_distribution(duration_total)

## 5.Dataset Train and Validation Split

In [None]:
# shuffle and split train test dataset
metadata_new_df = shuffle(metadata_new_df).reset_index(drop=True)
split = int(len(metadata_new_df) * 0.90)
df_train = metadata_new_df[:split]
df_val = metadata_new_df[split:].reset_index(drop=True)
print(f"Size of the training set: {len(df_train)}")
print(f"Size of the training set: {len(df_val)}")

In [None]:
fs_tr , t_tr = get_samplerate_duration(df_train)
show_duration_distribution(t_tr)

In [None]:
fs_ts , t_ts = get_samplerate_duration(df_val)
show_duration_distribution(t_ts)

In [None]:
## Check Audio vs Transcription mapping  
indextr = 100 # randomly choosen number
print(df_train['transcription'][indextr])
ipd.Audio(wavs_path + df_train['file_name'][indextr])

In [None]:
## Check Audio vs Transcription mapping
indexts = 100 # randomly choosen number
print(df_val['transcription'][indexts])
ipd.Audio(wavs_path + df_val['file_name'][indexts])

## 6. Lable and Data Preprocessing

In [None]:
Geez_char = """
ሀ ሁ ሂ ሄ ህ ሆ
ለ ሉ ሊ ላ ሌ ል ሎ ሏ
መ ሙ ሚ ማ ሜ ም ሞ ሟ
ረ ሩ ሪ ራ ሬ ር ሮ ሯ
ሰ ሱ ሲ ሳ ሴ ስ ሶ ሷ
ሸ ሹ ሺ ሻ ሼ ሽ ሾ ሿ
ቀ ቁ ቂ ቃ ቄ ቅ ቆ ቋ
በ ቡ ቢ ባ ቤ ብ ቦ ቧ
ቨ ቩ ቪ ቫ ቬ ቭ ቮ ቯ
ተ ቱ ቲ ታ ቴ ት ቶ ቷ
ቸ ቹ ቺ ቻ ቼ ች ቾ ቿ
ኋ
ነ ኑ ኒ ና ኔ ን ኖ ኗ
ኘ ኙ ኚ ኛ ኜ ኝ ኞ ኟ
አ ኡ ኢ ኤ እ ኦ ኧ
ከ ኩ ኪ ካ ኬ ክ ኮ ኳ
ወ ዉ ዊ ዋ ዌ ው ዎ
ዘ ዙ ዚ ዛ ዜ ዝ ዞ ዟ
ዠ ዡ ዢ ዣ ዤ ዥ ዦ ዧ
የ ዩ ዪ ያ ዬ ይ ዮ
ደ ዱ ዲ ዳ ዴ ድ ዶ ዷ
ጀ ጁ ጂ ጃ ጄ ጅ ጆ ጇ
ገ ጉ ጊ ጋ ጌ ግ ጐ ጓ ጔ
ጠ ጡ ጢ ጣ ጤ ጥ ጦ ጧ
ጨ ጩ ጪ ጫ ጬ ጭ ጮ ጯ
ጰ ጱ ጲ ጳ ጴ ጵ ጶ ጷ
ፀ ፁ ፂ ፃ ፄ ፅ ፆ ፇ
ፈ ፉ ፊ ፋ ፌ ፍ ፎ ፏ
ፐ ፑ ፒ ፓ ፔ ፕ ፖ
። ፣ ? !
""".split()

In [None]:
characters = [x for x in Geez_char]
# Mapping characters to integers
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token=" ")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token=" ", invert=True
)

print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
def am_normalize_text(df):    
    normlz_df = pd.DataFrame(columns=['normalized_transcription']) 
    for i in range(len(df)):
        normlz_df.loc[i] = normalize(df.loc[i])
    return normlz_df

In [None]:
"""
Next, we create the function that describes the transformation that we apply to each
element of our dataset.
"""

# An integer scalar Tensor. The window length in samples.
frame_length = 256
# An integer scalar Tensor. The number of samples to step.
frame_step = 160
# An integer scalar Tensor. The size of the FFT to apply.
# If not provided, uses the smallest power of 2 enclosing frame_length.
fft_length = 384


def encode_single_sample(wav_file, label):
    ###########################################
    ##  Process the Audio
    ##########################################
    # 1. Read wav file
    file = tf.io.read_file(wavs_path + wav_file)
    # 2. Decode the wav file
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # 3. Change type to float
    audio = tf.cast(audio, tf.float32)
    # 4. Get the spectrogram
    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    # 5. We only need the magnitude, which can be derived by applying tf.abs
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    # 6. normalisation
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    ###########################################
    ##  Process the label
    ##########################################
    # 7.Normalize Amharic text
    #label = normalize(label)
    # 8. Split the label
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")  #x.decode("UTF-8") to decode the unicode
    # 9. Map the characters in label to numbers
    label = char_to_num(label)
    # 10. Return a dict as our model is expecting two inputs
    return spectrogram, label

## 7. Tensorflow Dataset split

In [None]:
df_train['normalized_transcription'] = am_normalize_text(df_train["transcription"])
df_val['normalized_transcription'] = am_normalize_text(df_val["transcription"])

In [None]:
df_val

In [None]:
df_train

In [None]:
"""
## Creating `Dataset` objects

We create a `tf.data.Dataset` object that yields
the transformed elements, in the same order as they
appeared in the input.
"""

batch_size = 32
# Define the trainig dataset
train_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]), list(df_train["normalized_transcription"]))
)
train_dataset = (
    train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Define the validation dataset
validation_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]), list(df_val["normalized_transcription"]))
)
validation_dataset = (
    validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(batch_size)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)


### 8.Audio Visualization

In [None]:
"""
## Visualize the data

Let's visualize an example in our dataset, including the
audio clip, the spectrogram and the corresponding label.
"""

fig = plt.figure(figsize=(8, 5))
for batch in train_dataset.take(1):
    spectrogram = batch[0][0].numpy()
    spectrogram = np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
    label = batch[1][0]
    # Spectrogram
    label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
    ax = plt.subplot(2, 1, 1)
    ax.imshow(spectrogram, vmax=1)
    #ax.set_title(label)
    ax.axis("off")
    # Wav
    file = tf.io.read_file(wavs_path + list(df_train["file_name"])[0])
    audio, _ = tf.audio.decode_wav(file)
    audio = audio.numpy()
    ax = plt.subplot(2, 1, 2)
    plt.plot(audio)
    ax.set_title("Signal Wave")
    ax.set_xlim(0, len(audio))
    display.display(display.Audio(np.transpose(audio), rate=16000))
plt.show()

## 9. Model

### 9.1 CTC loss Function

In [None]:
"""
## Model

We first define the CTC Loss function.
"""


def CTCLoss(y_true, y_pred):
    # Compute the training-time loss value
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

### 9.2 DeepSpeech2 Architecture

In [None]:
"""
We now define our model. We will define a model similar to
[DeepSpeech2](https://nvidia.github.io/OpenSeq2Seq/html/speech-recognition/deepspeech2.html).
"""


def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
    """Model similar to DeepSpeech2."""
    # Model's input
    input_spectrogram = layers.Input((None, input_dim), name="input")
    # Expand the dimension to use 2D CNN.
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)
    # Convolution layer 1
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 41],
        strides=[2, 2],
        padding="same",
        use_bias=False,
        name="conv_1",
    )(x)
    x = layers.BatchNormalization(name="conv_1_bn")(x)
    x = layers.ReLU(name="conv_1_relu")(x)
    # Convolution layer 2
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 21],
        strides=[1, 2],
        padding="same",
        use_bias=False,
        name="conv_2",
    )(x)
    x = layers.BatchNormalization(name="conv_2_bn")(x)
    x = layers.ReLU(name="conv_2_relu")(x)
    # Reshape the resulted volume to feed the RNNs layers
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
    # RNN layers
    for i in range(1, rnn_layers + 1):
        recurrent = layers.GRU(
            units=rnn_units,
            activation="tanh",
            recurrent_activation="sigmoid",
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}",
        )
        x = layers.Bidirectional(
            recurrent, name=f"bidirectional_{i}", merge_mode="concat"
        )(x)
        if i < rnn_layers:
            x = layers.Dropout(rate=0.5)(x)
    # Dense layer
    x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
    x = layers.ReLU(name="dense_1_relu")(x)
    x = layers.Dropout(rate=0.5)(x)
    # Classification layer
    output = layers.Dense(units=output_dim + 1, activation="softmax")(x)
    # Model
    model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
    # Optimizer
    opt = keras.optimizers.Adam(learning_rate=1e-4)
    # Compile the model and return
    model.compile(optimizer=opt, loss=CTCLoss)
    return model


# Get the model
model = build_model(
    input_dim=fft_length // 2 + 1,
    output_dim=char_to_num.vocabulary_size(),
    rnn_units=256,   #512
)
model.summary(line_length=110)

### 9.3 Training and Evaluating

In [None]:
"""
## Training and Evaluating
"""

# A utility function to decode the output of the network
def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    # Use greedy search. For complex tasks, you can use beam search
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
    # Iterate over the results and get back the text
    output_text = []
    for result in results:
        result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text.append(result)
    return output_text


# A callback class to output a few transcriptions during training
class CallbackEval(keras.callbacks.Callback):
    """Displays a batch of outputs after every epoch."""

    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset

    def on_epoch_end(self, epoch: int, logs=None):
        predictions = []
        targets = []
        for batch in self.dataset:
            X, y = batch
            batch_predictions = model.predict(X)
            batch_predictions = decode_batch_predictions(batch_predictions)
            predictions.extend(batch_predictions)
            for label in y:
                label = (
                    tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
                )
                targets.append(label)
        wer_score = wer(targets, predictions)
        print("-" * 100)
        print(f"Word Error Rate: {wer_score:.4f}")
        print("-" * 100)
        for i in np.random.randint(0, len(predictions), 2):
            print(f"Target    : {targets[i]}")
            print(f"Prediction: {predictions[i]}")
            print("-" * 100)


### 9.4 Start the training process

In [None]:
"""
Let's start the training process.
"""

# Define the number of epochs.
epochs = 100
# Callback function to check transcription on the val set.
validation_callback = CallbackEval(validation_dataset)
start_time = datetime.now()
# Train the model
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[validation_callback],
)
print('Duration: {}'.format(end_time - start_time))  
print('Done')

### 9.5 Check validation samples

In [None]:
"""
## Inference
"""

# Let's check results on more validation samples
predictions = []
targets = []
for batch in validation_dataset:
    X, y = batch
    batch_predictions = model.predict(X)
    batch_predictions = decode_batch_predictions(batch_predictions)
    predictions.extend(batch_predictions)
    for label in y:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Word Error Rate: {wer_score:.4f}")
print("-" * 100)
for i in np.random.randint(0, len(predictions), 5):
    print(f"Target    : {targets[i]}")
    print(f"Prediction: {predictions[i]}")
    print("-" * 100)



### 10.Save model to disk

In [None]:
# Save model to pickle file
with open('ASR_Amharic_epoch_100_(Aug_xx_22).pkl', 'wb') as files:
     pickle.dump(model,files)

### 11.Plot model result

In [None]:

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model accuracy')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.savefig('ASR_Amharic_epoch_100_(jul29_22).png')
plt.show()
%matplotlib inline