<a href="https://colab.research.google.com/github/KJOELJOYSON2427/API_GATEWAY_MICROSERVICE/blob/main/speech_recognition_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<pre>
Preprocess.run()
    ‚Üì
DatasetReader created
    ‚Üì
store_samples("train")
    ‚Üì
store_samples("test")
So the high-level order is:

Preprocess.run()

store_samples("train")

store_samples("test")

Both train and test follow the exact same inner steps.
</pre>

In [1]:
import logging
import os
import fnmatch
import random
import librosa
import numpy as np
from tqdm import tqdm

In [2]:

# ------------------------------------------
# POWER SPECTROGRAM CALCULATION
# ------------------------------------------
def calculatePowerSpectrogram(
    audio_data,
    samplerate,
    n_mels=128,
    n_fft=512,
    hop_length=160):
  spect = librosa.feature.melspectrogram(audio_data,sr=samplerate,n_mels=n_mels,n_fft=n_fft, hop_length=hop_length)
  spectLog = librosa.power_to_db(spect, ref=np.max)
  spectNorm = (spectLog - np.mean(spectLog)) / np.std(spectLog)
  return spectNorm.T


In [3]:
def letterToId(letter):
    if letter == ' ':
        return 27
    if letter == "'":
        return 26
    return ord(letter) - ord('a')

def sentenceToIds(sentence):
  return [letterToId(letter) for letter in sentence.lower()]



In [4]:
def recursiveTraverse(directory, file_pattern):
  for root, dirs, files in os.walk(directory):
    for name in fnmatch.filter(files, file_pattern):
      yield os.path.join(root, name)


In [5]:
class DatasetReader:
  def __init__(self, data_directory):
    self._data_directory = data_directory
    self._transcript_dict_cache = None

  @property
  def _transcript_dict(self):
    if self._transcript_dict_cache is None:
      self._transcript_dict_cache = self._build_transcript()
    return self._transcript_dict_cache

  @staticmethod
  def _get_transcript_entries(transcript_directory):
    files = recursiveTraverse(transcript_directory, "*.trans.txt")
    for tfile in files:
        with open(tfile, 'r') as f:
          for line in f:
            line = line.rstrip('\n')
            audio_id, text = line.split(' ', 1)
            yield audio_id, text
  @staticmethod
  def _extract_audio_id(audio_file):
        return os.path.splitext(os.path.basename(audio_file))[0]

  def _transform_sample(self, audio_file, preprocess_fnc):
    audio, sr = librosa.load(audio_file, sr=None)
    features = preprocess_fnc(audio, sr)
    audio_id = self._extract_audio_id(audio_file)
    return audio_id, features


  def _build_transcript(self):
       transcripts ={}
       for audio_id, text in self._get_transcript_entries(self._data_directory):
        transcripts[audio_id] = sentenceToIds(text)
       return transcripts

  def store_samples(self, directory, preprocess_fnc):
    # Output path
    out_dir = os.path.join(self._data_directory, "preprocessed", directory)
    os.makedirs(out_dir, exists_ok=True)

    #Input audio
    audio_files = list(recursiveTraverse(
            os.path.join(self._data_directory, directory), "*.flac"
        ))
    transcripts = self._transcript_dict

    print(f"Processing {len(audio_files)} audio files...")

    for audio_file in tqdm(audio_files):
      audio_id = self._extract_audio_id(audio_file)

      if audio_id not in transcripts:
                continue

      transcript = transcripts[audio_id]

      audio_id, features = self._transform_sample(audio_file, preprocess_fnc)

      np.savez(
                os.path.join(out_dir, audio_id),
                audio_fragments=features,
                transcript=transcript
            )
    print("Done saving samples.")


In [6]:
class Preprocess:
  def run(self, base_dir="/content/data"):
    reader = DatasetReader(base_dir)
    preprocess_fnc = calculatePowerSpectrogram

    reader.store_samples("train", preprocess_fnc)
    reader.store_samples("test", preprocess_fnc)


<pre>
Raw Audio (.flac)
    ‚Üì
Preprocessing (mel / power spectrogram)
    ‚Üì
Saved .npz files
    ‚Üì
Data Generator (yields features + transcript)
    ‚Üì
‚úÖ InputBatchLoader / SingleInputLoader (THIS CODE)
    ‚Üì
Acoustic Model (CNN / LSTM / Transformer)
    ‚Üì
CTC Loss
    ‚Üì
Training / Decoding

</pre>


<pre>
Before padding:

Sample 1: [frame x 480]
Sample 2: [frame x 620]

