# Building an Image Classifier Deep Network applying Transfer Learning

In [None]:
# import the required libraries
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

# from tensorflow.keras.preprocessing import image_dataset_from_directory
# from tensorflow.keras.preprocessing.image import ImageDataGenerator
# import tf.keras.utils
from PIL import Image
import numpy as np
import os
import base64
from io import BytesIO

## Dataset creation

In [None]:
# dataset's directory path
directory = "../dataset/images/"

batch_size = 32
image_size = (160,160) # resize the images to 160x160

# create training and validation sets
# use image_dataset_from_directory to load the images
# set a validation split and specify the subset ('training' or 'validation')
# set the random seeds to match eachother (to avoid overlapping of the images in
# train and validation sets)

train_set = tf.keras.utils.image_dataset_from_directory(directory,
                                             shuffle=True,
                                             batch_size=batch_size,
                                             image_size=image_size,
                                             validation_split=0.2,
                                             subset='training',
                                             seed=42)
validation_set = tf.keras.utils.image_dataset_from_directory(directory,
                                             shuffle=True,
                                             batch_size=batch_size,
                                             image_size=image_size,
                                             validation_split=0.2,
                                             subset='validation',
                                             seed=42)

# Obtain Class names (labels)

In [None]:
# print some images
# use .class_names attribute to retrieve the classes of the images from the dicrectories names
class_names = train_set.class_names
print(class_names)

# Plot samples

In [None]:
plt.figure(figsize=(10, 10))
for images, labels in train_set.take(1): # take 1 batch randomly
    for i in range(9): # print 9 images of that batch
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(class_names[labels[i]])
        plt.axis("off")

# Import pre-trained model

In [None]:
# explore the MobileNetV2 network
image_shape = image_size + (3,)
pre_trained_model = tf.keras.applications.MobileNetV2(input_shape=image_shape,
                                               include_top=True,
                                               weights='imagenet')

In [None]:
len(pre_trained_model.layers)

In [None]:
# load the pre-trained model without the final layers
image_shape = image_size + (3,)
pre_trained_model = tf.keras.applications.MobileNetV2(input_shape=image_shape,
                                               include_top=False,
                                               weights='imagenet')

In [None]:
len(pre_trained_model.layers)

In [None]:
# normalize the inputs to the range [-1, 1]
preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input
# freeze the base model by making it non trainable
pre_trained_model.trainable = False
# define the input layer
inputs = tf.keras.Input(shape=image_shape) 
# preprocess the inputs
x = preprocess_input(inputs)
# add the pre-trained (not trainable) model
x = pre_trained_model(x, training=False)
# add a pooling layer
x = tf.keras.layers.GlobalAveragePooling2D()(x)
# add a dropout layer for regularization
x = tf.keras.layers.Dropout(0.2)(x)
# add the output layer
# outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
outputs = tf.keras.layers.Dense(2, activation='sigmoid')(x)
# define the model with its inputs and outputs
model = tf.keras.Model(inputs, outputs)

# Compile Model

In [None]:
# compile the model
base_learning_rate = 0.001
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              # loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Train Model

In [None]:
# train the model
history = model.fit(train_set, validation_data=validation_set, epochs=10)

# Plot Loss/Accuracy

In [None]:
acc = [0.] + history.history['accuracy']
val_acc = [0.] + history.history['val_accuracy']

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training')
plt.plot(val_acc, label='Validation')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()),1])
plt.title('Accuracy')

plt.subplot(2, 1, 2)
plt.plot(loss, label='Training')
plt.plot(val_loss, label='Validation')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.ylim([0,1.0])
plt.title('Loss')
plt.xlabel('epoch')
plt.show()

# Verify prediction results

In [None]:
# predict images
test_path = "../dataset/images/tea-green"
test_imgs = os.listdir(test_path)

ti = len(test_imgs)
print("number of images: ", ti)

data = np.empty((ti, 160, 160, 3))

