# Converting PyTorch to TensorFlow Lite for xCORE Using ONNX

ONNX is an open format built to represent machine learning models. We can convert from PyTorch to ONNX, then from ONNX to TensorFlow, then from TensorFlow to TensorFlow Lite, and finally, run it through xformer to optimise it for xCORE.
Ensure that you have installed Python 3.8 and have the installed requirements.txt

## Import PyTorch Model

For this example, we use mobilenet_v2.

In [None]:
import torch

pytorch_model = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True)

# Switch the model to eval mode
pytorch_model.eval()

### Run inference on model (to test)

In [None]:
# Download an image to test against
import urllib
url, filename = ("https://github.com/pytorch/hub/raw/master/images/dog.jpg", "dog.jpg")
try: urllib.URLopener().retrieve(url, filename)
except: urllib.request.urlretrieve(url, filename)

In [None]:
# Download Image Labels
!wget https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt

In [None]:
# Read the categories
with open("imagenet_classes.txt", "r") as f:
    categories = [s.strip() for s in f.readlines()]

### Perform an infrence on the pytorch model directly

In [None]:
# We will test and train with these params
batch_size = 1
channels = 3
height = 224
width = 224

In [None]:
from PIL import Image
from torchvision import transforms

# Open testing image
input_image = Image.open(filename)

preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(height),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Note Pytorch is BCHW
input_tensor = preprocess(input_image)

In [None]:
input_batch = input_tensor.unsqueeze(0) # create a mini-batch as expected by the model

with torch.no_grad():
    output = pytorch_model(input_batch)

probabilities = torch.nn.functional.softmax(output[0], dim=0)

# Show top categories per image
vals, idxs = torch.topk(probabilities, 5)
pytorch_results = [(categories[idx], prob) for (idx, prob) in zip(idxs.tolist(), vals.tolist())]
for cat, prob in  pytorch_results:
    print(cat, ':', prob)

## Convert to ONNX


In [None]:
# This is only for shape info for tracing the model during conversion
sample_input = torch.rand((batch_size, channels, height, width))

onnx_model_path = "mobilenet_v2.onnx"

torch.onnx.export(
    pytorch_model,
    sample_input,
    onnx_model_path,
    input_names=['image'],
    output_names=['probabilities']
)

### Check the exported model

In [None]:
import onnx
onnx_model = onnx.load(onnx_model_path)
onnx.checker.check_model(onnx_model)

### Check ONNX Output

In [None]:
import onnxruntime 
import numpy as np
ort_session = onnxruntime.InferenceSession(onnx_model_path)

def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

def softmax(xs):
    return np.exp(xs)/sum(np.exp(xs))

# compute ONNX Runtime output prediction
input_batch_np = to_numpy(input_batch)
ort_inputs = {ort_session.get_inputs()[0].name: input_batch_np}
ort_outs = ort_session.run(None, ort_inputs)

# The input is still BCHW
data = zip(range(len(ort_outs[0][0])), softmax(ort_outs[0][0]))

onnx_results = [(categories[idx], prob) for (idx, prob) in sorted(data, key=lambda x: x[1], reverse=True)[:5]]
for cat, prob in  onnx_results:
    print(cat, ':', prob)

## Using onnx2tf

Using package: https://github.com/PINTO0309/onnx2tf

### Convert ONNX to Keras

In [None]:
import onnx2tf

keras_model_path = 'mobilenet_v2.tf'

keras_model = onnx2tf.convert(
    input_onnx_file_path=onnx_model_path,
    output_folder_path=keras_model_path,
    non_verbose=True,
)

### Check the conversion to keras

In [None]:
#transpose the input_batch into BHWC order for tensorflow
tf_input_data = np.transpose( input_batch.numpy(), [0, 2, 3, 1])

keras_output_data = keras_model(tf_input_data)

probs = softmax(keras_output_data[0])
data = zip(range(len(probs)), probs)
keras_results = [(categories[idx], prob) for (idx, prob) in sorted(data, key=lambda x: x[1], reverse=True)[:5]]
for cat, prob in  keras_results:
    print(cat, ':', prob)

## Convert Keras to TFLite (float32)

In [None]:
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
converter.inference_input_type = tf.float32 
converter.inference_output_type = tf.float32

tflite_model = converter.convert()

# Save the model.
tflite_float_model_path = 'mobilenet_v2_float.tflite'
with open(tflite_float_model_path, 'wb') as f:
  f.write(tflite_model)

### Check it Worked

In [None]:
tfl_interpreter = tf.lite.Interpreter(model_path=tflite_float_model_path)
tfl_interpreter.allocate_tensors()

tfl_input_details = tfl_interpreter.get_input_details()
tfl_output_details = tfl_interpreter.get_output_details()

