<a href="https://colab.research.google.com/github/KJOELJOYSON2427/speech-recognition-CRNN_using_LibriSpeech_corpus_dataset/blob/main/speech_recognition_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<pre>
Preprocess.run()
    ↓
DatasetReader created
    ↓
store_samples("train")
    ↓
store_samples("test")
So the high-level order is:

Preprocess.run()

store_samples("train")

store_samples("test")

Both train and test follow the exact same inner steps.
</pre>

In [None]:
import logging
import os
import fnmatch
import random
import librosa
import numpy as np
from tqdm import tqdm

In [None]:

# ------------------------------------------
# POWER SPECTROGRAM CALCULATION
# ------------------------------------------
def calculatePowerSpectrogram(
    audio_data,
    samplerate,
    n_mels=128,
    n_fft=512,
    hop_length=160):
    spect = librosa.feature.melspectrogram(
    audio_data,
    sr=samplerate
    n_mels=n_mels,
    n_fft=n_fft, hop_length=hop_length
    )
    spectLog = librosa.power_to_db(spect, ref=np.max)
    spectNorm = (spectLog - np.mean(spectLog)) / np.std(spectLog)
    return spectNorm.T


In [None]:
def letterToId(letter):
    if letter == ' ':
        return 27
    if letter == "'":
        return 26
    return ord(letter) - ord('a')

def sentenceToIds(sentence):
  return [letterToId(letter) for letter in sentence.lower()]



[19,
 7,
 4,
 27,
 5,
 20,
 13,
 2,
 19,
 8,
 14,
 13,
 27,
 14,
 17,
 3,
 -57,
 26,
 0,
 26,
 -56,
 27,
 17,
 4,
 19,
 20,
 17,
 13,
 18,
 27,
 19,
 7,
 4,
 27,
 8,
 13,
 19,
 4,
 6,
 4,
 17,
 27,
 -40,
 -42,
 -51,
 27,
 19,
 7,
 8,
 18,
 27,
 13,
 20,
 12,
 1,
 4,
 17,
 27,
 17,
 4,
 15,
 17,
 4,
 18,
 4,
 13,
 19,
 18,
 27,
 19,
 7,
 4,
 27,
 20,
 13,
 8,
 2,
 14,
 3,
 4,
 27,
 2,
 14,
 3,
 4,
 27,
 15,
 14,
 8,
 13,
 19,
 27,
 -57,
 22,
 7,
 8,
 2,
 7,
 27,
 8,
 13,
 2,
 11,
 20,
 3,
 4,
 18,
 27,
 19,
 7,
 4,
 27,
 0,
 18,
 2,
 8,
 8,
 27,
 21,
 0,
 11,
 20,
 4,
 -56,
 27,
 5,
 14,
 17,
 27,
 19,
 7,
 4,
 27,
 11,
 14,
 22,
 4,
 17,
 2,
 0,
 18,
 4,
 27,
 2,
 7,
 0,
 17,
 0,
 2,
 19,
 4,
 17]

In [None]:
def recursiveTraverse(directory, file_pattern):
  for root, dirs, files in os.walk(directory):
    for name in fnmatch.filter(files, file_pattern):
      yield os.path.join(root, name)


In [None]:
class DatasetReader:
  def __init__(self, data_directory):
    self._data_directory = data_directory
    self._transcript_dict_cache = None

  @property
  def _transcript_dict(self):
    if self._transcript_dict_cache is None:
      self._transcript_dict_cache = self._build_transcript()
    return self._transcript_dict_cache

  @staticmethod
  def _get_transcript_entries(transcript_directory):
    files = recursiveTraverse(transcript_directory, "*.trans.txt")
    for tfile in files:
        with open(tfile, 'r') as f:
          for line in f:
            line = line.rstrip('\n')
            audio_id, text = line.split(' ', 1)
            yield audio_id, text
  @staticmethod
  def _extract_audio_id(audio_file):
        return os.path.splitext(os.path.basename(audio_file))[0]

  def _transform_sample(self, audio_file, preprocess_fnc):
    audio, sr = librosa.load(audio_file, sr=None)
    features = preprocess_fnc(audio, sr)
    audio_id = self._extract_audio_id(audio_file)
    return audio_id, features


  def _build_transcript(self):
       transcripts ={}
       for audio_id, text in self._get_transcript_entries(self._data_directory):
        transcripts[audio_id] = sentenceToIds(text)
       return transcripts

  def store_samples(self, directory, preprocess_fnc):
    # Output path
    out_dir = os.path.join(self._data_directory, "preprocessed", directory)
    os.makedirs(out_dir, exists_ok=True)

    #Input audio
    audio_files = list(recursiveTraverse(
            os.path.join(self._data_directory, directory), "*.flac"
        ))
    transcripts = self._transcript_dict

    print(f"Processing {len(audio_files)} audio files...")

    for audio_file in tqdm(audio_files):
      audio_id = self._extract_audio_id(audio_file)

      if audio_id not in transcripts:
                continue

      transcript = transcripts[audio_id]

      audio_id, features = self._transform_sample(audio_file, preprocess_fnc)

      np.savez(
                os.path.join(out_dir, audio_id),
                audio_fragments=features,
                transcript=transcript
            )
    print("Done saving samples.")


