In [1]:
import tensorflow as tf
import numpy as np
import plotly.express as px
import os
from sklearn.model_selection import train_test_split

2023-08-22 18:39:03.617023: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
print(tf.config.list_physical_devices('GPU')[0])

PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


2023-08-22 18:39:05.774496: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-22 18:39:05.802103: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-08-22 18:39:05.802306: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

In [3]:
path = r'path'
input_file1 = 'dev.npy'
input_file2 = 'dev_labels.npy'
utterances = np.load(
    file=os.path.join(path, input_file1),
    allow_pickle=True,
    encoding='bytes'
    )
phoneme_states = np.load(
    file=os.path.join(path, input_file2),
    allow_pickle=True,
    encoding='bytes'
    )

In [5]:
rs=121
train_utterances, val_utterances, train_phoneme_states, val_phoneme_states = train_test_split(
    utterances, phoneme_states,
    test_size=0.15,
    random_state=rs
    )
print('Training array:', train_utterances.shape, '\nValidation array:', val_utterances.shape)

Training array: (937,) 
Validation array: (166,)


In [6]:
# np.save('train.npy', train_utterances)
# np.save('train_labels.npy', train_phoneme_states)
# np.save('val.npy', val_utterances)
# np.save('val_labels.npy', val_phoneme_states)

In [5]:
unique_phonemes = []
for i in train_phoneme_states:
    for j in i:
        if j not in unique_phonemes:
            unique_phonemes.append(j)
print(len(unique_phonemes))

138


In [6]:
num_frames = []
for i in utterances:
    frames = i.shape[0]
    num_frames.append(frames)
print(max(num_frames))

1738


In [7]:
fig = px.histogram(
    x=num_frames,
    nbins=40
    )
fig.update_layout(
    showlegend=False,
    width=1000,
    height=600,
    template="plotly_dark",
    )
fig.update_xaxes(title_text='Frames')
fig.update_yaxes(title_text='Count')
fig.show()

