Importing libraries

In [None]:
import matplotlib.pyplot as mpl 
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
#import torch
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import InputLayer
from keras.utils.np_utils import to_categorical
from keras import callbacks

#!pip install librosa
import librosa as lb
import librosa.display

import random
from torch.utils.data import random_split
from tensorflow.keras.utils import Sequence
from sklearn.model_selection import train_test_split
from scipy import signal

tf.__version__



'2.8.0'

# Configuration Variables

Setting seed


In [None]:
tf.random.set_seed(101)
np.random.seed(102) #does not need to be same number as tf seed

Creating the variables used in creating the encoder, predictor, joiner and the final RNN-T Model.

In [None]:
num_hidden_encoder=512 # number of cells in the lstm hidden layers 
num_hidden_joiner=64  #number of cells in the dense layer of joiner
num_hidden_predictor=64 #number of cells in the lstm layer of the predictor
input_dim=1024 
num_predictions=2  #Number of predictions from softmax layer.  In this instance 2 Since we are focusing on jamaica and trinidad
encoder_input_shape=(15,1198)  #input shape -> result of concatenating 13 MFCC, fundamental freqeuncy and energy for each frame
predictor_input_shape=(1,2) # predictor input shape -> 2 classes we are trying to identify
joiner_input_shape=(160,) #num_encoder_dense+num_predictor_dense
batch_size=32 #batch size used in training
num_encoder_dense=128 #number of cells in the projection/dense layer of the encoder
num_predictor_dense=32 # number of cells in the projection/dense layer of the predictor


# Training Data Preprocessing

## Mount drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')


Mounted at /content/gdrive


In [None]:
%cd gdrive/MyDrive/capstone_data
audpath='/content/gdrive/MyDrive/capstone_data'

/content/gdrive/MyDrive/capstone_data


## Audio PreProcessing Util Class

This class contains all the code needed for opening, rechannelling, padding or truncating, removing background and finding the mfccs, fundamental frequency and pitch using the librosa and scipy libraries.

