# **Relevant Libraries**

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import soundfile as sf
import librosa
import os
import io
import re


from pathlib import Path
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Dense, TimeDistributed, Activation, Input
from keras.optimizers import Adam

# **Storing the filepath of the testing and training data folders**

In [2]:
train1_dir = Path('/kaggle/input/transcription/LibriSpeech/dev-clean')
test1_dir = Path('/kaggle/input/test-data-1/LibriSpeech/test-clean')

#   **Preprocessing the Training Data**

# **Helper Functions for Audio**

In [3]:
def load_flac(path):
    data, sample_rate = sf.read(path)
    return data, sample_rate

target_sr = 16000 #We want our audio samples to have a sample rate of 16kHz

def sample_audio(data, prev_sr, target_sr = target_sr): #We want all the sample rates to be consistent across training data
    resample = librosa.resample(data, orig_sr = prev_sr, target_sr = target_sr)
    if len(data.shape) == 2:
        data = np.mean(data, axis=1)
    return resample

def normalize_volume(data): #We want the volume of the sample audio to be consistent across all training data
    return librosa.util.normalize(data)

n_mfcc = 13 #We use the first 13 mfcc coefficients for RNN speech processing models

def get_mfcc_features(data, sample_rate = target_sr, n_mfcc = n_mfcc):
    mfccs = librosa.feature.mfcc(y=data, sr = sample_rate, n_mfcc = n_mfcc)
    return mfccs.T

#  **Helper functions for Text**

In [4]:
#Generates the vocabulary text to fit the tokenizer to
def tokenizer_generator(path):
    for layer1 in path.iterdir():
        for layer2 in layer1.iterdir():
            for index in layer2.iterdir():
                if index.suffix == ".txt":
                    with open(index, 'r') as file:
                        for line in file:
                            yield line.strip()


#Extracts text from files
def read_txt(path): 
    with open(path, 'r') as file:
        lines = file.readlines()
    
    text_data = []
    for line in lines:
        text = line.split(' ', 1)[1].strip()
        text_data.append(text)
    return text_data

#Pads all text to equal lengths
def pad_sequence(token_data): 
    padded_data = pad_sequences(token_data, padding='post')
    return padded_data
    

# **Loading Audio and Text**

In [5]:
def load_data(path):   
    audio_data = []
    mfcc_array = []
    text_array = []
    number = []
    #Tokenizing the Text Data
    tokenizer = Tokenizer(char_level = True)
    tokenizer.fit_on_texts(tokenizer_generator(path))
                    
    
    for flayer_1 in path.iterdir():
        for flayer_2 in flayer_1.iterdir():
            num_items = len(list(flayer_2.iterdir())) - 1
            #print(flayer_2)
            #numbers = re.findall(r'\d+', str(flayer_2))
            layer_2 = str(flayer_2)
            parts = os.path.normpath(layer_2).split(os.sep)
            #print(numbers)
            path_extension = str(parts[-2]) + '-' + str(parts[-1]) + '.trans.txt'
            txt_path = flayer_2 / path_extension
            if txt_path.suffix == ".txt":
                path = str(txt_path)
                text_data = read_txt(path)
                for sentence in text_data:
                    tokenized_data = [tokenizer.texts_to_sequences(sentence)]
                    #print(text_data)
                    text_array.append(tokenized_data)
            
            for index in range(num_items):
                audio_extension = str(parts[-2]) + '-' + str(parts[-1])
                if index < 10:
                    audio_extension = audio_extension + '-000' + str(index) + '.flac' 
                elif index < 100:
                    audio_extension = audio_extension + '-00' + str(index) + '.flac'
                elif index < 1000:
                    audio_extension = audio_extension + '-0' + str(index) + '.flac'
                else:
                    audio_extension = audio_extension + '-' + str(index) + '.flac'
                    
                audio_path = flayer_2 / audio_extension
                if audio_path.suffix == ".flac":
                    data, samplerate = load_flac(audio_path)
                    resampled_data = sample_audio(data, samplerate)
                    normalized_data = normalize_volume(resampled_data)
                    mfcc_features = get_mfcc_features(normalized_data)
                    mfcc_array.append(mfcc_features)
    
    
    sequence_lengths = [mfcc.shape[0] for mfcc in mfcc_array]  # Assuming shape is (time_steps, n_mfcc)
    max_length = max(sequence_lengths)
    
    for index in range(len(mfcc_array)):
        pad_width = max_length - mfcc_array[index].shape[0]
        mfcc_array[index] = np.pad(mfcc_array[index], pad_width=((0, pad_width), (0, 0)), mode='constant')
    
    padded_text = []
    for text in text_array:
        if len(text) > max_length:
            padded_text.append(text[:max_length].T)
        else:
            padded_text.append(pad_sequences(text, maxlen = max_length, padding = 'post').T)
    
    padded_text = np.array(padded_text)
    padded_text = np.squeeze(padded_text, axis = 1)
    
    return mfcc_array, padded_text, max_length, tokenizer
  
                           

# Call the function
input_data, output_data, audio_max_len, tokenizer = load_data(train1_dir)

input_data = np.array(input_data)