In [5]:
class SpeechDataGeneratorRNN(tf.keras.utils.Sequence):
    def __init__(
        self,
        filenames,
        path = r'path',
        max_frames=None,
        pad_frames=1738,
        batch_size=2,
        shuffle=True
        ):
        self.dataX, self.dataY = self._load_np_arrays(filenames, path)
        self.batch_size = batch_size
        self.shuffle = shuffle        
        self.num_frequencies = self.dataX[0].shape[1]
        self.idxMap = []
        for utterance_idx in range(self.dataX.shape[0]):
            self.idxMap.append(utterance_idx)
        self.pad_frames = self._get_max_frames()
        if max_frames == None:
            self.max_frames = self.pad_frames
        else:
            self.max_frames = max_frames
        self.num_classes = self._get_num_classes()
            
    def _load_np_arrays(self, filenames, path):
        f1, f2 = filenames
        utterances = np.load(
            file=os.path.join(path, f1),
            allow_pickle=True,
            encoding='bytes'
            )
        phoneme_states = np.load(
            file=os.path.join(path, f2),
            allow_pickle=True,
            encoding='bytes'
            )
        return utterances, phoneme_states
    
    def _get_max_frames(self):
        num_frames = []
        for utterance in self.dataX:
            frames = utterance.shape[0]
            num_frames.append(frames)
        return max(num_frames)
    
    def _get_num_classes(self):
        unique_phonemes = []
        for i in self.dataY:
            for j in i:
                if j not in unique_phonemes:
                    unique_phonemes.append(j)
        return len(unique_phonemes)
               
    def __getitem__(
        self,
        batch_idx
        ):
        batch_idxMap = self.idxMap[batch_idx*self.batch_size : (batch_idx+1)*self.batch_size]
        batchedX = np.empty((self.batch_size, self.max_frames, self.num_frequencies))
        batchedY = np.empty((self.batch_size, self.max_frames))
        for batch_idx, utterance_idx in enumerate(batch_idxMap):
            utterance = self.dataX[utterance_idx]
            utterance = (utterance - (np.mean(utterance) + 1e-8)) / np.std(utterance)
            phoneme_vector = self.dataY[utterance_idx].reshape(-1,1)
            pad_size = self.pad_frames - utterance.shape[0]
            if pad_size % 2 == 0:
                zero_padding = tf.constant([[pad_size//2, pad_size//2,], [0, 0]])
            else:
                zero_padding = tf.constant([[pad_size//2, pad_size//2 + 1,], [0, 0]])
            utterance = tf.pad(
                tensor = utterance,
                paddings = zero_padding,
                mode = "CONSTANT"
                ).numpy()[:self.max_frames, :]
            phoneme_vector = tf.pad(
                tensor = phoneme_vector,
                paddings = zero_padding,
                mode = "CONSTANT",
                constant_values = self.num_classes
                ).numpy()[:self.max_frames, :].reshape(-1,)
            batchedX[batch_idx,] = utterance
            batchedY[batch_idx,] = phoneme_vector
        return batchedX, batchedY
    
    def __len__(self):
        return len(self.idxMap) // self.batch_size
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.idxMap)

In [6]:
batch_size = 8
train_data_generator = SpeechDataGeneratorRNN(
    filenames=('train.npy', 'train_labels.npy'),
    batch_size=batch_size,
    shuffle=True
    )
val_data_generator = SpeechDataGeneratorRNN(
    filenames=('val.npy', 'val_labels.npy'),
    batch_size=batch_size,
    shuffle=True
    )    

RNN

In [38]:
rnn = tf.keras.Sequential()
rnn.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
rnn.add(
    tf.keras.layers.SimpleRNN(64, return_sequences=True, activation='tanh')
    )
rnn.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
rnn._name = "RNN"
rnn.summary()

Model: "RNN"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 simple_rnn_2 (SimpleRNN)    (None, None, 64)          6720      
                                                                 
 dense_6 (Dense)             (None, None, 139)         9035      
                                                                 
Total params: 15755 (61.54 KB)
Trainable params: 15755 (61.54 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [39]:
rnn.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
rnn.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )



<keras.src.callbacks.History at 0x7f8898da2ac0>

BIDIRECTIONAL RNN

In [26]:
rnn_bidirectional = tf.keras.Sequential()
rnn_bidirectional.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
rnn_bidirectional.add(
    tf.keras.layers.Bidirectional(
        tf.keras.layers.SimpleRNN(64, return_sequences=True, activation='tanh')
        )
    )
rnn_bidirectional.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
rnn_bidirectional._name = "Bidirectional_RNN"
rnn_bidirectional.summary()

Model: "Bidirectional_RNN"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional (Bidirection  (None, None, 128)         13440     
 al)                                                             
                                                                 
 dense_1 (Dense)             (None, None, 139)         17931     
                                                                 
Total params: 31371 (122.54 KB)
Trainable params: 31371 (122.54 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [27]:
rnn_bidirectional.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
rnn_bidirectional.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )



<keras.src.callbacks.History at 0x7f88d0a5dd30>

GRU

In [28]:
gru = tf.keras.Sequential()
gru.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
gru.add(
    tf.keras.layers.GRU(64, return_sequences=True, activation='tanh')
    )
gru.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
gru._name = "GRU"
gru.summary()

Model: "GRU"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 gru (GRU)                   (None, None, 64)          20352     
                                                                 
 dense_2 (Dense)             (None, None, 139)         9035      
                                                                 
Total params: 29387 (114.79 KB)
Trainable params: 29387 (114.79 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [29]:
gru.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
gru.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )

2023-08-21 16:11:40.548919: I tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:432] Loaded cuDNN version 8600




<keras.src.callbacks.History at 0x7f88d0228070>

BIDIRECTIONAL GRU

In [30]:
gru_bidirectional = tf.keras.Sequential()
gru_bidirectional.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
gru_bidirectional.add(
    tf.keras.layers.Bidirectional(
        tf.keras.layers.GRU(64, return_sequences=True, activation='tanh')
        )
    )
gru_bidirectional.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
gru_bidirectional._name = "Bidirectional_GRU"
gru_bidirectional.summary()

Model: "Bidirectional_GRU"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional_1 (Bidirecti  (None, None, 128)         40704     
 onal)                                                           
                                                                 
 dense_3 (Dense)             (None, None, 139)         17931     
                                                                 
Total params: 58635 (229.04 KB)
Trainable params: 58635 (229.04 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [31]:
gru_bidirectional.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
gru_bidirectional.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )



<keras.src.callbacks.History at 0x7f88c18867c0>

LSTM

In [32]:
lstm = tf.keras.Sequential()
lstm.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
lstm.add(
    tf.keras.layers.LSTM(64, return_sequences=True, activation='tanh')
    )
lstm.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
lstm._name = 'LSTM'
lstm.summary()

Model: "LSTM"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, None, 64)          26880     
                                                                 
 dense_4 (Dense)             (None, None, 139)         9035      
                                                                 
Total params: 35915 (140.29 KB)
Trainable params: 35915 (140.29 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [34]:
lstm.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
lstm.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )



<keras.src.callbacks.History at 0x7f88a41c5fa0>

BIDIRECTIONAL LSTM

In [35]:
lstm_bidirectional = tf.keras.Sequential()
lstm_bidirectional.add(tf.keras.Input(shape=(None, train_data_generator.num_frequencies)))
lstm_bidirectional.add(
    tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(64, return_sequences=True, activation='tanh')
        )
    )
lstm_bidirectional.add(
    tf.keras.layers.Dense(len(unique_phonemes)+1)
    )
lstm_bidirectional._name = 'Bidirectional_LSTM'
lstm_bidirectional.summary()

Model: "Bidirectional_LSTM"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional_2 (Bidirecti  (None, None, 128)         53760     
 onal)                                                           
                                                                 
 dense_5 (Dense)             (None, None, 139)         17931     
                                                                 
Total params: 71691 (280.04 KB)
Trainable params: 71691 (280.04 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [37]:
lstm_bidirectional.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
    )
lstm_bidirectional.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=1
    )



<keras.src.callbacks.History at 0x7f889dbc0820>

BIDIRECTIONAL GRU & BIDIRECTIONAL LSTM both performed better than the rest, plus they are time saving too in comparison with RNN

In [14]:
class SpeechDataGeneratorRNN(tf.keras.utils.Sequence):
    def __init__(
        self,
        filenames,
        path = r'path',
        max_frames=None,
        batch_size=2,
        shuffle=True
        ):
        self.dataX, self.dataY = self._load_np_arrays(filenames, path)
        self.batch_size = batch_size
        self.shuffle = shuffle        
        self.num_frequencies = self.dataX[0].shape[1]
        self.idxMap = []
        for utterance_idx in range(self.dataX.shape[0]):
            self.idxMap.append(utterance_idx)
        self.pad_frames = self._get_max_frames()
        if max_frames == None:
            self.max_frames = self.pad_frames
        else:
            self.max_frames = max_frames
        self.num_classes = self._get_num_classes()
            
    def _load_np_arrays(self, filenames, path):
        f1, f2 = filenames
        utterances = np.load(
            file=os.path.join(path, f1),
            allow_pickle=True,
            encoding='bytes'
            )
        phoneme_states = np.load(
            file=os.path.join(path, f2),
            allow_pickle=True,
            encoding='bytes'
            )
        return utterances, phoneme_states
    
    def _get_max_frames(self):
        num_frames = []
        for utterance in self.dataX:
            frames = utterance.shape[0]
            num_frames.append(frames)
        return max(num_frames)
    
    def _get_num_classes(self):
        unique_phonemes = []
        for i in self.dataY:
            for j in i:
                if j not in unique_phonemes:
                    unique_phonemes.append(j)
        return len(unique_phonemes)
               
    def __getitem__(
        self,
        batch_idx
        ):
        batch_idxMap = self.idxMap[batch_idx*self.batch_size : (batch_idx+1)*self.batch_size]
        batchedX = np.empty((self.batch_size, self.max_frames, self.num_frequencies))
        batchedY = np.empty((self.batch_size, self.max_frames))
        for batch_idx, utterance_idx in enumerate(batch_idxMap):
            utterance = self.dataX[utterance_idx]
            utterance = (utterance - (np.mean(utterance, axis=0) + 1e-8))
            phoneme_vector = self.dataY[utterance_idx].reshape(-1,1)
            pad_size = self.pad_frames - utterance.shape[0]
            zero_padding = tf.constant([[0, pad_size,], [0, 0]])
            utterance = tf.pad(
                tensor = utterance,
                paddings = zero_padding,
                mode = "CONSTANT"
                ).numpy()[:self.max_frames, :]
            phoneme_vector = tf.pad(
                tensor = phoneme_vector,
                paddings = zero_padding,
                mode = "CONSTANT",
                constant_values = self.num_classes
                ).numpy()[:self.max_frames, :].reshape(-1,)
            batchedX[batch_idx,] = utterance
            batchedY[batch_idx,] = phoneme_vector
        return batchedX, batchedY
    
    def __len__(self):
        return len(self.idxMap) // self.batch_size
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.idxMap)