In [None]:
class AudioPreProc:
  def open(file):
    """ Opens given audio file returning an numpy numerical array
        and the sampling rate of the file"""
    arr, sr = lb.load(file,mono=True,sr=44100) # sampling rate of 44100 hz 
    return (arr, sr) #returns a numpy array and the sample rate

  @staticmethod
  def rechannel(aud, new_channel=1):
    """
    Converts the given audio to the given number of channels.  
    In this case it would convert any stereo audio (2 channels)
    to mono audio (1 channel)
    """"
    sig, sr = aud
    #if (sig.shape[0] == new_channel):
    if(len(sig.shape)==new_channel): #changing as libroase says (n,) is mono bu (2,n) is stereo
      return aud
    else:
      # Convert from mono to stereo by duplicating the first channel
      resig=np.array([sig,sig])
    return (resig, sr)

  @staticmethod
  def remove_silence(aud):
    """
    Removes all silence from the given audio array
    """
    sig,sr=aud
    print("Reg audio ",sig)
    clips = lb.effects.split(sig, top_db=10)
    wav_data = []
    for c in clips:
        data = sig[c[0]:c[1]]
        wav_data.append(data)
    print("Silenced audio ",wav_data)
    return (wav_data, sr)

  @staticmethod
  def pad_trunc(aud,max_ms):
    """"
    Pads the given audio with zeros to the desired length in milliseconds 
    or shortens the audio signal to the desired length.
    """
    sig,sr = aud
    sig_len=round(lb.get_duration(sig,sr=sr)*1000)
    max_len = sr//1000 * max_ms
    #print(max_len)

    if (sig_len > max_len):
      # Truncate the signal to the given length
      resig = sig[:,:max_len]
      #print("SIG",sig)
    elif (sig_len < max_len):
      resig=lb.util.fix_length(sig, size=max_len)
    return (resig,sr)

  @staticmethod
  def remove_background(aud):
    """
    Attempts to lessen the background noise of audio in order to make
    foreground features more obvious.
    """
    sig,sr=aud
    b,a = signal.butter(10, 2000/(sr/2), btype='highpass')
    sig = signal.lfilter(b,a,sig)
    return (sig,sr)

  def get_fundamental_freq(aud,hop):
    """"
    Calculates the fundamental frequency (pitch) of a given audio signal
    based on the hop length.  Normalises the values before it is returned.
    """
    sig,sr=aud
    f0 = lb.yin(sig, sr = sr, fmin = lb.note_to_hz('C2'), fmax= lb.note_to_hz('C7'),hop_length=hop)
    norm_f0=np.linalg.norm(f0)
    f0=f0/norm_f0
    return f0

  def get_energy(aud,hop):
    """
    Calculates the magnitude of a signal for each frame based on the given hop length.
    """
    sig,sr=aud
    energy=lb.feature.rms(sig, hop_length=hop, center=True)
    return energy

  def get_mfccs(sig,num_mfccs=13,window_length=0.02):
    """
    Calculates mffc coefficients for each frame given a window length in seconds.
    Return the calculate hop length and mfccs in the shape (num_mffcs,1198)
    """
    aud,sr=sig
    n_fft = int(sr * window_length)   # window length: 0.02 s
    #print(n_fft,n_fft//2)
    hop_length = n_fft // 2  
    mfccs = lb.feature.mfcc(aud, sr=sr, n_mfcc=num_mfccs, hop_length=hop_length, n_fft=n_fft)
    norm_mfcc=np.linalg.norm(mfccs)
    mfccs=mfccs/norm_mfcc
    return mfccs,hop_length

  def get_formant_freq(sig):
    """
    Calculates formant frequency based on given signal.
    """
    aud,sr=sig
    A = librosa.core.lpc(aud,4)
    rts = np.roots(A)
    rts = rts[np.imag(rts) >= 0]
    angz = np.arctan2(np.imag(rts), np.real(rts))
    frqs = angz * sr / (2 *  np.pi)
    frqs.sort()
    norm_formant=np.linalg.norm(frqs)
    fin=frqs/norm_formant
    makeup=(1198//len(fin))+1
    fin=np.tile(fin,makeup)
    return fin[:1198]
 
    

SyntaxError: ignored

# Data Generator Class

Created so that data is loaded in batches instead of all at once.

In [None]:
class DataGenerator(Sequence):
    """Generates data for Keras
    Sequence based data generator. Suitable for building data generator for training and prediction.
    """
    def __init__(self, filenames, audlabels, audio_path, 
                 to_fit=True, batch_size=batch_size, dim=encoder_input_shape,
                 n_channels=1, n_classes=2, shuffle=True):
        """Initialization
        :param filenames: list of audio file names
        :param audlabels: respective accent_id labels for filenames
        :param audio_path: path to audio locations
        :param to_fit: True to return X and y, False to return X only
        :param batch_size: batch size at each iteration
        :param dim: tuple indicating audio dimension
        :param n_channels: number of audio channels
        :param n_classes: number of output masks
        :param shuffle: True to shuffle label indexes after every epoch
        """
        self.audlabels = audlabels
        self.filenames = filenames
        self.audio_path = audio_path
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()
        self.target_dim=(1,n_classes)
        self.duration = 12000 #length in milliseconds   #rn at 10 seconds
        self.shift_pct = 0.4

    def __len__(self):
        """Denotes the number of batches per epoch
        :return: number of batches per epoch
        """
        return int(np.floor(len(self.filenames) / self.batch_size))

    def __getitem__(self, index):
        """Generate one batch of data
        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.filenames[k] for k in indexes]

        # Generate data
        X,y,target= self._generate_data(list_IDs_temp)

        if self.to_fit:
            return [X, target],y
        else:
            return X

    def on_epoch_end(self):
        """Updates indexes after each epoch
        """
        self.indexes = np.arange(len(self.filenames))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_data(self, list_IDs_temp):
        """
        Converts every audio in the given array using the AudioPreProc class.
        Also creates and returns a target array with different probability matrixes
        to train the predictor and an array with the labels for each file in the dataset.

        """
        # Initialization       
        X = np.empty((self.batch_size, *self.dim),dtype=float)
        y = np.empty((self.batch_size), dtype=float)
        target=np.empty((self.batch_size, *self.target_dim),dtype=float)
        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            (aud,sr) = AudioPreProc.open(ID)   
            tot=AudioPreProc.remove_background((aud,sr))
            tot=AudioPreProc.pad_trunc(tot,12000)
            
            mfcc,hop=AudioPreProc.get_mfccs(tot)
            #print('MFCC',mfcc.shape)
            #formant=AudioPreProc.get_formant_freq(tot)
            #formant=formant.reshape((1,formant.shape[0]))
            f0=AudioPreProc.get_fundamental_freq(tot,hop)
            f0=f0.reshape((1,f0.shape[0]))
            #print('FF',f0.shape)
            energy=AudioPreProc.get_energy(tot,hop)
            #print('energy',energy.shape)

            fin=np.concatenate([mfcc,f0,energy])
            X[i,]=fin
            y[i] = self.audlabels[i]  
           
            #target[i,]=np.array([0.1 if self.audlabels[i]!=x else 0.9 for x in range(self.n_classes)]).reshape((1,self.n_classes))
            target[i,]=np.array([np.random.uniform(low=0.1,high=0.5) if self.audlabels[i]!=x else np.random.uniform(low=0.5,high=0.1) for x in range(self.n_classes)]).reshape((1,self.n_classes))
        return X,y,target

  

# Loading Data and Data Generator Objects

In [None]:
df = pd.read_csv('labels3.csv',header=None)
df.columns=['path','accent'] #renaming columns

lst=[0 if x=="Jamaican" else 1 for x in df['accent']] #assigning class ids to text
df['accent_id']=lst
df = df.drop(columns=['accent'])
trin=df[:521]
jam=df[521:]

jam=jam.sample(521)  # random under sampling of jamaican data to get equal data from jamaican and trinidadian data sets

new_df=pd.concat([trin,jam],axis=0)



## Splitting data into training, testing and validation sets

In [None]:

X_train, X_valtest, y_train, y_valtest = train_test_split(new_df['path'], new_df['accent_id'],test_size=0.20,random_state=21)
X_val,X_test, y_val,y_test=train_test_split(X_valtest, y_valtest,test_size=0.50,random_state=22) # test size is 0.5 since we want vlength of val and test to be equal


In [None]:

# Converting everything to numpy arrays so hat indexes function in data generator works
X_train=np.array(X_train)
X_val=np.array(X_val)
X_test=np.array(X_test)
y_train=np.array(y_train)
y_val=np.array(y_val)
y_test=np.array(y_test)

#print(X_test)


## Creating validation and training data generators


In [None]:

training_generator=DataGenerator(X_train,y_train,audpath)
validation_generator=DataGenerator(X_val,y_val,audpath)
#prediction_generator=DataGenerator(X_test,y_test,audpath,to_fit=False,shuffle=False,batch_size=1)


# Creating RNNT Model

## Encoder Class

In [None]:

class Encoder(keras.layers.Layer):
  """
  Subclass of keras.layers.Layer.  
  Consists of 3 LSTM layers with 512 cells each  and a projection/dense layer with 128 cells
  """
  def __init__(self,num_hidden_encoder,encoder_input_shape,num_encoder_dense):
    """Initialization
        :param num_hidden_encoder: the number of cells in each LSTM layers
        :param encoder_input_shape: the shape of data to be used as input
        :param num_encoder_dense: the number of cells in the dense layer.
    """
    super(Encoder,self).__init__()
    self.num_hidden_e=num_hidden_encoder
    self.input_shape_e=encoder_input_shape
    self.num_enc_dense=num_encoder_dense
    self.e_input=InputLayer(input_shape=self.input_shape_e,name="encoder_input",dtype=tf.float64)
    self.e_masking=layers.Masking(mask_value=0.0,input_shape=self.input_shape_e)
    self.l1= LSTM(self.num_hidden_e,return_sequences=True,dropout=0.1,name="e_lstm1")
    self.l2= LSTM(self.num_hidden_e,return_sequences=True,dropout=0.1,name="e_lstm2")
    #self.l3= LSTM(self.num_hidden_e,return_sequences=True,dropout=0.1,name="e_lstm3")
    self.l4= LSTM(self.num_hidden_e,return_sequences=True,dropout=0.1,name="e_lstm4")
    self.e_dense=Dense(self.num_enc_dense) 
   

  def call(self, input, is_sequence=False):
    """
    Passes input from input layer to masking to the lstm layers
    and then the dense layer.  If is_sequence is true then it will return
    the results for every timestep in the input while if false it will only
    return the result for the last timestep.
    """
    i= self.e_input(input)
    mask_out=self.e_masking(i)
    output=self.l1(mask_out)
    output=self.l2(output)
    #output=self.l3(output)
    output=self.l4(output)
    fin=self.e_dense(output)
    if is_sequence:
      return fin
    else:
      return fin[:,-1]

    
  def initialize_states(self, batch_size=32):   
    """
    Creates zero tensor of shape (batch_size,num_hidden_encoder).
    Previously used to intialise lstm encoder states.
    """ 
    return (tf.zeros([batch_size, self.num_hidden_e],tf.float32),
                tf.zeros([batch_size, self.num_hidden_e], tf.float32))
    
  def get_config(self):
    config = super(Encoder, self).get_config()
    #config={}
    config.update({"num_hidden_encoder": self.num_hidden_e})
    config.update({"encoder_input_shape": self.input_shape_e})
    return config

## Predictor Class

In [None]:

class Single_Step_Predictor(keras.layers.Layer):
  """
  Subclass of keras.layers.Layer
  The predictor class of the RNN-T model. Consists of 1 lstm layer with 64 units
  and a dense projection layer with 32 units.
  """

  def __init__(self, num_hidden_predictor,predictor_input_shape,num_predictor_dense):
    """Initialization
        :param num_hidden_predictor: the number of cells in each LSTM layer
        :param predictor_input_shape: the shape of data to be used as input
        :param num_predictor_dense: the number of cells in the dense layer.
    """
    super(Single_Step_Predictor,self).__init__()
    self.num_hidden_p=num_hidden_predictor
    self.num_pred_dense=num_predictor_dense
    self.input_shape_p=predictor_input_shape
    self.p_input=InputLayer(input_shape=self.input_shape_p,name="input_predictor")
    self.pl1= LSTM(self.num_hidden_p,return_state=True,dropout=0.1,name="p_lstm")
    #self.pl2= LSTM(self.num_hidden_p,return_state=True, dropout=0.2)
    self.p_dense=Dense(self.num_pred_dense)
   

  def call(self, input, state_h,state_c):
    """
    Call method of the predictor. Accepts a probability matrix, hidden state and 
    a cell state and uses the hiddden and cell state as the initial state of the
    lstm layer.
    """
    i=self.p_input(input)
    output,hidd_state, cell_state=self.pl1(i,initial_state=[state_h,state_c])
    fin=self.p_dense(output)

    return fin, hidd_state, cell_state

  def initialize_states(self, batch_size=4):
    """
    Creates a zero tensor to act as the initial states of the lstm layers 
    based on the given batch size and the number of cells in the lstm layer.
    """    
    return (tf.zeros([batch_size, self.num_hidden_p],tf.float32),
                tf.zeros([batch_size, self.num_hidden_p],tf.float32))
  
  def get_config(self):
    config = super(Single_Step_Predictor, self).get_config()
    #config={}
    config.update({"num_hidden_predictor": self.num_hidden_p})
    config.update({"predictor_input_shape": self.input_shape_p})
    return config



## Joiner Class

In [None]:
#@tf.keras.utils.register_keras_serializable(package='Custom', name=None)
class Joiner(keras.layers.Layer):
  """
  Subclass of keras.layers.Layer
  The joiner class of the RNN-T model. Consists of 1 Dense layer with 64 units
  and a dense layer with softmax activation with 2 units (the number of classes).
  """
  def __init__(self, num_hidden_joiner, joiner_input_shape, num_predictions):
    """Initialization
        :param num_hidden_joinerr: the number of cells in the regular Dense layer
        :param joiner_input_shape: the shape of data to be used as input
        :param num_predictions: the number of cells in the dense layer with softmax activation.
    """
    super(Joiner,self).__init__()
    self.num_hidden_j=num_hidden_joiner
    self.input_shape_j=joiner_input_shape
    self.num_predictions=num_predictions
    self.j_input=InputLayer(input_shape=self.input_shape_j,name="joiner_input")
    self.d1= Dense(self.num_hidden_j,activation=None,name="joiner_dense",input_shape=self.input_shape_j)
    self.d2=Dense(self.num_predictions,activation='softmax')
    
  
  def call(self, input):
    """
    Simple layer call. Passes input through the input layer, then the first
    dense layer then the softmax activation layer.  Returns the result of the
    softmax activation layer.
    """
    i=self.j_input(input)
    intermediate=self.d1(i)
    fin= self.d2(intermediate)

    return fin

  def get_config(self):

    config = super(Joiner, self).get_config()
    config.update({"num_hidden_joiner": self.num_hidden_j})
    config.update({"joiner_input_shape": self.input_shape_j})
    config.update({"num_predictions":self.num_predictions})
    return config


## RNNT CLass

In [None]:

class RNNT3(keras.Model):
  """
  A model which consists of an encoder, and a predictor and a joiner.
  For each frame the encoder processes the input while the predictor 
  processes the previous prediction.  The output of encoder and the output
  of the predictor is then concatenated and used as input to the joiner layer
  which will produce a probability matrix.  This probability matrix will
  then be used as input for the predictor on the next time step.
  """
  def __init__(self, num_hidden_encoder, num_hidden_predictor, num_hidden_joiner, num_predictions, joiner_input_shape,predictor_input_shape,encoder_input_shape,num_encoder_dense,num_predictor_dense):
    """Initialization
        :param num_hidden_joiner: the number of cells in the joiner's regular Dense layer.
        :param joiner_input_shape: the shape of data to be used as input to the joiner.
        :param num_predictions: the number of cells in the joiner's dense layer with softmax activation.
        :param num_hidden_predictor: the number of cells in the predictor's LSTM layer.
        :param predictor_input_shape: the shape of data to be used as input to the predictor.
        :param num_predictor_dense: the number of cells in the predictor's dense layer.
        :param num_hidden_encoder: the number of cells in each of the encoder's LSTM layers.
        :param encoder_input_shape: the shape of data to be used as input to the encoder.
        :param num_encoder_dense: the number of cells in the encoder's dense layer.
    """
    
    super(RNNT3,self).__init__()
    self.num_hidden_encoder=num_hidden_encoder
    self.num_hidden_predictor=num_hidden_predictor
    self.num_hidden_joiner=num_hidden_joiner
    self.num_predictions=num_predictions
    self.num_enc_dense=num_encoder_dense
    self.num_pred_dense=num_predictor_dense
    self.joiner_input_shape=joiner_input_shape
    self.predictor_input_shape=predictor_input_shape
    self.encoder_input_shape=encoder_input_shape
    self.encoder=Encoder(self.num_hidden_encoder,self.encoder_input_shape,num_encoder_dense)
    self.predictor=Single_Step_Predictor(self.num_hidden_predictor,self.predictor_input_shape,num_predictor_dense)
    self.joiner=Joiner(self.num_hidden_joiner, self.joiner_input_shape, self.num_predictions)

  def call(self, inputs):
    """
    Used for training only.
    States of the predictor is initialised.  The input sequence
    is broken into the features extracted from the audio file
    and a target array containing probability matrixes corresponding to an accent.
    The encoder is given the features as input while the predictor is give the target array.
    Output for the last timestep in the encoder and the predictor is concatenated and used
    as input to the joiner.

    Previously the call method was implemented in a way similar to make_prediction,
    but validation accuracy still stagnated at 56.25%
    """
    pred_init=self.predictor.initialize_states(32)
    hid,cell=pred_init[0],pred_init[1]
    encoder_out=self.encoder(inputs[0],is_sequence=False) 
    pred_out,hid,cell=self.predictor(inputs[1],hid,cell)  ## only expecting 1 value from y but we need two
    joiner_input=tf.concat([encoder_out,pred_out],axis=1)
    fin=self.joiner(joiner_input)
    return fin

 
  def one_step_decode(self,encoder_out,timestep):
    """
    For every timestep in the output of the encoder,
    the predictor is called, the output of 
    the predictor and the output of the encoder for that timestepp
    is given to the joiner and a prediction is made which is then 
    used by the predictor in the calculation for the next frame.

    Returns the last prediction made by the joiner (i.e.
    the prediction made using the last timestep of the encoder output) 
    """
    prev_predict=[]
    pred_init=self.predictor.initialize_states(1)
    state_h=pred_init[0]
    state_c=pred_init[1]
    pred_out=np.array([[0.5,0.5]]).reshape(self.predictor_input_shape)
    for step in range(self.encoder_input_shape[0]):
        pred_out, state_h, state_c = self.predictor(tf.reshape(pred_out,[1,1,2]), state_h,state_c,training=False)
        joiner_input=tf.concat([tf.reshape(encoder_out[step],[1,self.num_enc_dense]),pred_out],axis=1) ##change to num_hidden_encoder
        pred_out=self.joiner(joiner_input,training=False)
        prev_predict=pred_out
    return tf.convert_to_tensor(prev_predict)

  @tf.function
  def make_prediction(self,input,batch_size):
    """
    Used in validation. Propagates audio through the encoder,
    predictor and joinger for every timestep by batch.

    For every timestep in the output of the encoder,
    the predictor is called, the output of 
    the predictor and the output of the encoder for that timestepp
    is given to the joiner and a prediction is made which is then 
    used by the predictor in the calculation for the next frame.

    Returns two arrays containing the classified accent labels and the probability matrix 
    for each sample in the batch.
    """
    prev_predict=tf.TensorArray(tf.float32,size=batch_size)
    np_output=tf.TensorArray(tf.int64, size=batch_size)

    argmax_outputs=tf.TensorArray(tf.float32, size=batch_size)

    encoder_outputs= self.encoder(input, is_sequence=True,training=False)

    ins=tf.convert_to_tensor(encoder_outputs)

    timesteps = ins.shape[1]
    pred_init=self.predictor.initialize_states(batch_size)
    state_h=pred_init[0]
    state_c=pred_init[1]
    pred_out=tf.reshape([0.5 for i in range(batch_size*self.num_predictions)],[batch_size,1,num_predictions])
    for step in range(self.encoder_input_shape[0]):
      pred_out, state_h, state_c = self.predictor(tf.reshape(pred_out,[batch_size,1,2]), state_h,state_c,training=False)
      joiner_input=tf.concat([tf.reshape(ins[:,step],[batch_size,self.num_enc_dense]),pred_out],axis=1) ##change to num_hidden_encoder
      pred_out=self.joiner(joiner_input,training=False)
      prev_predict=pred_out

    np_output=tf.math.argmax(prev_predict)
    argmax_output=prev_predict

    return np_output, argmax_output

  def predict_val(self,input,batch_size):
    """ Calls one_step_decode for every audio in a batch.
    Used to make predictions, outside of training and validation.
    Replaces model.predict()
    """
    np_output=[]
    argmax_outputs=[]
    encoder_outputs= self.encoder(input, is_sequence=True,training=False)
    ins=tf.convert_to_tensor(encoder_outputs)
    timesteps = ins.shape[1]
    print(ins.shape)
    for i in range(batch_size): # for every input in the batch
        decoded_seq = self.one_step_decode(ins[i], timesteps)
        np_output.append(np.argmax(decoded_seq))
    
        argmax_outputs.append(decoded_seq)
    return np_output,argmax_outputs

  @tf.function
  def test_step(self, data):
    """
    Validation step. Calculates losses based on the 
    results of make_prediction.
    """
    # Unpack the data
    X, y = data
    # Compute predictions
    #y_pred_not_np,raw = self(x, training=False)
    y_pred_not_np, raw = self.make_prediction(X[0],32)
    raw=tf.reshape(raw,[32,2])
    # Updates the metrics tracking the loss
    self.compiled_loss(tf.convert_to_tensor(y),tf.convert_to_tensor(raw) ,regularization_losses=self.losses)
    # Update the metrics.
    self.compiled_metrics.update_state(tf.convert_to_tensor(y),tf.convert_to_tensor(raw))
    # Return a dict mapping metric names to current value.
    # Note that it will include the loss (tracked in self.metrics).
    return {m.name: m.result() for m in self.metrics}
  
  
  """def get_config(self):
    #config = super(RNNT3, self).get_config()
    config={}
    config.update({"num_hidden_encoder": self.num_hidden_encoder})
    config.update({"num_hidden_predictor": self.num_hidden_predictor})
    config.update({"num_hidden_joiner": self.num_hidden_joiner})
    config.update({"joiner_input_shape": self.joiner_input_shape})
    config.update({"predictor_input_shape": self.predictor_input_shape})
    config.update({"encoder_input_shape": self.encoder_input_shape})
    config.update({"num_predictions":self.num_predictions})
    return config"""


# Compiling and training Model

In [None]:

rnn3=RNNT3(num_hidden_encoder, num_hidden_predictor, num_hidden_joiner, num_predictions, joiner_input_shape,predictor_input_shape,encoder_input_shape,num_encoder_dense,num_predictor_dense)

earlystopping = callbacks.EarlyStopping(monitor ="val_loss", 
                                        mode ="min", patience = 5, 
                                        restore_best_weights = True)

checkpoint_path = "checkpoints/cp_teststep_with512_encoder-epoch{epoch:04d}_acc{val_accuracy:.3f}.ckpt"


cp_callback = tf.keras.callbacks.ModelCheckpoint(
    monitor='val_loss',
    verbose=1, 
    save_best_only=True,
    mode='min',
    filepath=checkpoint_path,   
    save_weights_only=True,
    )



In [None]:
opt = keras.optimizers.Adam(learning_rate=0.001)

rnn3.compile(optimizer=opt, loss='sparse_categorical_crossentropy', metrics=['accuracy'])


## Training rnnt3

In [None]:
hist=rnn3.fit(training_generator, validation_data=validation_generator,epochs=10,callbacks =[earlystopping,cp_callback],verbose=1) 
#hist=rnn3.fit(training_generator,epochs=1,verbose=1)


# Testing Model




In [None]:
def get_prediction_data(X_test):
 
  batch_size=len(X_test)
  X = np.empty((batch_size, *encoder_input_shape),dtype=float)
  print('Batch size: ',batch_size)
  for i in range(batch_size):
    (aud,sr) = AudioPreProc.open(X_test[i])   
    tot = AudioPreProc.rechannel((aud,sr), 1)
    tot=AudioPreProc.pad_trunc(tot,12000)
            
    mfcc,hop=AudioPreProc.get_mfccs(tot)
    #print('MFCC',mfcc.shape)
    f0=AudioPreProc.get_fundamental_freq(tot,hop)
    f0=f0.reshape((1,f0.shape[0]))
    #print('FF',f0.shape)
    energy=AudioPreProc.get_energy(tot,hop)
    #print('energy',energy.shape)

    fin=np.concatenate([mfcc,f0,energy])
    X[i,]=fin
    
  return batch_size, X


batchsize,Xt=get_prediction_data(X_test[:10])

Batch size:  10


In [None]:

#rnn3.load_weights('checkpoints/checkpoints/cpc-epoch0005--loss0.6417.ckpt') #cpc has 256 enc, 4 pcells, joiner input of 260, overall prediction of 49% good
#mix of jam and trinidadian 256 denser encoder, 4 dense predictor

#rnn3.load_weights('checkpoints/cp_teststep_with64_predictor-epoch0001_acc0.562.ckpt')
pred,non_argmax=rnn3.predict_val(Xt,10)

(10, 15, 128)


### Accuracy Rate Calculation

In [None]:

pred=np.array(pred)
count=0
for i in range(len(pred)):
  if pred[i]==y_test[i]:
    count+=1
print(count/len(pred)*100)

print(np.array(pred))
print(y_test)