#        Running Inference on EC2 INF1 in K8 cluster 

This notebook outlines flow for compiling a pretrained Neural Network Model (Resnet50 using Tensorflow) for inferentia and loading it on K8 cluster with Inferentia nodes for Inference.
#### Steps:
1. Save the Pretrained model as a pb file
2. Compile it for inferentia
3. Launch the K8 cluster with Inf1 nodes
4. Launch the Tensorflow Model Serving Container
5. Run the inference 

### 1. Generate the model file 
This steps involves the following:
<ul>
    <li> Load the model from Keras (FP32) </li>
    <li> Optimize the model for inference </li>
    <li> Convert the FP32 model to FP16 model </li>
 </ul>

##### Genarate the .pb file from pretrained model

In [1]:
import tensorflow as tf
import numpy as np

from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions

from google.protobuf import text_format
import tensorflow.python.saved_model

# set Keras global configurations
tf.keras.backend.set_learning_phase(0)
tf.keras.backend.set_image_data_format('channels_last')
float_type = 'float32'
float_type2 = 'fp32'
tf.keras.backend.set_floatx(float_type)

# load pre-trained model using Keras
model_name = 'resnet50_%s_keras'%float_type2
model = None
model = ResNet50(weights='imagenet')


# various save files
frozen_file = model_name + '.pb'
opt_file = model_name + '_opt.pb'

# obtain parameters
model_input = model.input.name.replace(':0', '')
model_output = model.output.name.replace(':0', '')
batch, height, width, channels = model.input.shape

print ("model, frozen file, optimized file, input size, input node, output node,")
print ("%s, %s, %s, %dx%dx%d, %s, %s" %(model_name, frozen_file, opt_file, width, height, channels, model_input, model_output))

# obtain the TF session
sess = tf.compat.v1.keras.backend.get_session()

# save checkpoint files for freeze_graph
ckpt_file = '/tmp/' + model_name + '/' + model_name + '.ckpt'
graph_file = '/tmp/' + model_name + '/' + model_name + '.pb'
tf.compat.v1.train.Saver().save(sess, ckpt_file)
tf.io.write_graph(sess.graph.as_graph_def(), logdir='.', name=graph_file, as_text=False)

print(model_output)
with tf.compat.v1.Session(graph=tf.Graph()) as sess:
    saver = tf.compat.v1.train.import_meta_graph(ckpt_file + '.meta')
    saver.restore(sess, ckpt_file)
    output_graph_def = tf.compat.v1.graph_util.convert_variables_to_constants(
        sess, tf.compat.v1.get_default_graph().as_graph_def(), [model_output])
    output_graph_def = tf.compat.v1.graph_util.remove_training_nodes(
        output_graph_def, protected_nodes=[model_output])
    with open(frozen_file, 'wb') as f:
        f.write(output_graph_def.SerializeToString())

print('Done generating {} ......'.format(frozen_file))


Instructions for updating:
If using Keras pass *_constraint arguments to layers.
model, frozen file, optimized file, input size, input node, output node,
resnet50_fp32_keras, resnet50_fp32_keras.pb, resnet50_fp32_keras_opt.pb, 224x224x3, input_1, probs/Softmax
probs/Softmax
INFO:tensorflow:Restoring parameters from /tmp/resnet50_fp32_keras/resnet50_fp32_keras.ckpt
Instructions for updating:
Use `tf.compat.v1.graph_util.convert_variables_to_constants`
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
INFO:tensorflow:Froze 320 variables.
INFO:tensorflow:Converted 320 variables to const ops.
Instructions for updating:
Use `tf.compat.v1.graph_util.remove_training_nodes`
Done generating resnet50_fp32_keras.pb ......


In [2]:
!ls

aws-neuron-sdk		    resnet50_fp32_keras.pb
compiler_workdir	    resnet50_fp32_keras_opt.pb
eks-demo-draft.ipynb	    rn50
nohup.out		    rn50_fp16
optimize_for_inference.py   rn50_fp16_compiled_b1_nc4
pod_manifests		    rn50_fp16_compiled_b1_nc4.zip
resnet50_fp16_keras_opt.pb


##### Optimize the model file for Inference

In [3]:
import re
import copy
import tensorflow as tf
import numpy as np
import string

from google.protobuf import text_format
from tensorflow.core.framework import node_def_pb2
from tensorflow.core.framework import attr_value_pb2
from tensorflow.python.framework import tensor_util
from tensorflow.tools.graph_transforms import TransformGraph

