<a href="https://colab.research.google.com/github/ADMoreau/raspi_gui/blob/master/pose_estimation_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!pip uninstall tensorflow -y
!pip install tensorflow==2.1
!pip install tensorflow-model-optimization
!pip install efficientnet

In [0]:
%tensorflow_version 2.x
from __future__ import absolute_import, division, print_function, unicode_literals

import efficientnet.tfkeras as efn 
import tensorflow as tf
import tensorflow.keras as tfk
from tensorflow.keras import layers
from tensorflow_model_optimization.sparsity import keras as sparsity
import json
import numpy as np
import math
import collections
import PIL
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import os
from six.moves import xrange
import string
#tf.compat.v1.disable_eager_execution()
tf.__version__

'2.1.0'

In [0]:
class convolutional(layers.Layer):
  def __init__(self, batch_normalize, filters, size, stride, pad, activation, **kwargs):
    super(convolutional, self).__init__(**kwargs)
    self.batch_normalize = batch_normalize
    self.filters = filters
    self.size = size
    self.stride = stride
    self.is_pad = pad
    self.pad = 'same' if self.is_pad else 'valid'
    self.activation = activation
    if self.batch_normalize:
      self.conv = layers.Conv2D(filters = self.filters, 
                                kernel_size = (self.size, self.size),
                                strides = (self.stride, self.stride),
                                padding = self.pad,
                                use_bias = False)
      self.bn = layers.BatchNormalization()
    else:
      self.conv = layers.Conv2D(filters = self.filters, 
                              kernel_size = (self.size, self.size),
                              strides = (self.stride, self.stride),
                              padding = self.pad)
    if self.activation == 'leaky':
      self.activation = layers.LeakyReLU(alpha=.1)
    elif self.activation == 'relu':
      self.activation = layers.ReLU()
  def call(self, inputs):
    x = self.conv(inputs)
    if self.batch_normalize:
      x = self.bn(x)
    if self.activation == 'linear':
      return x
    else:
      return self.activation(x)

class deconvolutional(layers.Layer):
  def __init__(self, batch_normalize, filters, size, stride, pad, activation, **kwargs):
    super(deconvolutional, self).__init__(**kwargs)
    self.batch_normalize = batch_normalize
    self.filters = filters
    self.size = size
    self.stride = stride
    self.is_pad = pad
    self.pad = 'same' if self.is_pad else 'valid'
    self.activation = activation
    if self.batch_normalize:
      self.conv = layers.Conv2DTranspose(filters = self.filters, 
                                kernel_size = (self.size, self.size),
                                strides = (self.stride, self.stride),
                                padding = self.pad,
                                use_bias = False)
      self.bn = layers.BatchNormalization()
    else:
      self.conv = layers.Conv2DTranspose(filters = self.filters, 
                              kernel_size = (self.size, self.size),
                              strides = (self.stride, self.stride),
                              padding = self.pad)
    if self.activation == 'leaky':
      self.activation = layers.LeakyReLU(alpha=.1)
    elif self.activation == 'relu':
      self.activation = layers.ReLU()
  def call(self, inputs):
    x = self.conv(inputs)
    if self.batch_normalize:
      x = self.bn(x)
    if self.activation == 'linear':
      return x
    else:
      return self.activation(x)

#Model based on https://www.zpascal.net/cvpr2019/Hu_Segmentation-Driven_6D_Object_Pose_Estimation_CVPR_2019_paper.pdf

In [0]:
def swish(x):
  return tf.keras.layers.Multiply()([x, tf.keras.layers.Activation('sigmoid')(x)])