After padding:

Sample 1: [480 real | 140 zeros]
Sample 2: [620 real]

</pre>


This function converts variable-length frame sequences into a fixed-size batch tensor while remembering the real lengths.

In [7]:
# ===============================
# 1Ô∏è‚É£ COLAB + TF1 SETUP
# ===============================
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()

import threading
import numpy as np
from abc import abstractmethod

print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.19.0


In [8]:
from abc import abstractmethod

# ===============================
# 2Ô∏è‚É£ BASE INPUT LOADER
# ===============================

class BaseInputLoader:

  def __init__(self, input_size):
    self.input_size = input_size

  def _get_inputs_feed_item(self, input_list):
      """
        Pads variable-length inputs
      """
      print(input_list.shape)
      sequence_lengths = np.array(
          [inp.shape[0] for inp in input_list]
      )
      max_time = sequence_lengths.max()
      input_tensor = np.zeros(
          (len(input_list), max_time, self.input_size)
      )

      for idx, inp in enumerate(input_list):
        input_tensor[idx, :inp.shape[0], :] = inp
      print(input_tensor.shape)
      return input_tensor, sequence_lengths, max_time

  @staticmethod
  def _get_labels_feed_item(label_list, max_time):
    """
        Converts labels to SparseTensorValue (needed for CTC)
    """
    label_shape = np.array([len(label_list), max_time], dtype=np.int64)
    label_indices = []
    label_values = []
    for label_idx, label in enumerate(label_list):
      for time_idx, value in enumerate(label):
          label_indices.append([label_idx, time_idx])
          label_values.append(value)
    label_indices = np.array(label_indices, dtype=np.int64)
    label_values = np.array(label_values, dtype=np.int32)
    return tf.SparseTensorValue(label_indices, label_values, label_shape)

  @abstractmethod
  def get_inputs(self):
     raise NotImplementedError()

In [9]:
# ===============================
# 3Ô∏è‚É£ SINGLE INPUT LOADER (Inference)
# ===============================
class SingleInputLoader(BaseInputLoader):
   def __init__(self, input_size):
        super().__init__(input_size)
        self.speech_input = None
        batch_size = 1
        with tf.device("/cpu:0"):
            self.inputs = tf.placeholder(
                tf.float32, [batch_size, None, input_size], name="inputs"
            )
            self.sequence_lengths = tf.placeholder(
                tf.int32, [batch_size], name="sequence_lengths"
            )
   def get_inputs(self):
        return self.inputs, self.sequence_lengths, None

   def set_input(self, speech_input):
        self.speech_input = speech_input

   def get_feed_dict(self):
        if self.speech_input is None:
            raise ValueError("Call set_input() first")

        input_tensor, seq_lengths, _ = self._get_inputs_feed_item(
            [self.speech_input]
        )
        self.speech_input = None

        return {
            self.inputs: input_tensor,
            self.sequence_lengths: seq_lengths
        }


In [10]:
# ===============================
# 4Ô∏è‚É£ BATCH INPUT LOADER (Training)
# ===============================
class InputBatchLoader(BaseInputLoader):

  def __init__(self, input_size, batch_size, data_generator_creator, max_steps=None):
     super().__init__(input_size)

     self.batch_size = batch_size
     self.data_generator_creator = data_generator_creator
     self.steps_left = max_steps

     with tf.device("/cpu:0"):
      self.inputs = tf.placeholder(
                tf.float32, [batch_size, None, input_size], name="inputs"
            )
      self.sequence_lengths = tf.placeholder(
                tf.int32, [batch_size], name="sequence_lengths"
            )
      self.labels = tf.sparse_placeholder(tf.int32, name="labels")

      self.queue = tf.FIFOQueue(
                capacity=50,
                dtypes=[tf.float32, tf.int32, tf.string]
      )

      serialized_labels = tf.serialize_many_sparse(self.labels)

      self.enqueue_op = self.queue.enqueue(
                [self.inputs, self.sequence_lengths, serialized_labels]
      )


  def get_inputs(self):
    inputs, seq_lengths, labels = self.queue.dequeue()
    labels = tf.deserialize_many_sparse(labels, dtype=tf.int32)
    return inputs, seq_lengths, labels


  def _batch(self, iterable):
      args = [iter(iterable)] * self.batch_size
      return zip(*args)


  def _enqueue(self, sess, coord):
    generator = self.data_generator_creator()
    for batch in self._batch(generator):
        input_list, label_list = zip(*batch)
        input_tensor, seq_lengths, max_time = \
                  self._get_inputs_feed_item(input_list)
        labels = self._get_labels_feed_item(label_list, max_time)

        sess.run(self.enqueue_op, feed_dict={
            self.inputs: input_tensor,
            self.sequence_lengths: seq_lengths,
            self.labels: labels
        })
        if self.steps_left is not None:
            self.steps_left -= 1
            if self.steps_left == 0:
                break

        if coord.should_stop():
          break

    sess.run(self.queue.close())

  def start_threads(self, sess, coord, n_threads=1):
      threads = []
      for _ in range(n_threads):
          t = threading.Thread(target=self._enqueue, args=(sess, coord))
          t.daemon = True
          t.start()
          coord.register_thread(t)
          threads.append(t)
      return threads

