In [None]:
import os
import tensorflow as tf
import numpy as np
import os
import random
import pandas as pd
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.utils import class_weight
import math 

from tensorflow.keras.applications import VGG16
from tensorflow.keras.preprocessing import image
from sklearn.metrics import confusion_matrix, classification_report

from tensorflow.keras.preprocessing.image import ImageDataGenerator

tfk = tf.keras
tfkl = tf.keras.layers

!pip install split-folders
import splitfolders

In [None]:
# Fix Random seed for reproducibility
seed = 2113

random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
tf.compat.v1.set_random_seed(seed)

In [None]:
# Dataset folder
dataset_dir = '../input/cannyleavesdataset' # dataset obtained by applying the canny edge to the original dataset
                                            # via python script (see at the end of the notebook, after post processing)
train_dir = os.path.join(dataset_dir, 'training')
# Labels
labels = [
    "Apple",
    "Blueberry",
    "Cherry",
    "Corn",
    "Grape",
    "Orange",
    "Peach",
    "Pepper",
    "Potato",
    "Raspberry",
    "Soybean",
    "Squash",
    "Strawberry",
    "Tomato"
]

In [None]:
# Plot example images from dataset
num_row = len(labels)//2
num_col = len(labels)//num_row
fig, axes = plt.subplots(num_row, num_col, figsize=(2*num_row,15*num_col))
for i in range(len(labels)):
    if i < len(labels):
        class_imgs = next(os.walk('{}/{}/'.format(train_dir, labels[i])))[2]
        class_img = class_imgs[0]
        img = Image.open('{}/{}/{}'.format(train_dir, labels[i], class_img))
        ax = axes[i//num_col, i%num_col]
        ax.imshow(np.array(img))
        ax.set_title('{}'.format(labels[i]))
plt.tight_layout()
plt.show()

### Dataset Preprocessing

In [None]:
# Apply splitfolders to split the dataset in train/validation/test sets
splitfolders.ratio(train_dir, output="output", seed=seed, ratio=(.7, .2, .1), group_prefix=None)

In [None]:
# Dataset folders 
output_dir = './output'
training_dir = os.path.join(output_dir, 'train')
validation_dir = os.path.join(output_dir, 'val')
test_dir = os.path.join(output_dir, 'test')

In [None]:
# Create ImageDataGenerator object
# Apply DataAugmentation to the train set
train_data_gen = ImageDataGenerator(rotation_range=45,
                          zoom_range=0.4,
                          horizontal_flip=True,
                          vertical_flip=True,
                          width_shift_range=0.2,
                          height_shift_range=0.2,
                          rescale=1/255)
# Do not apply Data Augmentation on validation and test set
valid_data_gen = ImageDataGenerator(rescale=1/255)
test_data_gen = ImageDataGenerator(rescale=1/255)

train_gen = train_data_gen.flow_from_directory(directory=training_dir, 
                                              target_size=(256, 256), #when i read the images, this will resize as you specify
                                              color_mode='rgb',
                                              classes=None, #default is None (read the classes into a lexicographic order and assign to them the indexes)
                                              batch_size=20,
                                              shuffle=True, #each epoch shuffle the data. 
                                              seed=seed) 
valid_gen = valid_data_gen.flow_from_directory(directory=validation_dir, 
                                              target_size=(256, 256),
                                              color_mode='rgb',
                                              classes=None,
                                              batch_size=20,
                                              shuffle=False,
                                              seed=seed)

test_gen = test_data_gen.flow_from_directory(directory=test_dir, 
                                              target_size=(256, 256),
                                              color_mode='rgb',
                                              classes=None,
                                              batch_size=20,
                                              shuffle=False,
                                              seed=seed)

In [None]:
# Compute weights for each class
class_weights = class_weight.compute_class_weight(
           'balanced',
            classes=np.unique(train_gen.classes), 
            y=train_gen.classes)

dataset_stats = {}

for i, label in enumerate(labels):
    files = os.listdir(os.path.join(train_dir, label))
    dataset_stats[i] = [label, len(files)]
df = pd.DataFrame.from_dict(dataset_stats, orient="index", columns=["Category", "Size"])
df["Weight"] = class_weights

df

In [None]:
weights={}

for i in range(len(class_weights)):
    weights[i] = class_weights[i]

weights

In [None]:
input_shape = (256, 256, 3)
epochs = 200

### Model

In [None]:
from datetime import datetime

def create_folders_and_callbacks(model_name):
    # List of callbacks
    callbacks = []
    
    # Create data_loader folder if it does not exist
    exps_dir = os.path.join('data_loaded')
    if not os.path.exists(exps_dir):
        os.makedirs(exps_dir)

    now = datetime.now().strftime('%b%d_%H-%M-%S')

    exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
    if not os.path.exists(exp_dir):
        os.makedirs(exp_dir)

  # Model checkpoint: allows to automatically save the model during training

    ckpt_dir = os.path.join(exp_dir, 'ckpts')  # create a subfolder in the folder which will contain the model checkpoints
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)

    ckpt_callback = \
        tf.keras.callbacks.ModelCheckpoint(
            filepath=os.path.join(ckpt_dir, 'cp'),
            save_weights_only=False,
            save_best_only=False)  # where i want to save the checkpoints

    callbacks.append(ckpt_callback)

  # Visualize Learning on Tensorboard
  # ---------------------------------

    tb_dir = os.path.join(exp_dir, 'tb_logs')
    if not os.path.exists(tb_dir):
        os.makedirs(tb_dir)

    tb_callback = tf.keras.callbacks.TensorBoard(
        log_dir=tb_dir,
        profile_batch=0,
        histogram_freq=1)  # allows to save the histogram of weights over the epochs (the value stands for the epoch frequency)
    
    callbacks.append(tb_callback)

  # Early Stopping
  # --------------

    es_callback = tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=8,
        min_delta=0.0003,
        restore_best_weights=True)
    
    callbacks.append(es_callback)
    
    return callbacks

