In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import Counter
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds
#tfds.disable_progress_bar()
from keras.models import Sequential
from keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
from keras.preprocessing.image import ImageDataGenerator
import datetime
%matplotlib inline
try:
  # Use the %tensorflow_version magic if in colab.
  %tensorflow_version 2.x
except Exception:
  pass

In [None]:
# Enable eager execution
# tf.enable_v2_behavior()

# Check out all available datasets 
#tfds.list_builders()

In [None]:
(ds_train, ds_validation, ds_test), ds_info = tfds.load(
    name = 'beans', 
    split = ['train', 'validation', 'test'],
    as_supervised = True,
    with_info = True)
#print(ds_info)

In [None]:
# Image parameters:
image_height = 500
image_width = 500
num_channels = 3 
num_classes = 3 # healthy, angular leaf spot disease, bean rust disease

# Pipeline hyperparameters:
batch_size = 32


# Let's keep the dimensions the same (no resizing for now)

def normalize_image(image, label, target_height = 500, target_width = 500):
    image = tf.cast(image, tf.float32)/255.
    image = tf.image.resize_with_crop_or_pad(image, target_height, target_width)
    return image, label

ds_train = ds_train.map(normalize_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
ds_validation = ds_validation.map(
    normalize_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
ds_validation = ds_validation.batch(batch_size)
ds_validation = ds_validation.cache()
ds_validation = ds_validation.prefetch(tf.data.experimental.AUTOTUNE)

ds_test = ds_test.map(
    normalize_image, num_parallel_calls = tf.data.experimental.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
def return_class_labels(ds):
    """"Returns a list of class labels from a `DatasetV1Adapter` object."""
    l_labels = []
    for _, labels in ds.take(-1):
        labels = labels.numpy()
        l_labels.append(labels[:])
    return [item for sublist in l_labels for item in sublist]

In [None]:
training_labels = return_class_labels(ds_train)
print("The distribution of training labels is: ", (Counter(training_labels)))

validation_labels = return_class_labels(ds_validation)
print("The distribution of validation labels is: ", (Counter(validation_labels)))
      
test_labels = return_class_labels(ds_test)
print("The distribution of test labels is: ", (Counter(test_labels)))

In [None]:
example = ds_train.take(1)
for sample in example:
    image, label = sample[0], sample[1]
    image = image.numpy()
    label = label.numpy()

In [None]:
# Custom Activation
from keras.utils.generic_utils import get_custom_objects
from keras.layers import Activation
from keras import backend as K

def Generic_Para(x):
    return 0.65*K.pow(x,2) + 0.5*x

def SBAF_Para(x):
    return 1 - x + K.pow(x, 2)

k = 1.25
def A_ReLU_Para(x):
    global k
    return k * K.pow(x, 2)

def Abs_Func(x):
    return K.abs(x)

# Not Working
def A_ReLU(x):
  # if x>=0:
  #   return k*K.pow(x,n)
  # else:
  #   return 0.0
  # return K.clip(k*K.pow(x,n), 0, 2)
  return 0.94*K.pow(K.abs(x),1.1)
  # return K.switch(K.greater(x, 0), 0.94*K.pow(x,1.1), K.zeros_like(x))
  # return K.clip(K.switch(K.greater(x, 0), K.zeros_like(x), 0.84*K.pow(x,1.1)), 0, 2)

# Not working
def Leaky_A_ReLU(x):
  a = 0.01
  k = 1.06
  n = 1.1
  if x>=0:
    return k*K.pow(x,n)
  else:
    return a*k*K.pow(K.abs(x),n)
  
get_custom_objects().update({
    'Generic_Para': Activation(Generic_Para),
    'SBAF_Para': Activation(SBAF_Para),
    'A_ReLU_Para': Activation(A_ReLU_Para),
    'Abs_Func': Activation(Abs_Func),
    'A_ReLU': Activation(A_ReLU),
    'Leaky_A_ReLU': Activation(Leaky_A_ReLU)
    })


In [None]:

model = Sequential()
model.add(Conv2D(15, 10, input_shape = (image_height, image_width, num_channels),
                        strides = 2, padding = 'same', activation = 'relu'))
model.add(MaxPooling2D(pool_size = 4))
model.add(Dropout(0.3))
model.add(Conv2D(15, 10, strides = 2, padding = 'same', activation = 'relu'))
model.add(MaxPooling2D(pool_size = 4))
model.add(Dropout(0.3))
model.add(Flatten())
# model.add(Dense(128, activation = 'A_ReLU_Para', kernel_initializer = 'he_uniform'))
model.add(Dense(128, activation = 'swish'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation = 'softmax'))
# Conv2D(15, 10, input_shape = (image_height, image_width, num_channels), strides = 2, padding = 'same', activation = 'relu'),
# MaxPooling2D(pool_size = 4),
# Dropout(0.3),
# Conv2D(15, 10, strides = 2, padding = 'same', 
#                      activation = 'relu'),
# MaxPooling2D(pool_size = 4),
# Dropout(0.3),
# Flatten(),
# Dense(128, activation = 'relu'),
# Dense(num_classes, activation = 'softmax'),

In [None]:
learning_rate = 0.001

model.summary()
model.compile(
    loss = 'sparse_categorical_crossentropy',
    optimizer = tf.keras.optimizers.Adam(learning_rate),
    metrics = ['accuracy'],
)

In [None]:
num_epochs = 40

history = model.fit(
    ds_train,
    epochs = num_epochs,
    verbose = 1,
    validation_data = ds_validation,
)

In [None]:
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

loss=history.history['loss']
val_loss=history.history['val_loss']

epochs_range = range(num_epochs)

plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()


In [None]:
# tf.enable_eager_execution()
score = model.evaluate(ds_test)
print('\n', 'Test accuracy:', round(score[1], 5))
print('\n', 'Test Loss:', round(score[0], 5))

print(tfds.as_numpy(ds_test))
from sklearn import metrics
y_test = []
x_test = []
for data, labels in ds_test.take(-1):
        labels1 = labels.numpy()
        data1 = data.numpy()
        y_test.extend(labels1)
        x_test.extend(data1)

# # x_test, y_test = list(ds_test)

testY1 = np.array(y_test)
testY = testY1.flatten()
testX1 = np.array(x_test)

predY = model.predict_classes(testX1)
print(metrics.classification_report(testY, predY, digits=3))