# Init

In [1]:
%cd /kaggle/input/leaves

In [2]:
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix
from PIL import Image

from tensorflow.keras.preprocessing.image import ImageDataGenerator
#import cv2 as cv

import skimage as ski
import skimage.filters as skif
import skimage.color as skic

from sklearn.utils import class_weight



# -------------------------------------

tfk = tf.keras
tfkl = tf.keras.layers
print(tf.__version__)



# Random seed for reproducibility
seed = 43

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)



# Dataset folders 
dataset_dir = '/kaggle/input/leaves'
training_dir = os.path.join(dataset_dir, 'training')

In [3]:
'''def preprocess(im):
    #im = im.astype('uint8')
    
    im_thresh = skic.rgb2gray(im)
    im_thresh = skif.hessian(im_thresh, mode='reflect', sigmas=[1], gamma=10)

    im_thresh = 1-im_thresh
    im_thresh *= 255
    
    #im = im.astype('float64')
    
    im[:,:,0] = im_thresh
    
    return im'''

In [4]:
def to_categorical(image, label):
    # Casts to an Int and performs one-hot ops
    label = tf.one_hot(tf.cast(label, tf.int32), 14)
    # Recasts it to Float32
    lab11el = tf.cast(label, tf.float32)
    return image, label

train_ds = tfk.utils.image_dataset_from_directory(
        training_dir, 
        labels='inferred',
        validation_split=0.3,
        subset="training",
        seed=seed,
        image_size=(240, 240),
        batch_size=32)
data_aug = tfk.Sequential([
    tfkl.RandomFlip("horizontal_and_vertical", seed=seed),
    tfkl.RandomRotation(0.5, fill_mode='constant', seed=seed),
    tfkl.RandomZoom(0.5, fill_mode='constant', seed=seed)
])
train_ds = train_ds.map(to_categorical)
train_ds_aug = train_ds.map(lambda x, y: (data_aug(x,training=True), y),  num_parallel_calls=tf.data.AUTOTUNE)


valid_ds = tfk.utils.image_dataset_from_directory(
        training_dir, 
        labels='inferred',
        validation_split=0.3,
        subset="validation",
        seed=seed,
        image_size=(240, 240),
        batch_size=32)
valid_ds = valid_ds.map(to_categorical)

In [5]:
keys = list(range(14))

values = os.listdir(training_dir)
inv_labels = dict(zip(keys, values))
print(inv_labels)

# keys_onehot = list(tf.one_hot(keys, 14).numpy())
# print(keys_onehot)

# cat_labels = dict(zip(keys_onehot, values))

In [6]:
#image data generator

gen = ImageDataGenerator(validation_split=0.2)

train_gen=gen.flow_from_directory(directory = training_dir,
                                  subset="training",
                                  color_mode='rgb',
                                  classes=None,
                                  class_mode='categorical',
                                  seed=seed)
# valid_gen=gen.flow_from_directory(directory = training_dir,
#                                   subset="validation",
#                                   color_mode='rgb',
#                                   classes=None,
#                                   class_mode='categorical',
#                                   seed=seed)             

keys = range(14)
class_weights = class_weight.compute_class_weight(
               'balanced',
                np.unique(train_gen.classes), 
                train_gen.classes)

weights = dict(zip(keys,class_weights))
print(weights)

# Data Visualization _from generator_

## test_ds

In [None]:
plt.figure(figsize=(12, 6))
for images, labels in train_ds.take(1):
    for i in range(8):
        ax = plt.subplot(2, 4, i+1)
        plt.imshow(images[i].numpy().astype("uint8"))

## valid_ds

In [None]:
plt.figure(figsize=(12, 6))
for images, labels in valid_ds.take(1):
    for i in range(8):
        ax = plt.subplot(2, 4, i+1)
        plt.imshow(images[i].numpy().astype("uint8"))
#         plt.title(inv_labels[labels.numpy()[i]])

# __Model__

In [7]:
from keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))


In [8]:
input_shape = (240, 240, 3)

In [9]:
from tensorflow.keras.applications.efficientnet import EfficientNetB1

supernet = EfficientNetB1(
    include_top=False,
    classes=14,
    weights="imagenet",
    input_shape=input_shape,
    classifier_activation='softmax'
)

supernet_layer = 'efficientnetb1'

supernet.summary()
#tfk.utils.plot_model(supernet)

In [12]:
def add_conv1_block(model, filters, activation='relu', dropout=0.3):
    model.add(tfkl.Conv2D(filters=filters, kernel_size=1, strides=1))
    model.add(tfkl.Dropout(dropout))
    model.add(tfkl.BatchNormalization())
    if activation!=None:
        model.add(tfkl.Activation(activation))