for i in test_imgs:
    try:
        image_path = os.path.join(test_path, i)
        # print(image_path)
        img = Image.open(image_path).convert('RGB')
        img = img.resize((160, 160), Image.LANCZOS)
        # img = img.resize((160, 160), Image.ANTIALIAS)
        img = np.asarray(img)
        # img = preprocess_input(img)
        # print(test_imgs.index(i))
        data[test_imgs.index(i)] = img
    except IsADirectoryError:
        print("bypassing: "+image_path)
        continue

predictions = model.predict(data)

# print(predictions)
print(np.argmax(predictions, axis=1))
print("Above, should be all 1's.")

# Verify prediction results

In [None]:
# predict images
test_path = "../dataset/images/other"
test_imgs = os.listdir(test_path)


ti = len(test_imgs)
print("number of images: ", ti)

data = np.empty((ti, 160, 160, 3))

for i in test_imgs:
    image_path = os.path.join(test_path, i)
    # print(image_path)
    img = Image.open(image_path).convert('RGB')
    img = img.resize((160, 160), Image.LANCZOS)
    # img = img.resize((160, 160), Image.ANTIALIAS)
    img = np.asarray(img)
    # img = preprocess_input(img)
    # print(test_imgs.index(i))
    data[test_imgs.index(i)] = img


predictions = model.predict(data)

# print(predictions)

print(np.argmax(predictions, axis=1))
print("Above, should be all 0's.")

# With Data Augmentation

In [None]:
# freeze the base model by making it non trainable
pre_trained_model.trainable = False
# define the input layer
inputs = tf.keras.Input(shape=image_shape) 
# apply data augmentation
x = tf.keras.Sequential([tf.keras.layers.RandomFlip('horizontal'),
                         tf.keras.layers.RandomRotation(0.2)])(inputs)

# preprocess the augmented inputs
x = preprocess_input(x)
# add the pre-trained (not trainable) model
x = pre_trained_model(x, training=False)
# add a pooling layer
x = tf.keras.layers.GlobalAveragePooling2D()(x)
# add a dropout layer for regularization
x = tf.keras.layers.Dropout(0.2)(x)
# add the output layer
# outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)
outputs = tf.keras.layers.Dense(2, activation='sigmoid')(x)
# define the model with its inputs and outputs
model_augmented = tf.keras.Model(inputs, outputs)

In [None]:
model_augmented.summary()

# Compile augmented model

In [None]:
# compile the model
base_learning_rate = 0.001
model_augmented.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              # loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
# train the model
history_augmented = model_augmented.fit(train_set, validation_data=validation_set, epochs=20)

In [None]:
# predict images
test_path = "../dataset/images/tea-green"
test_imgs = os.listdir(test_path)


ti = len(test_imgs)
print("number of images: ", ti)

data = np.empty((ti, 160, 160, 3))

for i in test_imgs:
    try:
        image_path = os.path.join(test_path, i)
        # print(image_path)
        img = Image.open(image_path).convert('RGB')
        img = img.resize((160, 160), Image.LANCZOS)
        # img = img.resize((160, 160), Image.ANTIALIAS)
        img = np.asarray(img)
        # img = preprocess_input(img)
        # print(test_imgs.index(i))
        data[test_imgs.index(i)] = img
    except IsADirectoryError:
        print("bypassing: "+image_path)
        continue

predictions = model_augmented.predict(data)

# print(predictions)
print(np.argmax(predictions, axis=1))
print("Above, should be all 1's.")
print("Results should have improved after augmentation")

# test single image

In [None]:
testfile = "samples/bali-tea.jpeg"

img = Image.open(testfile).convert('RGB')
img = img.resize((160, 160), Image.LANCZOS)
img = np.asarray(img)
# print(img)

mydata = np.empty((1, 160, 160, 3))
mydata[0] = img

# print(mydata)

mypredition = model_augmented.predict(mydata)

print(mypredition)
print(np.argmax(mypredition, axis=1))

