# SYNC INTERN'S
Beamlak Tesfahun - Artificial Intelligence Intern

Task 1 - Plant Disease Detection

# Import Required Modules

In [None]:
import numpy as np
import time
import pandas as pd
import seaborn as sns
import os
import shutil
import pathlib
import itertools
import cv2
sns.set_style('darkgrid')
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras.preprocessing.image import ImageDataGenerator
#from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Activation, Dropout, BatchNormalization
from tensorflow.keras import regularizers
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
import pathlib
# Ignore Warnings
import warnings
warnings.filterwarnings("ignore")
print ('Modules have successfully been loaded')

# Define Required Functions

Function to create dataframe

In [None]:
# Function to make filepath--label dataframe
def filepath_label_data_frame(dataset_directory):
    filepaths = []
    labels = []
    docs = os.listdir(dataset_directory)
    for doc in docs:
        doc_path = os.path.join(dataset_directory, doc)
        files = os.listdir(doc_path)
        for file in files:
            file_path = os.path.join(doc_path, file)
            filepaths.append(file_path)
            labels.append(doc)
    # Concatenate data paths with labels into one dataframe
    filepath_series = pd.Series(filepaths, name= 'filepaths')
    label_series = pd.Series(labels, name='labels')
    d_frame = pd.concat([filepath_series, label_series], axis= 1) # axis=1: to display the series side by side
    return d_frame

# Function to split dataframe to train, validation, and test 
def split_dataset(dataset_directory):
    # train dataframe
    data_frame = filepath_label_data_frame(dataset_directory)
    s = data_frame['labels']                                                
    train_data_frame, dummy_data_frame = train_test_split(data_frame,  train_size= 0.8, shuffle= True, random_state= 42, stratify= s)

    # validate and test dataframe
    s = dummy_data_frame['labels']
    valid_data_frame, test_data_frame = train_test_split(dummy_data_frame,  train_size= 0.5, shuffle= True, random_state= 42, stratify= s)
    # display train, validate, and test dataframe
    # sample to pull 2 random rows
    print('Train dataframe')
    display(train_data_frame.sample(2))
    print('Validation dataframe')
    display(valid_data_frame.sample(2))
    print('Test dataframe')
    display(test_data_frame.sample(2))
    return train_data_frame, valid_data_frame, test_data_frame

Function to generate images from dataframe

In [None]:
# Function to create generators for given dataframes 
def create_img_generators(train_df, valid_df, test_df, batch_size):
    # Defining model parameters
    img_size = (224, 224)
    channels = 3      
    img_shape = (224, 224, 3)

    # Defining values for test_batch_size and test_steps
    test_df_length = len(test_df) # Fetch the length of test dataframe   
    test_batch_size = 8               
    test_steps = test_df_length // test_batch_size
    

    # This function is used in ImageDataGenerator for data augmentation, it returns image as it is
    def image(img):
        return img
    # Data augmentation
    t_gen = ImageDataGenerator(preprocessing_function= image, horizontal_flip= True)              
    v_gen = ImageDataGenerator(preprocessing_function= image)                    

    train_gen = t_gen.flow_from_dataframe(dataframe = train_df, x_col= 'filepaths', y_col= 'labels', target_size= img_size, class_mode= 'categorical',     
                                        color_mode= 'rgb', shuffle= True, batch_size= batch_size)

    valid_gen = v_gen.flow_from_dataframe(dataframe = valid_df, x_col= 'filepaths', y_col= 'labels', target_size= img_size, class_mode= 'categorical',
                                        color_mode= 'rgb', shuffle= True, batch_size= batch_size)
    # shuffle is set to false, use batch_size of 8
    test_gen = v_gen.flow_from_dataframe(dataframe = test_df, x_col= 'filepaths', y_col= 'labels', target_size= img_size, class_mode= 'categorical',
                                        color_mode= 'rgb', shuffle= False, batch_size= test_batch_size)       

    return train_gen, valid_gen, test_gen

Function to display data samples

In [None]:
# Function that takes the data generator and show sample of the images
def show_samples(generator):
    # return classes , images to be displayed
    g_dict = generator.class_indices        
    classes = list(g_dict.keys())     
    images, labels = next(generator)  

    # calculate number of displayed samples
    length = len(labels)       
    sample = min(length, 20)    

    plt.figure(figsize= (25, 20))

    for i in range(sample):
        # show image
        plt.subplot(4, 5, (i + 1))
        image = images[i] / 255 # scale pixels
        plt.imshow(image)
        # get class of image
        index = np.argmax(labels[i])  
        class_name = classes[index]   
        plt.title(class_name, color= 'Red', fontsize= 14)
        plt.subplots_adjust(hspace=0.1, wspace=0.6)       
        plt.axis('off')
    plt.show()