def coreModel():
  yolo_input = tfk.Input(shape=(480, 640, 3), name='input')
  a = layers.Conv2D(filters=32, kernel_size=(3, 3), strides=1, padding='same')(yolo_input)
  a = swish(layers.BatchNormalization()(a))
  a_s = layers.Conv2D(filters=64, kernel_size=(3, 3), strides=2, padding='same')(a)
  a_s = swish(layers.BatchNormalization()(a_s))
  a = layers.Conv2D(filters=32, kernel_size=(1, 1), strides=1, padding='same')(a_s)
  a = swish(layers.BatchNormalization()(a))
  a_S = layers.Conv2D(filters=64, kernel_size=(3, 3), strides=1, padding='same')(a)
  a_S = swish(layers.BatchNormalization()(a_S))
  #shortcut layer -3 (all)
  b = layers.add([a_s, a_S])
  b_s = layers.Conv2D(filters=128, kernel_size=(3, 3), strides=2, padding='same')(b)
  b_s = swish(layers.BatchNormalization()(b_s))
  b = layers.Conv2D(filters=64, kernel_size=(1,1), strides=1, padding='same')(b_s)
  b = swish(layers.BatchNormalization()(b))
  b_S = layers.Conv2D(filters=128, kernel_size=(3, 3), strides=1, padding='same')(b)
  b_S = swish(layers.BatchNormalization()(b_S))
  #shortcut layer
  c_s = layers.add([b_s, b_S])
  c = layers.Conv2D(filters=64, kernel_size=(1,1), strides=1, padding='same')(c_s)
  c = swish(layers.BatchNormalization()(c))
  c_S = layers.Conv2D(filters=128, kernel_size=(3, 3), strides=1, padding='same')(b)
  c_S = swish(layers.BatchNormalization()(c_S))
  #shortcut layer
  d = layers.add([c_s, c_S])
  d_s = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=2, padding='same')(d)
  d_s = swish(layers.BatchNormalization()(d_s))
  d = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(d_s)
  d = swish(layers.BatchNormalization()(d))
  d_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(d)
  d_S = swish(layers.BatchNormalization()(d_S))
  #shortcut layer
  e_s = layers.add([d_s, d_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=128, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=256, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_36 = layers.add([e_s, e_S])
  e_s = layers.Conv2D(filters=512, kernel_size=(3,3), strides=2, padding='same')(e_36)
  e_s = swish(layers.BatchNormalization()(e_s))
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3,3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=512, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_61 = layers.add([e_s, e_S])
  e_s = layers.Conv2D(filters=1024, kernel_size=(3,3), strides=2, padding='same')(e_61)
  e_s = swish(layers.BatchNormalization()(e_s))
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=1024, kernel_size=(3,3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=1024, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=1024, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut layer
  e_s = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e_s)
  e = swish(layers.BatchNormalization()(e))
  e_S = layers.Conv2D(filters=1024, kernel_size=(3, 3), strides=1, padding='same')(e)
  e_S = swish(layers.BatchNormalization()(e_S))
  #shortcut
  e_74 = layers.add([e_s, e_S])
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e_74)
  e = swish(layers.BatchNormalization()(e))
  e = layers.Conv2D(filters=1024, kernel_size=(3, 3), strides=1, padding='same')(e)
  e = swish(layers.BatchNormalization()(e))
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e)
  e = swish(layers.BatchNormalization()(e))
  e = layers.Conv2D(filters=1024, kernel_size=(3, 3), strides=1, padding='same')(e)
  e = swish(layers.BatchNormalization()(e))
  e = layers.Conv2D(filters=512, kernel_size=(1,1), strides=1, padding='same')(e)
  e = swish(layers.BatchNormalization()(e))
  e = layers.Conv2D(filters=256, kernel_size=(1,1), strides=1, padding='same')(e)
  e = swish(layers.BatchNormalization()(e))
  """
  q = deconvolutional(batch_normalize=1, filters=256, size=2, stride=2, padding=0, activation='leaky')(q)
  #route -1, 61
  q = layers.Concatenate()([q, q_61])
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  #q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  #q = deconvolutional(batch_normalize=1, filters=128, size=2, stride=2, pad=0, activation='leaky')(q)
  #route -1, 36
  q = layers.Concatenate()([q, l_36])
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  """
  pos = layers.Conv2D(filters=16, kernel_size=(1,1), strides=1, padding='same', name='position')(e)
  sigmoid = layers.Conv2D(filters=8, kernel_size=(1,1), strides=1, padding='same', name='sigmoid')(e)
  
  yolo = tfk.Model(yolo_input, [sigmoid, pos])
  #out = tfk.activations.sigmoid(q_outlayer_1)
  #out = tfk.backend.max(out, axis = -1) #channels last
  #outlayer
  #route 74
  """Original moving to only classifying bounding box points 
  q = convolutional(batch_normalize=1, filters=512, size=1, stride=1, pad=1, activation='leaky')(q_74)
  q = convolutional(batch_normalize=1, filters=1024, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=1024, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = deconvolutional(batch_normalize=1, filters=256, size=2, stride=2, pad=0, activation='leaky')(q)
  #route -1, 61
  q = layers.Concatenate()([q, q_61])
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=512, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = deconvolutional(batch_normalize=1, filters=128, size=2, stride=2, pad=0, activation='leaky')(q)
  #route -1 36
  q = layers.Concatenate()([q, l_36])
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=128, size=1, stride=1, pad=1, activation='leaky')(q)
  q = convolutional(batch_normalize=1, filters=256, size=3, stride=1, pad=1, activation='leaky')(q)
  q_outlayer_2 = convolutional(batch_normalize=0, filters=24, size=1, stride=1, pad=1, activation='linear')(q)
  #outlayer
  yolo = tfk.Model(yolo_input, [q_outlayer_1, q_outlayer_2])
  """
  #yolo = tfk.Model(yolo_input, out)
  return yolo