In [15]:
class Conv1DBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        out_channels,
        kernel_size=3,
        include_bn=True
        ):
        super().__init__()
        self.conv = tf.keras.layers.Conv1D(out_channels, kernel_size, padding='same')
        self.act = tf.nn.tanh
        self.include_bn = include_bn
        self.dropout = tf.nn.dropout
        if self.include_bn == True:
            self.bn = tf.keras.layers.BatchNormalization()
        
    def call(
        self,
        input_tensor,
        training=False
        ):
        rate = 0.3
        x = self.conv(input_tensor, training=training)
        x = self.act(x)
        # x = self.act(x, beta=beta)
        x = self.dropout(x, rate=rate)
        if self.include_bn == True:
            return self.bn(x, training=training)
        else:
            return x 
    
class ResidualConv1DBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        block_channels
        ):
        super().__init__()
        self.cnn_blocks = [Conv1DBlock(out_channels) for out_channels in block_channels] 
        self.pooling = tf.keras.layers.MaxPooling1D()
        self.skip_connection = tf.keras.layers.Conv1D(
            block_channels[-1],
            1,
            padding='same'
            )
        self.act = tf.nn.tanh
        self.bn = tf.keras.layers.BatchNormalization()
        
    def call(
        self,
        input_tensor,
        training=False,
        ):
        leak = 0.02
        x = tf.keras.Sequential(self.cnn_blocks)(input_tensor, training=training)
        x = self.act(x + self.skip_connection(input_tensor))
        return self.bn(x)

class BiLSTMBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        units,
        include_bn=True,
        include_bi=True
        ):
        super().__init__()
        self.include_bn = include_bn
        if include_bi:
            self.lstm = tf.keras.layers.Bidirectional(
                tf.compat.v1.keras.layers.CuDNNGRU(
                    units,
                    kernel_regularizer=tf.keras.regularizers.L2(1e-3),
                    return_sequences=True
                    ),
                merge_mode='concat'  
                )
        else:
            self.lstm = tf.compat.v1.keras.layers.CuDNNGRU(
                units,
                kernel_regularizer=tf.keras.regularizers.L2(1e-3),
                return_sequences=True
                )              
        self.act = tf.nn.tanh
        self.dropout = tf.nn.dropout
        if self.include_bn:
            self.bn = tf.keras.layers.BatchNormalization()
        
    def call(
        self,
        input_tensor,
        training=False
        ):
        rate = 0.4
        x = self.lstm(input_tensor, training=training)
        x = self.act(x)
        x = self.dropout(x, rate=rate)
        if self.include_bn:
            return self.bn(x, training=training)
        else:
            return x 
        # return x
        
class ResidualBiLSTMBlock(tf.keras.layers.Layer):
    def __init__(
        self,
        block_units
        ):
        super().__init__()
        self.lstm_blocks = [BiLSTMBlock(units) for units in block_units]
        self.act = tf.nn.tanh
        
    def call(
        self,
        input_tensor,
        training=False,
        ):
        x = tf.keras.Sequential(self.lstm_blocks)(input_tensor, training=training)
        x = self.act(x + input_tensor)
        return x

