In [None]:
#| default_exp training

In [None]:
#| export
import matplotlib.pyplot as plt
import numpy as np
import cv2
import PIL
import tensorflow as tf
import os
import pandas as pd
from io import BytesIO
import subprocess

from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential

2023-03-14 16:35:58.450585: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-14 16:36:00.242729: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/taleta/.local/lib/python3.10/site-packages/cv2/../../lib64:
2023-03-14 16:36:00.242847: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/taleta/.local/lib/python3.10/site-packages/cv2/../../lib64

In [None]:
#| export

class train_model():

  if not os.path.exists('models'):
        os.mkdir('models')

  def __init__(self):
    self.MODELS_DIR = 'models'
    self.MODEL_TF = self.MODELS_DIR + 'model'
    self.MODEL_NO_QUANT_TFLITE = self.MODELS_DIR + '/model_no_quant.tflite'
    self.MODEL_TFLITE = self.MODELS_DIR + '/model.tflite'
    self.MODEL_TFLITE_MICRO = self.MODELS_DIR + '/model.cc'
    self.data_dir = 'data/'

  def load_data(self, img_height, img_width, batch_size):
    """
    Loads data from the directory provided in data_dir
    """

    train_ds = tf.keras.utils.image_dataset_from_directory(
      self.data_dir,
      validation_split=0.2,
      subset="training",
      seed=123,
      image_size=(img_height, img_width),
      batch_size=batch_size)
    
    val_ds = tf.keras.utils.image_dataset_from_directory(
      self.data_dir,
      validation_split=0.2,
      subset="validation",
      seed=123,
      image_size=(img_height, img_width),
      batch_size=batch_size)


    return train_ds, val_ds


  def train(self, img_height, img_width, epochs, optim_choice, train_ds, test_ds):
    """Model training 

    Args:
        `img_height` (_int_): image pixel height
        `img_width` (_int_): image pixel width
        `epochs` (_int_): Number of epochs to train
        `optim_choice` (_string_): Loss function to be used

    Returns:
        keras_model, statistics
    """

    class_names = train_ds.class_names
    
    #Enable caching for training
    AUTOTUNE = tf.data.AUTOTUNE
    train = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
    test = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

    num_classes = len(class_names)

    model = Sequential([
      layers.Rescaling(1./255, input_shape=(img_height, img_width, 3)),
      layers.Conv2D(16, 3, padding='same', activation='relu'),
      layers.MaxPooling2D(),
      layers.Conv2D(32, 3, padding='same', activation='relu'),
      layers.MaxPooling2D(),
      layers.Conv2D(64, 3, padding='same', activation='relu'),
      layers.MaxPooling2D(),
      layers.Dropout(0.20),
      layers.Flatten(),
      layers.Dense(128, activation='relu'),
      layers.Dense(num_classes)
    ])

    if optim_choice == "Categorical crossentropy":
      loss_fn = keras.losses.CategoricalCrossentropy(from_logits=True)
    elif optim_choice == "Sparse Categorical crossentropy":
      loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)


    model.compile(optimizer='adam',
                  loss=loss_fn,
                  metrics=['accuracy'])

    model.summary()

    epochs=epochs

    history = model.fit(
      train,
      validation_data=test,
      epochs=epochs
    )
    
    epochs_range = range(epochs)

    return model, history, epochs_range

  def convert_model(self, model, train_ds):
    """Model conversion into TFLite model

    Args:
        `model` (_type_): _description_
        `train_ds` (_type_): _description_

    Yields:
        _type_: _description_
    """

    model.save('models/keras_model.h5')
    # Convert the model to the TensorFlow Lite format without quantization
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    model_no_quant_tflite = converter.convert()

    # Save the model to disk
    open(self.MODEL_NO_QUANT_TFLITE, "wb").write(model_no_quant_tflite)

    # Convert the model with quantization.
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]

    #uncomment to enable end-to-end int8-model, i.e. input and output is also int8/uint8.

    #converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    #converter.inference_input_type = tf.int8
    #converter.inference_output_type = tf.int8

    def representative_dataset():
      for data in tf.data.Dataset.from_tensor_slices((train_ds)).batch(1).take(100):
        yield [data[0]]

    #converter.representative_dataset = representative_dataset
    tflite_model = converter.convert()

    # Save the model.
    with open(self.MODEL_TFLITE, 'wb') as f:
      f.write(tflite_model)

  def convert_to_c_array(self):
    """C array conversion

    Args:
        `model_file_name` (string): TFLite model name for the conversion command
    """

    model_path = self.MODEL_TFLITE
    array_path = self.MODEL_TFLITE_MICRO
    
    subprocess.run(['xxd -i models/model.tflite > models/model.cc'], shell=True)

  def prediction(self, model, class_names):
    """Predicts on the image provided in the path.

    Args:
        `model` (tflite model): tflite model to be used in the prediction

    Returns:
        img: image predicted, result: formatted string for the result
    """

    path = '/data/1/1.png'
    img = cv2.imread(path)
    img = cv2.resize(img, (180,180))
    img_array = tf.keras.utils.img_to_array(img)
    img_array = tf.expand_dims(img_array, 0) # Create a batch

    predictions = model.predict(img_array)
    score = tf.nn.softmax(predictions[0])

    names = { 1 : "human",
             2: "not human"}
    result = ("This image most likely belongs to {} with a {:.2f} percent confidence.".format(names[np.argmax(score)], 100 * np.max(score)))
    
    return img, result

  def plot_statistics(self, history, epochs_range):
    """Plot model training statistics

    Args:
        `history` (tuple?): tuple containing loss and accuracy values over training
        `epochs_range` (int): amount of epochs used to train over

    Returns:
        BytesIO buffer: Matplotlib figure containing graphs about the training process
    """

    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']

    stats = plt.figure(figsize=(8, 8))
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, acc, label='Training Accuracy')
    plt.plot(epochs_range, val_acc, label='Validation Accuracy')
    plt.legend(loc='lower right')
    plt.title('Training and Validation Accuracy')

    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, loss, label='Training Loss')
    plt.plot(epochs_range, val_loss, label='Validation Loss')
    plt.legend(loc='upper right')
    plt.title('Training and Validation Loss')

    buff = BytesIO()
    stats.savefig(buff, format="png")
    
    return buff

  def plot_size(self):
    """Plots the size difference before and after quantization

    Returns:
        pandas dataframe: Pandas dataframe containing information
    """

    size_no_quant_tflite = os.path.getsize(self.MODEL_NO_QUANT_TFLITE)
    size_tflite = os.path.getsize(self.MODEL_TFLITE)

    frame = pd.DataFrame.from_records(
        [["TensorFlow Lite", f"{size_no_quant_tflite} bytes "],
        ["TensorFlow Lite Quantized", f"{size_tflite} bytes", f"(reduced by {size_no_quant_tflite - size_tflite} bytes)"]],
        columns = ["Model", "Size", ""], index="Model")
      
    return frame

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
show_doc(train_model.load_data)