In [6]:
print("Input data shape:", input_data.shape)
print("Output data shape:", output_data.shape)

Input data shape: (2703, 1021, 13)
Output data shape: (2703, 1021, 1)


# **Model Construction and Training**

In [None]:
transcription_model = Sequential([
    GRU(64, input_shape = (audio_max_len, 13), return_sequences=True),
    GRU(64, return_sequences=True),
    TimeDistributed(Dense(units = len(tokenizer.word_index)+1)),
    #TimeDistributed(Dense(1, activation = 'linear'))
])

transcription_model.compile(optimizer='Adam', loss='sparse_categorical_crossentropy', metrics = ['accuracy', 'mean_squared_error'])
transcription_model.summary()

transcription_model.fit(input_data, output_data, epochs=3, batch_size = 128, validation_split = 0.2)

  super().__init__(**kwargs)


Epoch 1/3


# **Process Test Data**

In [None]:
def load_test_data(path, tokenizer, max_audio_size):   
    audio_data = []
    mfcc_array = []
    text_array = []
    number = []                  
    
    for flayer_1 in path.iterdir():
        for flayer_2 in flayer_1.iterdir():
            num_items = len(list(flayer_2.iterdir())) - 1
            layer_2 = str(flayer_2)
            parts = os.path.normpath(layer_2).split(os.sep)
            path_extension = str(parts[-2]) + '-' + str(parts[-1]) + '.trans.txt'
            txt_path = flayer_2 / path_extension
            if txt_path.suffix == ".txt":
                path = str(txt_path)
                text_data = read_txt(path)
                for sentence in text_data:
                    tokenized_data = [tokenizer.texts_to_sequences(sentence)]
                    token_data = np.array(tokenized_data)
                    text_array.append(tokenized_data)
            
            for index in range(num_items):
                audio_extension = str(parts[-2]) + '-' + str(parts[-1])
                if index < 10:
                    audio_extension = audio_extension + '-000' + str(index) + '.flac' 
                elif index < 100:
                    audio_extension = audio_extension + '-00' + str(index) + '.flac'
                elif index < 1000:
                    audio_extension = audio_extension + '-0' + str(index) + '.flac'
                else:
                    audio_extension = audio_extension + '-' + str(index) + '.flac'
                    
                audio_path = flayer_2 / audio_extension
                if audio_path.suffix == ".flac":
                    data, samplerate = load_flac(audio_path)
                    resampled_data = sample_audio(data, samplerate)
                    normalized_data = normalize_volume(resampled_data)
                    mfcc_features = get_mfcc_features(normalized_data)
                    mfcc_array.append(mfcc_features)
    
    
    max_length = max_audio_size
    
    for index in range(len(mfcc_array)):
        if len(mfcc_array[index]) > max_length:
            mfcc_array[index] = mfcc_array[index][:max_length]
        else:
            pad_width = max_length - mfcc_array[index].shape[0]
            mfcc_array[index] = np.pad(mfcc_array[index], pad_width=((0, pad_width), (0, 0)), mode='constant')
            
    
    padded_text = []
    for text in text_array:
       if len(text) > max_length:
           padded_text.append(text[:max_length].T)
       else:
           padded_text.append(pad_sequences(text, maxlen = max_length, padding = 'post').T) 
    
    
    padded_text = np.array(padded_text)
    padded_text = np.squeeze(padded_text, axis = 1)
    print(padded_text.shape)
    return mfcc_array, padded_text, max_length, tokenizer
  
                           

# Call the function

test_input, test_output, testmaxlen, test_tokenizer = load_test_data(test1_dir, tokenizer, audio_max_len)
test_input = np.array(test_input)


In [None]:
print("Input data shape:", input_data.shape)
print("Output data shape:", output_data.shape)
print("Input test data shape:", test_input.shape)
print("Output test data shape:", test_output.shape)

In [None]:
results = transcription_model.evaluate(test_input, test_output, batch_size=64)
print("test loss, test acc:", results)


In [None]:
results = np.array(results)
print(results.shape)

In [None]:
transcription_model.save('transcription.h5')

In [None]:
#print(output_data)
def unprocess(test, tokenizer):
    char_dict = tokenizer.index_word  # Mapping from index to character
    
    reversed_sequences = []
    
    for batch in test:  # Iterate over each batch
        batch_sequences = []
        for token_seq in batch:  # Iterate over each sequence in the batch
            if isinstance(token_seq, np.ndarray):
                token_seq = token_seq.tolist()  # Convert NumPy array to list
            
            # Convert each token in the sequence to the corresponding character
            chars = ''.join([char_dict[token] for token in token_seq if token in char_dict])
            batch_sequences.append(chars)
        
        reversed_sequences.append(batch_sequences)
    
    return reversed_sequences

In [None]:
next_input_data = np.expand_dims(test_input[56], axis=0)
text_tokens = transcription_model.predict(next_input_data)
print(text_tokens.shape)
print(text_tokens)
text = unprocess(text_tokens, tokenizer)
#print(text[0])

string_text = ''.join(text[0])
print(string_text)

In [None]:
text_2 = unprocess()