In [None]:
from keras.preprocessing.image import ImageDataGenerator
from keras import layers, models, callbacks, optimizers
from keras.utils import load_img, img_to_array
from keras.optimizers import RMSprop, SGD, Adam
from keras.applications import ResNet50, VGG16
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, AveragePooling2D, GlobalAveragePooling2D
import re, sys, datetime, json, random, cv2, os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix, classification_report
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import keras
from PIL import Image
import cv2
import os
import shutil
import tensorflow as tf
print(tf.__version__)

In [None]:
## If you are using the data by mounting the google drive, use the following :
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

In [None]:
!ls "/content/gdrive/MyDrive/images/face_recognition"

In [None]:
# Set the directory for the training images
test_dir = '/content/gdrive/MyDrive/images/face_recognition/test'

In [None]:
names = {0: "Dhiraj", 1: "Om", 2: "Tanushree"}

In [None]:
NUM_CLASSES = len(names)
print(NUM_CLASSES)

In [None]:
train_dir = "/content/gdrive/MyDrive/images/face_recognition/train"

"""
    # Use `ImageDataGenerator` to rescale the images.
    # Create the train generator and specify where the train dataset directory, image size, batch size.
    # Create the validation generator with similar approach as the train generator with the flow_from_directory() method.
"""

In [None]:
# We are trying to minimize the resolution of the images without loosing the 'Features'
# For facial recognition, this seems to be working fine, you can increase or decrease it

# Depending upon the total number of images you have set the batch size
FAST_RUN = False
IMAGE_WIDTH=224
IMAGE_HEIGHT=224
IMAGE_SIZE=(IMAGE_WIDTH, IMAGE_HEIGHT)
IMAGE_CHANNELS=3
BATCH_SIZE = 32
IMAGE_SHAPE = (IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS)
learning_rate = 0.0001
target_loss = 0.8


In [None]:
# Loop through all image files in the directory
def resize_images(path_to_directory, target_size):
  for filename in os.listdir(path_to_directory):
        root, ext = os.path.splitext(filename)
        if ext.lower() in [".jpg", ".jpeg", ".png"]:
            img_path = os.path.join(path_to_directory, filename)
            if Image.open(img_path).size != target_size:
              img = Image.open(img_path).resize(target_size)
              img.save(img_path)
  print(f"All images in {path_to_directory} resized to {target_size}")

In [None]:
for i in range(0, 3):
  resize_images(os.path.join(train_dir, str(i)), IMAGE_SIZE)


In [None]:
for i in range(0, 3):
  resize_images(os.path.join(test_dir, str(i)), IMAGE_SIZE)

In [None]:
# We need a data generator which rescales the images
# Pre-processes the images like re-scaling and other required operations for the next steps
test_datagen = ImageDataGenerator(
    rescale=1. / 255
    )

# Create an instance of ImageDataGenerator for the training set
train_datagen = ImageDataGenerator(
    rescale=1./255,      # Rescale the pixel values to the range [0, 1]
    rotation_range=20,   # Randomly rotate the images by up to 20 degrees
    width_shift_range=0.2,  # Randomly shift the images horizontally by up to 20% of the image width
    height_shift_range=0.2, # Randomly shift the images vertically by up to 20% of the image height
    shear_range=0.2,         # Randomly shear the images by up to 20%
    zoom_range=0.3,          # Randomly zoom the images by up to 20%
    horizontal_flip=True,    # Randomly flip the images horizontally
    vertical_flip=False,
    brightness_range = (0.8, 1.2), # brightness adjuster so that some partial images can detect when ground is dark
    fill_mode='nearest',      # Fill any empty pixels with the nearest available pixel value
    validation_split=0.3
)

# Normalize the data
train_datagen.mean = [123.68, 116.779, 103.939] # RGB channel mean values from ImageNet
train_datagen.std = [58.393, 57.12, 57.375]

