In [None]:
import os
import numpy as np
import pandas as pd

import keras
from keras.preprocessing import image_dataset_from_directory
from keras.preprocessing.image import ImageDataGenerator, load_img
import keras.backend as K

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import matplotlib.pyplot as plt
import cv2

In [None]:
data_dir = '../input/cassava-leaf-disease-classification/'
train_images_dir = os.path.join(data_dir, 'train_images')
test_images_dir = os.path.join(data_dir, 'test_images')
train_csv = pd.read_csv(os.path.join(data_dir, 'train.csv'), dtype=str)

train_csv, val_csv = train_test_split(train_csv, test_size=0.2, stratify=train_csv.label)

In [None]:
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=45,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest')

train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_csv,
    directory=train_images_dir,
    x_col="image_id",
    y_col="label",
    class_mode='categorical',
    target_size=(256, 256),
    batch_size=16
)

val_datagen = ImageDataGenerator(rescale=1./255)
val_generator = val_datagen.flow_from_dataframe(
    dataframe=val_csv,
    directory=train_images_dir,
    x_col="image_id", y_col="label", class_mode='categorical',
    target_size=(256, 256),
    batch_size=16
)

In [None]:
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Dense, Flatten
from keras import Input, layers, Model, optimizers

In [None]:
def inception_block(input_layer):
    conv1 = Conv2D(64, (1, 1), activation='relu', padding='same')(input_layer)
    # 3x3 conv
    conv3 = Conv2D(32, (1, 1), padding='same', activation='relu')(input_layer)
    conv3 = Conv2D(64, (3, 3), padding='same', activation='relu')(conv3)
    # 5x5 conv
    conv5 = Conv2D(32, (1, 1), padding='same', activation='relu')(input_layer)
    conv5 = Conv2D(64, (5, 5), padding='same', activation='relu')(conv5)
    # 3x3 max pooling
    pool = MaxPooling2D((3, 3), strides=(1, 1), padding='same')(input_layer)
    pool = Conv2D(32, (1, 1), padding='same', activation='relu')(pool)
    # concatenate filters, assumes filters/channels last
    output_layer = layers.merge.concatenate([conv1, conv3, conv5, pool], axis=-1)
    return output_layer

In [None]:
def residual_block(input_layer):
    conv1 = Conv2D(64, (1, 1), activation='relu', padding='same', kernel_initializer='he_normal')(input_layer)
    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal')(input_layer)
    conv3 = Conv2D(64, (3, 3), activation='relu', padding='same', kernel_initializer='he_normal')(conv2)
    add_layer = layers.add([conv3, conv1])
    output_layer = layers.Activation('relu')(add_layer)
    return output_layer

In [None]:
def custom_loss(y_true, y_pred):
    loss = -K.log(y_pred)
    loss_weights = [1, 1, 1, 3, 1]
    weighted_loss = loss * loss_weights
    weighted_loss /= sum(loss_weights)
    weighted_loss = K.sum(weighted_loss, axis=1)
    return weighted_loss

In [None]:
# model = Sequential()
# model.add(Conv2D(32, (3, 3), padding='same', input_shape=(256, 256, 3), activation='relu'))
# model.add(Conv2D(32, (3, 3), activation='relu'))
# model.add(MaxPooling2D((2,2)))
# model.add(Conv2D(32, (3, 3), activation='relu'))
# model.add(Conv2D(32, (3, 3), activation='relu'))
# model.add(MaxPooling2D((2,2)))
# model.add(Flatten())
# model.add(Dense(10, activation='relu'))
# model.add(Dense(5, activation='softmax'))

# model.compile(keras.optimizers.Adam(lr=0.001), loss='categorical_crossentropy', metrics=["accuracy"])
# history = model.fit_generator(train_generator, steps_per_epoch=100, epochs=100)

In [None]:
dropout_rate=0.3
input = Input(shape=(256,256,3))
# batch_norm = layers.BatchNormalization()(input)
inception1 = inception_block(input)
o1 = Dropout(dropout_rate)(inception1)
residual1 = residual_block(o1)
o2 = Dropout(dropout_rate)(residual1)
# inception2 = inception_block(o2)
flatten = Flatten()(o2)
dense1 = Dense(64, activation='relu')(flatten)
output = Dense(5, activation='softmax')(dense1)
model2 = Model(input, output)
model2.summary()

In [None]:
model2.compile(loss=custom_loss,
                   optimizer=optimizers.RMSprop(lr=1e-8),
                   metrics=['accuracy'])

STEPS_PER_EPOCH = train_generator.n//train_generator.batch_size
# STEPS_PER_EPOCH = 200
epochs=10
history = model2.fit_generator(train_generator, steps_per_epoch=STEPS_PER_EPOCH, epochs=epochs, 
                               validation_data=val_generator, validation_steps=50, shuffle=True, use_multiprocessing=True)
# history = model2.fit(train_generator, steps_per_epoch=STEPS_PER_EPOCH, epochs=epochs, validation_data=val_generator, validation_steps=50)

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    import itertools
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()


In [None]:
pred_labels = model2.predict(val_generator)
pred_labels = np.argmax(pred_labels, axis=1)

In [None]:
cm = confusion_matrix(np.asarray(val_csv.label, dtype='int'), pred_labels)
classes = [0, 1, 2, 3, 4]
plot_confusion_matrix(cm, classes)

In [None]:
img = load_img(os.path.join(train_images_dir, train_csv.iloc[0][0]))
img = np.asarray(img)

In [None]:
# cv2.extractChannel(img)
# r, g, b = cv2.split(np.asarray(img))

In [None]:
# plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
plt.imshow(cv2.extractChannel(img, 2))
plt.show()