# TensorFlow model converter
This notebook is meant to convert TensorFlow Sequential models to:
- Tensorflow Lite   (Fully supported)
- Torch             (Not supported, hardcoded conversion function for each layer type)
- ONNX              (Not supported, exported from Torch model)
- RTNeural weights  ()

In [None]:
# Specify TensorFlow version required for compatibility. Set the variable at None if not relevant
TENSORFLOW_VERSION_REQUIRED = '2.4.1'
# TENSORFLOW_VERSION_REQUIRED = None

In [None]:
import tensorflow as tf
if tf.version.VERSION != TENSORFLOW_VERSION_REQUIRED:
    !pip uninstall tensorflow -y
    !pip install tensorflow==2.4.1

In [None]:
import tensorflow as tf
import numpy as np
import json
from json import JSONEncoder
import torch
import os

if TENSORFLOW_VERSION_REQUIRED and tf.version.VERSION != TENSORFLOW_VERSION_REQUIRED:
    raise Exception('Error! Tensorflow version ('+tf.version.VERSION+') is different from the required version ('+TENSORFLOW_VERSION_REQUIRED+'). Run the previous cell and restart runtime if running on Colab.')
else:
    print("Imported TensorFlow "+tf.version.VERSION)

In [None]:
# Settings
TF_MODEL_PATH = "./TF_ModelA"     # Path to input TensorFlow Model
OUT_MODEL_NAME = 'ModelA'

In [None]:
TF_MODEL_PATH = os.path.abspath(TF_MODEL_PATH)              # Remove potential trailing separator from path
SAVE_MODEL_PATH = OUT_MODEL_NAME+"-converted"   # Directory where to save output models
!mkdir -p "$SAVE_MODEL_PATH"                                # Create dir

# Load tensorflow model
tfmodel = tf.keras.models.load_model(TF_MODEL_PATH)

## TF Lite Conversion
This is directly supported by TensorFlow.

Remember to set the TensorFlow version at the beginning of this notebook according to the version of the Interpreter that will run on the target system.

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model(TF_MODEL_PATH)
tflite_model = converter.convert()

with tf.io.gfile.GFile(SAVE_MODEL_PATH + '/tflite_'+OUT_MODEL_NAME+'_.tflite', 'wb') as f:
        f.write(tflite_model)

## Torch Conversion
TF to Torch model conversion is not supported at the moment.
This was coded for the special case of Dense/Linear sequential models with batchNorm Layers and **it will not work with other layer types**.

The code below can inspire the addition of more conversion functions (i.e. for different layer types).


In [None]:
class TF2Torch(torch.nn.Module):

    def dense2linear(self, dense : tf.keras.layers.Dense):
        # Retrieve dimensions
        in_features, out_features = dense.kernel.shape
        # Initialize fully connected (Linear) layer with given dimensions
        out = torch.nn.Linear(in_features, out_features)
        # Copy parameters
        out.weight.data = torch.Tensor(dense.kernel.numpy().transpose()) # Transposed weights due to different standards
        out.bias.data = torch.Tensor(dense.bias.numpy().transpose())     # Transposed weights due to different standards
        return out

    def batchNormalization2BatchNorm1d(self, batch_normalization : tf.keras.layers.BatchNormalization):
        # Retrieve dimensions
        num_features = batch_normalization.gamma.shape[0]
        # Initialize fully connected (Linear) layer with given dimensions
        out : torch.nn.BatchNorm1d = torch.nn.BatchNorm1d(num_features)
        # Copy parameters
        out.weight.data = torch.Tensor(batch_normalization.gamma.numpy())
        out.bias.data = torch.Tensor(batch_normalization.beta.numpy())
        out.running_mean = torch.Tensor(batch_normalization.moving_mean.numpy())
        out.running_var = torch.Tensor(batch_normalization.moving_variance.numpy())
        out.momentum = batch_normalization.momentum
        out.eps = batch_normalization.epsilon
        return out

    def __init__(self, tfmodel):
        super(TF2Torch, self).__init__()

        def populate(tflayers):
            for l in tflayers:
                if type(l) is tf.keras.layers.BatchNormalization:
                    yield self.batchNormalization2BatchNorm1d(l)
                elif type(l) is tf.keras.layers.Dense:
                    yield self.dense2linear(l)
                    if l.activation is tf.keras.activations.linear:
                        continue
                    elif l.activation is tf.keras.activations.relu:
                        yield torch.nn.ReLU()
                    else:
                        raise NotImplementedError("Conversion for activation function {} not implemented.".format(l.activation))
                    
                elif type(l) is tf.keras.layers.Dropout:
                    yield torch.nn.Dropout(l.rate)
                else:
                    raise NotImplementedError("Conversion for layer type {} not implemented.".format(type(l)))
        
        self.model = torch.nn.Sequential(*list(populate(tfmodel.layers)))
    
    def forward(self, x):
        return self.model(x)