# We separate out data set into Training, Validation & Testing. Mostly you will see Training and Validation.
# We create generators for that, here we have train and validation generator.
# Create a train_generator
# Use the flow_from_directory method to load the images from the directory
train_generator = train_datagen.flow_from_directory(
    train_dir,                 # Path to the training set directory
    target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),    # Resize the images to 256x256
    batch_size=BATCH_SIZE,             # Use batches of 32 images
    class_mode='categorical',   # Use categorical cross-entropy loss
    subset='training'
)

# Load the validation data into x_val and y_val
validation_generator = train_datagen.flow_from_directory(
    directory=train_dir,
    target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    subset='validation'
)

# Load the test data into x_test and y_test
test_generator = test_datagen.flow_from_directory(
    directory=test_dir,
    target_size=(IMAGE_WIDTH, IMAGE_HEIGHT),
    batch_size=BATCH_SIZE//2,
    class_mode='categorical'
)

# Handle any exceptions that may occur during data generation
try:
    x_train, y_train = next(train_generator)
except Exception as e:
    print(f"Error during training data generation: {e}")
    
try:
    x_val, y_val = next(validation_generator)
except Exception as e:
    print(f"Error during validation data generation: {e}")

try:
    x_test, y_test = next(test_generator)
except Exception as e:
    print(f"Error during test data generation: {e}")

test_generator.class_indices

In [None]:
# Triggering a training generator for all the batches
for image_batch, label_batch in train_generator:
    break

# This will print all classification labels in the console
print(train_generator.class_indices)

# Creating a file which will contain all names in the format of next lines
labels = '\n'.join(sorted(train_generator.class_indices.keys()))

# Writing it out to the file which will be named 'labels.txt'
with open('labels.txt', 'w') as f:
    f.write(labels)

shutil.copy('labels.txt','gdrive/MyDrive/models/face_recognition_labels.txt')


In [None]:
dir_0 = os.path.join(train_dir + '/0')

dir_1 = os.path.join(train_dir + '/1')

dir_2 = os.path.join(train_dir + '/2')

In [None]:
names_0 = random.sample(os.listdir(dir_0), 10)
print(names_0[:10])

names_1 = random.sample(os.listdir(dir_1), 10)
print(names_1[:10])

names_2 = random.sample(os.listdir(dir_2), 10)
print(names_2[:10])


In [None]:
cnt0 = len(os.listdir(dir_0))
cnt1 = len(os.listdir(dir_1))
cnt2 = len(os.listdir(dir_2))

print('total training 0 images:', cnt0)
print('total training 1 images:', cnt1)
print('total training 2 images', cnt2)

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Parameters for our graph; we'll output images in a 3x3 configuration
nrows = 3
ncols = 3

# Index for iterating over images
pic_index = 0

In [None]:
# Set up matplotlib fig, and size it to fit 4x4 pics
fig = plt.gcf()
fig.set_size_inches(ncols * 3, nrows * 3)

pic_index += 3
pix0 = [os.path.join(dir_0, fname) 
                for fname in names_0[pic_index-3:pic_index]]
pix1 = [os.path.join(dir_1, fname) 
                for fname in names_1[pic_index-3:pic_index]]
pix2 = [os.path.join(dir_2, fname) 
                for fname in names_2[pic_index-3:pic_index]]

for i, img_path in enumerate(pix0 + pix1 + pix2):
  # Set up subplot; subplot indices start at 1
  sp = plt.subplot(nrows, ncols, i + 1)
  sp.axis('Off') # Don't show axes (or gridlines)

  img = mpimg.imread(img_path)
  plt.imshow(img)
  if i < 3:
    plt.text(2, 10, names[0], fontsize=15, color='red')
  elif i >=3 and i < 6:
    plt.text(2, 10, names[1], fontsize=15, color='red')
  else:
    plt.text(2, 10, names[2], fontsize=15, color='red')

plt.show()