#Efficientnet from https://github.com/qubvel/efficientnet

could not use quantization on imported model

In [0]:
BlockArgs = collections.namedtuple('BlockArgs', [
    'kernel_size', 'num_repeat', 'input_filters', 'output_filters',
    'expand_ratio', 'id_skip', 'strides', 'se_ratio'
])
# defaults will be a public argument for namedtuple in Python 3.7
# https://docs.python.org/3/library/collections.html#collections.namedtuple
BlockArgs.__new__.__defaults__ = (None,) * len(BlockArgs._fields)

DEFAULT_BLOCKS_ARGS = [
        BlockArgs(kernel_size=3, num_repeat=1, input_filters=32, output_filters=16,
                  expand_ratio=1, id_skip=True, strides=[1, 1], se_ratio=0.25),
        BlockArgs(kernel_size=3, num_repeat=2, input_filters=16, output_filters=24,
                  expand_ratio=6, id_skip=True, strides=[2, 2], se_ratio=0.25),
        BlockArgs(kernel_size=5, num_repeat=2, input_filters=24, output_filters=40,
                  expand_ratio=6, id_skip=True, strides=[2, 2], se_ratio=0.25),
        BlockArgs(kernel_size=3, num_repeat=3, input_filters=40, output_filters=80,
                  expand_ratio=6, id_skip=True, strides=[2, 2], se_ratio=0.25),
        BlockArgs(kernel_size=5, num_repeat=3, input_filters=80, output_filters=112,
                  expand_ratio=6, id_skip=True, strides=[1, 1], se_ratio=0.25),
        BlockArgs(kernel_size=5, num_repeat=4, input_filters=112, output_filters=192,
                  expand_ratio=6, id_skip=True, strides=[2, 2], se_ratio=0.25),
        BlockArgs(kernel_size=3, num_repeat=1, input_filters=192, output_filters=320,
                  expand_ratio=6, id_skip=True, strides=[1, 1], se_ratio=0.25)
    ]

CONV_KERNEL_INITIALIZER = {
    'class_name': 'VarianceScaling',
    'config': {
        'scale': 2.0,
        'mode': 'fan_out',
        # EfficientNet actually uses an untruncated normal distribution for
        # initializing conv layers, but keras.initializers.VarianceScaling use
        # a truncated distribution.
        # We decided against a custom initializer for better serializability.
        'distribution': 'normal'
    }
}

DENSE_KERNEL_INITIALIZER = {
    'class_name': 'VarianceScaling',
    'config': {
        'scale': 1. / 3.,
        'mode': 'fan_out',
        'distribution': 'uniform'
    }
}

def get_swish():
    def swish(x):
        """Swish activation function: x * sigmoid(x).
        Reference: [Searching for Activation Functions](https://arxiv.org/abs/1710.05941)
        """
        return tf.keras.layers.Multiply()([x, tf.keras.layers.Activation('sigmoid')(x)])
    return swish

def get_dropout():
    """Wrapper over custom dropout. Fix problem of ``None`` shape for tf.keras.
    It is not possible to define FixedDropout class as global object,
    because we do not have modules for inheritance at first time.
    Issue:
        https://github.com/tensorflow/tensorflow/issues/30946
    """
    class FixedDropout(layers.Dropout):
        def _get_noise_shape(self, inputs):
            if self.noise_shape is None:
                return self.noise_shape

            symbolic_shape = tfk.shape(inputs)
            noise_shape = [symbolic_shape[axis] if shape is None else shape
                           for axis, shape in enumerate(self.noise_shape)]
            return tuple(noise_shape)

    return FixedDropout

