In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sea
import os
from tqdm.notebook import tqdm
import tensorflow as tf
from tensorflow import keras
from keras.layers import Dense, Dropout, Flatten, Input, Conv2D, MaxPool2D
from keras.models import Model
import cv2 as op
from keras.applications.mobilenet import preprocess_input
from tensorflow.keras.utils import to_categorical

plt.style.use('seaborn')
np.__version__

In [None]:
!pip install albumentations

In [None]:
PATH = '/kaggle/input/bean-disease-dataset/Bean_Dataset/'
os.listdir(PATH)

In [None]:
labels = {
    'bean_rust' : 0,
    'healthy' : 1,
    'angular_leaf_spot' : 2
}

label_index, filepath = [], []

for label in os.listdir(PATH):
    for file in tqdm(os.listdir(PATH + label)):
        label_index.append(labels[label])
        filepath.append(PATH + label + "/" +file)

print(len(label_index), len(filepath))

In [None]:
df = pd.DataFrame({
    'filepath' : filepath,
    'label_index' : label_index
})

df = df.sample(frac = 1).reset_index().drop('index', axis = 1)
df

In [None]:
sea.countplot(x = 'label_index', data = df)
df['label_index'].value_counts()

In [None]:
img = plt.imread(df.iloc[400,0])
img.shape

In [None]:
from sklearn.model_selection import train_test_split
df_train, df_val = train_test_split(df, test_size = 0.2)
df_val, df_test = train_test_split(df_val, test_size = 0.3)
df_train.shape, df_val.shape, df_test.shape

In [None]:
import albumentations as A
transform = A.Compose([
    A.HorizontalFlip(p = 0.5),
    A.Blur(blur_limit = 3, p = 0.5), 
])

In [None]:
IMG_SHAPE = (224, 224, 3)
NUM_CLASSES = len(labels)
BATCH_SIZE = 64

def map_function(img, label, training):
    img = plt.imread(img.decode())[:, :, :3]
    img = op.resize(img, (224, 224))
    if training:
        img = transform(image = img)['image']
    img = preprocess_input(img)
    label = to_categorical(label, num_classes = NUM_CLASSES)
    return img, label

def create_dataset(df, training = False):
    dataset = tf.data.Dataset.from_tensor_slices((df['filepath'], df['label_index']))
    dataset = dataset.shuffle(1000)
    dataset = dataset.map(lambda img, label : tf.numpy_function(
                    map_function, [img, label, training], [tf.float32, tf.float32]),
                    num_parallel_calls = tf.data.experimental.AUTOTUNE).batch(BATCH_SIZE)
    dataset = dataset.prefetch(buffer_size = tf.data.experimental.AUTOTUNE)
    return dataset

In [None]:
train_dataset = create_dataset(df_train, True)
val_dataset = create_dataset(df_val)
test_dataset = create_dataset(df_test)

In [None]:
a, b = next(iter(train_dataset))
a, b = next(iter(val_dataset))

del a, b

In [None]:
base_model = keras.applications.MobileNetV2(weights = 'imagenet', input_shape = IMG_SHAPE, include_top = False)
# base_model.summary()
base_model.trainable = True

n = int(0.70 * len(base_model.layers))
for i in range(n):
    base_model.layers[i].trainable = False
    
for (i, layer) in enumerate(base_model.layers):
    print(i, layer.trainable)

In [None]:
def make_model():
    inp = Input(shape = IMG_SHAPE)
    x = base_model(inp)
    x = Dropout(0.2)(x)
    x = Conv2D(256, (3,3), activation = 'relu')(x)
    x = MaxPool2D(2)(x)
    x = Dropout(0.2)(x)
    x = Flatten()(x)
    x = Dense(256, activation = 'relu')(x)
    x = Dropout(0.2)(x)
    out = Dense(3, activation = 'softmax')(x)
    
    model = Model(inputs = inp, outputs = out)
    return model

In [None]:
model = make_model()
model.summary()

In [None]:

lossfxn = keras.losses.CategoricalCrossentropy()
optimizer = keras.optimizers.Adam(learning_rate = 1e-4)

train_acc_metric = keras.metrics.CategoricalAccuracy()
val_acc_metric = keras.metrics.CategoricalAccuracy()

