In [None]:
import os

path = "/content/drive/My Drive"
os.chdir(path)

import csv
import numpy as np
import tensorflow as tf
import tensorflow.compat.v1 as tfv1
import scipy.io.wavfile as wav

from tensorflow.keras import layers
from tensorflow.keras import Model

!pip install python_speech_features
from python_speech_features import mfcc

In [None]:
# definition of DeepSpeech architecture
class DeepSpeech(Model):
  def __init__(self, n_hidden, n_cell_dim, n_hidden_out, n_context, n_input):
    super(DeepSpeech, self).__init__()
    self.dense1 = layers.Dense(n_hidden, activation='relu')
    self.dense2 = layers.Dense(n_hidden, activation='relu')
    self.dense3 = layers.Dense(n_cell_dim, activation='relu')
    self.lstm = layers.LSTM(n_cell_dim, return_sequences=True)
    self.dense5 = layers.Dense(n_hidden, activation='relu')
    self.dense_out = layers.Dense(n_hidden_out)

    self.n_hidden = n_hidden
    self.n_cell_dim = n_cell_dim
    self.n_hidden_out = n_hidden_out
    self.n_context = n_context
    self.n_input = n_input

  def create_overlapping_windows(self, batch_x, n_context, n_input):
    batch_size = tf.shape(input=batch_x)[0]
    window_width = 2 * n_context + 1
    num_channels = n_input

    # Create a constant convolution filter using an identity matrix, so that the
    # convolution returns patches of the input tensor as is, and we can create
    # overlapping windows over the MFCCs.
    eye_filter = tf.constant(np.eye(window_width * num_channels).
                             reshape(window_width, num_channels, window_width * num_channels), tf.float32)

    # Create overlapping windows
    batch_x = tf.nn.conv1d(input=batch_x, filters=eye_filter, stride=1, padding='SAME')

    # Remove dummy depth dimension and reshape into [batch_size, n_windows, window_width, n_input]
    batch_x = tf.reshape(batch_x, [batch_size, -1, window_width, num_channels])

    return batch_x

  def call(self, x):
    batch_size = tf.shape(input=x)[0]

    x = self.create_overlapping_windows(x, self.n_context, self.n_input)
    x = tf.reshape(x, [-1, self.n_input + 2 * self.n_input * self.n_context])

    x = self.dense1(x)
    x = self.dense2(x)
    x = self.dense3(x)

    x = tf.reshape(x, [batch_size, -1, self.n_hidden])

    x = self.lstm(x)

    x = tf.reshape(x, [-1, self.n_cell_dim])

    x = self.dense5(x)
    x = self.dense_out(x)

    x = tf.reshape(x, [batch_size, -1, self.n_hidden_out])
    x = tf.transpose(x, [1, 0, 2]) # transpose to time major
    
    return x

In [None]:
# helpers
def sparse_tuple_from(sequences, dtype=np.int32):
  """Create a sparse representention of x.
  Args:
    sequences: a list of lists of type dtype where each element is a sequence
  Returns:
    A tuple with (indices, values, shape)
  """

  indices = []
  values = []

  for n, seq in enumerate(sequences):
    indices.extend(zip([n]*len(seq), range(len(seq))))
    values.extend(seq)

    indices = np.asarray(indices, dtype=np.int64)
    values = np.asarray(values, dtype=dtype)
    shape = np.asarray([len(sequences), np.asarray(indices).max(0)[1]+1], dtype=np.int64)

  return indices, values, shape

# The following code is from: http://hetland.org/coding/python/levenshtein.py

# This is a straightforward implementation of a well-known algorithm, and thus
# probably shouldn't be covered by copyright to begin with. But in case it is,
# the author (Magnus Lie Hetland) has, to the extent possible under law,
# dedicated all copyright and related and neighboring rights to this software
# to the public domain worldwide, by distributing it under the CC0 license,
# version 1.0. This software is distributed without any warranty. For more
# information, see <http://creativecommons.org/publicdomain/zero/1.0>

