In [1]:
!pip install jiwer



In [2]:
!pip install butter



In [3]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from IPython import display
from jiwer import wer
import os
import warnings
import random
import librosa
import librosa.display
from scipy.signal import butter, lfilter

2024-03-27 06:30:14.803006: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-27 06:30:14.803067: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-27 06:30:14.804514: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [4]:
wavs_path = "/kaggle/input/ljspeech/ljspeech-1.1/LJSpeech-1.1/wavs/"

In [5]:
df = pd.read_csv("/kaggle/input/ljspeech/ljspeech-1.1/LJSpeech-1.1/metadata.csv", sep="|", header = None, quoting = 3)
df.head()

Unnamed: 0,0,1,2
0,LJ001-0001,"Printing, in the only sense with which we are ...","Printing, in the only sense with which we are ..."
1,LJ001-0002,in being comparatively modern.,in being comparatively modern.
2,LJ001-0003,For although the Chinese took impressions from...,For although the Chinese took impressions from...
3,LJ001-0004,"produced the block books, which were the immed...","produced the block books, which were the immed..."
4,LJ001-0005,the invention of movable metal letters in the ...,the invention of movable metal letters in the ...


In [6]:
# Reanme the column of the DataFrame
df.columns = ["file_name", "transcription", "normalized_transcription"]

# Reordering the column in the DataFrame
df=df[["file_name","transcription","normalized_transcription"]]

# Randomly shuffling the rows of the DataFrame
df=df.sample(frac=1).reset_index(drop=True)
df.head(3)

Unnamed: 0,file_name,transcription,normalized_transcription
0,LJ004-0122,fifty-nine had no division whatever to separat...,fifty-nine had no division whatever to separat...
1,LJ006-0010,"Mr. Samuel Hoare, when examined, considered it...","Mr. Samuel Hoare, when examined, considered it..."
2,LJ016-0038,in the wall above the chevaux-de-frise project...,in the wall above the chevaux-de-frise project...


In [7]:
# Split the data such that 90% of the data is for training & 10% is for validation
split = int(len(df) * 0.90)
df_train =df[:split]
df_val=df[split:]
print(f"size of the training set:{len(df_train)}")
print(f"size of the validation set:{len(df_val)}")

size of the training set:11790
size of the validation set:1310


In [8]:
# Define the set of character that will be accepted in the transcription
characters = [x for x in "abcdefghijklmnopqrstuvwxyz'?! "]

# Mapping characters to integers
char_to_num = keras.layers.StringLookup(vocabulary=characters, oov_token="")

# Mapping integers back to original characters
num_to_char = keras.layers.StringLookup(
    vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True
)
print(
    f"The vocabulary is: {char_to_num.get_vocabulary()} "
    f"(size ={char_to_num. vocabulary_size()})"
)

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', ' '] (size =31)


In [9]:
# An integer scalar Tensor. The window length in samples.
frame_length = 256
# An integer scalarITensor. The number of samples to step.
frame_step = 160 
# If not provided, uses the smallest power of 2 enclosing frame_length.
fft_length = 384

# Define the cutoff frequency for the low-pass filter
cutoff_frequency = 3000

# Define the sample rate of the audio
sample_rate = 16000

# Define a Butterworth low-pass filter function
def butter_lowpass_filter(data, cutoff, fs, order=5):
    nyquist = 0.5 * fs
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    # Apply the filter to the data
    y = lfilter(b, a, data)
    # Ensure that the output is float32
    return np.float32(y)

# Define a function to encode a single audio file and its label
def encode_single_simple(wav_file, label):
    # Read the audio file
    file = tf.io.read_file(wavs_path + wav_file + ".wav")
    # Decode the audio file
    audio, _ = tf.audio.decode_wav(file)
    # Remove singleton dimensions
    audio = tf.squeeze(audio, axis=-1)
    # Ensure that the output has a data type of float32
    audio = tf.cast(audio, tf.float32)

    # Apply low-pass filter to the audio to remove high-frequency noise
    audio = tf.numpy_function(butter_lowpass_filter, [audio, cutoff_frequency, sample_rate], tf.float32)

    # Compute the spectogram of the audio
    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    
    # Compute the absolute value of the spectrogram
    spectrogram = tf.abs(spectrogram)
    
    # Apply a power transformation to the spectrogram
    spectrogram = tf.math.pow(spectrogram, 0.5)
    
    # Compute the mean of the spectrogram
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    
    # Compute the standard deviation of the spectorgram
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    
    # Normalize the spectogram
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    
    # Convert the label to lower case
    label = tf.strings.lower(label)
    
    # Split the labels into character
    label = tf.strings.unicode_split(label, input_encoding="UTF-8")
    label = char_to_num(label)
    return spectrogram, label