In [None]:
@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        y_pred = model(x, training = True)
        loss = lossfxn(y_pred, y)
        
    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    
    train_acc_metric.update_state(y, y_pred)
    acc = train_acc_metric.result()
    
    return loss, acc

@tf.function
def val_step(x, y):
    y_pred = model(x, training = False)
    loss = lossfxn(y_pred, y)
    
    val_acc_metric.update_state(y, y_pred)
    acc = train_acc_metric.result()
    
    return loss, acc

def append_metrics(train_loss, train_acc, val_loss, val_acc):
    TRAIN_LOSS.append(train_loss)
    TRAIN_ACC.append(train_acc)
    VAL_LOSS.append(val_loss)
    VAL_ACC.append(val_acc)
    

In [None]:
EPOCHS = 20
NUM_BATCHES = len(train_dataset)
TEST_BATCHES = len(val_dataset)
TRAIN_LOSS, TRAIN_ACC, VAL_LOSS, VAL_ACC = [], [], [], []

for epoch in range(EPOCHS):
    train_loss, test_loss, train_acc, test_acc = 0, 0, 0, 0
    
    for x, y in tqdm(train_dataset):
        batch_loss, batch_acc = train_step(x, y)
        train_loss += batch_loss
        train_acc += batch_acc
        
    for x, y in val_dataset:
        batch_loss, batch_acc = val_step(x, y)
        test_loss += batch_loss
        test_acc += batch_acc
        
    train_loss = train_loss/NUM_BATCHES
    val_loss = test_loss/TEST_BATCHES
    
    train_acc = train_acc/NUM_BATCHES
    val_acc = test_acc/TEST_BATCHES
    
    train_acc_metric.reset_states()
    val_acc_metric.reset_states()
    
    append_metrics(train_loss, train_acc, val_loss, val_acc)

    print("Epoch: {} Training: [Loss:{:.3f} Acc:{:.3f}] Validation: [Loss:{:.3f} Acc:{:.3f}]".format(
            epoch, train_loss, train_acc, val_loss, val_acc))
    

In [None]:
plt.figure(figsize = (16, 5))

plt.subplot(1,2,1)
plt.title('Loss')
plt.plot(TRAIN_LOSS, marker = 'o', label = 'Training')
plt.plot(VAL_LOSS, '--r',label = 'Validation')
plt.legend()

plt.subplot(1,2,2)
plt.title('Accuracy')
plt.plot(TRAIN_ACC, marker = 'o', label = 'Training')
plt.plot(VAL_ACC, '--r', label = 'Validation')
plt.legend()

In [None]:
test_pred, true = [], []

for x, y in test_dataset:
    y_pred = np.argmax(model(x, training = False), axis = 1)
    test_pred.extend(y_pred)
    true.extend(np.argmax(y, axis = 1))
    
len(test_pred), len(true)

In [None]:
from sklearn.metrics import confusion_matrix
cf_matrix = confusion_matrix(true, test_pred)
sea.heatmap(cf_matrix, annot=True, cmap = 'Blues')

In [None]:
val_acc_metric.reset_states()
val_acc_metric.update_state(true, test_pred)
acc = val_acc_metric.result().numpy()

print("Accuracy:", acc)

In [None]:
from sklearn.metrics import precision_recall_fscore_support as prf

prf(true, test_pred)

In [None]:
d_names = {
    0: 'bean_rust',
    1: 'healthy',
    2: 'angular_leaf_spot'
}

samples = df_test.sample(n = 20)

plt.figure(figsize = (20, 18))

for i in range(20):
    img = plt.imread(samples.iloc[i, 0])[:, :, :3]    
    img = op.resize(img, (224, 224))
    img2 = preprocess_input(img)
    img2 = np.resize(img2, (1, 224, 224, 3))
    pred = d_names[np.argmax(model.predict(img2), axis = 1)[0]]
    l = d_names[samples.iloc[i,1]]
    
    plt.subplot(4,5,i + 1)
    plt.axis('off')
    plt.imshow(img)
    plt.title('Actual:{}\nPredicted:{}\n{}'.format(l, pred,  "Correct" if l == pred else "Incorrect"))