"""
We have to create the base model from the pre-trained CNN

Create the base model from the **MobileNet V2** model developed at Google
and pre-trained on the ImageNet dataset, a large dataset of 1.4M images and 1000 classes of web images.

First, pick which intermediate layer of MobileNet V2 will be used for feature extraction. 
A common practice is to use the output of the very last layer before the flatten operation,
The so-called "bottleneck layer". The reasoning here is that the following fully-connected layers 
will be too specialized to the task the network was trained on, and thus the features learned by
These layers won't be very useful for a new task. The bottleneck features, however, retain much generality.

Let's instantiate an MobileNet V2 model pre-loaded with weights trained on ImageNet. 
By specifying the `include_top=False` argument, we load a network that doesn't include the
classification layers at the top, which is ideal for feature extraction.
"""

In [None]:
# Create the base model from the pre-trained model MobileNet V2 Hidden and output layer is in the model The first
# layer does a redundant work of classification of features which is not required to be trained
# (This is also called as bottle neck layer)

# Hence, creating a model with EXCLUDING the top layer
base_model = tf.keras.applications.MobileNetV2(input_shape=IMAGE_SHAPE,
                                               include_top=False,
                                               weights='imagenet')
# base_model = VGG16(weights='imagenet', include_top=False, input_shape=(IMAGE_WIDTH, IMAGE_HEIGHT, IMAGE_CHANNELS))

# Feature extraction
# You will freeze the convolution base created from the previous step and use that as a feature extractor
# Add a classifier on top of it and train the top-level classifier.

# We will be tweaking the model with our own classifications
# Hence we don't want our tweaks to affect the layers in the 'base_model'
# Hence we disable their training
# Freeze all layers except the last one
# for layer in base_model.layers[: -2]:
#     layer.trainable = False
base_model.trainable = False



