# Import Libraries  Initialize Var

In [None]:
!apt-get update -y

!apt install libgl1-mesa-glx -y
!apt-get install -y xxd 

!pip install matplotlib
!pip install opencv-python
!/usr/bin/python -m pip install --upgrade pip
!pip install -q tensorflow-model-optimization
print('Depencies Installed')

In [None]:

import tensorflow as tf
import pathlib
import os
import numpy as np
import matplotlib.pyplot as plt
import logging
import cv2 


AUTOTUNE = tf.data.AUTOTUNE
BATCH_SIZE = 32
WIDTH=240
HEIGHT=240
IMG_SIZE = (WIDTH, HEIGHT)
filename="model_count_insects_final_2"
project_folder=''
image_dataset_folder="images_sets"
train_image_folder=os.path.join(os.path.join(image_dataset_folder,'images_train_set'))
test_image_folder=os.path.join(os.path.join(image_dataset_folder,'images_test_set'))

path_saved_model=os.path.join(project_folder,'saved_models',f"{filename}.h5")
path_saved_model_tflite=os.path.join(project_folder,'saved_models',f"{filename}.tflite")
path_saved_model_tflite_quant=os.path.join(project_folder,'saved_models',f"{filename}_quant.tflite")
path_saved_model_tflite_quant_tpu=os.path.join(project_folder,'saved_models',f"{filename}_quant_tpu.tflite")

print('Import libraries')
print('Initialize Vars')



In [None]:
try:
    for h in logging.getLogger().handlers:
        logging.getLogger().removeHandler(h)
except:
    pass
mylogs = logging.getLogger(__name__)
mylogs.setLevel(logging.INFO)
# Handler - 1
h_file = logging.FileHandler(f'{filename}aa.log')
fileformat = logging.Formatter("%(asctime)s:%(levelname)s:%(message)s")
h_file.setLevel(logging.INFO)
h_file.setFormatter(fileformat)

h_stream = logging.StreamHandler()
h_streamformat = logging.Formatter("%(asctime)s:   %(message)s")
h_stream.setLevel(logging.INFO)
h_stream.setFormatter(h_streamformat)

# Adding all handlers to the logs
mylogs.addHandler(h_file)
mylogs.addHandler(h_stream)

        
mylogs.info(f'Libraries Imported')

In [None]:
def a(predict,label):
    """α=1-|Μc-Αc|/Mc"""
    #print(f'1-|{label}-{predict}|/{label}')

    return 1-abs(float(label)-float(predict))/float(label) if float(label)!=0.0 else  1-abs(float(label)-float(predict))

def model_evaluate(model_file_path,dataset,print_commends=False,show_images=False):
    try:
        model = tf.keras.models.load_model(model_file_path)
        mylogs.info(f'Load model {model_file_path}')
    except :
        mylogs.error('error')

    batch_prediction_floor = []
    batch_prediction_round = []
    batch_truth = []
    count=0
    sum_a=0 
    for image, label in dataset.unbatch():
        batch_truth.append(label)

        input_data =(np.float32(image)/255.0)
        input_data = np.expand_dims(input_data, axis=0)
        
        output_data=model.predict(input_data)

        predictions=np.floor(np.array(output_data).item(0))
        predictions_round=np.around(np.array(output_data).item(0))
        count=count+1
        batch_prediction_floor.append(predictions)
        batch_prediction_round.append(predictions_round)
        ac=a(np.array(output_data).item(0) , label) 
        sum_a=sum_a+ac 
        if show_images==True:
            plt.imshow(image)
            plt.show()
        
        if print_commends==True:
            if label!=predictions_round:
                mylogs.info(f'{"*****" if label==predictions_round else ""} label={label} predict={np.array(output_data).item(0)}  round={predictions_round}')
                mylogs.info(f' accurancy a ={ac}')
       

    mylogs.info(f' a= {sum_a/count}')
    mylogs.info(f'Number of test Images {count}')
    tflite_accuracy = tf.keras.metrics.Accuracy()
    tflite_accuracy(batch_prediction_floor, batch_truth)
    mylogs.info("TF Lite accuracy: {:.3%}".format(tflite_accuracy.result()))


    tflite_accuracy_round = tf.keras.metrics.Accuracy()
    tflite_accuracy_round(batch_prediction_round, batch_truth)
    mylogs.info("TF Lite accuracy round : {:.3%}".format(tflite_accuracy_round.result()))

    dataset_a=dataset.map(lambda image,label:(image/255,label),num_parallel_calls=AUTOTUNE)
    loss0,mae = model.evaluate(dataset_a,verbose=2)