In [10]:
batch_size= 32

# Create the training dataset
train_dataset = tf.data.Dataset.from_tensor_slices(
    (list(df_train["file_name"]),list(df_train["transcription"]))
)

# Map the encode_single_simple function to preporcess the element of training dataset
train_dataset = (
    train_dataset.map(encode_single_simple, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(
        batch_size,
        padded_shapes=(
            [None, None],  # Spectrogram shape: variable time steps x variable frequency bins
            [None],  # Label shape: variable length
        ),
        padding_values=(0.0, char_to_num("")),  # Pad spectrogram with zeros, label with ""
        drop_remainder=True,  # Drop remainder to ensure batches are consistent
    )
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

# Craeate the validation dataset
validation_dataset  = tf.data.Dataset.from_tensor_slices(
    (list(df_val["file_name"]),list(df_val["transcription"]))
)

# Map the encode_single_simple function to preporcess the element of validation dataset
validation_dataset = (
    validation_dataset.map(encode_single_simple, num_parallel_calls=tf.data.AUTOTUNE)
    .padded_batch(
        batch_size,
        padded_shapes=(
            [None, None],  # Spectrogram shape: variable time steps x variable frequency bins
            [None],  # Label shape: variable length
        ),
        padding_values=(0.0, char_to_num("")),  # Pad spectrogram with zeros, label with ""
        drop_remainder=True,  # Drop remainder to ensure batches are consistent
    )
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

In [12]:
def CTCLoss(y_true, y_pred):
# Compute the training-time loss value
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf. cast(tf.shape(y_true)[1], dtype="int64")
    
    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    
    # Compute the CTC loss using the backend function ctc_batch_cost
    loss = keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [13]:
from tensorflow import keras
from tensorflow.keras import layers

def build_model(input_dim, output_dim, rnn_layers=5, rnn_units=128):
    # Define the input layer for the spectrogram
    input_spectrogram = layers.Input((None, input_dim), name="input")
    
    # Expand dimension to add a channel dimension for Conv2D
    x = layers.Reshape((-1, input_dim, 1), name="expand_dim")(input_spectrogram)
    
    # First convolutional layers
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 41],
        strides=[2, 2],
        padding="same",
        use_bias=False,
        name="conv_1",
    )(x)
    x = layers.BatchNormalization(name="conv_1_bn")(x)
    x = layers.ReLU(name="conv_1_relu")(x)
    
    # Second convolutional layers
    x = layers.Conv2D(
        filters=32,
        kernel_size=[11, 21],
        strides=[1, 2],
        padding="same",
        use_bias=False,
        name="conv_2",
    )(x)
    x = layers.BatchNormalization(name="conv_2_bn")(x)
    x = layers.ReLU(name="conv_2_relu")(x)
    
    # Reshape for RNN input
    x = layers.Reshape((-1, x.shape[-2] * x.shape[-1]))(x)
    
    # Stacked Bidirectional GRU layers
    for i in range(1, rnn_layers + 1):
        recurrent = layers.GRU(
            units=rnn_units,
            activation="tanh",
            recurrent_activation="sigmoid",
            use_bias=True,
            return_sequences=True,
            reset_after=True,
            name=f"gru_{i}",
        )
        x = layers.Bidirectional(
            recurrent, name=f"bidirectional_{i}", merge_mode="concat"
        )(x)

        #Apply dropout except for the last layer
        if i < rnn_layers:
            x = layers.Dropout(rate=0.5)(x)

    # Dense layer
    x = layers.Dense(units=rnn_units * 2, name="dense_1")(x)
    x = layers.ReLU(name="dense_1_relu")(x)
    x = layers.Dropout(rate=0.5)(x)
    
    # Output layer with softmax activation
    output = layers.Dense(units=output_dim + 1, activation="softmax")(x)

    # Crate the model
    model = keras.Model(input_spectrogram, output, name="DeepSpeech_2")

    # Optimizer
    opt = keras.optimizers.Adam(learning_rate=1e-4)

    # Compile the model using the CTC loss function
    model.compile(optimizer=opt, loss=CTCLoss)
    return model

# Assuming fft_length and char_to_num are defined earlier
# Get the model
model = build_model(
    input_dim=fft_length // 2 + 1,
    output_dim=char_to_num.vocabulary_size(),
    rnn_units=512,
)

# Display the model summary
model.summary(line_length=110)

In [14]:
def decode_batch_predictions(pred):
    # Decode the predictions from the model's output tensor
    input_len = np.ones(pred.shape[0], dtype=np.int32) * pred.shape[1]
    # Using the CTC decoding to decode the predictions
    results = keras.backend.ctc_decode(pred, input_length=input_len, greedy=True)[0][0]
    output_text = []
    # Convert the integer sequences into text
    for result in results:
        result = tf.strings.reduce_join(num_to_char(result)).numpy().decode("utf-8")
        output_text.append(result)
    return output_text

class CallbackEval(keras.callbacks.Callback):
    def __init__(self, dataset):
        super().__init__()
        self.dataset = dataset
        
    def on_epoch_end(self, epoch: int, logs=None):
        predictions = []
        targets = []
        for batch in self.dataset:
            X, y = batch
            # Get preditions from the model
            batch_predictions = model.predict(X)
            # Decode the batch predictions
            batch_predictions = decode_batch_predictions(batch_predictions)
            predictions.extend(batch_predictions)
            # convert labels to text
            for label in y:
                label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
                targets.append(label)
        # Compute the word error rate between targets and predictions
        wer_score = wer(targets, predictions)
        print("-" * 100)
        print(f"word error rate: {wer_score:.4f}")
        print("-" * 100)
        for i in np.random.randint(0, len(predictions), 2):
            print(f"Target: {targets[i]}")
            print(f"Predictions: {predictions[i]}")
            print("-" * 100)
            
        # Write the WER score to a file
        with open('wer_score.txt','a') as file:
            file.write(f"Epochs {epoch +1}: {wer_score:.4f}\n")

In [None]:
epochs = 50
validation_callback = CallbackEval(validation_dataset)
# Train the model
history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=epochs,
    callbacks=[validation_callback],
)

