# Cargar librerias


In [None]:
import os
import cv2
from glob import glob
from scipy.io import loadmat
from PIL import Image

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow import keras
from tensorflow.keras import layers
from keras.api._v2.keras.layers import Normalization

import numpy as np

from IPython.display import clear_output
import matplotlib.pyplot as plt

In [None]:
print(tf.__version__)

2.12.0


In [None]:
BATCH_SIZE = 32
seed=42
IMAGE_SIZE = 224
NUM_CLASSES = 4
DATA_DIR = "/content/drive/MyDrive/MIoT/TFM"
NUM_TRAIN_IMAGES= 323
#376
#323
NUM_TEST_IMAGES = 79
#96
#79

# Generar los datos

In [None]:

train_images = sorted(glob(os.path.join(DATA_DIR, "train_img/*")))[:NUM_TRAIN_IMAGES]
train_masks = sorted(glob(os.path.join(DATA_DIR, "train_mask_ids/*")))[:NUM_TRAIN_IMAGES]

val_images = sorted(glob(os.path.join(DATA_DIR, "test_img/*")))[:NUM_TEST_IMAGES]
val_masks = sorted(glob(os.path.join(DATA_DIR, "test_mask_ids/*")))[:NUM_TEST_IMAGES]

def read_image(image_path, mask=False):
    image = tf.io.read_file(image_path)

    if mask:
        image = tf.image.decode_png(image, channels=1)
        image.set_shape([None, None, 1])
        image = tf.image.resize(images=image, size=[IMAGE_SIZE, IMAGE_SIZE])
    else:
        image = tf.image.decode_png(image, channels=3)
        image.set_shape([None, None, 3])
        image = tf.image.resize(images=image, size=[IMAGE_SIZE, IMAGE_SIZE])
        image = tf.keras.applications.resnet50.preprocess_input(image)
    return image


def load_data(image_list, mask_list):
    image = read_image(image_list)
    mask = read_image(mask_list, mask=True)
    return image, mask


def data_generator(image_list, mask_list):

    dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
    dataset = dataset.map(load_data,num_parallel_calls=tf.data.AUTOTUNE)
    return dataset


train_dataset = data_generator(train_images, train_masks)
val_dataset = data_generator(val_images, val_masks)

print("Train Dataset:", train_dataset)
print("Val Dataset:", val_dataset)

Train Dataset: <_ParallelMapDataset element_spec=(TensorSpec(shape=(224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(224, 224, 1), dtype=tf.float32, name=None))>
Val Dataset: <_ParallelMapDataset element_spec=(TensorSpec(shape=(224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(224, 224, 1), dtype=tf.float32, name=None))>


# Aumentar los datos

In [None]:
class Augment(tf.keras.layers.Layer):
    def __init__(self, seed=42):
        super().__init__()

        self.augment_inputs = [
            tf.keras.layers.RandomFlip(mode="horizontal", seed=seed),
            tf.keras.layers.RandomRotation(factor=0.25, fill_mode="wrap", seed=seed),
            tf.keras.layers.RandomTranslation(height_factor=0.25, width_factor=0.25, fill_mode="wrap", seed=seed),
            tf.keras.layers.RandomZoom(height_factor=(-0.3, -0.2), fill_mode="wrap", seed=seed)
        ]

        self.augment_labels = [
            tf.keras.layers.RandomFlip(mode="horizontal", seed=seed),
            tf.keras.layers.RandomRotation(factor=0.25, fill_mode="wrap", seed=seed),
            tf.keras.layers.RandomTranslation(height_factor=0.25, width_factor=0.25, fill_mode="wrap", seed=seed),
            tf.keras.layers.RandomZoom(height_factor=(-0.3, -0.2), fill_mode="wrap", seed=seed)
        ]

    def call(self, inputs, labels):
        augmented_inputs = [inputs]
        augmented_labels = [labels]

        for augment_input in self.augment_inputs:
            augmented_inputs.append(augment_input(inputs))

        for augment_label in self.augment_labels:
            augmented_labels.append(augment_label(labels))

        augmented_inputs = tf.stack(augmented_inputs)
        augmented_labels = tf.stack(augmented_labels)

        return augmented_inputs, augmented_labels

# Crear 'batch'

In [None]:
train_batches = (
    train_dataset
    .map(Augment())
    .unbatch()
    .batch(BATCH_SIZE)
    .prefetch(buffer_size=tf.data.AUTOTUNE)
)

test_batches = val_dataset.batch(BATCH_SIZE)

In [None]:
print("Train Dataset:", train_batches)
print("Val Dataset:", test_batches)

Train Dataset: <_PrefetchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 1), dtype=tf.float32, name=None))>
Val Dataset: <_BatchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 224, 224, 1), dtype=tf.float32, name=None))>