In [None]:
# very normal custom model based on the base model
model = models.Sequential()
model.add(base_model)
model.add(layers.AveragePooling2D(pool_size=(2, 2)))
model.add(layers.Conv2D(filters=32, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same"))
model.add(layers.MaxPooling2D(pool_size=(3,3), strides=(2,2), padding="same"))
model.add(layers.Dropout(0.25))

model.add(layers.Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same"))
model.add(layers.MaxPooling2D(pool_size=(3,3), strides=(2,2), padding="same"))
model.add(layers.Dropout(0.25))

model.add(layers.Conv2D(filters=128, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same"))
model.add(layers.Dropout(0.25))
model.add(layers.GlobalAveragePooling2D())
model.add(layers.Flatten())
model.add(layers.Dense(NUM_CLASSES, activation='softmax'))


In [None]:
# You must compile the model before training it.  Since there are two classes, use a binary cross-entropy loss.
# Since we have added more classification nodes, to our existing model, we need to compile the whole thing
# as a single model, hence we will compile the model now

# 1 - BP optimizer [Adam/Xavier algorithms help in Optimization]
# 2 - Weights are changed depending upon the 'LOSS' ['RMS, 'CROSS-ENTROPY' are some algorithms]
# 3 - On basis of which parameter our loss will be calculated? here we are going for accuracy
optimizer = Adam(lr=learning_rate)

model.compile(optimizer=optimizer, 
              loss='categorical_crossentropy', 
              metrics=['accuracy'])

# model.compile(loss='categorical_crossentropy',
#               optimizer=RMSprop(learning_rate=0.0001),
#               metrics=['accuracy'])

# To see the model summary in a tabular structure
model.summary()

# Printing some statistics
print('Number of trainable variables = {}'.format(len(model.trainable_variables)))



In [None]:
# callback class
class myCallback(callbacks.Callback):
    def __init__(self, target_loss):
        super(myCallback, self).__init__()
        self.target_loss = target_loss

    def on_epoch_end(self, epoch, logs={}):
        '''
        Halts the training after reaching 98 percent accuracy

        Args:
            epoch (integer) - index of epoch (required but unused in the function definition below)
            logs (dict) - metric results from the training epoch
        '''

        # Check accuracy
        if(logs.get('accuracy') > 0.98 and logs.get('val_accuracy') > 0.83 and logs.get('val_loss') < 0.40):

            # Stop if threshold is met
            print("\nTraining and validation accuracy are higher than 0.98 so cancelling training!")
            self.model.stop_training = True

# Instantiate callbacks
callbacks = [
    myCallback(target_loss=target_loss),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.1, patience=3)
]


In [None]:
# Train the model
# We will do it in 50 Iterations
epochs = 50

# Fitting / Training the model
history = model.fit(train_generator,
                    epochs=epochs,
                    validation_data=validation_generator,
                    callbacks=[callbacks])



In [None]:
# Visualizing the Learning curves [OPTIONAL]
#
# Let's take a look at the learning curves of the training and validation accuracy/loss
# When using the MobileNet V2 base model as a fixed feature extractor.


acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()), 1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0, 1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()


"""## Fine tuning
In our feature extraction experiment, you were only training a few layers on top of an MobileNet V2 base model. The weights of the pre-trained network were **not** updated during training.

One way to increase performance even further is to train (or "fine-tune") the weights of the top layers of the pre-trained model alongside the training of the classifier you added. The training process will force the weights to be tuned from generic features maps to features associated specifically to our dataset.

### Un-freeze the top layers of the model

All you need to do is unfreeze the `base_model` and set the bottom layers be un-trainable. Then, recompile the model (necessary for these changes to take effect), and resume training.
"""

# Set the base model to trainable
base_model.trainable = True

# Print the layer names in the base model
print("Base model layers:")
for layer in base_model.layers:
    print(layer.name)

# Fine-tune from this layer onwards
fine_tune_from_layer = -2

# Freeze all the layers before the `fine_tune_from_layer` layer
for layer in base_model.layers[:fine_tune_from_layer]:
    layer.trainable = False


In [None]:
# Compile the model
# Compile the model using a much lower training rate.
# Notice the parameter in Adam() function, parameter passed to Adam is the learning rate
model.compile(loss='categorical_crossentropy',
              optimizer=tf.keras.optimizers.Adam(1e-5),
              metrics=['accuracy'])

# Getting the summary of the final model
model.summary()
# Printing Training Variables
print('Number of trainable variables = {}'.format(len(model.trainable_variables)))



In [None]:
# Continue Train the model
history_fine = model.fit(train_generator,
                         epochs=5,
                         validation_data=validation_generator
                         )
# Saving the Trained Model to the keras h5 format.
# So in future, if we want to convert again, we don't have to go through the whole process again
saved_model_dir = 'gdrive/MyDrive/models/face_recognition.h5'
model.save(saved_model_dir)
print("Model Saved to gdrive/MyDrive/models/face_recognition.h5")




In [None]:
# Load the saved model
loaded_model = models.load_model('gdrive/MyDrive/models/face_recognition.h5')

In [None]:
# from keras.preprocessing.image import load_img, img_to_array
from keras.utils import load_img, img_to_array
import numpy as np

# img_name = "1/Om.jpg"
# img_name = "0/Dhirajx.jpg"
# img_name = "2/Tanushree.jpg"
# img_name = "2/Tanushree2.jpg"
img_name = "2/Tanushree3.jpg"
# img_name = "2/tanushree_passport_sign.jpg"
img_path = os.path.join(test_dir, img_name)

# Load and preprocess your image
img = load_img(img_path, target_size=(224, 224))
# img = load_img(img_path, target_size=(128, 128))
# Show image
plt.imshow(img)
plt.show()

img = img_to_array(img)
img = np.expand_dims(img, axis=0)
img = img/255.0

# Make a prediction
prediction = loaded_model.predict(img)
print(prediction)
# Get the predicted class
predicted_class = np.argmax(prediction, axis=1)

print(predicted_class)
print(names[predicted_class[0]])

In [None]:
import tensorflow as tf

# Convert the model
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

In [None]:
# Save the model.
with open('face_recognition_model.tflite', 'wb') as f:
  f.write(tflite_model)

shutil.copy('face_recognition_model.tflite','gdrive/MyDrive/models/face_recognition_model.tflite')

In [None]:

# # tf.saved_model.save(model, saved_model_dir)

# converter = tf.lite.TFLiteConverter.from_keras_model_file(model)
# # Use this if 238 fails tf.lite.TFLiteConverter.from_keras_model(model)

# tflite_model = converter.convert()

# with open('model.tflite', 'wb') as f:
#     f.write(tflite_model)

# Let's take a look at the learning curves of the training and validation accuracy/loss
# When fine tuning the last few layers of the MobileNet V2 base model and training the classifier on top of it.
# The validation loss is much higher than the training loss, so you may get some overfitting.
# You may also get some overfitting as the new training set is
# Relatively small and similar to the original MobileNet V2 datasets.

acc = history_fine.history['accuracy']
val_acc = history_fine.history['val_accuracy']
print(acc)
print(val_acc)
loss = history_fine.history['loss']
val_loss = history_fine.history['val_loss']
print(loss)
print(val_loss)
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()), 1])
plt.title('Training and Validation Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0, 1.0])
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
plt.show()



# Summary:
'''
* **Using a pre-trained model for feature extraction**:  When working with a small dataset, it is common to take advantage of features learned by a model trained on a larger dataset in the same domain. This is done by instantiating the pre-trained model and adding a fully-connected classifier on top. The pre-trained model is "frozen" and only the weights of the classifier get updated during training.
In this case, the convolutional base extracted all the features associated with each image and you just trained a classifier that determines the image class given that set of extracted features.

* **Fine-tuning a pre-trained model**: To further improve performance, one might want to repurpose the top-level layers of the pre-trained models to the new dataset via fine-tuning.
In this case, you tuned your weights such that your model learned high-level features specific to the dataset. This technique is usually recommended when the training dataset is large and very similar to the orginial dataset that the pre-trained model was trained on.
'''

However this tflite will not run in edgetpu eg. raspberrypi
So we need to do as following

In [None]:
# A generator that provides a representative dataset

def representative_data_gen():
  dataset_list = tf.data.Dataset.list_files(test_dir + '/*/*')
  for i in range(100):
    image = next(iter(dataset_list))
    # file_type = os.path.splitext(image)[1]
    # if file_type not in ['.jpeg', '.jpg', '.png', '.bmp']:
    #   continue
    try:
      image = tf.io.read_file(image)
      image = tf.io.decode_jpeg(image, channels=3)
      image = tf.image.resize(image, [IMAGE_WIDTH, IMAGE_HEIGHT])
      image = tf.cast(image / 255., tf.float32)
      image = tf.expand_dims(image, 0)
    except tf.errors.InvalidArgumentError as e:
      continue
    yield [image]

converter = tf.lite.TFLiteConverter.from_keras_model(model)
# This enables quantization
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# This sets the representative dataset for quantization
converter.representative_dataset = representative_data_gen
# This ensures that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# For full integer quantization, though supported types defaults to int8 only, we explicitly declare it for clarity.
converter.target_spec.supported_types = [tf.int8]
# These set the input and output tensors to uint8 (added in r2.3)
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
tflite_model = converter.convert()

with open('face_recognition_model_edge.tflite', 'wb') as f:
  f.write(tflite_model)

shutil.copy('face_recognition_model_edge.tflite','gdrive/MyDrive/models/face_recognition_model_edge.tflite')


## Compile the edgetpu tflite model

Finally, we're ready to compile the model for the Edge TPU.

First download the Edge TPU Compiler:

In [None]:
! curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -

! echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list

! sudo apt-get update

! sudo apt-get install edgetpu-compiler	

In [None]:
! edgetpu_compiler face_recognition_model_edge.tflite

shutil.copy('face_recognition_model_edge_edgetpu.tflite','gdrive/MyDrive/models/face_recognition_model_edge_edgetpu.tflite')

In [None]:
# print (train_generator.class_indices)

# labels = '\n'.join(sorted(train_generator.class_indices.keys()))

# with open('updated_snow_detection_on_solar_panel_labels.txt', 'w') as f:
#   f.write(labels)
# shutil.copy('updated_snow_detection_on_solar_panel_labels.txt','gdrive/MyDrive/models/updated_snow_detection_on_solar_panel_labels.txt')

In [None]:
from google.colab import files

files.download('face_recognition_model.tflite')
files.download('face_recognition_model_edge.tflite')
files.download('face_recognition_model_edge_edgetpu.tflite')
files.download('labels.txt')