def round_filters(filters, width_coefficient, depth_divisor):
    """Round number of filters based on width multiplier."""

    filters *= width_coefficient
    new_filters = int(filters + depth_divisor / 2) // depth_divisor * depth_divisor
    new_filters = max(depth_divisor, new_filters)
    # Make sure that round down does not go down by more than 10%.
    if new_filters < 0.9 * filters:
        new_filters += depth_divisor
    return int(new_filters)

def round_repeats(repeats, depth_coefficient):
    """Round number of repeats based on depth multiplier."""
    return int(math.ceil(depth_coefficient * repeats))

def mb_conv_block(inputs, block_args, activation, drop_rate=None, prefix='', ):
    """Mobile Inverted Residual Bottleneck."""

    has_se = (block_args.se_ratio is not None) and (0 < block_args.se_ratio <= 1)
    bn_axis = 3 

    # workaround over non working dropout with None in noise_shape in tf.keras
    #Dropout = get_dropout()

    # Expansion phase
    filters = block_args.input_filters * block_args.expand_ratio
    if block_args.expand_ratio != 1:
        x = layers.Conv2D(filters, 1,
                          padding='same',
                          use_bias=False,
                          kernel_initializer=CONV_KERNEL_INITIALIZER,
                          name=prefix + 'expand_conv')(inputs)
        x = layers.BatchNormalization(axis=bn_axis, name=prefix + 'expand_bn')(x)
        x = layers.Activation(activation, name=prefix + 'expand_activation')(x)
    else:
        x = inputs

    # Depthwise Convolution
    x = layers.DepthwiseConv2D(block_args.kernel_size,
                               strides=block_args.strides,
                               padding='same',
                               use_bias=False,
                               depthwise_initializer=CONV_KERNEL_INITIALIZER,
                               name=prefix + 'dwconv')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=prefix + 'bn')(x)
    x = layers.Activation(activation, name=prefix + 'activation')(x)

    # Squeeze and Excitation phase
    if has_se:
        num_reduced_filters = max(1, int(
            block_args.input_filters * block_args.se_ratio
        ))
        se_tensor = layers.GlobalAveragePooling2D(name=prefix + 'se_squeeze')(x)

        target_shape = (1, 1, filters)
        se_tensor = layers.Reshape(target_shape, name=prefix + 'se_reshape')(se_tensor)
        se_tensor = layers.Conv2D(num_reduced_filters, 1,
                                  activation=activation,
                                  padding='same',
                                  use_bias=True,
                                  kernel_initializer=CONV_KERNEL_INITIALIZER,
                                  name=prefix + 'se_reduce')(se_tensor)
        se_tensor = layers.Conv2D(filters, 1,
                                  activation='sigmoid',
                                  padding='same',
                                  use_bias=True,
                                  kernel_initializer=CONV_KERNEL_INITIALIZER,
                                  name=prefix + 'se_expand')(se_tensor)
        x = layers.multiply([x, se_tensor], name=prefix + 'se_excite')

    # Output phase
    x = layers.Conv2D(block_args.output_filters, 1,
                      padding='same',
                      use_bias=False,
                      kernel_initializer=CONV_KERNEL_INITIALIZER,
                      name=prefix + 'project_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=prefix + 'project_bn')(x)
    if block_args.id_skip and all(
            s == 1 for s in block_args.strides
    ) and block_args.input_filters == block_args.output_filters:
        #if drop_rate and (drop_rate > 0):
        #    x = Dropout(drop_rate,
        #                noise_shape=(None, 1, 1, 1),
        #                name=prefix + 'drop')(x)
        x = layers.add([x, inputs], name=prefix + 'add')

    return x

def EfficientNet(width_coefficient,
                 depth_coefficient,
                 default_resolution,
                 dropout_rate=0.2,
                 drop_connect_rate=0.2,
                 depth_divisor=8,
                 model_name='efficientnet',
                 pooling=None):

    blocks_args=DEFAULT_BLOCKS_ARGS

    bn_axis = 3 
    activation = get_swish()

    input_tensor = tfk.Input(shape=(480, 640, 3), name='input')
    x = layers.Conv2D(round_filters(32, width_coefficient, depth_divisor), 3,
                        strides=(2, 2),
                        padding='same',
                        use_bias=False,
                        kernel_initializer=CONV_KERNEL_INITIALIZER,
                        name='stem_conv')(input_tensor)
    x = layers.BatchNormalization(axis=bn_axis, name='stem_bn')(x)
    x = layers.Activation(activation, name='stem_activation')(x)
    # Build blocks
    num_blocks_total = sum(block_args.num_repeat for block_args in blocks_args)
    block_num = 0
    for idx, block_args in enumerate(blocks_args):
        assert block_args.num_repeat > 0
        # Update block input and output filters based on depth multiplier.
        block_args = block_args._replace(
            input_filters=round_filters(block_args.input_filters,
                                        width_coefficient, depth_divisor),
            output_filters=round_filters(block_args.output_filters,
                                         width_coefficient, depth_divisor),
            num_repeat=round_repeats(block_args.num_repeat, depth_coefficient))

        # The first block needs to take care of stride and filter size increase.
        drop_rate = drop_connect_rate * float(block_num) / num_blocks_total
        x = mb_conv_block(x, block_args,
                          activation=activation,
                          drop_rate=drop_rate,
                          prefix='block{}a_'.format(idx + 1))
        block_num += 1
        if block_args.num_repeat > 1:
            # pylint: disable=protected-access
            block_args = block_args._replace(
                input_filters=block_args.output_filters, strides=[1, 1])
            # pylint: enable=protected-access
            for bidx in xrange(block_args.num_repeat - 1):
                drop_rate = drop_connect_rate * float(block_num) / num_blocks_total
                block_prefix = 'block{}{}_'.format(
                    idx + 1,
                    string.ascii_lowercase[bidx + 1]
                )
                x = mb_conv_block(x, block_args,
                                  activation=activation,
                                  drop_rate=drop_rate,
                                  prefix=block_prefix)
                block_num += 1

    # Build top
    x = layers.Conv2D(round_filters(1280, width_coefficient, depth_divisor), 1,
                      padding='same',
                      use_bias=False,
                      kernel_initializer=CONV_KERNEL_INITIALIZER,
                      name='top_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, name='top_bn')(x)
    x = layers.Activation(activation, name='top_activation')(x)
    pos = layers.Conv2D(filters=16,
                        kernel_size=(1,1),
                        strides=1,
                        padding='same',
                        name='position')(x)
    sigmoid = layers.Conv2D(filters=8,
                            kernel_size=(1,1),
                            strides=1,
                            padding='same',
                            name='sigmoid')(x)
  
    efficientnet = tfk.Model(input_tensor, [sigmoid, pos])
    return efficientnet

In [0]:
'''
efficientnet = EfficientNet(
        1.0, 1.0, 224, 0.2,
        model_name='efficientnet-b0'
    )
'''
efficientnet = EfficientNet(
        .1, .1, 100, 0.2,
        model_name='efficientnet-b0'
    )
efficientnet.summary()

Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input (InputLayer)              [(None, 480, 640, 3) 0                                            
__________________________________________________________________________________________________
stem_conv (Conv2D)              (None, 240, 320, 8)  216         input[0][0]                      
__________________________________________________________________________________________________
stem_bn (BatchNormalization)    (None, 240, 320, 8)  32          stem_conv[0][0]                  
__________________________________________________________________________________________________
stem_activation (Activation)    (None, 240, 320, 8)  0           stem_bn[0][0]                    
____________________________________________________________________________________________

In [0]:
with open('/content/drive/My Drive/pose_estimation/via_export_json.json') as f:
  data = json.load(f)

In [0]:
input_frame = np.zeros(shape=(len(data), 480, 640, 3))
position_frame = np.zeros(shape=(len(data), 15, 20, 16))
sigmoid_frame = np.zeros(shape=(len(data), 15, 20, 8))

for i, val in tqdm(enumerate(data)):
  rgb_image = PIL.Image.open("/content/drive/My Drive/pose_estimation/test_images/{}".format(data[val]['filename'].
                                                                                             replace('.png', '.jpg')))
  rgb_image = np.array(rgb_image)
  input_frame[i, :, :, :] = rgb_image
  for point in data[val]['regions']:
    x = point['shape_attributes']['cx']
    y = point['shape_attributes']['cy']
    sig_x = math.floor(x / 32)
    sig_y = math.floor(y / 32)
    label = point['region_attributes']['point_name']
    if label == 'ft':
      sigmoid_frame[i, sig_y, sig_x, 0] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 0] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 1] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'fl':
      sigmoid_frame[i, sig_y, sig_x, 1] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 2] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 3] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'fb':
      sigmoid_frame[i, sig_y, sig_x, 2] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 4] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 5] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'fr':
      sigmoid_frame[i, sig_y, sig_x, 3] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 6] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 7] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'bt':
      sigmoid_frame[i, sig_y, sig_x, 4] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 8] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 9] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'bl':
      sigmoid_frame[i, sig_y, sig_x, 5] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 10] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 11] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'bb':
      sigmoid_frame[i, sig_y, sig_x, 6] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 12] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 13] = (y - ((32 * b) + 15)) / 480 #y positions
    elif label == 'br':
      sigmoid_frame[i, sig_y, sig_x, 7] = 1
      for a in range(15):
        for b in range(20):
          position_frame[i, a, b, 14] = (x - ((32 * a) + 15)) / 640 #x positions
          position_frame[i, a, b, 15] = (y - ((32 * b) + 15)) / 480 #y positions