# Convert PyTorch Input Tensor into Numpy Matrix and Reshape for TensorFlow
# (Pytorch model expects C x H x W but TF expects H x W x C)
tfl_interpreter.set_tensor(tfl_input_details[0]['index'], tf_input_data)
tfl_interpreter.invoke()

tfl_output_data = tfl_interpreter.get_tensor(tfl_output_details[0]['index'])

probs = softmax(tfl_output_data[0])
data = zip(range(len(probs)), probs)
tfl_float32_results = [(categories[idx], prob) for (idx, prob) in sorted(data, key=lambda x: x[1], reverse=True)[:5]]
for cat, prob in  tfl_float32_results:
    print(cat, ':', prob)

## Convert Keras to TFLite (int8)
We will still feed the data into the model in float32 format for convinence but the internals of the model will be int8. This will require representitive data but as we interface in float32 we can use the pytorch preprocessing. 

## Representative Dataset

To convert a model into to a TFLite flatbuffer, a representative dataset is required to help in quantisation. Refer to [Converting a keras model into an xcore optimised tflite model](https://colab.research.google.com/github/xmos/ai_tools/blob/develop/docs/notebooks/keras_to_xcore.ipynb) for more details on this.

#### Download & Extract Images from Data Set

In [None]:
# We will use the imagenette dataset(326MB)
!mkdir ./imagenet-dataset
!wget https://s3.amazonaws.com/fast-ai-imageclas/imagenette2-320.tgz -O ./imagenet-dataset/imagenette2-320.tgz
!tar -xf ./imagenet-dataset/imagenette2-320.tgz -C ./imagenet-dataset/

In [None]:
import os
import glob
import random

all_files = glob.glob(os.path.join("./imagenet-dataset/", '**/*.JPEG'), recursive=True)

# Randomly select a subset of images, let's just use 1k for speed
sampled_files = random.sample(all_files, 1000)

def representative_dataset():

    # Iterate over the sampled images and preprocess them
    for image_path in sampled_files:
        pil_image = Image.open(image_path).convert("RGB")
        pytorch_tensor = preprocess(pil_image).unsqueeze(0)
        img_array_np = pytorch_tensor.numpy()
        
        #swap the axes to BHWC(tf) from BCHW(pytorch)
        img_array_np = np.transpose( img_array_np, [0, 2, 3, 1])
        yield [img_array_np]

In [None]:
# Now do the conversion to int8
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_dataset
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.float32 
converter.inference_output_type = tf.float32

tflite_int8_model = converter.convert()

# Save the model.
tflite_int8_model_path = 'mobilenet_v2.tflite'
with open(tflite_int8_model_path, 'wb') as f:
  f.write(tflite_int8_model)

In [None]:
tfl_interpreter = tf.lite.Interpreter(model_path=tflite_int8_model_path)
tfl_interpreter.allocate_tensors()

tfl_input_details = tfl_interpreter.get_input_details()
tfl_output_details = tfl_interpreter.get_output_details()

# Convert PyTorch Input Tensor into Numpy Matrix and Reshape for TensorFlow
tfl_interpreter.set_tensor(tfl_input_details[0]['index'], tf_input_data)
tfl_interpreter.invoke()

tfl_output_data = tfl_interpreter.get_tensor(tfl_output_details[0]['index'])

probs = softmax(tfl_output_data[0])
data = zip(range(len(probs)), probs)
tfl_int8_results = [(categories[idx], prob) for (idx, prob) in sorted(data, key=lambda x: x[1], reverse=True)[:5]]
for cat, prob in  tfl_int8_results:
    print(cat, ':', prob)

# Analysing Models

Defined below is a function to print out the operator counts of a model.

In [None]:
import io
from contextlib import redirect_stdout

def get_operator_counts(model_content):
    with io.StringIO() as buf, redirect_stdout(buf):
        tf.lite.experimental.Analyzer.analyze(model_content=model_content)
        model_structure = buf.getvalue()

    operators = [op.strip().split(" ")[1].split("(")[0] for op in model_structure.split("\n") if "Op#" in op]
    op_counts = {}
    for operator in operators:
        if operator in op_counts:
            op_counts[operator] = op_counts[operator]+1
        else:
            op_counts[operator] = 1
        
    return (len(operators), op_counts)

def print_operator_counts(model_content):
    total_op_count, op_counts = get_operator_counts(model_content)
    print(f"{'Operator'.upper():<20} {'Count'.upper():>6}")
    print("-"*20 + " " + "-"*6)
    
    for operator, count in op_counts.items():
        print(f"{operator.lower():<20} {count:>6}")
        
    print("-"*20 + " " + "-"*6)
    print(f"{'Total'.upper():<20} {total_op_count:>6}")
    print("-"*20 + " " + "-"*6)

In [None]:
# Let's inspect the int8 model
print_operator_counts(tflite_int8_model)