In [11]:
# def _batch( iterable):
#         args = [iter(iterable)] * 2
#         return zip(*args)


def dummy_data_generator():
    while True:
        time_steps = np.random.randint(80, 150)
        mel_features = np.random.rand(time_steps, 128).astype(np.float32)
        transcript = np.random.randint(0, 28, size=np.random.randint(5, 15)).tolist()
        yield mel_features, transcript

# for batch in _batch(dummy_data_generator()):
#     print(len(batch[0]))
#     input_list, label_list = zip(*batch)
# print(_batch(dummy_data_generator()))

In [12]:
# ===============================
# 6Ô∏è‚É£ TEST TRAINING PIPELINE
# ===============================
# loader = InputBatchLoader(
#     input_size=128,
#     batch_size=2,
#     data_generator_creator=dummy_data_generator,
#     max_steps=3
# )

# inputs, seq_lengths, labels = loader.get_inputs()
# with tf.Session() as sess:
#     coord = tf.train.Coordinator()
#     threads=loader.start_threads(sess, coord)
#     for step in range(3):
#         x, sl, lb = sess.run([inputs, seq_lengths, labels])
#         print(f"\nStep {step + 1}")
#         print("Input shape:", x.shape)
#         print("Sequence lengths:", sl)
#         print("Sparse labels:", lb)
#     coord.request_stop()
#     coord.join(threads)

<pre>
 üß†**Core SpeechModel**
 CREATE INPUT LOADER
        ‚Üì
CREATE MODEL
  ‚îú‚îÄ‚îÄ __init__()
  ‚îÇ     ‚îú‚îÄ‚îÄ get_inputs()
  ‚îÇ     ‚îî‚îÄ‚îÄ _create_network()
  ‚îÇ            ‚îî‚îÄ‚îÄ _convolution() √ó many
  ‚îÇ
  ‚îú‚îÄ‚îÄ add_training_ops()
  ‚îú‚îÄ‚îÄ add_decoding_ops()
  ‚îú‚îÄ‚îÄ finalize()
        ‚Üì
CREATE SESSION
        ‚Üì
init_session()  ‚Üí variables initialized
        ‚Üì
TRAIN LOOP
  ‚îî‚îÄ‚îÄ step(loss=True, update=True)
        ‚Üì
INFERENCE
  ‚îî‚îÄ‚îÄ step(decode=True)

</pre>

In [21]:
from tensorflow.keras.initializers import GlorotUniform

def xavier_initializer():
    return GlorotUniform()

In [22]:

class SpeechModel:

  def __init__(self, input_loader: BaseInputLoader, input_size: int, num_classes: int):
        self.input_loader = input_loader
        self.input_size = input_size
        self.convolution_count = 0
        self.global_step = tf.Variable(0, trainable = False)

        # üîπ Comes from InputLoader
        self.inputs, self.sequence_lengths, self.labels = input_loader.get_inputs()

         # üîπ Build network
        self.logits = self._create_network(num_classes)

        tf.summary.histogram('logits', self.logits)

   # =========================
    # TRAINING OPS
    # =========================

  def  add_training_ops(self, learning_rate=1e-3,
                         learning_rate_decay_factor=0,
                         max_gradient_norm=5.0,
                         momentum=0.9):
    self.learning_rate = tf.Variable(
      float(learning_rate), trainable=False, dtype=tf.float32
      )
    if self.labels is not None:
            with tf.name_scope('training'):
                self.cost = tf.nn.ctc_loss(
                    self.labels,
                    self.logits,
                    self.sequence_lengths // 2
                )
                self.avg_loss = tf.reduce_mean(self.cost)
                optimizer = tf.train.AdamOptimizer(self.learning_rate)

                gradients, variable = zip(
                    *optimizer.compute_gradients(self.avg_loss)
                )
                clipped, _=tf.clip_by_global_norm(
                    gradients, max_gradient_norm
                )

                self.update = optimizer.apply_gradients(
                    zip(clipped, variable),
                    global_step=self.global_step
                )