def interpreter_evaluation(model_tflite_file_path,dataset):
    interpreter = tf.lite.Interpreter(
      model_path=model_tflite_file_path, num_threads=None)

    mylogs.info(f'Load interpreter model {path_saved_model_tflite}')

    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    height = input_details[0]['shape'][1]
    width = input_details[0]['shape'][2]

    batch_prediction_floor = []
    batch_prediction_round = []
    batch_truth = []
    count=0
    plt.figure(figsize=(10, 20))
    mylogs.info(f'Wait to evalluate test images......')
    sum_a=0
    for image, label in dataset.unbatch():
        batch_truth.append(label)

        input_data =(np.float32(image)/255)
        input_data = np.expand_dims(input_data, axis=0).astype(input_details[0]["dtype"])

        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])

        predictions=np.floor(np.array(output_data).item(0))
        predictions_round=np.around(np.array(output_data).item(0))
        count=count+1
        batch_prediction_floor.append(predictions)
        batch_prediction_round.append(predictions_round)
        ac=a(np.array(output_data).item(0) , label) 
        sum_a=sum_a+ac
        
    
    mylogs.info(f' a= {sum_a/count}')
    mylogs.info(f'Number of test Images {count}')
    tflite_accuracy = tf.keras.metrics.Accuracy()
    tflite_accuracy(batch_prediction_floor, batch_truth)
    mylogs.info("TF Lite accuracy: {:.3%}".format(tflite_accuracy.result()))

    tflite_accuracy_round = tf.keras.metrics.Accuracy()
    tflite_accuracy_round(batch_prediction_round, batch_truth)
    mylogs.info("TF Lite accuracy round : {:.3%}".format(tflite_accuracy_round.result()))
print('Init Functions')

# Create Dataset

In [None]:
imagePaths= []
dataset_file_list=os.path.join(image_dataset_folder,'list_insects_14000.txt')

with open(dataset_file_list,'r') as f:
    imagePaths=eval(f.readline())
DATASET_SIZE=len(imagePaths)
  

def load_images(imagePath):
  image = tf.io.read_file(imagePath)
  image = tf.image.decode_jpeg(image, channels=3)
  image = tf.image.rgb_to_grayscale(image)
  #image = tf.image.resize(image, (240, 240)) / 255.0
  label=int(tf.strings.split(tf.strings.split(imagePath,'_')[-1],'.')[-2])
  return (image, label)

mylogs.info(len(imagePaths))

train_imagePaths = imagePaths[:int(len(imagePaths)*0.7)]
mylogs.info(len(train_imagePaths))

train_ds = tf.data.Dataset.from_tensor_slices(train_imagePaths).map(load_images, num_parallel_calls=AUTOTUNE)
train_ds = train_ds.cache()

train_ds = train_ds.batch(BATCH_SIZE)
train_ds = train_ds.prefetch(AUTOTUNE)
mylogs.info('train_ds ok')

test_imagePaths = imagePaths[int(len(imagePaths)*0.7):]
mylogs.info(len(test_imagePaths))

val_ds = tf.data.Dataset.from_tensor_slices(test_imagePaths).map(load_images, num_parallel_calls=AUTOTUNE)
val_ds = val_ds.cache()
val_ds = val_ds.batch(BATCH_SIZE)
val_ds = val_ds.prefetch(AUTOTUNE)

mylogs.info('val_ds ok')
mylogs.info(f'Data set folder \n Train set size {len(train_ds)}  \n Validation set size {len(val_ds)}  \n General size {DATASET_SIZE}')



In [None]:

test_imagePaths= []
for image_filename in os.listdir(test_image_folder):
  test_imagePaths.append(os.path.join(test_image_folder,image_filename))

def load_images(imagePath):
  image = tf.io.read_file(imagePath)
  image = tf.image.decode_jpeg(image, channels=3)
  image = tf.image.rgb_to_grayscale(image)
  #image = tf.image.resize(image, (240, 240)) / 255.0
  label=int(tf.strings.split(tf.strings.split(imagePath,'_')[-1],'.')[-2])
  return (image, label)

mylogs.info(len(test_imagePaths))