def add_dense_block(model, units, activation='relu', dropout=0.2):
    model.add(tfkl.Dense(units=units, activation=activation, kernel_initializer=tfk.initializers.GlorotUniform(seed)))
    model.add(tfkl.Dropout(dropout))

In [None]:
# Use the supernet as feature extractor
supernet.trainable = False

#73, 176, 234, 293, 308 
for layer in supernet.layers[176:]:    #ENB1,ENB2 block 7a7b
    if not isinstance(layer, tfk.layers.BatchNormalization):
        layer.trainable = True
        
# for i, layer in enumerate(supernet.layers):
#     if "batch_normalization" in layer.name:
#         supernet.layers[i] = GroupNormalization(groups=32, axis=-1, epsilon=0.00001)

dropout_rate = 0.15


#MODEL
model = tfk.Sequential()
    
#input & supernet
model.add(tfkl.Input(shape=input_shape))
model.add(supernet)
model.add(tfkl.GlobalAveragePooling2D())

#DENSE
add_dense_block(model=model, units=768, dropout=dropout_rate)
add_dense_block(model=model, units=196, dropout=dropout_rate)
add_dense_block(model=model, units=48, dropout=dropout_rate)
# add_dense_block(model=model, units=14, dropout=0, activation='softmax')
model.add(tfkl.Dense(units=14, activation=None, kernel_initializer=tfk.initializers.GlorotUniform(seed)))


#FULLY_CONV
# add_conv1_block(model=model, filters=704, dropout=hp_dropout)
# add_conv1_block(model=model, filters=176, dropout=hp_dropout)
# add_conv1_block(model=model, filters=44, dropout=hp_dropout)
# add_conv1_block(model=model, filters=14, activation=None, dropout=hp_dropout)
# model.add(tfkl.GlobalMaxPooling2D())
# model.add(tfkl.Activation('softmax'))


print('model built')

In [None]:
# Compile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
# loss='categorical_crossentropy'
loss='kl_divergence'
model.compile(loss=loss, optimizer=optimizer, metrics=['accuracy', f1_m])
model.summary()

In [None]:
for i, layer in enumerate(model.get_layer(supernet_layer).layers):
   print(i, layer.name, layer.trainable)

In [None]:
# Utility function to create folders and callbacks for training
from datetime import datetime

def create_folders_and_callbacks(model_name, patience=10):

  exps_dir = os.path.join('models')
  if not os.path.exists(exps_dir):
      os.makedirs(exps_dir)

  now = datetime.now().strftime('%b%d_%H-%M-%S')

  exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
  if not os.path.exists(exp_dir):
      os.makedirs(exp_dir)
      
  callbacks = []

  # Model checkpoint
  # ----------------
  ckpt_dir = os.path.join(exp_dir, 'ckpts')
  if not os.path.exists(ckpt_dir):
      os.makedirs(ckpt_dir)

  ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp.ckpt'), 
                                                     save_weights_only=False, # True to save only weights
                                                     save_best_only=True) # True to save only the best epoch 
  callbacks.append(ckpt_callback)

  # Visualize Learning on Tensorboard
  # ---------------------------------
  tb_dir = os.path.join(exp_dir, 'tb_logs')
  if not os.path.exists(tb_dir):
      os.makedirs(tb_dir)
      
  # By default shows losses and metrics for both training and validation
  tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir, 
                                               profile_batch=0,
                                               histogram_freq=1)  # if > 0 (epochs) shows weights histograms
  callbacks.append(tb_callback)

  # Early Stopping
  # --------------
  es_callback = tf.keras.callbacks.EarlyStopping(monitor='f1_m', patience=patience, restore_best_weights=True)
  callbacks.append(es_callback)

  rop_callback = tfk.callbacks.ReduceLROnPlateau(
        monitor = 'f1_m', 
        factor = 0.5, 
        patience = 4, 
        min_lr=0.000001,
        verbose=1,
        mode='auto')
  callbacks.append(rop_callback)

  return callbacks

In [None]:
%cd /kaggle/working

In [None]:
#Training of classifier

tf.get_logger().setLevel('ERROR') #('WARNING') 

epochs = 150
aug_callbacks = create_folders_and_callbacks(model_name='EfficientNet', patience=21)

history = model.fit(
    x = train_ds_aug,   
    epochs = epochs,
    validation_data = valid_ds,   
    callbacks = aug_callbacks,
    class_weight = weights
).history

In [None]:
# Save best epoch model
model.save("models/Best")

# Evaluation


In [None]:
%cd /kaggle/working

In [None]:
model = tfk.models.load_model("models/Best")

In [None]:
model_aug_test_metrics = model.evaluate(valid_ds, return_dict=True)
print("Test metrics with data augmentation")
print(model_aug_test_metrics)