In [None]:
model = tfk.Sequential([
    layers.Input(shape=input_shape),
    
    layers.SeparableConv2D(32, 3, activation='relu'), # Less parameters than Conv2D with same performances
    layers.SeparableConv2D(64, 3, activation='relu'),
    layers.BatchNormalization(), # used to stabilize and speed up the training
    layers.MaxPooling2D(2),
    
    layers.SeparableConv2D(64, 3, activation='relu'),
    layers.SeparableConv2D(128, 3, activation='relu'),
    layers.BatchNormalization(),
    layers.MaxPooling2D(2),
    
    layers.SeparableConv2D(128, 3, activation='relu'),
    layers.SeparableConv2D(256, 3, activation='relu'),
    layers.BatchNormalization(),
    layers.GlobalAveragePooling2D(), # we use GlobalAveragePooling instead of Flatten, since it reduces the number of parameters
    
    layers.Flatten(),
    layers.BatchNormalization(renorm=True),
    layers.Dropout(0.5, seed=seed), # add dropout to reduce overfitting
    layers.Dense(256, activation='relu'),
    layers.Dense(len(labels), activation='softmax')
])

model.compile(
    optimizer=tfk.optimizers.RMSprop(lr=1e-4),
    loss=tfk.losses.CategoricalCrossentropy(),
    metrics=['accuracy'],
)

model.summary()

In [None]:
# Create folders and callbacks and fit
callbacks = create_folders_and_callbacks(model_name='CNN_Canny')

# Train the model
history = model.fit(
    x = train_gen,
    validation_data=valid_gen,
    epochs=epochs,
    batch_size=32,
    class_weight=weights,
    callbacks=callbacks
)

model.save("CNN_Canny")

### Postprocessing

In [None]:
# Predict the test set with the CNN
predictions = model.predict(test_gen)
predictions.shape

In [None]:
#Confution Matrix and Classification Report
Y_pred = xe.predict_generator(test_gen, 1786 // 20+1)
y_pred = np.argmax(Y_pred, axis=1)
print('Confusion Matrix')
plt.figure(figsize=(10,8))
cm = confusion_matrix(test_gen.classes, y_pred)
sns.heatmap(cm.T, xticklabels=labels, yticklabels=labels)
plt.xlabel('True labels')
plt.ylabel('Predicted labels')
plt.show()

In [None]:
print('Classification Report')
print(classification_report(test_gen.classes, y_pred, target_names=labels))

In [None]:
# Plot the target images and the predictions
batch = next(test_gen)
batch_pr = model.predict(batch[0])
ind=random.randint(0,19)
fig, (ax1, ax2) = plt.subplots(1,2)
fig.set_size_inches(18,5)

image = batch[0][ind]
ax1.imshow(image)
cat_label = batch[1][ind]


label_index = np.argmax(cat_label)
ax1.set_title('True label: '+labels[label_index])
ax2.barh(labels, batch_pr[ind], color=plt.get_cmap('Paired').colors, log=True)
ax2.set_title('Predicted label: '+labels[np.argmax(batch_pr[ind])])
ax2.grid(alpha=.3)
plt.show()

In [None]:
layers = [layer.output for layer in model.layers[2].layers if isinstance(layer, tf.keras.layers.Conv2D)]
activation_model = tf.keras.Model(inputs=model.layers[2].input, outputs=layers)
fmaps = activation_model.predict(tf.expand_dims(image, 0))

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
%matplotlib inline
def display_activation(fmaps, depth=0, first_n=-1):

    fmaps = fmaps[depth] 
    if first_n > 0:
        fmaps = fmaps[0, :, :, :first_n] 
        fmaps = tf.image.resize(fmaps, size=[128, 128]) 

    # Distribute on a grid for plotting
    col_size = 8
    row_size = fmaps.shape[-1] // 8
    fmap_channel=0
    fig = plt.figure(figsize=(30, 30))
    grid = ImageGrid(fig, 111,  
                    nrows_ncols=(row_size, col_size),  
                    axes_pad=0.1,  
                    )
    for row in range(0,row_size):
        for col in range(0,col_size):
            grid[fmap_channel].imshow(fmaps[0, :, :, fmap_channel], cmap='gray', aspect='auto')
            fmap_channel += 1
    plt.show()

In [None]:
display_activation(fmaps=fmaps, depth=1, first_n=-1)

### Canny Edge Detector Algorithm

In [None]:
# Script used to preprocess the images with Canny Edge Algorithm and obtain a new dataset

from scipy import ndimage
from scipy.ndimage.filters import convolve

from scipy import misc
import numpy as np
    
# reduction of the noise in the image via Guassian kernel
def gaussian_kernel(size, sigma=1): # size = kernel size, sigma = gaussian noise
    size = int(size) // 2
    x, y = np.mgrid[-size:size+1, -size:size+1]
    normal = 1 / (2.0 * np.pi * sigma**2)
    g =  np.exp(-((x**2 + y**2) / (2.0*sigma**2))) * normal
    return g

# Application of sobel filter to extract the edges
def sobel_filters(img):
    Kx = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], np.float32)
    Ky = np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], np.float32)

    Ix = ndimage.filters.convolve(img, Kx)
    Iy = ndimage.filters.convolve(img, Ky)

    G = np.hypot(Ix, Iy)
    G = G / G.max() * 255
    theta = np.arctan2(Iy, Ix)
    return (G, theta)