99it [01:22,  1.20it/s]


In [0]:
X = (input_frame / 255.0).astype(np.float32)
y_sigmoid = sigmoid_frame.astype(np.float32)
y_pos = position_frame.astype(np.float32)

X_train, X_test = X[:80], X[80:],
y_train_sigmoid, y_test_sigmoid = y_sigmoid[:80], y_sigmoid[80:]
y_train_pos, y_test_pos = y_pos[:80], y_pos[80:]
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, {"sigmoid": y_train_sigmoid, "position": y_train_pos}))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, {"sigmoid": y_test_sigmoid, "position": y_test_pos}))
AUTO = tf.data.experimental.AUTOTUNE
train_dataset = train_dataset.shuffle(1).batch(4).prefetch(AUTO)
test_dataset = test_dataset.shuffle(1).batch(4).prefetch(AUTO)

In [0]:
def custom_loss(model, x, y):
  predictions = model(x)
  true_foci, true_loc = y['sigmoid'], y['position']
  predict_foci, predict_loc = tfk.activations.sigmoid(predictions[0]), predictions[1]

  def loss_object(true_foci, true_loc, predict_foci, predict_loc):
    #Positional L1 loss
    delta_c = tf.math.subtract(true_loc, predict_loc)
    delta_c = tf.math.abs(delta_c)
    #get Tau
    delta_c_max = tf.math.reduce_max(delta_c)
    delta_c_min = tf.math.reduce_min(delta_c)
    tau = -1 * (delta_c_max - delta_c_min) / (delta_c_max + delta_c_min)
    #L2
    temp = tf.math.subtract(true_loc, predict_loc)
    temp = tf.math.square(temp)
    #multiply by Tau and exp
    temp = tf.math.scalar_mul(tau, temp)
    temp = tf.math.exp(temp)
    #reduce the dimension of the temp array
    a1 = temp[:, :, :, 0]
    a2 = temp[:, :, :, 1]
    a = tf.math.add(a1, a2)
    a = tf.scalar_mul(.5, a)

    b1 = temp[:, :, :, 2]
    b2 = temp[:, :, :, 3]
    b = tf.math.add(b1, b2)
    b = tf.scalar_mul(.5, b)

    c1 = temp[:, :, :, 4]
    c2 = temp[:, :, :, 5]
    c = tf.math.add(c1, c2)
    c = tf.scalar_mul(.5, c)

    d1 = temp[:, :, :, 6]
    d2 = temp[:, :, :, 7]
    d = tf.math.add(d1, d2)
    d = tf.scalar_mul(.5, d)

    e1 = temp[:, :, :, 8]
    e2 = temp[:, :, :, 9]
    e = tf.math.add(e1, e2)
    e = tf.scalar_mul(.5, e)

    f1 = temp[:, :, :, 10]
    f2 = temp[:, :, :, 11]
    f = tf.math.add(f1, f2)
    f = tf.scalar_mul(.5, f)

    g1 = temp[:, :, :, 12]
    g2 = temp[:, :, :, 13]
    g = tf.math.add(g1, g2)
    g = tf.scalar_mul(.5, g)

    h1 = temp[:, :, :, 14]
    h2 = temp[:, :, :, 15]
    h = tf.math.add(h1, h2)
    h = tf.scalar_mul(.5, h)
    temp = tf.stack([a, b, c, d, e, f, g, h], axis=3)

    #L1
    temp = tf.math.subtract(predict_foci, temp)
    temp = tf.math.abs(temp)
    
    beta = 1
    gamma = 1
    position_loss = tf.math.reduce_sum(delta_c)
    conf_loss = tf.math.reduce_sum(temp)
    loss = beta * position_loss + gamma * conf_loss
    loss = loss / true_foci.shape[0] #BATCH SIZE ?
    return loss

  return loss_object(true_foci, true_loc, predict_foci, predict_loc)


