In [None]:
import fiftyone as fo
from PIL import Image
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

# use pickle to store data and models https://docs.python.org/3/library/pickle.html 

# Set True if you dont have the images locally already!
reload = False
    
classes = ["Sushi", "Hot dog", "Pizza"]
split = None #"validation" # ["train", "test" or "validation"] None --> Read all
max_samples = None # 10
num_classes = 3

images = []
labels = []
label=0

if reload:

    # Import all datasets 
    for _class in classes:
        # get dataset per class
        dataset = fo.zoo.load_zoo_dataset(
            "open-images-v7", 
            split=split, 
            label_types=["classifications"], 
            classes = _class,
            max_samples=max_samples,
            seed=None,
            shuffle=False,
            dataset_name = _class,
            dataset_dir = "datasets/"+_class,
            download_if_necessary = True,
            drop_existing_dataset = False,
            overwrite = True,
            cleanup = False,
        )
        # get image paths as list
        img_paths = dataset.values("filepath")

        label += 1


# Preparation

In [None]:
import os

# Resize Rectangular to quadratic size for all imported images (Size == VGG16 Model Size)
size = 224
# Batch size
batch_size = 64

# Convert to Tensorflow Dataset and perform 80/20 split
train_ds= tf.keras.utils.image_dataset_from_directory(
  directory=os.getcwd()+"/datasets/",
  image_size=(size, size),
  validation_split=0.2,
  subset="training",
  seed=1337,
  batch_size=batch_size)

test_ds = tf.keras.utils.image_dataset_from_directory(
  directory=os.getcwd()+"/datasets/",
  image_size=(size, size),
  validation_split=0.2,
  subset="validation",
  seed=1337,
  batch_size=batch_size)


## Show some Images

In [None]:
import matplotlib.pyplot as plt

# Plot 9 Examples including Labels
plt.figure(figsize=(10, 10))
for images, labels in train_ds.take(9):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(train_ds.class_names[labels[i]])
        plt.axis("off")
        
plt.figure(figsize=(10, 10))
for images, labels in test_ds.take(9):
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(train_ds.class_names[labels[i]])
        plt.axis("off")

## ONE HOT Encoding for Classification Training

In [None]:
# SHUFFLE DATA BEFORE TRAINING
train_ds = train_ds.shuffle(len(train_ds))
test_ds = test_ds.shuffle(len(test_ds))

# one-hot encode train_ds and test_ds
train_ds = train_ds.map(lambda x, y: (x, tf.one_hot(y, num_classes)))

# One-hot encode the target labels of test_ds
test_ds = test_ds.map(lambda x, y: (x, tf.one_hot(y, num_classes)))

# 1. Transfer Learning (Base Model)

### Modify for 3 classes and compile

In [None]:
# Load pre-trained VGG16 model
vgg_model = tf.keras.applications.VGG16(
    weights='imagenet',
    include_top=True,
    input_shape=(size, size, 3)
)

# Print model summary
vgg_model.summary()

# Replace the output layer
num_classes = 3  # Number of classes in your dataset
output_layer = tf.keras.layers.Dense(num_classes, activation='softmax')(vgg_model.layers[-2].output)

# Create the new model
model = tf.keras.Model(inputs=vgg_model.input, outputs=output_layer)

# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

### Train VGG16 Model

In [None]:
num_epochs = 100
filepath = r"Transfer_Learning.h5"

# CALLBACK TO SAVE BEST MODEL
save_best = tf.keras.callbacks.ModelCheckpoint(filepath, 
                                                monitor='val_loss', 
                                                verbose=1, 
                                                save_best_only=True,
                                                save_weights_only=False, 
                                                mode='auto', 
                                                save_freq='epoch')
history = model.fit(train_ds,
                    validation_data = test_ds, 
                    epochs=num_epochs, 
                    verbose = 1,
                    shuffle = True,)
                    #callbacks=[save_best]


In [None]:
legend = list(history.history.keys())
for key in history.history.keys():
    plt.plot(history.history[key][1:])
plt.title('Transfer Learning')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(legend, loc='upper right')
plt.grid()
plt.draw()
fig1 = plt.gcf()
fig1.set_dpi(200)
plt.show()
plt.close()

# 2. Transfer Learning + Augmentation  (Base Model with Augmentation) 

## Cleansing

In [None]:
# Auto-Check Function if Image is Blurry (NOT USED - Basically no blurry Images in dataset)

def is_image_blurry(image, threshold):
    grayscale_image = tf.image.rgb_to_grayscale(image)
    grayscale_image = tf.image.grayscale_to_rgb(grayscale_image)  # Convert to 3 channels
    grayscale_image = tf.cast(grayscale_image, tf.float32)  # Convert to float32
    grayscale_image = tf.expand_dims(grayscale_image, axis=0)  # Expand dimensions for compatibility
    gradients = tf.image.sobel_edges(grayscale_image)
    gradient_magnitude = tf.sqrt(tf.reduce_sum(gradients ** 2, axis=-1))
    average_gradient = tf.reduce_mean(gradient_magnitude)
    is_blurry = tf.less(average_gradient, threshold)
    return is_blurry

# --> Manuel Cleansing of Data by Human Eye! --> Sort out Senseless Images!

## Augmentation

In [None]:
# Define data augmentation settings

print("Total Number of Batches for Training BEFORE Augmentation: ", len(list(train_ds.as_numpy_iterator())))

data_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal_and_vertical"),
    tf.keras.layers.RandomZoom(0.2), #tf.keras.layers.RandomCrop(size,size),
    tf.keras.layers.RandomRotation(0.2), 
    tf.keras.layers.RandomTranslation(0.2, 0.2),
])