---

### train_model.load_data

>      train_model.load_data (img_height, img_width, batch_size)

Loads data from the directory provided in data_dir

In [None]:
show_doc(train_model.train)

---

### train_model.train

>      train_model.train (img_height, img_width, epochs, optim_choice, train_ds,
>                         test_ds)

Model training 

Args:
    `img_height` (_int_): image pixel height
    `img_width` (_int_): image pixel width
    `epochs` (_int_): Number of epochs to train
    `optim_choice` (_string_): Loss function to be used

Returns:
    keras_model, statistics

In [None]:
show_doc(train_model.convert_model)

---

### train_model.convert_model

>      train_model.convert_model (model, train_ds)

Model conversion into TFLite model

Args:
    `model` (_type_): _description_
    `train_ds` (_type_): _description_

Yields:
    _type_: _description_

In [None]:
show_doc(train_model.convert_to_c_array)

---

### train_model.convert_to_c_array

>      train_model.convert_to_c_array ()

C array conversion

Args:
    model_file_name (string): TFLite model name for the conversion command

In [None]:
show_doc(train_model.prediction)

---

### train_model.prediction

>      train_model.prediction (model, class_names)

Predicts on the image provided in the path.

Args:
    `model` (tflite model): tflite model to be used in the prediction

Returns:
    img: image predicted, result: formatted string for the result

In [None]:
show_doc(train_model.plot_statistics)

---

### train_model.plot_statistics

>      train_model.plot_statistics (history, epochs_range)

Plot model training statistics

Args:
    history (tuple?): tuple containing loss and accuracy values over training
    epochs_range (int): amount of epochs used to train over

Returns:
    BytesIO buffer: Matplotlib figure containing graphs about the training process

In [None]:
show_doc(train_model.plot_size)

---

### train_model.plot_size

>      train_model.plot_size ()

Plots the size difference before and after quantization

Returns:
    pandas dataframe: Pandas dataframe containing information