In [None]:
# Convert to PyTorch model, export TorchScript model by scripting
torchmodel = TF2Torch(tfmodel)

scripted_model = torch.jit.script(torchmodel)
scripted_model.save(SAVE_MODEL_PATH + "/torchscript_"+OUT_MODEL_NAME+"_.pt")

## ONNX Conversion

In [None]:
# Export onnx model by tracing (see https://pytorch.org/tutorials/advanced/super_resolution_with_onnxruntime.html)
import torch.onnx

INPUT_SIZE = tfmodel.layers[0].get_input_at(0).get_shape().as_list()[1]
x = torch.ones(INPUT_SIZE).unsqueeze(0) # dummy input for tracing

torch.onnx.export(torchmodel,                                       # Model
                  x,                                                # Input
                  SAVE_MODEL_PATH + "/onnx_"+OUT_MODEL_NAME+".onnx",  # Output file
                  export_params=True,                               # Export trained parameters
                  do_constant_folding=True,                         # Perform constant folding
                  input_names = ['input'],                          # Label for input
                  output_names = ['output']                         # Label for output
                  ) 

## RT Neural Conversion

In [None]:
class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return JSONEncoder.default(self, obj)

class Tf2RtneuralConverter:
    def save_model_json(self,model):
        def get_layer_type(layer):
            if isinstance(layer, tf.keras.layers.TimeDistributed):
                return 'time-distributed-dense'

            if isinstance(layer, tf.keras.layers.GRU):
                return 'gru'

            if isinstance(layer, tf.keras.layers.LSTM):
                return 'lstm'

            if isinstance(layer, tf.keras.layers.Dense):
                return 'dense'

            if isinstance(layer, tf.keras.layers.Conv1D):
                return 'conv1d'

            return 'unknown'

        def get_layer_activation(layer):
            if isinstance(layer, tf.keras.layers.TimeDistributed):
                return get_layer_activation(layer.layer)

            if layer.activation == tf.keras.activations.tanh:
                return 'tanh'

            if layer.activation == tf.keras.activations.relu:
                return 'relu'

            if layer.activation == tf.keras.activations.sigmoid:
                return 'sigmoid'

            if layer.activation == tf.keras.activations.softmax:
                return 'softmax'
            
            return ''

        def save_layer(layer):
            layer_type = get_layer_type(layer)
            if layer_type == 'unknown':
                return None
            layer_dict = {
                "type"       : layer_type,
                "activation" : get_layer_activation(layer),
                "shape"      : layer.output_shape,
                "weights"    : layer.get_weights()
            }

            if layer_dict["type"] == "conv1d":
                layer_dict["kernel_size"] = layer.kernel_size
                layer_dict["dilation"] = layer.dilation_rate

            return layer_dict


        model_dict = {}
        model_dict["in_shape"] = model.input_shape
        layers = []
        for layer in model.layers:
            layer_dict = save_layer(layer)
            if layer_dict is not None:
                layers.append(layer_dict)

        model_dict["layers"] = layers
        return model_dict

    def save_model(self, model, filename):
        model_dict = self.save_model_json(model)
        with open(filename, 'w') as outfile:
            json.dump(model_dict, outfile, cls=NumpyArrayEncoder, indent=4)

In [None]:
# Export RTNeural model (json weights)

rtneuralconverter = Tf2RtneuralConverter()
rtneuralconverter.save_model(tfmodel, SAVE_MODEL_PATH + "/rtneural_"+OUT_MODEL_NAME+"_.json")

## Compress models

In [None]:
archivename = 'convertedmodels_' +OUT_MODEL_NAME + '.tar.gz'
outdirname = os.path.dirname(SAVE_MODEL_PATH)
outlastdname = os.path.basename(SAVE_MODEL_PATH)

if outdirname != "":
    !cd "$outdirname" ; tar -czvf "$archivename" "$outlastdname"
else:
    !tar -czvf "$archivename" "$outlastdname"
