In [0]:
# Which GPU?
!nvidia-smi

In [0]:
!pip install tensorflow-gpu==2.1.0

In [0]:
# TensorFlow version
import tensorflow as tf
print(tf.__version__)

In [0]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from imutils import paths
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import random
import shutil
import time
import cv2
import os

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
!ls -lrt "/content/gdrive/My Drive"

In [0]:
!apt install unzip

In [0]:
!unzip "/content/gdrive/My Drive/dataset.zip" -d /content/dataset

In [0]:
!pwd

In [0]:
!ls -lrt

In [0]:
# All the paths of the flowers
ALL_IMG_PATHS = list(paths.list_images("dataset"))
print(ALL_IMG_PATHS[:10])

In [0]:
# Shuffle the image paths and preview
random.shuffle(ALL_IMG_PATHS)
ALL_IMG_PATHS[:5]

In [0]:
# Visualize images
plt.figure(figsize=(15,15))
for i in range(25):
    image_path = np.random.choice(ALL_IMG_PATHS)
    image = plt.imread(image_path)
    image = cv2.resize(image, (128, 128))
    # you might want to verify the labels before 
    # you put this to use
    label = image_path.split("/")[1]
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(True)
    plt.imshow(image)
    plt.xlabel(label)
plt.show()

In [0]:
# split image paths into test and train splits
testImgs = int(len(ALL_IMG_PATHS) * 0.15)
trainImgs = len(ALL_IMG_PATHS) - testImgs
trainImgPaths = ALL_IMG_PATHS[:trainImgs]
testImgPaths = ALL_IMG_PATHS[trainImgs:]

In [0]:
# Specify the directory paths
train_dir = "train"
val_dir = "val"

In [0]:
def move_images(imgPaths, outputDir):
	# iterate through the image paths
	for imagePath in imgPaths:
		# extract the label from the current image path
		label = imagePath.split("/")[1]

		# check if a directory for the label exists, if not, create it
		imageDir = os.path.join(outputDir, label)
		if not os.path.exists(imageDir):
			os.makedirs(imageDir)

		# copy the current image to the respective folder
		shutil.copy2(imagePath, imageDir)

In [0]:
move_images(trainImgPaths, train_dir)
move_images(testImgPaths, val_dir)

In [0]:
# Setup data generators
train_aug = ImageDataGenerator(rescale=1/255.)
val_aug = ImageDataGenerator(rescale=1/255.)

train_gen = train_aug.flow_from_directory("train",
    class_mode="categorical",
    target_size=(224, 224),
	color_mode="rgb",
	shuffle=True,
	batch_size=32
)
val_gen = train_aug.flow_from_directory("val",
    class_mode="categorical",
    target_size=(224, 224),
	color_mode="rgb",
	shuffle=False,
	batch_size=32
)

In [0]:
# Get the class labels, these will be required later
# Reference: https://colab.sandbox.google.com/github/google-coral/tutorials/blob/master/retrain_classification_ptq_tf1.ipynb
print (train_gen.class_indices)
labels = '\n'.join(sorted(train_gen.class_indices.keys()))
with open('facemask_labels.txt', 'w') as f:
    f.write(labels)

In [0]:
!cat facemask_labels.txt

In [0]:
# Load the MobileNetV2 model but exclude the classification layers
EXTRACTOR = MobileNetV2(weights="imagenet", include_top=False,
                    input_shape=(224, 224, 3))

In [0]:
def get_training_model():
    # We are fine-tuning the extractor model
    EXTRACTOR.trainable = True
    # Construct the head of the model that will be placed on top of the
    # the base model
    class_head = EXTRACTOR.output
    class_head = GlobalAveragePooling2D()(class_head)
    class_head = Dense(512, activation="relu")(class_head)
    class_head = Dropout(0.5)(class_head)
    class_head = Dense(2, activation="softmax")(class_head)

    # Create the new model
    classifier = Model(inputs=EXTRACTOR.input, outputs=class_head)

    # Compile and return the model
    classifier.compile(loss="categorical_crossentropy", 
                          optimizer="adam",
                          metrics=["accuracy"])

    return classifier

In [0]:
# LR schedule configuration
start_lr = 0.00001
min_lr = 0.00001
max_lr = 0.00005
rampup_epochs = 5
sustain_epochs = 0
exp_decay = .8