# Save augmented model

In [None]:
tf.saved_model.save(model_augmented, "./models/base/2")

# Infer saved augmented model

### Load model

In [None]:
smodel = tf.saved_model.load("./models/base/2")
# smodel = tf.keras.models.load_model('saved_model/tea_model')

# smodel = tf.saved_model.load("saved_model/tea_model_b64")
# This is the current signature, that only accepts image tensors as input
signature = smodel.signatures["serving_default"]

print(signature)

### Obtain In/Out keys

In [None]:
# get inputs, and find the second one (first one is empty)
iterInputs = iter(signature.structured_input_signature)
signIn = next(iterInputs)
signIn = next(iterInputs)

# get the key
keyIn = list(signIn.keys())[0]
print(keyIn)

# obtain the key name of the output (typically 'dense_X')
keyOut = next(iter(signature.structured_outputs))
print(keyOut)

### Load image and predict

In [None]:
testfile = "samples/bali-tea.jpeg"

# load test image
content = tf.io.read_file(testfile)
image = tf.image.decode_jpeg(content,channels=3)

# resize image
image = tf.image.resize(image, [160, 160])

# expand dimension to match signature
image = tf.expand_dims(image, 0)

# get predictor function
f = smodel.signatures["serving_default"]

# prepare predictor arguments
kwargs = {keyIn: image}

# predict
myprediction = f(**kwargs)

print(myprediction)
print("predicted class: ", np.argmax(myprediction[keyOut],axis=1))

# print(type(tf.argmax(myprediction[keyOut])))

# # print(tf.argmax(myprediction,axis=1))
# print(tf.argmax(myprediction[keyOut],axis=1))

# print(mypredition)
# print(tf.argmax(myprediction[keyOut], axis=1))

# Save model with Base64 Signature

### Load labels

In [None]:
print(class_names)
labels = tf.constant([class_names])
print(labels)

### Load augmented model

In [None]:
smodel = tf.saved_model.load("./models/base/2")

# This is the current signature, that only accepts image tensors as input
signature = smodel.signatures["serving_default"]
print(signature)

# obtain the key name of the output (typically 'dense_X')
keyOutput = next(iter(signature.structured_outputs))


@tf.function()
def my_predict(image_b64):

    # get content
    content = image_b64[0]

    # decode image    
    image = tf.image.decode_jpeg(content,channels=3)
    # tf.compat.v1.enable_eager_execution()

    # resize image
    image = tf.image.resize(image, [160, 160])

    # expand dimension to match signature
    image = tf.expand_dims(image, 0)

    # execute prediction
    prediction = signature(image)

    # obtain index of maximum probability prediction
    idx = tf.argmax(prediction[keyOutput],axis=1)

    # obtain the label for given index
    label = tf.gather(labels, idx, batch_dims=1)

    # obtain probability from Tensor
    probability = prediction[keyOutput][0,idx[0]]

    # combine result in String Tensor format with [label,probability]
    result = tf.concat([label, [tf.as_string(probability)]], axis=0)

    # return result_tensor
    return result


# Create new signature, to read b64 images
new_signature = my_predict.get_concrete_function(
    image_b64=tf.TensorSpec(name="image_b64", shape=[1], dtype=tf.string)
)

# Save model with Base64 input signature
tf.saved_model.save(
    smodel,
    export_dir="./models/redbag/2",
    signatures=new_signature
)

# Test Base64 Model with single image

In [None]:
smodel = tf.saved_model.load("./models/redbag/2")

# Load model's signature
signature = smodel.signatures["serving_default"]

print(signature)

### Run Inference

In [None]:
testfile = "samples/bali-tea.jpeg"

# load test image
content = tf.io.read_file(testfile)

# reshape to signature's expected dimensions
content = tf.reshape(content, shape = [1])

# obtain signature
f = smodel.signatures["serving_default"]

# run prediction
myprediction = f(image_b64=content)
print(myprediction)