# Get Generators and Splitted Data

In [None]:
dataset_directory = r'C:\Users\Tesfahun Sahilu\Desktop\Coursera\plantvillage dataset\color' # Reading data
# Get splitted data
train_df, valid_df, test_df = split_dataset(dataset_directory)

# Get generators
train_gen, valid_gen, test_gen = create_img_generators(train_df, valid_df, test_df, batch_size = 32)

# Display Image Samples

In [None]:
show_samples(train_gen) # call function show_samples to visualize image samples

# Define Callbacks

In [None]:
# Callbacks: stop model training after specfic time, stop training if there is no improvement in accuracy and so on
# Defining callback parameters
batch_size = 32 # the number of samples processed before model is updated  
epochs = 10               
lr_patience = 1 # the number of epochs to wait before early stop if value does not improve
early_stop_patience = 2 # number of epochs to wait before stopping training if monitored value does not improve 
factor = 0.5 # factor to reduce lr by

# Define callbacks
# Early Stopping: stop the training as soon as the model starts overfitting
# to look for the performance of the training and validation set 
early_stop = EarlyStopping(monitor='val_loss',
                           patience=early_stop_patience,
                           verbose=1, 
                           mode='min', 
                           baseline=None,
                           restore_best_weights=True
                          )
# Reduce learning rate when metric has stopped improving
lr_reduction_on_plateau = ReduceLROnPlateau(monitor='accuracy',
                                            patience=lr_patience,
                                            factor=factor)
# list of early_stop and lr
callback_list = [early_stop, lr_reduction_on_plateau]    

# Model Creation

In [None]:
# Create Model Structure
img_shape=(224,224,3)
class_count = len(list(train_gen.class_indices.keys())) 

# efficientnetb3 from EfficientNet family is used for transfer learning
base_model = tf.keras.applications.efficientnet.EfficientNetB3(include_top= False, weights= "imagenet", input_shape= img_shape, pooling= 'max')            #transfer learning
base_model.trainable = True

# Freeze all layers except for the last 80 layers
# Upper 80 layers will be trained
for layer in base_model.layers[:-80]:
    layer.trainable = False

# stack layers
model = Sequential([
    base_model,
    layers.BatchNormalization(axis= -1, momentum= 0.99, epsilon= 0.001),
    layers.Dense(256, kernel_regularizer= keras.regularizers.l2(l= 0.01), activity_regularizer= keras.regularizers.l1(0.001),
                bias_regularizer= keras.regularizers.l1(0.001), activation= 'relu'),
    layers.Dropout(rate= 0.4, seed= 42),
    layers.Dense(class_count, activation= 'softmax')
])

# Adamax is used as an optimizer
model.compile(keras.optimizers.Adamax(learning_rate= 0.0005), loss= 'categorical_crossentropy', metrics= ['accuracy'])       

model.summary() # print summary of model

# Train Model

In [None]:
history = model.fit(x= train_gen, epochs= epochs, verbose=1, callbacks= callback_list,
                    validation_data= valid_gen, validation_steps= None, shuffle= False)

# Create Functions to Display Model Performance

Function to plot the training history

In [None]:
# plot_training_history function: receives history of the trained model and creates a plot that 
# displays the accuray and loss history of the model by highlighting the best epoch for both metrics
def plot_training_history(history_):               
    # Define required variables
    tr_acc = history_.history['accuracy']
    tr_loss = history_.history['loss']
    val_acc = history_.history['val_accuracy']
    val_loss = history_.history['val_loss']
    
    min_loss_index = np.argmin(val_loss)
    max_acc_index = np.argmax(val_acc)
    val_lowest = val_loss[min_loss_index]
    acc_highest = val_acc[max_acc_index]
    
    Epoch_numbers = [i+1 for i in range(len(tr_acc))]
    
    loss_label = f'best epoch= {str(min_loss_index + 1)}'
    acc_label = f'best epoch= {str(max_acc_index + 1)}'

    # Plot training history
    sns.set_style("whitegrid")
    plt.figure(figsize=(20, 8))

    plt.subplot(1, 2, 1)
    sns.lineplot(x=Epoch_numbers, y=tr_loss, color='g', label='Training loss')
    sns.lineplot(x=Epoch_numbers, y=val_loss, color='b', label='Validation loss')
    plt.scatter(min_loss_index + 1, val_lowest, s= 250, c= 'red', alpha=0.3, label= loss_label)
    plt.title('Training and Validation Losses')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    sns.lineplot(x = Epoch_numbers,y = tr_acc, color='r', label='Training Accuracy')
    sns.lineplot(x = Epoch_numbers, y = val_acc, color='g', label='Validation Accuracy')
    plt.scatter(max_acc_index + 1 , acc_highest, s= 250, c= 'blue', alpha=0.3, label= acc_label)
    plt.title('Training and Validation Accuracies')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Automatically adjust subplot parameters to give specified padding
    plt.tight_layout()
    plt.show()


