# Test model generator
Code that generate tensorflow-lite models to test the C++ inference code.  
Models are saved to the `test-tflite-models` directory.

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 
import numpy as np
import tensorflow as tf


print("Tensorflow version: " + tf.version.VERSION)

HOMEBASE = './'
OUTPUT_FOLDER = os.path.join(HOMEBASE,'test_models')
if not os.path.exists(OUTPUT_FOLDER) and os.path.isdir(OUTPUT_FOLDER):
    os.mkdir(OUTPUT_FOLDER)

Tensorflow version: 2.4.1


In [2]:
# class CustomNetwork():
#     def __init__(self, number_of_conv:int, filters_per_conv:list, kernel_sizes_per_conv:list, 
#                  pool_layers:list, pool_sizes_per_conv:list, 
#                  number_of_dense:int, dense_units:list, dropout_rates:list, activations:list, 
#                  batchnorm_after_layer:list,
#                  input_shape:list, output_shape:int):
#         self.number_of_conv = number_of_conv
#         self.filters_per_conv = filters_per_conv
#         assert len(self.filters_per_conv) == self.number_of_conv
#         self.kernel_sizes_per_conv = kernel_sizes_per_conv
#         assert len(self.kernel_sizes_per_conv) == self.number_of_conv
#         self.pool_sizes_per_conv = pool_sizes_per_conv
#         assert len(self.pool_sizes_per_conv) == self.number_of_conv
#         self.pool_layers = pool_layers
#         assert len(self.pool_layers) == self.number_of_conv

#         self.number_of_dense = number_of_dense
#         self.dense_units = dense_units
#         assert len(self.dense_units) == self.number_of_dense
#         self.dropout_rates = dropout_rates
#         assert len(self.dropout_rates) == self.number_of_dense
#         self.activations = activations
#         assert len(self.activations) == self.number_of_dense

#         self.batchnorm_after_layer = batchnorm_after_layer
#         assert len(self.batchnorm_after_layer) == self.number_of_dense + self.number_of_conv

#         self.input_shape = input_shape
#         self.output_shape = output_shape

#     def _get_conv_unit(filters:int, kernel_size:int, activation:str, pool_size:int, pool_layer:bool, batchnorm:bool, input_shape:tuple = None):
    
#         if input_shape is not None:
#             res = [tf.keras.layers.Conv1D(filters=filters, kernel_size=kernel_size, activation=activation, input_shape=input_shape)]
#         else:
#             res = [tf.keras.layers.Conv1D(filters=filters, kernel_size=kernel_size, activation=activation)]

#         res += [tf.keras.layers.BatchNormalization(),  
#                 tf.keras.layers.MaxPooling1D(pool_size=2)]
        

In [3]:
def define_model_architecture(num_classes:int, input_shape:tuple, conv:int, num_of_dense:int = 0, dense_node_count:int= 0, model_name:str= "test_model",force_in_shape=False, _verbose = False):
    # tf.keras.backend.set_floatx('float32')

    sequential_structure = []
    if len(input_shape) == 2:
        convLayer = tf.keras.layers.Conv2D
        maxpoolLayer = tf.keras.layers.MaxPooling2D
        avgpoolLayer = tf.keras.layers.AveragePooling2D
        if force_in_shape:
            input_shape = (input_shape[0], input_shape[1], 1)
    elif len(input_shape) == 1:
        convLayer = tf.keras.layers.Conv1D
        maxpoolLayer = tf.keras.layers.MaxPooling1D
        avgpoolLayer = tf.keras.layers.AveragePooling1D
        if force_in_shape:
            input_shape = (input_shape[0], 1)
    else:
        raise ValueError('Input shape must be either 1 or 2 dimensional. Got: %s'%str(input_shape))

    if conv > 0:
        for i in range(conv):
            if i == 0 and force_in_shape:
                sequential_structure.append(convLayer(filters=2, kernel_size=3, activation='relu', input_shape=input_shape))
                print('Adding input shape: %s'%str(input_shape))
            else:
                sequential_structure.append(convLayer(filters=2, kernel_size=3, activation='relu'))

            sequential_structure += [tf.keras.layers.BatchNormalization()]
            sequential_structure += [maxpoolLayer(pool_size=2)]

        sequential_structure += [tf.keras.layers.Flatten(),
                                 tf.keras.layers.Dropout(0.5)]

    for i in range(num_of_dense):
        if i == 0 and conv == 0 and force_in_shape:
            sequential_structure += [tf.keras.layers.Dense(dense_node_count,activation='relu',
                                                           kernel_initializer='he_uniform',
                                                           input_shape=input_shape)]
            print('Adding input shape: %s'%str(input_shape))
        else:
            sequential_structure += [tf.keras.layers.Dense(dense_node_count,activation='relu',
                                                        kernel_initializer='he_uniform')]
        sequential_structure += [tf.keras.layers.BatchNormalization(),
                                 tf.keras.layers.Dropout(0.5)]

    sequential_structure += [tf.keras.layers.Dense(num_classes)]               

    model = tf.keras.models.Sequential(sequential_structure)

    model._name = model_name

    return model