In [0]:
def grad(model, inputs, targets):
  with tf.GradientTape() as tape:
    loss_value = custom_loss(model, inputs, targets)
  return loss_value, tape.gradient(loss_value, model.trainable_variables)

optimizer = tfk.optimizers.Adam()

In [0]:
checkpoint_file = '/content/drive/My Drive/pose_estimation/efficientnet.h5'
num_epochs = 300
num_train_samples = 80
batch_size = 4
end_step = np.ceil(1.0 * num_train_samples / batch_size).astype(np.int32) * num_epochs
new_pruning_params = {
      'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.50,
                                                   final_sparsity=0.95,
                                                   begin_step=100,
                                                   end_step=end_step,
                                                   frequency=20)
}

coreModel = sparsity.prune_low_magnitude(efficientnet, **new_pruning_params)

In [0]:
for epoch in range(num_epochs):
  epoch_loss_avg = tf.keras.metrics.Mean()
  best = math.inf
  for x, y in train_dataset:
    loss_value, grads = grad(efficientnet, x, y)
    optimizer.apply_gradients(zip(grads, efficientnet.trainable_variables))
    epoch_loss_avg(loss_value)
  print("Epoch {:03d}: Loss: {:.3f}".format(epoch, epoch_loss_avg.result()))
  sparsity.UpdatePruningStep()
  if float(epoch_loss_avg.result()) < best:
    best = epoch_loss_avg.result()
    tf.keras.models.save_model(efficientnet, checkpoint_file, include_optimizer=True)