# Cuantizar el modelo

In [None]:

# Cargar el modelo
load_h5= r'C:\Users\Paola\Documents\master\TFM\seg_quanti\modelo_finales_h5\with_MobileNetV3Large_dt1_decay.h5'

model = tf.keras.models.load_model(load_h5)


# Load the dataset with data augmentations

def representative_dataset_gen():
    imgs, batch_labels = next(iter(train_batches))
    for i in range(len(imgs)):
        img= imgs[i]
        img = tf.cast(img, tf.float32)
        img = tf.expand_dims(img, 0) # ---> obtenemos (1,224,223,3)
        print('SHAPE',img.shape)
        print('mira',i)
        yield [img]


converter = tf.lite.TFLiteConverter.from_keras_model(model)
# This enables quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# This sets the representative dataset for quantization
converter.representative_dataset = representative_dataset_gen
# This ensures that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# For full integer quantization, though supported types defaults to int8 only, we explicitly declare it for clarity.
converter.target_spec.supported_types = [tf.int8]
# These set the input and output tensors to uint8 (added in r2.3)
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

print('Comenzo')
tflite_model = converter.convert()
print('Acabo')

# Save the quantized model
save_tflite =r"...\modelos_finales_tflite\with_MobileNetV3Large_dt1_decay_int8_quant.tflite"
with open(save_tflite, 'wb') as f:
  f.write(tflite_model)


# Evaluación del modelo sin cuantizar

In [None]:
def infer(model, image_tensor):
    predictions = model.predict(np.expand_dims((image_tensor), axis=0))
    predictions = np.squeeze(predictions)
    predictions = np.argmax(predictions, axis=2)
    return predictions

In [None]:
new_model = tf.keras.models.load_model('/content/drive/MyDrive/MIoT/TFM/Modelos_finales/With_DenseNet201_dt1_decay_save')
prediction=[]
truth = []
batch_images, batch_labels = next(iter(test_batches))
for image_file, ground_truth_file in zip(batch_images, batch_labels):
  #image_tensor = read_image(image_file)
  #ground_truth_tensor = read_image(ground_truth_file, mask=True)
  image_tensor= image_file
  ground_truth_tensor = ground_truth_file
  prediction_mask = infer(image_tensor=image_tensor, model=new_model)
  ground_truth_mask =  np.squeeze(ground_truth_tensor).astype(np.uint8)
  prediction.append(prediction_mask)
  truth.append(ground_truth_mask)

prediction_stack = tf.stack(prediction)
truth_stack = tf.stack(truth)

keras_accuracy = [tf.keras.metrics.MeanIoU(num_classes=4, sparse_y_true = True, sparse_y_pred = True),
                  tf.keras.metrics.IoU(num_classes=4, target_class_ids=[1], sparse_y_true = True, sparse_y_pred = True),
                  tf.keras.metrics.IoU(num_classes=4, target_class_ids=[2], sparse_y_true = True, sparse_y_pred = True)]
keras_accuracy[0](prediction_stack, truth_stack)
keras_accuracy[1](prediction_stack, truth_stack)
keras_accuracy[2](prediction_stack, truth_stack)


print("Raw model mIOU: {:.3%}".format(keras_accuracy[0].result()))
print("Raw model Agua IOU: {:.3%}".format(keras_accuracy[1].result()))
print("Raw model Cyano IOU: {:.3%}".format(keras_accuracy[2].result()))