In [None]:
# Saved the trained model
model.save_weights('/kaggle/working/DeepSpeech_model_weights_50(Latest).weights.h5')

In [None]:
# Perform predictions on the validation dataset
predictions = []
targets = []
for batch in validation_dataset:
    X, y = batch
    batch_predictions = model.predict(X)
    batch_predictions = decode_batch_predictions(batch_predictions)
    predictions.extend(batch_predictions)
    for label in y:
        label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
        targets.append(label)
wer_score = wer(targets, predictions)
print("-" * 100)
print(f"Word Error Rate: {wer_score:.4f}")
print("-" * 100)
for i in np.random.randint(0, len(predictions), 5):
    print(f"Target    : {targets[i]}")
    print(f"Prediction: {predictions[i]}")
    print("-" * 100)

In [15]:
# Load the saved trained model
model.load_weights('/kaggle/input/newest-saved-trained-model/DeepSpeech_model_weights_50(Latest).weights.h5')
model.summary()

  trackable.load_own_variables(weights_store.get(inner_path))


In [None]:
import IPython.display as ipd
aud = ipd.Audio('/kaggle/input/clear-speech-dataset/MKH800_21_0015.wav')

In [None]:
aud

In [42]:
# Predictions
def transcribe_single_voice(audio_file_path):
    # Read the audio file
    file = tf.io.read_file(audio_file_path)
    audio, _ = tf.audio.decode_wav(file)
    audio = tf.squeeze(audio, axis=-1)
    # Change type to float
    audio = tf.cast(audio, tf.float32)
    # Apply low-pass filter to the audio
    audio = tf.numpy_function(butter_lowpass_filter, [audio, cutoff_frequency, sample_rate], tf.float32)

    spectrogram = tf.signal.stft(
        audio, frame_length=frame_length, frame_step=frame_step, fft_length=fft_length
    )
    spectrogram = tf.abs(spectrogram)
    spectrogram = tf.math.pow(spectrogram, 0.5)
    means = tf.math.reduce_mean(spectrogram, 1, keepdims=True)
    stddevs = tf.math.reduce_std(spectrogram, 1, keepdims=True)
    spectrogram = (spectrogram - means) / (stddevs + 1e-10)
    # Reshape to match model input shape
    spectrogram = tf.expand_dims(spectrogram, 0)
    # Predict transcription
    predictions = model.predict(spectrogram)
    # Decode predictions
    transcription = decode_batch_predictions(predictions)[0]
    return transcription

# Provide path to the custom voice file
custom_voice_path = "/kaggle/input/clear-speech-dataset/MKH800_22_0015.wav"
# Transcribe the custom voice
transcription = transcribe_single_voice(custom_voice_path)
print("Transcription:", transcription)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
Transcription:  pill othentthi frashien fbingenle