Epoch 000: Loss: 2074.397
Epoch 001: Loss: 1650.071
Epoch 002: Loss: 1396.481
Epoch 003: Loss: 1274.292
Epoch 004: Loss: 1212.144
Epoch 005: Loss: 1162.082
Epoch 006: Loss: 1129.366
Epoch 007: Loss: 1080.897
Epoch 008: Loss: 1076.704
Epoch 009: Loss: 1048.339
Epoch 010: Loss: 1031.123
Epoch 011: Loss: 1023.201
Epoch 012: Loss: 1008.259
Epoch 013: Loss: 1001.447
Epoch 014: Loss: 991.928
Epoch 015: Loss: 986.772
Epoch 016: Loss: 981.104
Epoch 017: Loss: 977.490
Epoch 018: Loss: 969.831
Epoch 019: Loss: 963.237
Epoch 020: Loss: 956.676
Epoch 021: Loss: 942.596
Epoch 022: Loss: 947.284
Epoch 023: Loss: 941.282
Epoch 024: Loss: 925.814
Epoch 025: Loss: 915.160
Epoch 026: Loss: 931.739
Epoch 027: Loss: 908.804
Epoch 028: Loss: 902.888
Epoch 029: Loss: 871.364
Epoch 030: Loss: 852.749
Epoch 031: Loss: 845.628
Epoch 032: Loss: 802.582
Epoch 033: Loss: 804.998
Epoch 034: Loss: 755.671
Epoch 035: Loss: 733.397
Epoch 036: Loss: 742.251
Epoch 037: Loss: 724.123
Epoch 038: Loss: 696.598
Epoch 039: 

In [0]:
final_model = sparsity.strip_pruning(efficientnet)
tf.keras.models.save_model(final_model,
                           '/content/drive/My Drive/pose_estimation/pruned_efficientnet2.h5',
                           include_optimizer=False)

In [0]:
converter = tf.lite.TFLiteConverter.from_keras_model(final_model)

converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE]

tflite_quant_model = converter.convert()