def clear_input(node):
  for i in range(len(node.input)):
    node.input.pop()

def replace_name(node, name):
  node.name = name
     
def replace_input(node, input_name, new_name):
  # node.input.replace(input_name, new_name)
  temp = []
  for i in node.input:
    temp.extend([new_name if i == input_name else i])
  clear_input(node)
  for i in temp:
    node.input.extend([i])

def swap_names(node1, node2):
  temp = node2.name
  node2.name = node1.name
  node1.name = temp

def get_const_node(const_node_name, const_by_name):
  name = re.sub("/read$", "", const_node_name)
  return const_by_name[name]

def get_const_ndarray(const_node_name, const_by_name):
  name = re.sub("/read$", "", const_node_name)
  node = const_by_name[name]
  return tf.make_ndarray(node.attr.get("value").tensor)

def adjust_bias_values(bias_node, fbn_node, const_by_name):
  bias_val = get_const_ndarray(bias_node.input[1], const_by_name)  
  gamma_val = get_const_ndarray(fbn_node.input[1], const_by_name)  
  mean_val = get_const_ndarray(fbn_node.input[3], const_by_name)  
  variance_val = get_const_ndarray(fbn_node.input[4], const_by_name) 
  new_bias = bias_val * gamma_val / np.sqrt(variance_val)
  new_tensor = tensor_util.make_tensor_proto(new_bias, new_bias.dtype, new_bias.shape)
  bias_const_node = get_const_node(bias_node.input[1], const_by_name)
  bias_const_node.attr["value"].CopyFrom(attr_value_pb2.AttrValue(tensor=new_tensor))

def MoveBiasAddAfterFusedBatchNorm(graphdef):
  """fold_batch_norm function of TransformGraph is unable to fold Keras ResNet50
  because of BiasAdd between Conv2D and FusedBatchNorm (BiasAdd is not needed
  if FusedBatchNorm is used, but it exists in Keras ResNet50). Here, we 
  move BiasAdd to after FusedBatchNorm, and adjust bias value by gamma/sqrt(variance).
  """
  sess = tf.compat.v1.Session(graph=tf.import_graph_def(graphdef))
  output_graph_def = tf.compat.v1.GraphDef()
  node_by_name = {}
  const_by_name = {}
  for node in graphdef.node:
    # Hack: use FusedBatchNormV2 so fold_batch_norm can recognize
    if node.op == "FusedBatchNormV3":
      node.op = "FusedBatchNorm"
      del(node.attr["U"])
      #import pdb; pdb.set_trace()
    copied_node = node_def_pb2.NodeDef()
    copied_node.CopyFrom(node)
    node_by_name[node.name] = copied_node
    skip_add_node = False
    # Switch Mul/BiasAdd in Keras RN50 so fold_batch_norm transform would work
    if node.op == "Const":
      const_by_name[node.name] = copied_node  
    elif node.op.startswith("FusedBatchNorm"):
      inputs = node.input
      for i in inputs:
        input_node = node_by_name[i]
        if input_node.op == "BiasAdd":
          output_graph_def.node.remove(input_node)
          input_node_input0 = input_node.input[0]
          # Adjust bias values (multiply by scale/sqrt(variance))
          adjust_bias_values(input_node, node, const_by_name)
          # Hack: swap names to avoid changing input of activation
          swap_names(copied_node, input_node)
          # Fix inputs for these two ops
          replace_input(copied_node, i, input_node_input0)
          replace_input(input_node, input_node_input0, copied_node.name)
          # Fix order in node list
          output_graph_def.node.extend([copied_node])
          output_graph_def.node.extend([input_node])
          skip_add_node = True
    # Add maybe-modified nodes if not already done
    if not skip_add_node:
      output_graph_def.node.extend([copied_node])
  return output_graph_def

def FoldFusedBatchNorm(graph_def):
  """Optimize training graph for inference:
    - Remove Identity and CheckNumerics nodes
    - Fold FusedBatchNorm constants into previous Conv2D weights
    - Fold other constants
    - Strip unused nodes
    - Sort by execution order
  """
  transformed_graph_def = TransformGraph (
         graph_def,
         ['input_1'],
         ['probs/Softmax'],
         [
            'add_default_attributes',
            'remove_nodes(op=Identity, op=CheckNumerics)',
            'fold_constants(ignore_errors=true)',
            'fold_batch_norms',
            'fold_old_batch_norms',
            'strip_unused_nodes',
            'sort_by_execution_order',
         ])
  return transformed_graph_def

