In [None]:
# the following notebook is partially based on https://keras.io/guides/transfer_learning/

In [None]:
%matplotlib inline

from IPython.display import display as d
from IPython.display import Image 

import pandas as pd
import numpy as np
import seaborn as sns
sns.set(rc={'figure.figsize':(12,12)})
sns.set_style("white")
import glob
import json
import os
import sys
import json
import shutil
import os
import sys
import multiprocessing
import shutil
from pprint import pprint

pool = multiprocessing.Pool(multiprocessing.cpu_count())

In [None]:
#!pip3 install keras tensorflow

In [None]:
# deactivate GPU
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

In [None]:
import tensorflow as tf
import keras

from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

print(tf.config.list_physical_devices('GPU'))

In [None]:
directory = "data/rule_of_thirds/"
batch_size = 32
input_shape = (224, 224, 3)

def read_images(directory, subset, batch_size=32):
    return tf.keras.preprocessing.image_dataset_from_directory(
        directory, 
        labels='inferred', 
        label_mode='int',
        color_mode='rgb', 
        batch_size=batch_size, 
        image_size=input_shape[0:2], 
        shuffle=True, 
        seed=42, 
        validation_split=0.1,  # 10% validation 
        subset=subset,
        interpolation='bilinear'
    )

training = read_images(directory, "training", batch_size)
print(training)

validation = read_images(directory, "validation", batch_size)
print(validation)

In [None]:
import matplotlib.pyplot as plt

def overview_dataset(data):
    class_names = data.class_names

    plt.figure(figsize=(10, 10))
    for images, labels in data.take(1):
          for i in range(min(len(images), 9)):
                ax = plt.subplot(3, 3, i + 1)
                plt.imshow(images[i].numpy().astype("uint8"))
                plt.title(class_names[labels[i]])
                plt.axis("off")



In [None]:
overview_dataset(training)

In [None]:
overview_dataset(validation)

In [None]:
from tensorflow import keras
from tensorflow.keras import layers


class RandomJPEG(layers.experimental.preprocessing.PreprocessingLayer):
    def __init__(self, min_comp=70, max_comp=99):
        super(RandomJPEG, self).__init__()
        self.min_comp = min_comp
        self.max_comp = max_comp
    def call(self, images):
        return tf.map_fn(
            lambda img: tf.image.random_jpeg_quality(img, self.min_comp, self.max_comp), 
            images
        )

class RandomHue(layers.experimental.preprocessing.PreprocessingLayer):
    def __init__(self, max_delta=0.2):
        super(RandomHue, self).__init__()
        self.max_delta = max_delta

    def call(self, images):
        return tf.map_fn(
            lambda img: tf.image.random_hue(img, self.max_delta), 
            images
        )
    

class RandomSat(layers.experimental.preprocessing.PreprocessingLayer):
    def __init__(self, lower=3, upper=6):
        super(RandomSat, self).__init__()
        self.lower = lower
        self.upper = upper

    def call(self, images):
        return tf.map_fn(
            lambda img: tf.image.random_saturation(img, self.lower, self.upper), 
            images
        )

    
data_augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
        layers.experimental.preprocessing.RandomContrast(0.5),
        RandomHue(),
        RandomSat(),
        #RandomJPEG(), # this does not work
    ]
)

In [None]:
base_model = keras.applications.DenseNet201( # MobileNetV2( # VGG19(
    weights="imagenet",  # Load weights pre-trained on ImageNet.
    input_shape=input_shape,
    include_top=False,
)  # Do not include the ImageNet classifier at the top.

# Freeze the base_model
base_model.trainable = False

# Create new model on top
inputs = keras.Input(shape=input_shape)

x = tf.keras.applications.densenet.preprocess_input(inputs)

x = base_model(x, training=False)

#x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Flatten()(x)  
x = keras.layers.Dropout(0.2)(x)  # Regularize with dropout
outputs = keras.layers.Dense(1, activation="sigmoid")(x)  # , activation='softmax'

model = keras.Model(inputs, outputs)

model.summary()


In [None]:
# this still does not work
def augment_images(images):
    for batch, label in images:
        da = data_augmentation(batch)
        yield da, label


In [None]:
from tqdm.keras import TqdmCallback
from keras.callbacks import ModelCheckpoint

In [None]:
model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=[keras.metrics.BinaryAccuracy()],
)

checkpoint = ModelCheckpoint(
    "best_model.hdf5",
    monitor='val_loss', 
    verbose=0,
    save_best_only=True, 
    mode='auto', 
    save_freq='epoch'
)


epochs = 2
res = model.fit(
    training, 
    epochs=epochs, 
    validation_data=validation,
    verbose=0,
    callbacks=[TqdmCallback(verbose=2), checkpoint]
)

In [None]:
y_pred = res.model.predict(validation)


In [None]:
pprint(y_pred.shape)
y_pred = y_pred.flatten()
pprint(y_pred)

In [None]:
y_truth = []
for y, l in validation:
    y_truth.extend(l.numpy())
y_truth = np.array(y_truth).flatten() 

In [None]:
df = pd.DataFrame({"y_pred": y_pred, "y_truth": y_truth})

In [None]:
df.plot(x="y_pred", y="y_truth", kind="scatter")

In [None]:
from sklearn.metrics import classification_report

print(
    classification_report(y_truth, [1 if x > 0.0 else 0 for x in y_pred], zero_division=0)
)

In [None]:
hist = res.history
hist["epoch"] = list(range(len(hist["loss"])))

pprint(hist)

dh = pd.DataFrame(hist)

In [None]:
dh.plot(x="epoch", y=["loss", "val_loss"], figsize=(12,8))

In [None]:
dh.plot(x="epoch", y=["binary_accuracy", "val_binary_accuracy"], figsize=(12,8))

