In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install jiwer

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer


In [None]:
#https://www.kaggle.com/datasets/awsaf49/ljspeech-sr16k-dataset/

In [None]:
#https://keithito.com/LJ-Speech-Dataset/

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import torchaudio.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from tqdm import tqdm

# Now defining a simple deep learning model for ASR
class ASRModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(ASRModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out)
        return out


# Defining the vocabulary (characters) based on the dataset's transcriptions

vocabulary = set()
for transcription in lj_dataset.metadata['Normalized Transcription']:
    vocabulary.update(transcription)

special_tokens = ['<PAD>', '<SOS>', '<EOS>']
vocabulary.update(special_tokens)
char_to_idx = {char: idx for idx, char in enumerate(sorted(vocabulary))}
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

output_size = len(vocabulary)

# Hyperparameters
input_size = 161
hidden_size = 256
output_size = len(alphabet)

# Initializing the ASR model
model = ASRModel(input_size, hidden_size, output_size)

# Defining the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


import os
import pandas as pd
import torchaudio
from torch.utils.data import Dataset

# Defining a function to load and preprocess the audio data
def load_and_preprocess_audio(audio_path, target_sample_rate=22050):
    waveform, sample_rate = torchaudio.load(audio_path)

    if sample_rate != target_sample_rate:
        resampler = torchaudio.transforms.Resample(
            orig_freq=sample_rate, new_freq=target_sample_rate
        )
        waveform = resampler(waveform)
    mfcc_transform = torchaudio.transforms.MFCC(
        sample_rate=target_sample_rate, n_mfcc=13
    )
    mfcc = mfcc_transform(waveform)
    return mfcc

# Creating a custom dataset class
class LJSpeechDataset(Dataset):
    def __init__(self, metadata_file, audio_dir, target_sample_rate=22050):
        self.metadata = pd.read_csv(metadata_file, delimiter='|')
        self.audio_dir = audio_dir
        self.target_sample_rate = target_sample_rate

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        audio_filename = os.path.join(self.audio_dir, self.metadata['ID'][idx])
        transcription = self.metadata['Normalized Transcription'][idx]

        # Loading and preprocessing the audio
        audio_data = load_and_preprocess_audio(audio_filename, self.target_sample_rate)

        return audio_data, transcription

metadata_file = '/content/drive/MyDrive/metadata.csv'
audio_dir = '/content/drive/MyDrive/audio/files'

# Creating an instance of the LJSpeechDataset
lj_dataset = LJSpeechDataset(metadata_file, audio_dir)

# Accessing data and transcriptions using dataset[index]
sample_audio, transcription = lj_dataset[0]
print(f"Transcription: {transcription}")
print(f"Audio Shape: {sample_audio.shape}")

# Creating a custom dataset class
class SpeechRecognitionDataset(Dataset):
    def __init__(self, metadata_file, transform=None):
        self.data, self.labels = load_dataset(metadata_file)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = {'audio': self.data[idx], 'label': self.labels[idx]}
        if self.transform:
            sample = self.transform(sample)
        return sample

# Splitting the dataset into training and validation sets
metadata_file = 'path_to_metadata_file.csv'  # Replace with the actual path
train_dataset, val_dataset = train_test_split(metadata_file, test_size=0.2)

# Creating data loaders for training and validation
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0

    for batch in tqdm(train_loader):
        inputs, labels = batch['audio'], batch['label']
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.view(-1, output_size), labels.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {total_loss / len(train_loader)}")

# Validating loop (evaluate the model)
model.eval()
total_correct = 0
total_samples = 0

with torch.no_grad():
    for batch in tqdm(val_loader):
        inputs, labels = batch['audio'], batch['label']
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 2)
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0) * labels.size(1)

accuracy = total_correct / total_samples
print(f"Validation Accuracy: {accuracy * 100:.2f}%")


In [None]:
data_url="https://www.kaggle.com/datasets/awsaf49/ljspeech-sr16k-dataset"
data_path = keras.utils.get_file("LJSpeech-1.1", data_url, untar=True)

In [None]:
wavs_path = data_path + "/wavs/"
metadata_path = data_path+"/metadata.csv"

In [None]:
metadata_df = pd.read_csv(metadata_path, sep="|", header=None, quoting=3)

In [None]:
metadata_df.tail(10)

In [None]:
metadata_df.head(10)

In [None]:
metadata_df.columns = ["file_name", "transcription", "normalized_transcription"]
metadata_df = metadata_df[["file_name", "transcription"]]
metadata_df = metadata_df.sample(frac=1).reset_index(drop=True)
metadata_df.head (3)

In [None]:
#splitl=int(len(metadata_df) * 0.30)
split = int(len(metadata_df) * 0.90)
df_train = metadata_df[:split]
df_val = metadata_df[split:]

print(f"Size of the training set: {len(df_train)}")
print(f"Size of the training set: {len(df_val)}")

In [None]:
# The set of characters accepted in the transcription.
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]

# Mapping characters to integers
char_to_num = keras. layers.StringLookup(vocabulary=characters, oov_token="")
# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)
print(
f"The vocabulary is: {char_to_num.get_vocabulary()} "
f"(size ={char_to_num.vocabulary_size()})"
)

In [None]:
frame_length = 256
frame_step= 160
fft_length = 384

def encode_single_sample(wav_file, label):
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    audio, _= tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    audio = tf.cast (audio, tf.float32)
    spectrogram = tf.signal.stft(audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length)
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram=(spectrogram I means) / (stddevs + 1e-10)
    label = tf.strings.lower(label)
    label = tf.strings.unicode_split (label, input_encoding="UTF-8")
    label = char_to_num(label)
    return spectrogram, label

#Creating data set object

