In [None]:
# !pip install mltu
import os
from datetime import datetime
from mltu.configs import BaseModelConfigs
class ModelConfigs(BaseModelConfigs):
    def __init__(self):
        super().__init__()
        self.model_path = os.path.join("Models/05_sound_to_text", datetime.strftime(datetime.now(), "%Y%m%d"))
        self.frame_length = 256
        self.frame_step = 160
        self.fft_length = 384

        self.vocab = "abcdefghijklmnopqrstuvwxyz'?! "
        self.input_shape = None
        self.max_text_length = None
        self.max_spectrogram_length = None

        self.batch_size = 8
        self.learning_rate = 0.0005
        ############################
        self.train_epochs = 50
        ############################
        self.train_workers = 20

In [2]:
import tensorflow as tf
from keras import layers
from keras.models import Model
from mltu.tensorflow.model_utils import residual_block, activation_layer
def train_model(input_dim, output_dim, activation="leaky_relu", dropout=0.2):
    inputs = layers.Input(shape=input_dim, name="input", dtype=tf.float32)
    # expand dims to add channel dimension
    input = layers.Lambda(lambda x: tf.expand_dims(x, axis=-1))(inputs)
    # Convolution layer 1
    x = layers.Conv2D(filters=32, kernel_size=[11, 41], strides=[2, 2], padding="same", use_bias=False)(input)
    x = layers.BatchNormalization()(x)
    x = activation_layer(x, activation="leaky_relu")
    # Convolution layer 2
    x = layers.Conv2D(filters=32, kernel_size=[11, 21], strides=[1, 2], padding="same", use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = activation_layer(x, activation="leaky_relu")
    # Reshape the resulted volume to feed the RNNs layers
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
    # RNN layers
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Dropout(dropout)(x)

    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Dropout(dropout)(x)

    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Dropout(dropout)(x)

    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Dropout(dropout)(x)

    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)

    # Dense layer
    x = layers.Dense(256)(x)
    x = activation_layer(x, activation="leaky_relu")
    x = layers.Dropout(dropout)(x)
    # Classification layer
    output = layers.Dense(output_dim + 1, activation="softmax", dtype=tf.float32)(x)
    model = Model(inputs=inputs, outputs=output)
    return model

In [3]:
# !pip install tf2onnx
import tensorflow as tf
try: [tf.config.experimental.set_memory_growth(gpu, True) for gpu in tf.config.experimental.list_physical_devices("GPU")]
except: pass

import os
import csv
import tarfile
import pandas as pd
from tqdm import tqdm
from urllib.request import urlopen
from io import BytesIO

from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from mltu.preprocessors import WavReader

from mltu.tensorflow.dataProvider import DataProvider
from mltu.transformers import LabelIndexer, LabelPadding, SpectrogramPadding
from mltu.tensorflow.losses import CTCloss
from mltu.tensorflow.callbacks import Model2onnx, TrainLogger
from mltu.tensorflow.metrics import CERMetric, WERMetric


def download_and_unzip(url, extract_to="Datasets", chunk_size=1024*1024):
    http_response = urlopen(url)

    data = b""
    iterations = http_response.length // chunk_size + 1
    for _ in tqdm(range(iterations)):
        data += http_response.read(chunk_size)

    tarFile = tarfile.open(fileobj=BytesIO(data))
    tarFile.extractall(path=extract_to)
    tarFile.close()


dataset_path = os.path.join("Datasets", "nptel-pure")
if not os.path.exists(dataset_path):
    download_and_unzip("https://github.com/AI4Bharat/NPTEL2020-Indian-English-Speech-Dataset/releases/download/v0.1/nptel-pure-set.tar.gz", extract_to="Datasets")

dataset_path = "Datasets/nptel-pure"
metadata_path = dataset_path + "/metadata.csv"
wavs_path = dataset_path + "/wav/"