Raw model mIOU: 87.214%
Raw model Agua IOU: 76.034%
Raw model Cyano IOU: 91.092%


# Evaluación del modelo cuantizado

In [None]:

batch_images, batch_labels = next(iter(test_batches))
def set_input_tensor(interpreter, input):
  input_details = interpreter.get_input_details()[0]
  tensor_index = input_details['index']
  input_tensor = interpreter.tensor(tensor_index)()[0]
  # Inputs for the TFLite model must be uint8, so we quantize our input data.
  # NOTE: This step is necessary only because we're receiving input data from
  # ImageDataGenerator, which rescaled all image data to float [0,1]. When using
  # bitmap inputs, they're already uint8 [0,255] so this can be replaced with:
  #input_tensor[:, :] = input
  scale, zero_point = input_details['quantization']
  input_tensor[:, :] = np.int8(input / scale + zero_point)

def classify_image(interpreter, input):
  set_input_tensor(interpreter, input)
  interpreter.invoke()
  output_details = interpreter.get_output_details()[0]
  output = interpreter.get_tensor(output_details['index'])
  # Outputs from the TFLite model are uint8, so we dequantize the results:
  scale, zero_point = output_details['quantization']
  output = scale * (output - zero_point)
  output = np.squeeze(output)
  top_1 = np.argmax(output,axis=2)
  return top_1

interpreter = tf.lite.Interpreter('/content/drive/MyDrive/MIoT/TFM/Modelos_tflite/with_ConvNeXtSmall_dt1_decay_int8_agua_t_aug_quant.tflite')
interpreter.allocate_tensors()

# Collect all inference predictions in a list
batch_prediction = []
batch_truth = np.squeeze(batch_labels)

for i in range(len(batch_images)):
  prediction = classify_image(interpreter, batch_images[i])
  batch_prediction.append(prediction)

batch_prediction_stack= tf.stack(batch_prediction)

# Compare all predictions to the ground truth
tflite_accuracy = [tf.keras.metrics.MeanIoU(num_classes=4, sparse_y_true = True, sparse_y_pred = True),
                  tf.keras.metrics.IoU(num_classes=4, target_class_ids=[1], sparse_y_true = True, sparse_y_pred = True),
                   tf.keras.metrics.IoU(num_classes=4, target_class_ids=[2], sparse_y_true = True, sparse_y_pred = True)]
tflite_accuracy[0](batch_prediction, batch_truth)
tflite_accuracy[1](batch_prediction, batch_truth)
tflite_accuracy[2](batch_prediction, batch_truth)

print("Quant TF Lite mIOU: {:.3%}".format(tflite_accuracy[0].result()))
print("Quant TF Lite Agua IOU: {:.3%}".format(tflite_accuracy[1].result()))
print("Quant TF Lite Cyano IOU: {:.3%}".format(tflite_accuracy[2].result()))

Quant TF Lite mIOU: 84.152%
Quant TF Lite Agua IOU: 69.521%
Quant TF Lite Cyano IOU: 87.455%


#Compilación .tflite para TPU

In [None]:
! curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -

! echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list

! sudo apt-get update

! sudo apt-get install edgetpu-compiler

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  2659  100  2659    0     0  91689      0 --:--:-- --:--:-- --:--:-- 91689
OK
deb https://packages.cloud.google.com/apt coral-edgetpu-stable main
Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:2 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Get:3 https://packages.cloud.google.com/apt coral-edgetpu-stable InRelease [6,332 B]
Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
Get:5 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Get:8 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]


In [None]:
! edgetpu_compiler /content/drive/MyDrive/MIoT/TFM/Modelos_tflite/with_ConvNeXtSmall_dt1_decay_int8_quant.tflite

Edge TPU Compiler version 16.0.384591198
ERROR: Didn't find op for builtin opcode 'CONV_2D' version '6'. An older version of this builtin might be supported. Are you using an old TFLite binary with a newer model?

ERROR: Registration failed.

Invalid model: /content/drive/MyDrive/MIoT/TFM/Modelos_tflite/with_ConvNeXtSmall_dt1_decay_int8_quant.tflite
Model could not be parsed
