# ResNet Model specification and training

In [1]:
import pandas as pd 
import numpy as np 
import time
import matplotlib.pyplot as plt

# Tensorflow neural network imports
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, BatchNormalization, Add, AveragePooling2D, ReLU, GlobalAveragePooling2D
from tensorflow.keras import Input, Model
from tensorflow.keras import initializers

# Util imports
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

## Loading and pre-processing of the data

In [3]:
# Load data and labels (allow_pickle is required since not all sequences have the same length (
# object type = 'object'))
data = np.load('../preprocessed_data/resnet_mfcc_features_test.npy')
labels = np.load('../preprocessed_data/resnet_labels_test.npy')

# Create a dummy encoding of the labels
encoded_labels = pd.get_dummies(labels).values

In [4]:
# Splitting of the data into train and validation data (0.8 - 0.2)
data_train, data_val, labels_train, labels_val = train_test_split(data, encoded_labels, train_size=0.8, random_state=42, stratify=encoded_labels)

# Split validation data into validation and test data (0.1 - 0.1)
data_val, data_test, labels_val, labels_test = train_test_split(data_val, labels_val, train_size=0.5, random_state=42, stratify=labels_val)

## Model definition

In [5]:
class ResNetModel():
    
    def __init__(self, input_shape, n_classes=35, res_depth=6, conv_filters=45, use_dil_scheme=True):
        self.input_shape = input_shape
        self.n_classes = n_classes
        self.res_depth = res_depth
        self.conv_filters = conv_filters
        self.use_dil_scheme = use_dil_scheme
        
    def compute_dilation(self, index, use_scheme):
        if use_scheme:
            dilation = int(2**(index//3))
            index += 1
            return dilation, index
        else:
            return 1, 1
        
    def residual_block(self, input_layer, index, use_scheme):
        
        # Apply convolutions on output of previous layer
        
        dilation, index = self.compute_dilation(index, use_scheme)
        x = Conv2D(filters = self.conv_filters, kernel_size = 3, activation = 'relu', padding = 'same', dilation_rate=dilation)(input_layer)
        x = BatchNormalization()(x)

        dilation, index = self.compute_dilation(index, use_scheme)
        x = Conv2D(filters = self.conv_filters, kernel_size = 3, activation = 'relu', padding = 'same', dilation_rate=dilation)(x)
        x = BatchNormalization()(x)

        # Add output of convolutional operation to the input
        addition = Add()([x, input_layer])

        return addition, index
    
    def build_model(self):
        
        # Define the input, consisting of a single convolutional layer
        input_layer = Input(self.input_shape)
        x = Conv2D(filters = self.conv_filters, kernel_size = 3, activation = 'relu', padding = 'same', use_bias=False)(input_layer)
        
        # Create the desired number of residual blocks
        conv_index = 0
        for i in range(self.res_depth):
            x, conv_index = self.residual_block(x, conv_index, self.use_dil_scheme)
        
        # Add non-residual conv and bn layer
        dilation, index = self.compute_dilation(conv_index, self.use_dil_scheme)
        x = Conv2D(filters = self.conv_filters, kernel_size = 3, activation = 'relu', padding = 'same', dilation_rate=dilation)(x)
        x = BatchNormalization()(x)
        
        # Final pooling and output layer
        x = GlobalAveragePooling2D()(x)
        output = Dense(self.n_classes, activation='softmax')(x)
        
        return Model(inputs=[input_layer], outputs=[output])

In [6]:
model = ResNetModel([1, 138,40], n_classes=3).build_model()
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 1, 138, 40)] 0                                            
__________________________________________________________________________________________________
conv2d (Conv2D)                 (None, 1, 138, 45)   16200       input_1[0][0]                    
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 1, 138, 45)   18270       conv2d[0][0]                     
__________________________________________________________________________________________________
batch_normalization (BatchNorma (None, 1, 138, 45)   180         conv2d_1[0][0]                   
______________________________________________________________________________________________

## Experiments

In [7]:
def create_data_split(data, labels, train_size):
    '''
    This function can be used to use smaller portions of training data to investigate
    how this influences performance.
    '''
    
    data_train, _, labels_train, _ = train_test_split(data, labels, train_size=train_size, random_state=42, stratify=labels)
    
    return data_train, labels_train

In [8]:
# Compile the model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

In [10]:
# Fit the model on training data
EPOCHS = 1
BATCH_SIZE = 64


start = time.time()
history = model.fit(data_train, labels_train, validation_data=(data_val,labels_val), epochs=EPOCHS,batch_size=BATCH_SIZE)
end = time.time()

print(f"Time elapsed during training: {end - start:.2f} seconds")

Time elapsed during training: 124.73 seconds


In [None]:
# Predict on test data
test_predictions = model.predict(data_test)

## Store predictions and training output

In [None]:
# Store predictions and training history to harddrive

file_name = "resnet_15"
with open(f'{file_name}_predictions', 'wb') as f:
    np.save(f, file_name)
    
with open(f'{file_name}_training_history', 'wb') as f:
    np.save(f, file_name)