def load_graph(model_file):
  graph_def = tf.compat.v1.GraphDef()

  with open(model_file, "rb") as f:
    graph_def.ParseFromString(f.read())
  return graph_def


in_graph = 'resnet50_fp32_keras.pb'
out_graph = 'resnet50_fp32_keras_opt.pb'

graph_orig = load_graph(in_graph)
graph_mod = MoveBiasAddAfterFusedBatchNorm(graph_orig)
graph_mod2 = FoldFusedBatchNorm(graph_mod)
with tf.io.gfile.GFile(out_graph, "wb") as f:
    f.write(graph_mod2.SerializeToString())

print('Done Generating {} .....'.format(out_graph))

Done Generating resnet50_fp32_keras_opt.pb .....


In [4]:
!ls

aws-neuron-sdk		    resnet50_fp32_keras.pb
compiler_workdir	    resnet50_fp32_keras_opt.pb
eks-demo-draft.ipynb	    rn50
nohup.out		    rn50_fp16
optimize_for_inference.py   rn50_fp16_compiled_b1_nc4
pod_manifests		    rn50_fp16_compiled_b1_nc4.zip
resnet50_fp16_keras_opt.pb


##### Convert to FP16

In [5]:
import tensorflow as tf
import numpy as np

from google.protobuf import text_format
from tensorflow.core.framework import graph_pb2
from tensorflow.core.framework import node_def_pb2
from tensorflow.python.platform import gfile

from tensorflow.core.framework import attr_value_pb2
from tensorflow.python.framework import tensor_util

def ConvertFP32ToOther(graphdef):
  """Converts an FP32 network by casting all constants (weights) to a lower
     precision floating point type (FP16) and updating the dtypes
     everywhere."""
  cast_type = "float16"
  sess = tf.Session(graph=tf.import_graph_def(graphdef))
  output_graph_def = graph_pb2.GraphDef()
  dummy_tensor = sess.run(tf.constant([0.1]))
  dummy_tensor_proto = tensor_util.make_tensor_proto(dummy_tensor, \
      dtype=cast_type, shape=dummy_tensor.shape)
  dummy_tensor32 = sess.run(tf.constant([0.1]))
  dummy_tensor_proto32 = tensor_util.make_tensor_proto(dummy_tensor, \
      dtype=tf.float32, shape=dummy_tensor.shape)
  dt_float_type_attr = attr_value_pb2.AttrValue(type=dummy_tensor_proto32.dtype)
  dt_half_type_attr = attr_value_pb2.AttrValue(type=dummy_tensor_proto.dtype)
  for node in graphdef.node:
    output_node = node_def_pb2.NodeDef()
    output_node.CopyFrom(node)
    if (node.op == "Const"):
      if (node.attr["dtype"] == dt_float_type_attr):
        a = tensor_util.MakeNdarray(node.attr["value"].tensor)
        a = tf.cast(a, cast_type)
        a = sess.run(a)
        output_node.attr["dtype"].CopyFrom(dt_half_type_attr)
        output_node.attr["value"].CopyFrom(
            attr_value_pb2.AttrValue(
              tensor=tensor_util.make_tensor_proto(a,\
                dtype=cast_type, shape=a.shape)))
    else:
      if ("T" in node.attr.keys()):
        if (output_node.attr["T"] == dt_float_type_attr):
          output_node.attr["T"].CopyFrom(dt_half_type_attr)
      if ("Tparams" in node.attr.keys()):
        if (output_node.attr["Tparams"] == dt_float_type_attr):
          output_node.attr["Tparams"].CopyFrom(dt_half_type_attr)
      if ("dtype" in node.attr.keys()):
        if (node.attr["dtype"] == dt_float_type_attr):
          output_node.attr["dtype"].CopyFrom(dt_half_type_attr)
      if ("SrcT" in node.attr.keys()):
        if (node.attr["SrcT"] == dt_float_type_attr):
          output_node.attr["SrcT"].CopyFrom(dt_half_type_attr)
      if ("DstT" in node.attr.keys()):
        if (node.attr["DstT"] == dt_float_type_attr):
          output_node.attr["DstT"].CopyFrom(dt_half_type_attr)
    output_graph_def.node.extend([output_node])
  return output_graph_def