In [None]:
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices ((list(df_train["file_name"]), list(df_train["normalized_transcription"])))
train_dataset = (
  train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
 .padded batch (batch_size)
 .prefetch(buffer_size=tf.data.AUTOTUNE) I
)
validation_dataset = tf.data.Dataset.from_tensor_slices (
     (list(df_val["file_name"]), list (df_val["normalized_transcription"]))
)
validation_dataset = (
  validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
 .padded_batch (batch_size)
 .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [None]:
fig = plt.figure(figsize=(8, 5))
for batch in train_dataset.take(1):
    spectrogram = batch[0][0].numpy()
    spectrogram = np.array([np.trim_zeros(x) for x in np.transpose(spectrogram)])
    label = batch[1][0]
    #Spectrogram
    label = tf.strings.reduce_join(num_to_char(label)). numpy().decode("utf-8")
    ax = plt.subplot(2, 1, 1)
    ax.imshow(spectrogram, vmax=1)
    ax.set_title(label)
    ax.axis("off")
    #Wav
    file= tf.io.read_file(wavs_path + list(df train["file_name"])[0] + ".wav")
    audio, _ = tf.audio.decode_wav(file)
    audio = audio.numpy()
    ax = plt.subplot(2, 1, 2)
    plt.plot(audio)
    ax.set_title("Signal Wave")
    ax.set_xlin(0, len(audio))
    display.display(display.Audio (np. transpose (audio), rate=16000))
plt.show()



# Model

In [None]:
def CTCLoss(y_true, y_pred):
  batch_len = tf.cast(tf.shape(y_true) [0], dtype="int64")
  input_length= tf.cast(tf.shape (y_pred) [1], dtype="int64")
  label_length = tf.cast(tf.shape (y_true) [1], dtype="int64")

  input_length = input_length * tf.ones(shape=(batch_len,1), dtype="int64")
  label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

  loss = keras.backend.ctc_batch_cost (y_true, y_pred, input_length, label_length)
  return loss

#Model DeepSpeech2

In [None]:
def build_model(input_dim, output_dim, ran_layers=5, rnn_units=128):
    input_spectrogram = layers.Input((None, input_dim), name="input")
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim") (input_spectrogram)
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 41],
        strides=[2, 2],
        padding="same",
        use_bias=False,
        name="conv_1",
    )(x)

    x = layers.BatchNormalization(name="conv_1_bn") (x)
    x = layers.ReLU(name="cony_1_relu") (x)
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 21],
        strides=[1, 2],
        padding="same",
        use_bias=False,
        name="conv_2",
    )(x)
    x = layers.BatchNormalization(name="conv_2_bn")(x)
    x = layers. ReLU(name="conv 2 relu")(x)
    x = layers.Reshape((-1, x.shape[-2] x.shape[-1]))(x)

    for i in range(1, rnn_layers + 1):
        recurrent layers.GRU(
            units = rnn_units,
            activations=" tanh",
            recurrent_activation="sigmoid",
            use=bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}",
        )
        x= layers.Bidirectional(
           recurrent, name=f"bidirectional _{i}", merge_mode="concat")(x)
        if i  <  rnn_layers:
          x= layers.Dropout (rate=0.5)(x)

    x = layers.Dense (units=rnn_units * 2, name="dense_1")(x)
    x= layers.ReLU(name="dense_1_relu") (x)
    x = layers.Dropout (rate=8.5)(x)

    output = layers.Dense (units=output_dim + 1, activation="softmax")(x)
    model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")
    opt =  keras.optimizers.Adam(learning_rate=1e-4)
    model.compile(optimizer=opt, loss=CTCLoss)
    return model

model = build_model(
    input dim=fft_length // 2+1,
    output_dim=char_to_num. vocabulary_size(),
    rnn_units=512,
)

model.summary(line_length=110)

#Training and Evaluating


In [None]:

def decode_batch_predictions(pred):
    input_len = np.ones(pred.shape[0]) * pred.shape[1]
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
    output_text = []
    for result in results:
        result = tf.strings. reduce_join (num_to_char(result)). numpy().decode("utf-8")
        output_text.append(result)
    return output_text
class CallbackEval(keras.callbacks.Callback):
    def  __init__(self, dataset):
         super()._init_()
         self.dataset = dataset
    def on_epoch_end(self, epoch: int, logs=None):
        predictions = []
        targets =[]
        for batch in self.dataset:
            X, y = batch
            batch_predictions=model.predict(X)
            batch_predictions = decode_batch_predictions (batch_predictions)
            predictions.extend(batch_predictions)
            for label in y:
                label = (tf.strings.reduce_join(num_to_char(label)). numpy().decode("utf-8"))
                targets.append(label)
    wer_score = wer(targets, predictions)
    print("-" * 100)
    print (f"Word Error Rate: {wer_score: 4f}")
    print("- " * 100)
    for i in np.random.randint(0, len(predictions), 2):
        print(f"Target        : {targets[i]}")
        print(f"Prediction: {predictions[i]}")
        print("-" * 100)

let start the trainning process

In [None]:
epochs = 2
validation_callback = CallbackEval(validation_dataset)
history = model.fit(
train_dataset,
validation_data=validation_ dataset,
epochs = epochs,
callbacks=[validation_callback],
)

#Inference

In [None]:
predictions = []
targets = []
for batch in validation_dataset:
    X, y = batch
    batch_predictions= model.predict(X)
    batch_predictions = decode_batch_predictions(batch_predictions)
    predictions.extend(batch predictions)
    for label in y:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Word Error Rate: {wer_score: 4f}")
print("-" * 180)
for i in np.random.randint(s, len(predictions), 5):
    print(f"Target (targets[i]}")
    print(f"Prediction: (predictions[1]}")
    print("-", * 100)