# =========================
    # DECODING OPS
    # =========================
  def add_decoding_ops(self):

        with tf.name_scope('decoding'):
            self.decoded, self.log_probs = tf.nn.ctc_greedy_decoder(
                self.logits,
                self.sequence_lengths // 2,
                merge_repeated=True
            )
# =========================

    # SESSION INIT

    # =========================

  def init_session(self, sess, init_variables=True):

        if init_variables:

            sess.run(self.init)

        self.summary_writer.add_graph(sess.graph)

  # =========================
    # FINALIZE
    # =========================
  def finalize(self, log_dir, run_name, run_type):
    self.init = tf.global_variables_initializer()
    self.saver = tf.train.Saver()
    self.merged_summaries = tf.summary.merge_all()

    self.summary_writer = tf.summary.FileWriter(
    f"{log_dir}/{run_name}_{run_type}"
)
# =========================
    # CONVOLUTION LAYER
    # =========================

  def _convultion(self, value, filter_width, stride,
                  input_channels, out_channels,
                     apply_non_linearity=True
                  ):
    layer_id = self.convolution_count
    self.convolution_count += 1

    with tf.variable_scope(f'conv_{layer_id}'):
      filters = tf.get_variable(
                'filters',
                shape=[filter_width, input_channels, out_channels],
                initializer=xavier_initializer()
            )
      bias = tf.get_variable(
                'bias',
                shape=[out_channels],
                initializer=tf.zeros_initializer()
            )

      conv = tf.nn.conv1d(
                value, filters, stride, padding='SAME'
      )
      conv=tf.nn.bias_add(conv, bias)
      if apply_non_linearity:
                return tf.nn.relu(conv), out_channels
      else:
          return conv, out_channels

     # =========================
    # TRAIN / DECODE STEP
    # =========================
  def step(self, sess, loss=True, update=True, decode=False):

        fetches = []

        if loss:
            fetches.append(self.avg_loss)
        if decode:
            fetches.append(self.decoded)
        if update:
            fetches.append(self.update)

        feed = self.input_loader.get_feed_dict() or {}

        return sess.run(fetches, feed_dict=feed)
  @abstractmethod
  def _create_network(self, num_classes):
        pass

  def restore(self, session, checkpoint_directory: str, reset_learning_rate: float = None):
    ckpt = tf.train.get_checkpoint_state(checkpoint_directory)

    if ckpt and ckpt.model_checkpoint_path:
      self.saver.restore(session, ckpt.model_checkpoint_path)
      self.init_session(session, init_variables=False)

      if reset_learning_rate:
        session.run(self.learning_rate.assign(reset_learning_rate))

    else:
      raise FileNotFoundError('No checkpoint for evaluation found')

  def restore_or_create(self, session, checkpoint_directory: str, reset_learning_rate: float = None):
    try:
      self.restore(session, checkpoint_directory, reset_learning_rate)
    except FileNotFoundError:
      self.init_session(session, init_variables=True)


In [15]:
class Wav2LetterModel(SpeechModel):
  def _create_network(self, num_classes):
     # Input: [batch, time, 128]
        outputs, channels = self._convolution(
            self.inputs, 48, 2, self.input_size, 250
        )

        for _ in range(7):
          outputs, channels = self._convolution(
                outputs, 7, 1, channels, channels
            )
        outputs, channels = self._convolution(
            outputs, 32, 1, channels, channels * 8
        )

        outputs, channels = self._convolution(
            outputs, 1, 1, channels, channels
        )

        outputs, channels = self._convolution(
            outputs, 1, 1, channels, num_classes, False
        )

        return tf.transpose(outputs, [1, 0, 2])



In [16]:
def create_default_model(command, input_size, speech_input):
    """
    command: 'train' or 'infer'
    input_size: e.g. 128 (mel features)
    speech_input: InputBatchLoader or SingleInputLoader
    """

    # 1Ô∏è‚É£ Create model (this calls __init__ + _create_network)
    model = Wav2LetterModel(
        input_loader=speech_input,
        input_size=input_size,
        num_classes=29
    )

    # 2Ô∏è‚É£ Add ops based on mode
    if command == 'train':
        model.add_training_ops(
            learning_rate=1e-3,
            max_gradient_norm=5.0
        )
        model.add_decoding_ops()

    else:  # inference / evaluation
        model.add_training_ops()   # loss optional
        model.add_decoding_ops()

    # 3Ô∏è‚É£ Finalize graph
    model.finalize(
        log_dir='logs',
        run_name='wav2letter',
        run_type=command
    )

    return model


