In [438]:
import tensorflow as tf
import tensorflow.keras as keras
import copy
import numpy as np
import scipy.io as sio
from pathlib import Path

In [439]:
# generator for dataset
def battery_train_gen():
    train_x = sio.loadmat(str(Path(data_dir, 'TrainX_IM240')))['TrainX_IM240']
    train_y = sio.loadmat(str(Path(data_dir, 'TrainY_IM240')))['TrainY_IM240']
    length = len(train_x)
    for i in range(length):
        x = tf.expand_dims(tf.convert_to_tensor(train_x[i][0], dtype=tf.float32), axis=0)
        y = tf.expand_dims(tf.convert_to_tensor(train_y[i], dtype=tf.float32), axis=0)
        yield (x, y)
        
def battery_test_gen():
    test_x = sio.loadmat(str(Path(data_dir, 'TestX_IM240')))['TestX_IM240']
    test_y = sio.loadmat(str(Path(data_dir, 'TestY_IM240')))['TestY_IM240']
    length = len(test_x)
    for i in range(length):
        x = tf.expand_dims(tf.convert_to_tensor(test_x[i][0], dtype=tf.float32), axis=0)
        y = tf.expand_dims(tf.convert_to_tensor(test_y[i], dtype=tf.float32), axis=0)
        yield (x, y)
        

class InvertedResidualLayer2D(keras.layers.Layer):
    def __init__(self, filters, kernel_size, strides, expansion_factor=6, trainable=True, name=None, **kwargs):
        super(InvertedResidualLayer2D, self).__init__(trainable=trainable, name=name, **kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.expansion_factor = expansion_factor
        
    def build(self, input_shape):
        input_channels = int(input_shape[-1])
        self.ptwise_conv1 = keras.layers.Conv2D(int(input_channels * self.expansion_factor),
                                               kernel_size=(1,1),
                                               strides=(1,1),
                                               padding='same',
                                               use_bias=False)
        
        self.dwise = keras.layers.DepthwiseConv2D(kernel_size=self.kernel_size,
                                                 strides=self.strides,
                                                 padding='same')
        
        self.ptwise_conv2 = keras.layers.Conv2D(self.filters,
                                               kernel_size=(1,1),
                                               strides=(1,1),
                                               padding='same',
                                               use_bias=False)
        
        self.bn1 = keras.layers.BatchNormalization()
        self.bn2 = keras.layers.BatchNormalization()
        self.bn3 = keras.layers.BatchNormalization()
    
    def call(self, input_x):
        x = self.ptwise_conv1(input_x)
        x = self.bn1(x)
        x = tf.nn.relu6(x)
        
        x = self.dwise(x)
        x = self.bn2(x)
        x = tf.nn.relu6(x)
        
        x = self.ptwise_conv2(x)
        x = self.bn3(x)
        
        # residual connection
        if input_x.shape[1:] == x.shape[1:]:
            x += input_x
        
        return x
            
        
class MobileNetV2(keras.Model):
    def __init__(self, configs):
        '''
            params:
            configs = {
                'layers' : [
                    [type, expansion_factor, channel, n, stride], format of a layer configuration
                    * type
                        1 : conv2d
                        2 : bottleneck
                ]
            }
        '''
        super(MobileNetV2, self).__init__()
        
        if configs:
            self.configs = configs
        else:
            self.configs = {
                'input_size' : (1, 100, 3),
                'out_features' : 6,
                'kernel_size' : (3, 3),
                'strides' : [(0, 0), (1, 1), (2, 2)], 
                'layers' : [
                    [2, 0, 32, 1, 2],
                    [2, 1, 16, 1, 1],
                    [2, 6, 24, 2, 2],
                    [2, 6, 32, 3, 2],
                    [2, 6, 64, 4, 2],
                    [2, 0, 256, 1, 1],
                ]
            }
        padding = []
        for idx, kernel in enumerate(configs['kernel_size']):
            if kernel == 1:
                padding.append(0)
            elif kernel == 3:
                padding.append(1)

        self.input_size = [[configs['input_size'][0], configs['input_size'][1], configs['input_size'][2]]]
        for idx, layer in enumerate(configs['layers']):
            next_input = []
            next_input.append(int((self.input_size[idx][0] - configs['kernel_size'][0] + 2 * padding[0]) / layer[4]) + 1)
            next_input.append(int((self.input_size[idx][1] - configs['kernel_size'][1] + 2 * padding[1]) / layer[4]) + 1)
            next_input.append(layer[2])
            self.input_size.append(next_input)
        
        
        seq_layers = []
        for layer in configs['layers']:
            seq_layer = []
            stride_index = layer[4]
            for i in range(layer[3]):
                if i > 0:
                    stride_index = 1
                # Conv2D
                if layer[0] == 1:
                    seq_layer.append(
                        keras.layers.Conv2D(
                            layer[2],
                            kernel_size = configs['kernel_size'],
                            strides = configs['strides'][stride_index],
                            padding = 'same',
                            use_bias = False
                        )
                    )
                elif layer[0] == 2:
                    seq_layer.append(
                        InvertedResidualLayer2D(
                            layer[2],
                            kernel_size = configs['kernel_size'],
                            strides = configs['strides'][stride_index],
                            expansion_factor = layer[1]
                        )
                    )
            seq_layer.append(
                keras.layers.BatchNormalization()
            )
            seq_layer.append(
                keras.layers.ReLU(max_value=6.0)
            )
            seq_layers += seq_layer
        
        
        self.encoder = keras.Sequential(seq_layers)
        self.out_conv = keras.layers.Conv2D(
            configs['out_features'],
            kernel_size = (1,1),
            strides = (1,1),
            padding='same',
            use_bias = False
        )
            
    def call(self, inputs):
        x = self.encoder(inputs)
        x = tf.nn.avg_pool(x, ksize=(self.input_size[-1][0], self.input_size[-1][1]), strides=(1, 1), padding='VALID')
        x = self.out_conv(x)
        return x

In [440]:
epochs = 5
lr = 1.00e-4
batch_size = 256

configs = {
    'input_size' : (1, 100, 3),
    'out_features' : 6,
    'kernel_size' : (3, 3),
    'strides' : [(0, 0), (1, 1), (2, 2)], 
    'layers' : [
        [1, 0, 32, 1, 2],
        [2, 1, 16, 1, 1],
        [2, 6, 24, 2, 2],
        [2, 6, 32, 3, 2],
        [2, 6, 64, 4, 2],
        [1, 0, 256, 1, 1],
    ]
}

### Load dataset

In [441]:
data_dir = Path('.', 'data')
train_data = tf.data.Dataset.from_generator(
    battery_train_gen,
    output_signature=(
        tf.TensorSpec(shape=configs['input_size'], dtype=tf.float32),
        tf.TensorSpec(shape=(1, configs['out_features']), dtype=tf.float32)
    )
)

train_data = train_data.batch(batch_size)

test_data = tf.data.Dataset.from_generator(
    battery_test_gen,
    output_signature=(
        tf.TensorSpec(shape=(1, 100, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(1, 6), dtype=tf.float32)
    )
)

### Setup Training model

In [442]:
model = MobileNetV2(configs)

model.compile(
    optimizer = 'Adam',
    loss = 'MeanSquaredError',
    metrics = ['mse']
)

model.fit(
    train_data,
    batch_size = batch_size,
    epochs = epochs, 
)

Epoch 1/5
    395/Unknown - 87s 203ms/step - loss: 0.0235 - mse: 0.0235

KeyboardInterrupt: 