In [None]:
class Preprocess:
  def run(self, base_dir="/content/data"):
    reader = DatasetReader(base_dir)
    preprocess_fnc = calculatePowerSpectrogram

    reader.store_samples("train", preprocess_fnc)
    reader.store_samples("test", preprocess_fnc)


In [None]:
from google.colab import files
files.upload()
p = Preprocess()
p.run("/content/data")


KeyboardInterrupt: 

<pre>
Raw Audio (.flac)
    ↓
Preprocessing (mel / power spectrogram)
    ↓
Saved .npz files
    ↓
Data Generator (yields features + transcript)
    ↓
✅ InputBatchLoader / SingleInputLoader (THIS CODE)
    ↓
Acoustic Model (CNN / LSTM / Transformer)
    ↓
CTC Loss
    ↓
Training / Decoding

</pre>


<pre>
Before padding:

Sample 1: [frame x 480]
Sample 2: [frame x 620]

After padding:

Sample 1: [480 real | 140 zeros]
Sample 2: [620 real]

</pre>


This function converts variable-length frame sequences into a fixed-size batch tensor while remembering the real lengths.

In [None]:
# ===============================
# 1️⃣ COLAB + TF1 SETUP
# ===============================
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()

import threading
import numpy as np
from abc import abstractmethod

print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.19.0


In [None]:
# ===============================
# 2️⃣ BASE INPUT LOADER
# ===============================

class BaseInputLoader:

  def __init__(self, input_size):
    self.input_size = input_size

  def _get_inputs_feed_iyem(self, input_list):
      """
        Pads variable-length inputs
      """
      print(input_list.shape)
      sequence_lengths = np.array(
          [inp.shape[0] for inp in input_list]
      )
      max_time = sequence_lengths.max()
      input_tensor = np.zeros(
          (len(input_list), max_time, self.input_size)
      )

      for idx, inp in enumerate(input_list):
        input_tensor[idx, :inp.shape[0], :] = inp
      print(input_tensor.shape)
      return input_tensor, sequence_lengths, max_time

  @staticmethod
  def _get_labels_feed_item(label_list, max_time):
    """
        Converts labels to SparseTensorValue (needed for CTC)
    """
    label_shape = np.array([len(label_list), max_time], dtype=np.int64)
    label_indices = []
    label_values = []
    for label_idx, label in enumerate(label_list):
      for time_idx, value in enumerate(label):
          label_indices.append([label_idx, time_idx])
          label_values.append(value)
    label_indices = np.array(label_indices, dtype=np.int64)
    label_values = np.array(label_values, dtype=np.int32)
    return tf.SparseTensorValue(label_indices, label_values, label_shape)

  @abstractmethod
  def get_inputs(self):
     raise NotImplementedError()


In [None]:
# ===============================
# 4️⃣ BATCH INPUT LOADER (Training)
# ===============================
class InputBatchLoader(BaseInputLoader):

  def __init__(self, input_size, batch_size, data_generator_creator, max_steps=None):
     super().__init__(input_size)

     self.batch_size = batch_size
     self.data_generator_creator = data_generator_creator
     self.steps_left = max_steps

     with tf.device("/cpu:0"):
      self.inputs = tf.placeholder(
                tf.float32, [batch_size, None, input_size], name="inputs"
            )
      self.sequence_lengths = tf.placeholder(
                tf.int32, [batch_size], name="sequence_lengths"
            )
      self.labels = tf.sparse_placeholder(tf.int32, name="labels")

      self.queue = tf.FIFOQueue(
                capacity=50,
                dtypes=[tf.float32, tf.int32, tf.string]
      )

      serialized_labels = tf.serialize_many_sparse(self.labels)

      self.enqueue_op = self.queue.enqueue(
                [self.inputs, self.sequence_lengths, serialized_labels]
      )


def get_inputs(self):
  inputs, seq_lengths, labels = self.queue.dequeue()
  labels = tf.deserialize_many_sparse(labels, dtype=tf.int32)
  return inputs, seq_lengths, labels


def _batch(self, iterable):
    args = [iter(iterable)] * self.batch_size
    return zip(*args)


def _enqueue(self, sess, coord):
  generator = self.data_generator_creator()
  for batch in self._batch(generator):
      input_list, label_list = zip(*batch)
      input_tensor, seq_lengths, max_time = \
                self._get_inputs_feed_item(input_list)






In [14]:
def _batch( iterable):
        args = [iter(iterable)] * 2
        return zip(*args)


def dummy_data_generator():
    while True:
        time_steps = np.random.randint(80, 150)
        mel_features = np.random.rand(time_steps, 128).astype(np.float32)
        transcript = np.random.randint(0, 28, size=np.random.randint(5, 15)).tolist()
        yield mel_features, transcript

for batch in _batch(dummy_data_generator()):
    print(len(batch[0]))
    input_list, label_list = zip(*batch)
print(_batch(dummy_data_generator()))

<zip object at 0x7f336d4c4580>
<zip object at 0x7f336d4c4580>