In [0]:
# LR schedule
def lrfn(epoch):
    def lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay):
        if epoch < rampup_epochs:
            lr = (max_lr - start_lr)/rampup_epochs * epoch + start_lr
        elif epoch < rampup_epochs + sustain_epochs:
            lr = max_lr
        else:
            lr = (max_lr - min_lr) * exp_decay**(epoch-rampup_epochs-sustain_epochs) + min_lr
        return lr
    return lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay)
    
lr_callback = tf.keras.callbacks.LearningRateScheduler(lambda epoch: lrfn(epoch), verbose=True)

In [0]:
# How does the LR schedule looks like?
rng = [i for i in range(10)]
y = [lrfn(x) for x in rng]
plt.plot(rng, [lrfn(x) for x in rng])
print(y[0], y[-1])

In [0]:
len(list(paths.list_images("train"))), len(list(paths.list_images("val")))

In [0]:
trainX = len(list(paths.list_images("train")))
trainY = len(list(paths.list_images("val")))

In [0]:
# Train the model
facemask_model = get_training_model()
start = time.time()
history = facemask_model.fit(train_gen,
              steps_per_epoch=trainX//32,
              validation_data=val_gen,
              validation_steps=trainY//32,
              epochs=10,
              callbacks=[lr_callback])
print("Total training time: ",time.time()-start)