# TestExecutor

In [17]:
from abc import ABCMeta, abstractmethod
from functools import partial
import tensorflow as tf


In [18]:
class TestExecutor(metaclass=ABCMeta):

  def __init__(self, data_dir ='data'):
    # 1Ô∏è‚É£ Dataset reader
        self.reader = DatasetReader(data_dir)
        # 2Ô∏è‚É£ Determine feature size (e.g., 128)
        self.input_size = self.determine_input_size()

        # 3Ô∏è‚É£ Create input loader
        self.speech_input = InputBatchLoader(
            input_size=self.input_size,
            batch_size=64,
            data_generator_creator=partial(
                self.create_sample_generator,
                self.get_loader_limit_count()
            ),
            max_steps=self.get_max_steps()
        )


 # ------------------------------------
    # INPUT SIZE
  # ------------------------------------
  def determine_input_size(self):
        """
        Takes ONE sample and checks feature dimension
        """
        sample, _ = next(self.create_sample_generator(limit_count=1))
        return sample.shape[1]

  @abstractmethod
  def create_sample_generator(self, limit_count: int):
        """
        Must yield: (audio_features, transcript)
        """
        pass

  def get_max_steps(self):
        return None  # no step limit

  # ------------------------------------
    # PIPELINE START
    # ------------------------------------
  def start_pipeline(self, sess, n_threads=2):
        coord = tf.train.Coordinator()
        self.speech_input.start_threads(
            sess=sess,
            coord=coord,
            n_threads=n_threads
        )
        return coord


 # ------------------------------------
    # MODEL CREATION
    # ------------------------------------
  def create_model(self, sess, checkpoint_dir):
        model = create_default_model(
            command='evaluate',
            input_size=self.input_size,
            speech_input=self.speech_input
        )
  @abstractmethod
  def get_loader_limit_count(self) -> int:
        pass

In [19]:
import time

In [20]:
class Train(TestExecutor):

  def __init__(self, data_dir='data', limit=0):
        self.limit = limit
        super().__init__(data_dir)

  # ----------------------------------
  # DATA GENERATOR
  # ----------------------------------
  def create_sample_generator(self, limit_count: int):
      return self.reader.load_samples(
          directory='train',
          loop_infinitely=True,
          limit_count=limit_count,
          feature_type='power'
      )

  def get_loader_limit_count(self) -> int:
      return self.limit

  # ----------------------------------
  # MODEL
  # ----------------------------------
  def create_model(self, sess):
      model = create_default_model(
          command='train',
          input_size=self.input_size,
          speech_input=self.speech_input
      )

      model.restore_or_create(
          session=sess,
          checkpoint_directory='train/best-weights',
          reset_learning_rate=1e-4
      )
      return model

  def run(self):
    tf.reset_default_graph()
    with tf.Session() as sess:
      model = self.create_model(sess)
      coord = self.start_pipeline(sess, n_threads=2)

      step_time = 0.0
      loss_accum = 0.0
      current_step = 0
      print("üöÄ Begin training")

      try:
          while not coord.should_stop():
            current_step += 1
            checkpoint_step = (current_step % 1000 == 0)

            start_time = time.time()

            # üî• THIS RUNS THE GRAPH
            avg_loss, _ = model.step(
                sess,
                loss=True,
                update=True,
                decode=False
            )

            step_time += time.time() - start_time
            loss_accum += avg_loss
            if checkpoint_step:
                global_step = sess.run(model.global_step)
                learning_rate = sess.run(model.learning_rate)
                perplexity = np.exp(avg_loss) if avg_loss < 300 else float("inf")
                print(
                            f"step {global_step} | "
                            f"lr {learning_rate:.6f} | "
                            f"loss {avg_loss:.2f} | "
                            f"ppl {perplexity:.2f}"
                        )
                # SAVE MODEL
                os.makedirs('train/best-weights', exist_ok=True)
                checkpoint_path = os.path.join(
                    'train/best-weights', 'speech.ckpt'
                )

                model.saver.save(
                            sess,
                            checkpoint_path,
                            global_step=model.global_step
                        )

                print("üíæ Weights saved")

                step_time = 0.0
                loss_accum = 0.0
      except tf.errors.OutOfRangeError:
                print("‚úÖ Training finished")
      finally:
                coord.request_stop()



In [None]:
trainer = Train(
    data_dir='data',
    limit=0   # 0 = use all samples
)

trainer.run()