def load_graph(model_file):
  graph_def = tf.GraphDef()

  with open(model_file, "rb") as f:
    graph_def.ParseFromString(f.read())

  return graph_def

print('Converting the Model to FP16 .......')

in_graph = 'resnet50_fp32_keras_opt.pb'
out_graph = 'resnet50_fp16_keras_opt.pb'
graph_f32 = load_graph(in_graph)
graph_f16 = ConvertFP32ToOther(graph_f32)
output_xformed_graph_name = out_graph
with gfile.GFile(output_xformed_graph_name, "wb") as f:
    f.write(graph_f16.SerializeToString())
    
print('Done generating {} ......'.format(output_xformed_graph_name))

Converting the Model to FP16 .......


KeyboardInterrupt: 

In [None]:
!ls

### 2. Compile the model for Inferentia
Inferentia supports Ahead of time Compiltaion. The model will be compiled using neuron-cc. The script below captures the essential arguments passed to the neuron-cc compiler. Once the compilation is successful the compiled model will be uploaded to a S3 bucket for later use by the TF Model serving.

##### Compile the model

In [None]:
import time
import shutil
import numpy as np
import argparse
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications import resnet50
import tensorflow.neuron as tfn

tf.keras.backend.set_image_data_format('channels_last')

batch_size = 1
num_neuroncores = 4

def pb_to_saved_model(pb_path, input_names, output_names, model_dir):
    graph_def = tf.GraphDef()
    graph_def.ParseFromString(open(pb_path, 'rb').read())
    with tf.Session(graph=tf.Graph()) as sess:
        tf.import_graph_def(graph_def, name='')
        inputs = {name: sess.graph.get_tensor_by_name(ts_name) for name, ts_name in input_names.items()}
        outputs = {name: sess.graph.get_tensor_by_name(ts_name) for name, ts_name in output_names.items()}
        tf.saved_model.simple_save(sess, model_dir, inputs, outputs)

saved_model_dir = "rn50_fp16"

shutil.rmtree(saved_model_dir, ignore_errors=True)

pb_to_saved_model("resnet50_fp16_keras_opt.pb", {"input_1:0": "input_1:0"}, {"probs/Softmax:0" : "probs/Softmax:0"}, saved_model_dir)

img_arr = np.zeros([batch_size, 224, 224, 3], dtype='float16')
compiled_saved_model_dir = saved_model_dir + "_compiled_b" + str(batch_size) + "_nc" + str(num_neuroncores)
shutil.rmtree(compiled_saved_model_dir + "/1", ignore_errors=True)

print("\n*** Batch size {}, num NeuronCores {} (input shape: {}, saved model dir: {}) ***\n".format(batch_size, num_neuroncores, img_arr.shape, compiled_saved_model_dir))
compiler_args = ['--batching_en', '--rematerialization_en', '--spill_dis',
                 '--sb_size', str((batch_size + 6)*10), 
                 '--enable-replication', 'True',
                 '--num-neuroncores', str(num_neuroncores)]
static_weights = False
if num_neuroncores >= 8:
    compiler_args.append('--static-weights')
    static_weights = True

shutil.rmtree('compiler_workdir', ignore_errors=True)
start = time.time()
rslts = tfn.saved_model.compile(saved_model_dir, compiled_saved_model_dir + "/1",
               model_feed_dict={'input_1:0' : img_arr},
               compiler_workdir='compiler_workdir',
               dynamic_batch_size=True,
               compiler_args = compiler_args)
delta = time.time() - start
perc_on_inf = rslts['OnNeuronRatio'] * 100

compile_success = False
if perc_on_inf < 50:
    print("\nERROR: Compilation finished in {:.0f} seconds with less than 50% operations placed on Inferentia ({:.1f}%)\n".format(delta, perc_on_inf))
    if '--static-weights' in compiler_args:
        print("INFO: Retry compilation without static weights")
        compiler_args.remove('--static-weights')
        static_weights = False
        shutil.rmtree(compiled_saved_model_dir + "/1", ignore_errors=True)
        shutil.rmtree('compiler_workdir2', ignore_errors=True)
        start = time.time()
        rslts = tfn.saved_model.compile(saved_model_dir, compiled_saved_model_dir + "/1",
                   model_feed_dict={'input_1:0' : img_arr},
                   compiler_workdir='compiler_workdir2',
                   dynamic_batch_size=True,
                   compiler_args = compiler_args)
        delta = time.time() - start
        perc_on_inf = rslts['OnNeuronRatio'] * 100
        if perc_on_inf < 50:
            print("\nERROR: Retry compilation finished in {:.0f} seconds with less than 50% operations placed on Inferentia ({:.1f}%)\n".format(delta, perc_on_inf))
        else:    
            print("\nINFO: Retry compilation finished in {:.0f} seconds with {:.1f}% operations placed on Inferentia\n".format(delta, perc_on_inf))
            compile_success = True