In [0]:
# Plotting the losses
N = len(history.history["loss"])
plt.figure()
plt.plot(np.arange(0, N), history.history["loss"], label="train_loss")
plt.plot(np.arange(0, N), history.history["val_loss"], label="val_loss")
plt.plot(np.arange(0, N), history.history["accuracy"], label="train_acc")
plt.plot(np.arange(0, N), history.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.show()

In [0]:
# Extract the image paths from the train set, shuffle them, and 
# choose 100 images
image_paths = list(paths.list_images("train"))
random.shuffle(image_paths)
image_paths = image_paths[:100]

# An empty list as a placeholder for the dataset
rep_ds = []

# Iterate over the image paths
for image in tqdm(image_paths):
    # Read the image from the current path, change the datatype, resize the image,
    # add batch dimension, normalize the pixel values
    image_pixels = plt.imread(image).astype("float32")
    image_pixels = cv2.resize(image_pixels, (224, 224))
    image_pixels = np.expand_dims(image_pixels, 0)
    image_pixels = image_pixels / 255.
    
    # Append to the list
    rep_ds.append(image_pixels)

In [0]:
# Convert to TensorFlow dataset
rep_ds = np.array(rep_ds)
rep_ds = tf.data.Dataset.from_tensor_slices((rep_ds))

In [0]:
# Creating a representative dataset for int quantization
def representative_dataset():
    for image in rep_ds.take(100):
        yield [image]

In [0]:
# TF Lite conversion 

# Instantiate the converter, instruct TF Lite to optimize for size, and
# specify the representative dataset
converter = tf.lite.TFLiteConverter.from_keras_model(facemask_model) 
converter.optimizations = [tf.lite.Optimize.OPTIMIZE_FOR_SIZE] 
converter.representative_dataset = representative_dataset

# We are going for full INT8 quantization
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8

# Convert finally
tflite_model = converter.convert()

In [0]:
# Unoptimized (SavedModel format)
facemask_model.save("facemask_model_no_op")

In [0]:
# Check size
!du --all -h facemask_model_no_op

In [0]:
# Serialize the TFLite model and check it's size
!mkdir tflite_model

f = open("tflite_model/facemask_model.tflite", "wb")
f.write(tflite_model)
f.close()

!du --all -h tflite_model

In [0]:
# Load the model into interpreters
interpreter_quant = tf.lite.Interpreter(model_content=tflite_model)
interpreter_quant.allocate_tensors()
input_index_quant = interpreter_quant.get_input_details()[0]["index"]
output_index_quant = interpreter_quant.get_output_details()[0]["index"]

In [0]:
# Prepare validation sets
# Extract the image paths from the train set
image_paths = list(paths.list_images("val"))

# Empty labels for storing images and labels
val_images = []
val_labels = []

# Iterate over the image paths
for image in tqdm(image_paths):
    # Read the image from the current path, change the datatype, resize the image,
    # add batch dimension, normalize the pixel values
    image_pixels = plt.imread(image).astype("float32")
    image_pixels = cv2.resize(image_pixels, (224, 224))
    image_pixels = np.expand_dims(image_pixels, 0)
    image_pixels = image_pixels / 255.

    # Extract the label
    label = image.split("/")[1]
    
    # Append to the list
    val_images.append(image_pixels)
    val_labels.append(label)

# Create NumPy array
val_images = np.array(val_images)

In [0]:
# Define the class labels (should be alphabetical)
CLASSES = ['with_mask', 'without_mask']

In [0]:
# Run a demo prediction
test_image = val_images[0]
interpreter_quant.set_tensor(input_index_quant, test_image)
interpreter_quant.invoke()
predictions = interpreter_quant.get_tensor(output_index_quant)

# See the prediction
plt.imshow(val_images[0].squeeze())
template = "True:{true}, predicted:{predict}"
_ = plt.title(template.format(true=val_labels[0],
                              predict=str(CLASSES[np.argmax(predictions[0])])))
plt.grid(False)

In [0]:
# Run a demo prediction
test_image = val_images[205]
interpreter_quant.set_tensor(input_index_quant, test_image)
interpreter_quant.invoke()
predictions = interpreter_quant.get_tensor(output_index_quant)

# See the prediction
plt.imshow(val_images[205].squeeze())
template = "True:{true}, predicted:{predict}"
_ = plt.title(template.format(true=val_labels[0],
                              predict=str(CLASSES[np.argmax(predictions[0])])))
plt.grid(False)

In [0]:
def evaluate_model(interpreter):
  accurate_count = 0
  
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on every image in the "test" dataset.
  predictions = []
  for val_image, val_label in zip(val_images, val_labels):
    interpreter.set_tensor(input_index, val_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    probability = interpreter.get_tensor(output_index)
    facemask_id = np.argmax(probability[0])
    predictions.append(facemask_id)
    
    # Compare prediction results with ground truth labels to calculate accuracy.
    if CLASSES[facemask_id] == val_label:
      accurate_count += 1
  
  accuracy = accurate_count * 1.0 / len(predictions)

  return accuracy

In [0]:
# Check, check
print(evaluate_model(interpreter_quant))

In [0]:
%%bash
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
sudo apt-get update
sudo apt-get install edgetpu-compiler

In [0]:
!edgetpu_compiler /content/tflite_model/facemask_model.tflite

In [0]:
!ls -lrt /content/tflite_model/

In [0]:
from google.colab import files
files.download("tflite_model/facemask_model.tflite")
files.download("facemask_model_edgetpu.tflite")
files.download("facemask_labels.txt")

We can see that most of the operations of the new compiled model can be executed on an [Edge TPU](https://coral.ai/docs/edgetpu/faq/) - "Number of operations that will run on Edge TPU: 71".

You can plug an Edge TPU USB accelerator in a supported device (supports Raspberry Pi as well :D) and follow [this documentation](https://coral.ai/docs/reference/edgetpu.classification.engine/) and [this tutorial](https://github.com/google-coral/tflite/blob/master/python/examples/classification/classify_image.py) to use the model we just compiled. Here's the sample inference script that you can use:

```python
# Usage
# python classify_image.py --image 30.jpg
# once can change the input image path

# Imports
import tflite_runtime.interpreter as tflite
import matplotlib.pyplot as plt
import numpy as np
import argparse
import imutils
import time
import cv2

# Utility function for loading and processing a given image
def load_and_preprocess(image_path):
	image_pixels = plt.imread(image_path).astype("float32")
	image_pixels = cv2.resize(image_pixels, (224, 224))
	image_pixels = np.expand_dims(image_pixels, 0)
	image_pixels = image_pixels / 255.

	return image_pixels

# Argument parser
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--image", required=True,
	help="path to input image")
args = vars(ap.parse_args())

# Load a sample image
image_pixels = load_and_preprocess(args["image"])

# Load the same image for drawing predictions
original =cv2.imread(args["image"])
original = imutils.resize(original, width=500)

# Open and load the label file
labels = open("facemask_labels.txt", "r")
labels = [label.strip() for label in labels.readlines()]

# Load the model into interpreters with TFLite delegate
interpreter = tflite.Interpreter(model_path="facemask_model_edgetpu.tflite",
	experimental_delegates=[tflite.load_delegate("libedgetpu.so.1")])
interpreter.allocate_tensors()
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]

# Run inference and parse the predictions
interpreter.set_tensor(input_index, image_pixels)
start = time.time()
interpreter.invoke()
print("Inference took {:.2f} seconds".format(time.time() - start))
predictions = interpreter.get_tensor(output_index)

# Draw the predicted label on the image and show it
class_label = str(labels[np.argmax(predictions[0])])
confidence = float(predictions[0].max())
text = "Label: {} Conf: {}".format(class_label, confidence * 100)
cv2.putText(original, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX,
			0.5, (0, 255, 0), 2)
cv2.imshow("Image", original)
cv2.waitKey(0)
```