def get_loss():
    return tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

In [4]:
def compile_model(model,optimizer,loss_fn,_verbose = False):
    opt = None
    if optimizer["method"] == "sgd":
        opt = tf.keras.optimizers.SGD(learning_rate = optimizer["learning_rate"], momentum=optimizer["momentum"])
    elif optimizer["method"] == "adam":
        opt = tf.keras.optimizers.Adam(learning_rate = optimizer["learning_rate"])
    else:
        raise Exception("Optimizer method not supported")

    model.compile(optimizer=opt,
                  loss=loss_fn,
                  metrics=['accuracy'])
    
    if _verbose:
        print("Model compiled")

In [5]:
# TFLite conversion function
def convert2tflite(tf_model_dir,tflite_model_dir = None,model_name="model",quantization=None,dataset=None, _verbose = False):
    assert (quantization==None or quantization=="dynamic" or quantization=="float-fallback" or quantization=="full")
    # Convert the model saved in the previous step.
    converter = tf.lite.TFLiteConverter.from_saved_model(tf_model_dir)
    if _verbose:
        print("Model loaded")
    if quantization is not None:
        if _verbose:
            print("Quantization: %s"%quantization)
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        if quantization == "full" or quantization=="float-fallback":
            assert dataset is not None
            def representative_dataset_gen():
                for data in tf.data.Dataset.from_tensor_slices((dataset)).batch(1).take(100):
                    yield [tf.dtypes.cast(data, tf.float32)]
            converter.representative_dataset = representative_dataset_gen
        if quantization == "full":
            converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
            converter.inference_input_type = tf.int8  # or tf.uint8
            converter.inference_output_type = tf.int8  # or tf.uint8
        if quantization == "dynamic":
            assert dataset is None

    tflite_model = converter.convert()
    if _verbose:
        print("Model converted")

    # Save the TF Lite model.
    if tflite_model_dir is None:
        if _verbose:
            print("Saving model to: %s"%tf_model_dir)
        TF_MODEL_PATH = tf_model_dir + "/" + model_name + '.tflite'
    else:
        if _verbose:
            print("Saving model to: %s"%tflite_model_dir)
        TF_MODEL_PATH = tflite_model_dir + "/" + model_name + '.tflite'

    if _verbose:
        print("Model saved")
    with tf.io.gfile.GFile(TF_MODEL_PATH, 'wb') as f:
        f.write(tflite_model)
    return TF_MODEL_PATH

## USAGE
# model_path = MODELFOLDER + "/" + RUN_NAME + "/fold_1"
# convert2tflite(model_path)

In [6]:
test_model_params = {
    "input_shape": (10,10),
    "conv": 1,
    "num_of_dense": 1,
    "dense_node_count": 10,
    "model_name": "conv2d_1",
    "num_classes": 8
}

def save_test_model(test_model_params, mode='train'):
    assert test_model_params['input_shape'] is not None
    assert mode in ['train', 'define_input_shape'], "mode must be either 'train' or 'define_input_shape'"

    model = define_model_architecture(num_classes=test_model_params['num_classes'],
                                      input_shape = test_model_params['input_shape'],
                                      conv = test_model_params['conv'],
                                      num_of_dense = test_model_params['num_of_dense'],
                                      dense_node_count  = test_model_params['dense_node_count'],
                                      model_name = test_model_params['model_name'],
                                      force_in_shape = mode == 'define_input_shape',
                                      _verbose = True)
    
    compile_model(model,{"method":"adam","learning_rate":0.001},get_loss(),True)

    if mode == 'train':
        dataTo1DConv = lambda x: np.expand_dims(x,axis = -1)
        dataTo2DConv = lambda x,n,m: np.expand_dims(x.reshape((len(x),n,m)),axis = -1)

        tx = np.random.rand(5, *test_model_params['input_shape'])
        if len(test_model_params['input_shape']) == 1 and test_model_params['conv'] > 0:
            tx = dataTo1DConv(tx)
        elif len(test_model_params['input_shape']) == 2 and test_model_params['conv'] > 0:
            tx = dataTo2DConv(tx, *test_model_params['input_shape'])
            print('tx.shape:',tx.shape)

        ty = np.random.randint(0, 8, 5)

        # Force training on CPU
        with tf.device('/cpu:0'):
            print('Traning with input shape: %s and first layer of type'%str(tx.shape), model.layers[0].__class__.__name__)
            model.fit(tx, ty, epochs=10, batch_size=32)

    model.summary()
    # Print input shape to first layer
    print('Input shape: %s'%str(model.layers[0].input_shape))

    ORIG_PATH = os.path.join(OUTPUT_FOLDER, test_model_params['model_name'], 'original')
    model.save(ORIG_PATH)

    mpath = convert2tflite(ORIG_PATH, tflite_model_dir = os.path.dirname(ORIG_PATH), model_name = test_model_params['model_name'], _verbose=True)
    return mpath