def levenshtein(a, b):
  "Calculates the Levenshtein distance between a and b."
  n, m = len(a), len(b)
  if n > m:
    # Make sure n <= m, to use O(min(n,m)) space
    a, b = b, a
    n, m = m, n

  current = list(range(n+1))
  for i in range(1, m+1):
    previous, current = current, [i]+[0]*n
    for j in range(1, n+1):
      add, delete = previous[j]+1, current[j-1]+1
      change = previous[j-1]
      if a[j-1] != b[i-1]:
        change = change + 1
      current[j] = min(add, delete, change)

  return current[n]

def WER(truth, hypothesis):
  return levenshtein(truth.split(), hypothesis.split()) / len(truth.split())

In [None]:
# prepare dataset

# Constants
SPACE_TOKEN = '<space>'
SPACE_INDEX = 0
FIRST_INDEX = ord('a') - 1

# Some configs
num_features = 13
num_classes = ord('z') - ord('a') + 1 + 1 + 1

# import data
!python DeepSpeech/import_ldc93s1.py DeepSpeech/data/ldc93s1

train_files = "DeepSpeech/data/ldc93s1/ldc93s1.csv"

# TO BE CONTINUED
with open(train_files) as f:
  reader = csv.reader(f)
  first_row = next(reader)
  for row in reader:
    audio_file = row[0]
    src_transcript = row[2]

fs, audio = wav.read(audio_file)
feature = mfcc(audio, samplerate=fs)
feature = np.asarray(feature[np.newaxis, :])
feature = (feature - np.mean(features))/np.std(feature)
seq_len = [feature.shape[1]]

#transcript = ' '.join(transcript.strip().lower().split(' ')[2:]).replace('.', '')
transcript = src_transcript.replace(' ', '  ')
transcript = transcript.split(' ')
transcript = np.hstack([SPACE_TOKEN if x == '' else list(x) for x in transcript])
transcript = np.asarray([SPACE_INDEX if x == SPACE_TOKEN else ord(x) - FIRST_INDEX for x in transcript])
transcript = sparse_tuple_from([transcript])
transcript = tf.sparse.SparseTensor(transcript[0],transcript[1],transcript[2])

In [None]:
# training and testing
model = DeepSpeech(n_hidden=100,n_cell_dim=100,n_hidden_out=num_classes,n_context=7,n_input=num_features)

loss_fn = tfv1.nn.ctc_loss
optimizer = tf.keras.optimizers.Adam()
train_loss = tf.keras.metrics.Mean(name='train_loss')
test_loss = tf.keras.metrics.Mean(name='test_loss')

@tf.function
def train(feature, transcript, sequence_length):
  with tf.GradientTape() as tape:
    # training=True is only needed if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
    prediction = model(feature, training=True)
    loss = loss_fn(transcript, prediction, sequence_length)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)

def test(feature, transcript, sequence_length, src_transcript):
  # training=False is only needed if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  prediction = model(feature, training=False)
  loss = loss_fn(transcript, prediction, sequence_length)

  test_loss(loss)

  decoded,_ = tf.nn.ctc_beam_search_decoder(prediction, sequence_length, beam_width = 500)
  decoded = decoded[0].values.numpy()
  res_transcript = ''.join([chr(x) for x in np.asarray(decoded) + FIRST_INDEX])
  res_transcript = res_transcript.replace(chr(ord('z') + 1), '')
  res_transcript = res_transcript.replace(chr(ord('a') - 1), ' ')

  wer = WER(src_transcript, res_transcript)
  
  return res_transcript, wer

EPOCHS = 200

for epoch in range(EPOCHS):
  # Reset the metrics at the start of the next epoch
  train_loss.reset_states()
  test_loss.reset_states()

  train(feature, transcript, seq_len)

  res_transcript, wer = test(feature, transcript, seq_len, src_transcript)

  template = 'Epoch {}, TRAIN Loss: {}, Test Loss: {}, WER: {} \n src: {} \n res: {}'
  print(template.format(epoch + 1,
            train_loss.result(),
            test_loss.result(),
            wer,
            src_transcript,
            res_transcript))