# Read metadata file and parse it
files = os.listdir('Datasets/nptel-pure/original_txt')
with open('Datasets/nptel-pure/metadata.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    writer.writerow(['file_name', 'normalized_transcription'])
    for file in files:
        with open(os.path.join('Datasets/nptel-pure/original_txt', file), 'r') as txt_file:
            text = txt_file.read()
        writer.writerow([file.replace(".txt",""), text])
metadata_df = pd.read_csv(metadata_path, sep=",", header=0, quoting=3)

# structure the dataset where each row is a list of [wav_file_path, sound transcription]
dataset = [[f"Datasets/nptel-pure/wav/{file}.wav", label.lower()] for file, label in metadata_df.values.tolist()]
# Create a ModelConfigs object to store model configurations
configs = ModelConfigs()

max_text_length, max_spectrogram_length = 0, 0
for file_path, label in tqdm(dataset):
    spectrogram = WavReader.get_spectrogram(file_path, frame_length=configs.frame_length, frame_step=configs.frame_step, fft_length=configs.fft_length)
    valid_label = [c for c in label if c in configs.vocab]
    max_text_length = max(max_text_length, len(valid_label))
    max_spectrogram_length = max(max_spectrogram_length, spectrogram.shape[0])
    configs.input_shape = [max_spectrogram_length, spectrogram.shape[1]]

configs.max_spectrogram_length = max_spectrogram_length
configs.max_text_length = max_text_length
configs.save()
# Create a data provider for the dataset
data_provider = DataProvider(
    dataset=dataset,
    skip_validation=True,
    batch_size=configs.batch_size,
    data_preprocessors=[
        WavReader(frame_length=configs.frame_length, frame_step=configs.frame_step, fft_length=configs.fft_length),
        ],
    transformers=[
        SpectrogramPadding(max_spectrogram_length=configs.max_spectrogram_length, padding_value=0),
        LabelIndexer(configs.vocab),
        LabelPadding(max_word_length=configs.max_text_length, padding_value=len(configs.vocab)),
        ],
)

# Split the dataset into training and validation sets
train_data_provider, val_data_provider = data_provider.split(split = 0.9)
# Creating TensorFlow model architecture
model = train_model(
    input_dim = configs.input_shape,
    output_dim = len(configs.vocab),
    dropout=0.5
)
# Compile the model and print summary
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=configs.learning_rate),
    loss=CTCloss(),
    metrics=[
        CERMetric(vocabulary=configs.vocab),
        WERMetric(vocabulary=configs.vocab)
        ],
    run_eagerly=False
)
model.summary(line_length=110)
# Define callbacks
earlystopper = EarlyStopping(monitor="val_CER", patience=20, verbose=1, mode="min")
checkpoint = ModelCheckpoint(f"{configs.model_path}/model.h5", monitor="val_CER", verbose=1, save_best_only=True, mode="min")
trainLogger = TrainLogger(configs.model_path)
tb_callback = TensorBoard(f"{configs.model_path}/logs", update_freq=1)
reduceLROnPlat = ReduceLROnPlateau(monitor="val_CER", factor=0.8, min_delta=1e-10, patience=5, verbose=1, mode="auto")
model2onnx = Model2onnx(f"{configs.model_path}/model.h5")
# Train the model
model.fit(
    train_data_provider,
    validation_data=val_data_provider,
    epochs=configs.train_epochs,
    callbacks=[earlystopper, checkpoint, trainLogger, reduceLROnPlat, tb_callback, model2onnx],
    workers=configs.train_workers
)
# Save training and validation datasets as csv files
train_data_provider.to_csv(os.path.join(configs.model_path, "train.csv"))
val_data_provider.to_csv(os.path.join(configs.model_path, "val.csv"))



  0%|          | 0/1000 [00:00<?, ?it/s]

librosa version: 0.10.1


100%|██████████| 1000/1000 [00:13<00:00, 73.39it/s]
INFO:DataProvider:Skipping Dataset validation...