test_ds = tf.data.Dataset.from_tensor_slices(test_imagePaths).map(load_images, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.cache()
test_ds = test_ds.batch(BATCH_SIZE)
test_ds = test_ds.prefetch(AUTOTUNE)
mylogs.info('test_ds ok')


In [None]:
plt.figure(figsize=(10, 10))
for images, labels in test_ds.take(1):
  for i in range(12):
    ax = plt.subplot(3, 4, i + 1)
    plt.imshow(images[i][:,:,0],cmap='gray', vmin=0, vmax=255)
    plt.title(str(labels[i].numpy()))
    plt.axis("off")

## Load and get accurancy

In [None]:
mylogs.info(path_saved_model)
model_evaluate(path_saved_model,test_ds)
model_evaluate(path_saved_model,val_ds)



# Conver* h5 to TFLite



In [None]:
# Convert the model to the TensorFlow Lite format with full integer quantization
model_lite = tf.keras.models.load_model(path_saved_model)
converter = tf.lite.TFLiteConverter.from_keras_model(model_lite)
tflite_model = converter.convert()

with open(path_saved_model_tflite, 'wb') as f:
  f.write(tflite_model)

mylogs.info(f'Create tflite model {path_saved_model_tflite}')

## Evaluate TFLite model

In [None]:
interpreter_evaluation(path_saved_model_tflite,test_ds)

# Convert To TFLite Quant

In [None]:

def representative_data_gen():
  
  for setimage , setlabel  in train_ds.take(50):
    for index in range(len(setimage)):
      image = setimage[index]
      image = (np.float32(image) / 255.0)
      image=np.expand_dims(image, axis=0) 
      yield [image.astype(np.float32)]

model = tf.keras.models.load_model(path_saved_model)
mylogs.info(f'Load  model {path_saved_model}')

converter = tf.lite.TFLiteConverter.from_keras_model(model)
# This enables quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# This sets the representative dataset for quantization
#converter.representative_dataset = representative_data_gen
converter.representative_dataset =tf.lite.RepresentativeDataset(representative_data_gen)

# This ensures that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# For full integer quantization, though supported types defaults to int8 only, we explicitly declare it for clarity.
converter.target_spec.supported_types = [tf.int8]
# These set the input and output tensors to uint8 (added in r2.3)
converter.inference_input_type = tf.int8
#converter.inference_input_type = tf.float32
converter.inference_output_type = tf.int8
tflite_model_quant = converter.convert()

with open(path_saved_model_tflite_quant, 'wb') as f:
  f.write(tflite_model_quant)

mylogs.info(f'Create Quant file {path_saved_model_tflite_quant}')


## Evulate Quant model

In [None]:

interpreter_qt = tf.lite.Interpreter(
      model_path=path_saved_model_tflite_quant, num_threads=None)

mylogs.info(f'Load interpreter model {path_saved_model_tflite_quant}')


interpreter_qt.allocate_tensors()

input_details = interpreter_qt.get_input_details()

output_details = interpreter_qt.get_output_details()

height = input_details[0]['shape'][1]
width = input_details[0]['shape'][2]
#mylogs.info(f"height {height}  width {width}")

batch_prediction_floor = []
batch_prediction_round = []

batch_truth = []
count=0
sum_a=0
sum_a_category_of_images=[0,0,0,0,0,0,0]
correct_predict_category_of_images=[0,0,0,0,0,0,0]
count_a_category_of_images=[0,0,0,0,0,0,0]
for image, label in test_ds.unbatch():
  batch_truth.append(label)
  
  
  scale, zero_point = input_details[0]['quantization']
  
  input_data= (np.float32(image) / 255.0) / (scale*1.0) + (zero_point*1.0)
  
  input_data = np.expand_dims(input_data, axis=0).astype(input_details[0]["dtype"])
  
  interpreter_qt.set_tensor(input_details[0]['index'], input_data)
  interpreter_qt.invoke()
  
  #prepair output
  output_data1 = interpreter_qt.get_tensor(output_details[0]['index'])
  
  scale, zero_point = output_details[0]['quantization']
  
  output_data = ((scale*1.0) * (output_data1 - zero_point*1.0))
  
  ac=a(np.array(output_data).item(0) , label) 

  sum_a_category_of_images[int(label)]=sum_a_category_of_images[int(label)]+ac
  count_a_category_of_images[int(label)]=count_a_category_of_images[int(label)]+1
  sum_a=sum_a+ac

  predictions=np.floor(np.array(output_data).item(0))
  predictions_round=np.around(np.array(output_data).item(0))
  if int(label)==predictions_round:
    correct_predict_category_of_images[int(label)]=correct_predict_category_of_images[int(label)]+1

  count=count+1
  batch_prediction_floor.append(predictions)
  batch_prediction_round.append(predictions_round)
  #image=cv2.putText(np.float32(image),str(predictions_round),(5,25),cv2.FONT_HERSHEY_SIMPLEX,1,(0,0,0),1)
  #cv2.imwrite(f'predicte_bug_{count}_{predictions_round}.jpg',image)  

mylogs.info(f' Mean a = {sum_a/count}')
mylogs.info(f'Number of test Images {count}')
tflite_accuracy = tf.keras.metrics.Accuracy()
tflite_accuracy(batch_prediction_floor, batch_truth)
mylogs.info("TF Lite accuracy: {:.3%}".format(tflite_accuracy.result()))

tflite_accuracy_round = tf.keras.metrics.Accuracy()
tflite_accuracy_round(batch_prediction_round, batch_truth)
mylogs.info("TF Lite accuracy round : {:.3%}".format(tflite_accuracy_round.result()))

for index in range(0,7):
  avg_a_category=sum_a_category_of_images[index]/count_a_category_of_images[index]
  mylogs.info(f'Category {index} a={avg_a_category}')
  mylogs.info(f'Category {index} total count={count_a_category_of_images[index]} correct round={correct_predict_category_of_images[index]}')




# Create CC file for microcontroller



In [None]:
# Save to disk
#!apt-get update --fix-missing

path_saved=os.path.join(project_folder,'saved_models')

!cd '{path_saved}'

!xxd -i '{path_saved_model_tflite_quant}' > '{filename}_quant.cc'
#!sed -i 's/{filename}_quant.tflite/model_data_tflite/g'  "{filename}_quant.cc"

# Create TensorFlow Lite TPU for Coral

In [None]:
! curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -

! echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" |  tee /etc/apt/sources.list.d/coral-edgetpu.list

! apt-get update

! apt-get install edgetpu-compiler	

! edgetpu_compiler '{path_saved_model_tflite_quant}' -m 13 -s -o '{path_saved_model_tflite_quant_tpu}'