# To thin out the edges of the image after sobel preprocessing
def non_max_suppression(img, D):
    M, N = img.shape
    Z = np.zeros((M,N), dtype=np.int32)
    angle = D * 180. / np.pi
    angle[angle < 0] += 180


    for i in range(1,M-1):
        for j in range(1,N-1):
            try:
                q = 255
                r = 255

               #angle 0
                if (0 <= angle[i,j] < 22.5) or (157.5 <= angle[i,j] <= 180):
                    q = img[i, j+1]
                    r = img[i, j-1]
                #angle 45
                elif (22.5 <= angle[i,j] < 67.5):
                    q = img[i+1, j-1]
                    r = img[i-1, j+1]
                #angle 90
                elif (67.5 <= angle[i,j] < 112.5):
                    q = img[i+1, j]
                    r = img[i-1, j]
                #angle 135
                elif (112.5 <= angle[i,j] < 157.5):
                    q = img[i-1, j-1]
                    r = img[i+1, j+1]

                if (img[i,j] >= q) and (img[i,j] >= r):
                      Z[i,j] = img[i,j]
                else:
                    Z[i,j] = 0


            except IndexError as e:
                pass

    return Z

# To identify the weak (pixels whose intensity value is not enough to be considered as strong ones  
# but yet not small enough to be considered as non-relevant for the edge detection)
# strong (pixels that have an intensity so high that we are sure they contribute to the final edge) and non-relevant pixels
def threshold(img):

    highThreshold = img.max() * 0.15;
    lowThreshold = highThreshold * 0.05;

    M, N = img.shape
    res = np.zeros((M,N), dtype=np.int32)

    weak = np.int32(75)
    strong = np.int32(255)

    strong_i, strong_j = np.where(img >= highThreshold)
    zeros_i, zeros_j = np.where(img < lowThreshold)

    weak_i, weak_j = np.where((img <= highThreshold) & (img >= lowThreshold))

    res[strong_i, strong_j] = strong
    res[weak_i, weak_j] = weak

    return (res)

# transforming weak pixels into strong ones, if and only if at least one 
# of the pixels around the one being processed is a strong one,
def hysteresis(img):

    M, N = img.shape
    weak = 75
    strong = 255

    for i in range(1, M-1):
        for j in range(1, N-1):
            if (img[i,j] == weak):
                try:
                    if ((img[i+1, j-1] == strong) or (img[i+1, j] == strong) or (img[i+1, j+1] == strong)
                        or (img[i, j-1] == strong) or (img[i, j+1] == strong)
                        or (img[i-1, j-1] == strong) or (img[i-1, j] == strong) or (img[i-1, j+1] == strong)):
                        img[i, j] = strong
                    else:
                        img[i, j] = 0
                except IndexError as e:
                    pass

    return img

def canny_edge(image):
    image = np.dot(image[...,:3], [0.2989, 0.5870, 0.1140]) # rgb to grayscale conversion
    img_smoothed = convolve(image, gaussian_kernel(5, 1))
    gradientMat, thetaMat = sobel_filters(img_smoothed)
    nonMaxImg = non_max_suppression(gradientMat, thetaMat)
    thresholdImg = threshold(nonMaxImg)
    img_final = hysteresis(thresholdImg)
    img_final = np.stack((img_final,)*3, axis=-1) # grayscale to rgb conversion
    img_final = np.array(img_final)

    return img_final