tflite_quant_model_file = '/content/drive/My Drive/pose_estimation/efficient_sparse_quant2.tflite'
with open(tflite_quant_model_file, 'wb') as f:
  f.write(tflite_quant_model)

In [0]:
restored_model = tf.keras.models.load_model('/content/drive/My Drive/pose_estimation/pruned_model.h5')
restored_model.compile()
score = restored_model.evaluate(test_dataset, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])



ValueError: ignored

In [0]:
final_model = sparsity.strip_pruning(pruned_model)
final_model.summary()

NameError: ignored

In [0]:
compressionModel = tfk.models.load_model("/content/drive/My Drive/pose_estimation/weights")

In [0]:
import os

checkpoint_directory = "/content/drive/My Drive/pose_estimation/training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_directory, "ckpt")
checkpoint = tf.train.Checkpoint(model=compressionModel)
checkpoint.save(file_prefix=checkpoint_prefix)

In [0]:
keras_file = "/content/drive/My Drive/pose_estimation/weights.h5"
tf.keras.models.save_model(compressionModel, keras_file, include_optimizer=False)

In [0]:
test = coreModel()
test.load_weights("/content/drive/My Drive/pose_estimation/training_checkpoints/ckpt-1.index")

In [0]:
loadedmodel = tfk.models.load_model("/content/drive/My Drive/pose_estimation/training_checkpoints")

In [0]:
from tensorflow_model_optimization.sparsity import keras as sparsity

epochs = 5
num_train_samples = 80
batch_size = 4
end_step = np.ceil(1.0 * num_train_samples / batch_size).astype(np.int32) * epochs
new_pruning_params = {
      'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.50,
                                                   final_sparsity=0.90,
                                                   begin_step=0,
                                                   end_step=end_step,
                                                   frequency=100)
}
with prune.prune_scope():
  model = tfk.models.load_model("/content/drive/My Drive/pose_estimation/weights")

new_pruned_model = sparsity.prune_low_magnitude(model, **new_pruning_params)
new_pruned_model.summary()

In [0]:
for epoch in range(num_epochs):
  epoch_loss_avg = tf.keras.metrics.Mean()
  best = math.inf
  for x, y in train_dataset:
    loss_value, grads = grad(coreModel, x, y)
    optimizer.apply_gradients(zip(grads, coreModel.trainable_variables))
    epoch_loss_avg(loss_value)
  print("Epoch {:03d}: Loss: {:.3f}".format(epoch, epoch_loss_avg.result()))
  sparsity.UpdatePruningStep()
  if epoch_loss_avg.result() < best:
    best = epoch_loss_avg.result()
    coreModel.save('/content/drive/My Drive/pose_estimation/weights')

In [0]:
model = tfk.models.load_model("/content/drive/My Drive/pose_estimation/weights")
#model.summary()

In [0]:
rgb_image = PIL.Image.open("/content/drive/My Drive/pose_estimation/test_images/image_90.jpg")
rgb_image = np.array(rgb_image) / 255.0
rgb_image = np.expand_dims(rgb_image, axis=0)
img = tf.convert_to_tensor(rgb_image, dtype=tf.float32)
result = model.predict(img)
result[0].shape, result[1].shape #, result[0], result[1]

In [0]:
a = result[0][:, :, :, 1].squeeze()
b = a.flatten()
ind = np.argpartition(b, -40)[-40:]
print(b[ind])
points = []
for i in b[ind]:
  points.append(np.where(a == i))

In [0]:
points = np.array(points).squeeze()
print(points)
print(points.shape)
rgb = result[1].squeeze()
print(rgb.shape)
p = []
for point in points:
  x = rgb[point[0], point[1], 2]
  y = rgb[point[0], point[1], 3] 

  x = ((32 * point[0]) + 15) + (x * 640) #x positions
  y = ((32 * point[1]) + 15) + (y * 480) #y positions
  p.append([x, y])
p

In [0]:
temp = result[:, :, :, 1].flatten()
tops = temp.argsort()[-100:]
vals = np.unravel_index(tops, result[:, :, :, 1].shape)
vals[1].mean(), vals[2].mean()
#np.where(result[:, :, :, 1] == np.amax(result[:, :, :, 1]))
#np.where(result[:, :, :, 2] == np.amax(result[:, :, :, 2]))