else:    
    print("\nINFO: Compilation finished in {:.0f} seconds with {:.1f}% operations placed on Inferentia\n".format(delta, perc_on_inf))
    compile_success = True

# Prepare SavedModel for uploading to Inf1 instance
completion_code = 0
if compile_success:
    shutil.make_archive('./' + compiled_saved_model_dir, 'zip', './', compiled_saved_model_dir)
    completion_code = 1 + int(static_weights)

print(completion_code)


In [None]:
!ls

In [None]:
!mkdir -p rn50/1
!cp rn50_fp16/saved_model.pb rn50/1/

##### Upload the compied model to S3 for TF serving

In [None]:
!aws s3 sync rn50 s3://eks-tests/rn50

In [None]:
!aws s3 ls s3://eks-tests/rn50

### 3. Launch the K8 cluster with INF1 nodes

#### Steps:
  1. Launch the cluster
  2. Apply neuron device plugin as a daemon set. [This device plugin is reponsible for exposing the neuron devices to containers]


##### Launch the K8s INF1 cluster

In [None]:
!eksctl create cluster --name=eks-demo-inf1 \
                       --nodes=1 \
                       --node-ami=ami-0b7eb206d4a5ad3ea \ #remove this once the EKS INF1 is announced
                       --node-type=inf1.2xlarge \
                       --ssh-access \
                       --region=us-west-2 \
                       --ssh-public-key ~/.ssh/id_rsa.pub

In [None]:
!kubectl get pods -A

In [None]:
%cd ~/demo_k8s_inf1/pod_manifests

In [None]:
!cat k8s-neuron-device-plugin.yml

##### Apply neuron device plugin

In [None]:
!kubectl apply -f k8s-neuron-device-plugin.yml

In [None]:
!kubectl get pods -A

In [None]:
#Apply the secret.yml with AWS credentals required to access S3 bucket from worker nodes
!kubectl apply -f secret.yml

### 4. Launch the Tensorflow Model Server

The model server POD should host two containers
1. TF-SERVING container, that performs the follwoing
    a. Pulls the compiled model from s3 and lods the model into inferentia neuron core by calling neuron-rtd GRPC APIs
    b. Exposes a north bound Interface for the clinets to run the inference
2. neuron-rtd side car container that manages the inferentia device

It is recommended to keep both neuron-rtd and Model serving contaner in the same POD.

In [None]:
!cat rn50_service.yml

In [None]:
!kubectl apply -f rn50_service.yml

In [None]:
!kubectl get pods -A

In [None]:
!kubectl port-forward svc/inf-k8s-test 9000:9000

In [None]:
!kubectl get svc -A

### 5. Run Inference

In [None]:
import numpy as np
import grpc
import tensorflow as tf
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras.applications.resnet50 import decode_predictions
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2_grpc

print('Starting the Inference.........')
channel = grpc.insecure_channel('localhost:9000')
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)

img_file = tf.keras.utils.get_file(
        "./kitten_small.jpg",
        "https://raw.githubusercontent.com/awslabs/mxnet-model-server/master/docs/images/kitten_small.jpg")

img = image.load_img(img_file, target_size=(224, 224))
img_array = preprocess_input(image.img_to_array(img)[None, ...])
    
request = predict_pb2.PredictRequest()
request.model_spec.name = 'rn50_fp16'

#cast the input to fp16
img_array = img_array.astype(np.float16)
request.inputs['input_1:0'].CopyFrom(
        tf.contrib.util.make_tensor_proto(img_array, shape=img_array.shape))
result = stub.Predict(request)
prediction = tf.make_ndarray(result.outputs['probs/Softmax:0'])
res = decode_predictions(prediction)[0][0][1]
if res == 'tabby':
    print("Infer Test Passed..")
else:
    print('Inference result missmatch. Expected tabby got %s' % res)