Model: "model"
______________________________________________________________________________________________________________
 Layer (type)                                    Output Shape                                Param #          
 input (InputLayer)                              [(None, 2720, 193)]                         0                
                                                                                                              
 lambda (Lambda)                                 (None, 2720, 193, 1)                        0                
                                                                                                              
 conv2d (Conv2D)                                 (None, 1360, 97, 32)                        14432            
                                                                                                              
 batch_normalization (BatchNormalization)        (None, 1360, 97, 32)                        128 

  saving_api.save_model(


Epoch 2/50
Epoch 2: val_CER improved from 0.98817 to 0.98817, saving model to Models/05_sound_to_text/20240306/model.h5
Epoch 3/50
Epoch 3: val_CER improved from 0.98817 to 0.98165, saving model to Models/05_sound_to_text/20240306/model.h5
Epoch 4/50
Epoch 4: val_CER did not improve from 0.98165
Epoch 5/50
Epoch 5: val_CER did not improve from 0.98165
Epoch 6/50
Epoch 6: val_CER did not improve from 0.98165
Epoch 7/50
Epoch 7: val_CER did not improve from 0.98165
Epoch 8/50
Epoch 8: val_CER did not improve from 0.98165

Epoch 8: ReduceLROnPlateau reducing learning rate to 0.0004000000189989805.
Epoch 9/50
Epoch 9: val_CER did not improve from 0.98165
Epoch 10/50
Epoch 10: val_CER did not improve from 0.98165
Epoch 11/50
Epoch 11: val_CER did not improve from 0.98165
Epoch 12/50
Epoch 12: val_CER did not improve from 0.98165
Epoch 13/50
Epoch 13: val_CER did not improve from 0.98165

Epoch 13: ReduceLROnPlateau reducing learning rate to 0.00032000001519918444.
Epoch 14/50
Epoch 14: val_

In [None]:
import typing
import numpy as np
from mltu.inferenceModel import OnnxInferenceModel
from mltu.preprocessors import WavReader
from mltu.utils.text_utils import ctc_decoder, get_cer, get_wer

class WavToTextModel(OnnxInferenceModel):
    def __init__(self, char_list: typing.Union[str, list], *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.char_list = char_list
    def predict(self, data: np.ndarray):
        data_pred = np.expand_dims(data, axis=0)
        preds = self.model.run(self.output_names, {self.input_names[0]: data_pred})[0]
        text = ctc_decoder(preds, self.char_list)[0]
        return text

if __name__ == "__main__":
    import pandas as pd
    from tqdm import tqdm
    from mltu.configs import BaseModelConfigs
    configs = BaseModelConfigs.load("Models/05_sound_to_text/"+datetime.strftime(datetime.now(), "%Y%m%d")+"/configs.yaml")
    model = WavToTextModel(model_path=configs.model_path, char_list=configs.vocab, force_cpu=True)
    df = pd.read_csv("Models/05_sound_to_text/"+datetime.strftime(datetime.now(), "%Y%m%d")+"/val.csv").values.tolist()

    accum_cer, accum_wer = [], []
    for wav_path, label in tqdm(df):
        wav_path = wav_path.replace("\\", "/")
        spectrogram = WavReader.get_spectrogram(wav_path, frame_length=configs.frame_length, frame_step=configs.frame_step, fft_length=configs.fft_length)
        WavReader.plot_raw_audio(wav_path, label)
        padded_spectrogram = np.pad(spectrogram, ((0, configs.max_spectrogram_length - spectrogram.shape[0]),(0,0)), mode="constant", constant_values=0)
        WavReader.plot_spectrogram(spectrogram, label)
        text = model.predict(padded_spectrogram)
        true_label = "".join([l for l in label.lower() if l in configs.vocab])

        cer = get_cer(text, true_label)
        wer = get_wer(text, true_label)

        accum_cer.append(cer)
        accum_wer.append(wer)

    print(f"Average CER: {np.average(accum_cer)}, Average WER: {np.average(accum_wer)}")