class Conv1DLSTMModel(tf.keras.Model):
    def __init__(
        self,
        blocks_channels,
        blocks_units,
        train_data_generator,
        ):
        super().__init__()
        self.train_data_generator = train_data_generator
        self.resconv_blocks = [ResidualConv1DBlock(block_channels) for block_channels in blocks_channels]
        self.reslstm_layers = [ResidualBiLSTMBlock(block_units) for block_units in blocks_units] 
        self.classifier = tf.keras.layers.Dense(train_data_generator.num_classes + 1, name='Output_Layer')
        
    def call(
        self,
        input_tensor,
        training=False
        ):
        x = tf.keras.Sequential(self.resconv_blocks)(input_tensor, training=training)
        x = tf.keras.Sequential(self.reslstm_layers)(x, training=training)
        return self.classifier(x)
    
    def model(
        self
        ):
        x = tf.keras.layers.Input(
            shape=(self.train_data_generator.max_frames, self.train_data_generator.num_frequencies),
            batch_size=self.train_data_generator.batch_size,
            name="Input_Layer"
            )
        return tf.keras.Model(
            inputs=[x],
            outputs=self.call(x)    
            )

In [16]:
batch_size = 4
max_frames = 1000
train_data_generator = SpeechDataGeneratorRNN(
    filenames=('train.npy', 'train_labels.npy'),
    batch_size=batch_size,
    max_frames=max_frames,
    shuffle=True
    )
val_data_generator = SpeechDataGeneratorRNN(
    filenames=('val.npy', 'val_labels.npy'),
    batch_size=batch_size,
    max_frames=max_frames,
    shuffle=True
    )    

In [17]:
conv_lstm = Conv1DLSTMModel(
    blocks_channels=[
        [128, 256],
        [128, 256],
        ],
    blocks_units=[
        [128, 128],
        ],
    train_data_generator=train_data_generator,
    ).model()
base_input = conv_lstm.layers[0].input
base_output = conv_lstm.layers[-1].output
conv_lstm = tf.keras.Model(base_input, base_output)
conv_lstm.summary()

Model: "model_7"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 Input_Layer (InputLayer)    [(4, 1000, 40)]           0         
                                                                 
 sequential_6 (Sequential)   (4, 1000, 256)            43520     
                                                                 
 sequential_7 (Sequential)   (4, 1000, 256)            297472    
                                                                 
 Output_Layer (Dense)        (4, 1000, 139)            35723     
                                                                 
Total params: 376715 (1.44 MB)
Trainable params: 375179 (1.43 MB)
Non-trainable params: 1536 (6.00 KB)
_________________________________________________________________


In [18]:
epochs = 5
initial_learning_rate = 1e-3
final_learning_rate = 9e-4
learning_rate_decay_factor = (final_learning_rate / initial_learning_rate)**(1/epochs)
steps_per_epoch = train_data_generator.__len__()
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=steps_per_epoch,
    decay_rate=learning_rate_decay_factor,
    staircase=True
    )
conv_lstm.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=lr_schedule,
                                              clipvalue=0.25,
                                              clipnorm=1
                                              ),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
    )

In [None]:
conv_lstm_history = conv_lstm.fit(
    x=train_data_generator,
    validation_data=val_data_generator,
    epochs=epochs,
    )