In [7]:
def prepend_reshape_to_input(model, verbose=False):
    new_in_shape = None
    if isinstance(model.layers[0],tf.keras.layers.Conv2D):
        shape = model.layers[0].input_shape[1:]
        if verbose:
            print('shape',shape)
        new_in_shape = (shape[0]*shape[1],)
        if verbose:
            print('new_in_shape',new_in_shape)
        new_reshape_layer = tf.keras.layers.Reshape(shape, input_shape=new_in_shape)
    elif isinstance(model.layers[0],tf.keras.layers.Conv1D):
        shape = model.layers[0].input_shape[1:]
        new_in_shape = (shape[0],)
        new_reshape_layer = tf.keras.layers.Reshape(shape, input_shape=new_in_shape)
    else:
        strtype = str(type(model.layers[0]))
        strtype = strtype.split('.')[-1]
        if verbose:
            raise Exception('You might not need reshaping with first layer of type: %s'%(strtype))

    # print('Reshaper %s > %s'%(model.layers[0].input_shape,new_reshape_layer.output_shape))

    new_model = tf.keras.Sequential()
    new_model.add(new_reshape_layer)
    new_model.add(model)

    #compile the model
    # new_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    if verbose:
        new_model.summary()
    # test with fake input  
    if verbose:
        print('New input shape: ',new_in_shape)


    fakedata = np.random.sample(new_in_shape[0]).reshape((-1,1)).T
    with tf.device('/cpu:0'):
        e = new_model(fakedata).numpy()

    return new_model
curpath = save_test_model(copy.deepcopy(test_model_params), mode='train')
assert curpath is not None and os.path.exists(curpath), "Model not saved"
shutil.copyfile(curpath,os.path.join(os.path.expanduser('~/Develop/deep-classf-runtime-wrappers/TFLiteWrapper2.4.1/data/'), os.path.basename(curpath)))






test_model_params = {
    "input_shape": (10,10),
    "conv": 1,
    "num_of_dense": 1,
    "dense_node_count": 10,
    "model_name": "_",
    "num_classes": 8
}

test_model_params['model_name'] = "conv2d_train"
curpath = save_test_model(copy.deepcopy(test_model_params), mode='train')
assert curpath is not None and os.path.exists(curpath), "Model not saved"
shutil.copyfile(curpath,os.path.join(os.path.expanduser('~/Develop/deep-classf-runtime-wrappers/TFLiteWrapper2.4.1/data/'), os.path.basename(curpath)))


test_model_params['model_name'] = "conv2d_define_input_shape"
curpath = save_test_model(copy.deepcopy(test_model_params), mode='define_input_shape')
assert curpath is not None and os.path.exists(curpath), "Model not saved"
shutil.copyfile(curpath,os.path.join(os.path.expanduser('~/Develop/deep-classf-runtime-wrappers/TFLiteWrapper2.4.1/data/'), os.path.basename(curpath)))





Adding input shape: (10, 1)
Model compiled
Model: "conv1d_define_input_shape"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv1d (Conv1D)              (None, 8, 2)              8         
_________________________________________________________________
batch_normalization (BatchNo (None, 8, 2)              8         
_________________________________________________________________
max_pooling1d (MaxPooling1D) (None, 4, 2)              0         
_________________________________________________________________
flatten (Flatten)            (None, 8)                 0         
_________________________________________________________________
dropout (Dropout)            (None, 8)                 0         
_________________________________________________________________
dense (Dense)                (None, 10)                90        
________________________________________________________________

'/home/cimil-01/Develop/deep-classf-runtime-wrappers/TFLiteWrapper2.4.1/data/conv2d_define_input_shape.tflite'