aug_train_ds = train_ds.map(lambda x, y: (data_augmentation(x, training=True), y))
aug_test_ds = test_ds.map(lambda x, y: (data_augmentation(x, training=True), y))

train_ds = train_ds.concatenate(aug_train_ds)
test_ds = test_ds.concatenate(aug_test_ds)

print("Total Number of Batches for Training AFTER Augmentation: ", len(list(train_ds.as_numpy_iterator())))

# SHUFFLE DATA BEFORE TRAINING
train_ds = train_ds.shuffle(len(train_ds))
test_ds = test_ds.shuffle(len(test_ds))

In [None]:
# Load pre-trained VGG16 model
vgg_model = tf.keras.applications.VGG16(
    weights='imagenet',
    include_top=True,
    input_shape=(size, size, 3)
)

# Print model summary
vgg_model.summary()

# Replace the output layer
num_classes = 3  # Number of classes in your dataset
output_layer = tf.keras.layers.Dense(num_classes, activation='softmax')(vgg_model.layers[-2].output)

# Create the new model
model_augmented = tf.keras.Model(inputs=vgg_model.input, outputs=output_layer)

# Compile the model
model_augmented.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
num_epochs = 50
filepath = r"Transfer_Learning_Cleansed_Augmented.h5"

# CALLBACK TO SAVE BEST MODEL
save_best = tf.keras.callbacks.ModelCheckpoint(filepath, 
                                                monitor='val_loss', 
                                                verbose=1, 
                                                save_best_only=True,
                                                save_weights_only=False, 
                                                mode='auto', 
                                                save_freq='epoch')
history_augmented = model_augmented.fit(train_ds,
                    validation_data = test_ds, 
                    epochs=num_epochs, 
                    verbose = 1,
                    shuffle = True)
                    #callbacks=[save_best]


In [None]:
legend = list(history_augmented.history.keys())
for key in history_augmented.history.keys():
    plt.plot(history_augmented.history[key][1:])
plt.title('Transfer Learning  with Cleansing & Augmentation')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(legend, loc='upper right')
plt.grid()
plt.draw()
fig1 = plt.gcf()
fig1.set_dpi(200)
plt.show()
plt.close()

# 3. Transfer Learning + Cleansing + Augmentation + Architecture (Custom Model (New Architecture))

## Modified Architecture

In [None]:
# Freezing layers 1-5
for layer in vgg_model.layers[:5]:
    layer.trainable = False
    
output = vgg_model.get_layer('block3_conv3').output

conv1 = tf.keras.layers.Conv2D(512,(1,1), padding="same", activation="relu")(output)
conv2 = tf.keras.layers.Conv2D(1024,(3,3), padding="valid",activation="relu")(conv1)
flatten = tf.keras.layers.Flatten()(conv2)
dense = tf.keras.layers.Dense(256,activation="relu")(flatten)
final = tf.keras.layers.Dense(num_classes, activation="softmax")(dense)

model_custom = tf.keras.Model(inputs=vgg_model.input, outputs=final)
model_custom.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
num_epochs = 50
filepath = r"Transfer_Learning_Cleansed_Augmented_newArchitecture.h5"

# CALLBACK TO SAVE BEST MODEL
save_best = tf.keras.callbacks.ModelCheckpoint(filepath, 
                                                monitor='val_loss', 
                                                verbose=1, 
                                                save_best_only=True,
                                                save_weights_only=False, 
                                                mode='auto', 
                                                save_freq='epoch')
history_custom = model_custom.fit(train_ds,
                    validation_data = test_ds, 
                    epochs=num_epochs, 
                    verbose = 1,
                    shuffle = True,)
                    #callbacks=[save_best])


In [None]:
legend = list(history_custom.history.keys())
for key in history_custom.history.keys():
    plt.plot(history_custom.history[key][1:])
plt.title('Transfer Learning  with Cleansing & New Augmentation & Diff. Architecture')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(legend, loc='upper right')
plt.grid()
plt.draw()
fig1 = plt.gcf()
fig1.set_dpi(200)
plt.show()
plt.close()

## Summary of Evaluation

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

## Base Model

In [None]:
# base model
model.summary()

# Get the predicted labels for the test dataset
y_pred = model.predict(test_ds)
y_pred = np.argmax(y_pred, axis=1)  # Convert predicted probabilities to class labels

# Get the true labels for the test dataset
y_true = test_ds.map(lambda x, y: y)
y_true = np.concatenate(list(y_true.as_numpy_iterator()))
y_true = np.argmax(y_true, axis=1)
# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Display the confusion matrix
print(cm)

# Plot the confusion matrix as an image
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

## Base Model with Augmentation

In [None]:
# base model with augmented data
model_augmented.summary()

# Get the predicted labels for the test dataset
y_pred = model_augmented.predict(test_ds)
y_pred = np.argmax(y_pred, axis=1)  # Convert predicted probabilities to class labels

# Get the true labels for the test dataset
y_true = test_ds.map(lambda x, y: y)
y_true = np.concatenate(list(y_true.as_numpy_iterator()))
y_true = np.argmax(y_true, axis=1)
# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Display the confusion matrix
print(cm)

# Plot the confusion matrix as an image
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

## Custom Model (New Architecture)

In [None]:
# modifed model with augmented data
model_custom.summary()

# Get the predicted labels for the test dataset
y_pred = model_custom.predict(test_ds)
y_pred = np.argmax(y_pred, axis=1)  # Convert predicted probabilities to class labels

# Get the true labels for the test dataset
y_true = test_ds.map(lambda x, y: y)
y_true = np.concatenate(list(y_true.as_numpy_iterator()))
y_true = np.argmax(y_true, axis=1)
# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Display the confusion matrix
print(cm)

# Plot the confusion matrix as an image
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()