Function to plot the confusion matrix

In [None]:
def plot_confusion_matrix(generator, y_true, y_pred):
    sns.set_style('white')
    # Get the class indices and labels
    g_dict = generator.class_indices
    classes = list(g_dict.keys())

    cm = confusion_matrix(y_true, y_pred)

    # Plot the confusion matrix
    plt.figure(figsize=(25,25))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Greens, vmin=0, vmax=1) 
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=90)
    plt.yticks(tick_marks, classes)

    # Normalize the confusion matrix
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    text_color = (0.5, 0.7, 0.5)

    # Plot the normalized confusion matrix
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, "{:.2f}".format(cm_norm[i, j]),
                 horizontalalignment="center",
                 color=text_color)                

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()


Function to plot the classification report

In [None]:
def plot_classification_report(y_true, y_pred, classes):
    report = classification_report(y_true, y_pred, target_names=classes, output_dict=True)
    report_df = report_df = pd.DataFrame(report)      
    # Plot heatmap
    plt.figure(figsize=(6, 15))
    ax = sns.heatmap(report_df.iloc[:-1, :-3].T, cmap='Blues', annot=True, fmt='.2f', cbar=False)
    ax.set_title('Classification Report')
    ax.set_xlabel('Metrics')
    ax.set_ylabel('Classes')
    plt.xticks(rotation=90, ha='right')
    plt.show()


Function for visualizing model predictions

In [None]:
# This function is suited to test_generator that has batch_size of 8
def visualize_predictions(test_generator, pred_generator):
    # Create a list of 8 next predictions using generator
    pred_values = []
    for i in range(8):                                 
        pred_values.append(next(pred_generator))
    # Return classes, images to be displayed
    g_dict = test_generator.class_indices        
    classes = list(g_dict.keys())
    images, labels = next(test_generator)
    length = 8 # Specify length
    plt.figure(figsize= (28, 12))
    for i in range(length):
        # Show image
        plt.subplot(2, 4, (i + 1))
        image = images[i] / 255 # Scale image pixels
        plt.imshow(image)
        # Get class of image
        index_true = np.argmax(labels[i])  
        class_name_true = classes[index_true] 
        class_name_pred = classes[pred_values[i]]
        plt.title(f'Actual: {class_name_true}\nPredicted: {class_name_pred}', color= 'purple', fontsize= 14)
        plt.subplots_adjust(hspace=0.3, wspace=0.1)       
        plt.axis('off')
    plt.show()
    

# Plot the Training History

In [None]:
plot_training_history(history)

# Evaluate Model

In [None]:
# specify dataframe length
test_df_length = len(test_df)
train_df_length = len(train_df)
valid_df_length = len(valid_df)

test_batch_size = 8 # set batch size to 8              
test_steps = test_df_length // test_batch_size    
train_steps = train_df_length // 32
valid_steps = valid_df_length // 32

# Evaluate scores
train_score = model.evaluate(train_gen, steps= train_steps, verbose= 1)
valid_score = model.evaluate(valid_gen, steps= valid_steps, verbose= 1)
test_score = model.evaluate(test_gen, steps= test_steps, verbose= 1)

# Print scores
print("Train Loss: ", train_score[0])
print("Train Accuracy: ", train_score[1])
print("Validation Loss: ", valid_score[0])
print("Validation Accuracy: ", valid_score[1])
print("Test Loss: ", test_score[0])
print("Test Accuracy: ", test_score[1])

# Get Predictions

In [None]:
y_true = test_gen.classes
preds = model.predict(test_gen)       
y_pred = np.argmax(preds, axis=1)

g_dict = test_gen.class_indices
classes = list(g_dict.keys())

# Plot Classification Report

In [None]:
plot_classification_report(y_true, y_pred, classes)

# Plot Confusion Matrix

In [None]:
plot_confusion_matrix(test_gen, y_true, y_pred)    

# Display Predictions

In [None]:
test_gen.reset()
pred_gen = (x for x in y_pred)

In [None]:
visualize_predictions(test_gen, pred_gen)

In [None]:
visualize_predictions(test_gen, pred_gen)

In [None]:
visualize_predictions(test_gen, pred_gen)