In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import seaborn as sns
import pathlib
import random
import os
import splitfolders
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.optimizers import SGD, RMSprop, Adam, Adagrad, Adadelta
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score, roc_auc_score, confusion_matrix
from sklearn.metrics import classification_report
from keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization, Conv2D, MaxPool2D, MaxPooling2D
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from tensorflow.keras import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D

#Suppressing Warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

In [None]:
data_dir = "../input/pistachio-image-dataset/Pistachio_Image_Dataset/Pistachio_Image_Dataset/"

In [None]:
splitfolders.ratio(data_dir, output="output", seed=101, ratio=(.8, .1, .1))

Copying files: 2148 files [00:10, 199.26 files/s]


In [None]:
train_path = './output/train'
test_path = './output/test'
val_path = './output/val'

Notice the path I've passed for images.
Here I am displaying the size of the original images to see if resizing is required or not.

In [None]:
img = mpimg.imread('./output/train/Kirmizi_Pistachio/kirmizi 62.jpg')
img1 = mpimg.imread('./output/train/Siirt_Pistachio/siirt 603.jpg')

In [None]:
img.shape, img1.shape

((600, 600, 3), (600, 600, 3))

In [None]:
img_size = 512
batch = 16

In [None]:
labels = []
for i in os.listdir(train_path):
    labels+=[i]

In [None]:
labels

['Siirt_Pistachio', 'Kirmizi_Pistachio']

In [None]:
def load_random_imgs_from_folder(folder,label):
  plt.figure(figsize=(15,15))
  for i in range(3):
    file = random.choice(os.listdir(folder))
    image_path = os.path.join(folder, file)
    img=mpimg.imread(image_path)
    ax=plt.subplot(1,3,i+1)
    ax.title.set_text(label)
    plt.xlabel(f'Name: {file}')
    plt.imshow(img)

In [None]:
for label in labels:
    load_random_imgs_from_folder(f"{data_dir}/{label}",label)

In [None]:
train_datagen = ImageDataGenerator(rescale = 1.0/255.0,
                                   rotation_range = 0.5,
                                   width_shift_range = 0.2,
                                   height_shift_range = 0.2,
                                   shear_range = 0.2,
                                   zoom_range = 0.1,
                                   horizontal_flip = True,
                                   fill_mode = 'nearest'
                                  )

test_val_datagen = ImageDataGenerator(rescale = 1.0/255.0)

In [None]:
train_generator = train_datagen.flow_from_directory(directory = train_path,
                                                    batch_size = batch,
                                                    class_mode = "categorical",
                                                    target_size = (img_size,img_size)
                                                    )

val_generator = test_val_datagen.flow_from_directory(directory = val_path,
                                                    batch_size = batch,
                                                    class_mode = "categorical",
                                                    target_size = (img_size,img_size)
                                                    )

test_generator = test_val_datagen.flow_from_directory(directory = test_path,
                                                    batch_size = batch,
                                                    class_mode = "categorical",
                                                    target_size = (img_size,img_size)
                                                    )

In [None]:
data_train = image_dataset_from_directory(
    data_dir,
    label_mode='categorical',
    validation_split=0.2,
    subset="training",
    seed=0,
    color_mode="rgb",
    image_size=(img_size,img_size),
    batch_size=32,
)
data_test = image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    label_mode='categorical',
    seed=0,
    color_mode="rgb",
    image_size=(img_size,img_size),
    batch_size=32,
)

In [None]:
# # small train dataset with BATCH_SIZE*SMALL_DATASET_BATCHES images
# train_data_small = data_train.take(2)

In [None]:
from keras.applications.efficientnet import EfficientNetB6

In [None]:
base_model = VGG16(weights='imagenet', include_top=False,
                            input_shape=(img_size, img_size,3))

# freeze extraction layers
base_model.trainable = False

# add custom top layers
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dropout(0.2)(x)
x = Dense(4096,activation="relu")(x)
x = Dropout(0.2)(x)
x = Dense(4096,activation="relu")(x)
x = Dropout(0.2)(x)
x = Dense(2096,activation="relu")(x)
x = Dropout(0.2)(x)
x = Dense(2096,activation="relu")(x)
x = Dropout(0.4)(x)
predictions = Dense(2, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# confirm unfrozen layers
for layer in model.layers:
    if layer.trainable==True:
        print(layer)

In [None]:
# model.summary()

In [None]:
from tensorflow.keras.utils import plot_model
from IPython.display import Image
plot_model(model, to_file='convnet.png', show_shapes=True,show_layer_names=True)
Image(filename='convnet.png')

### Callback to continually monitor the validation loss and stop the training of model when it overfits

In [None]:
callbacks = [EarlyStopping(monitor='val_loss', patience=5, verbose=1),
                ModelCheckpoint('model.hdf5',
                                 save_best_only=True)]

In [None]:
opt = Adam(learning_rate=0.0001)
model.compile(
  loss='categorical_crossentropy',
  optimizer=opt,
  metrics=['accuracy']
)

### Reason for 50 epochs is that if the model begins to overfit or the validation loss does not decrease much then callback will automatically stop the training

In [None]:
history=model.fit(data_train,
                  epochs=50,
                  validation_data=data_test,
                  validation_steps=int(0.1 * len(data_test)),
                  verbose=1,
                  callbacks=callbacks)

In [None]:
# history = model.fit(train_generator,
#                     epochs = 50,
#                     validation_data = val_generator,
#                     verbose = 1,
#                    callbacks=callbacks)

In [None]:
from tensorflow.keras.models import load_model

### Loading Best Model Saved during Callback

In [None]:
model = load_model('./model.hdf5')

In [None]:
def learning_curve(model_fit, key='accuracy', ylim=(0, 1.01)):
    plt.figure(figsize=(12,6))
    plt.plot(model_fit.history[key])
    plt.plot(model_fit.history['val_' + key])
    plt.title('Learning Curve')
    plt.ylabel(key.title())
    plt.xlabel('Epoch')
    plt.ylim(ylim)
    plt.legend(['train', 'val'], loc='best')
    plt.show()

In [None]:
learning_curve(history,'loss', ylim=(0,1))
learning_curve(history, 'accuracy', ylim=(0,1))

In [None]:
test_loss, test_acc = model.evaluate(data_test, steps=len(data_test), verbose=1)
print('Loss: %.3f' % (test_loss))
print('Accuracy: %.3f' % (test_acc * 100.0))

Loss: 0.070
Accuracy: 98.135


In [None]:
predictions = np.array([])
labels =  np.array([])
for x, y in data_test:
    predictions = np.concatenate([predictions, np.argmax(model.predict(x),axis=1)])
    labels = np.concatenate([labels, np.argmax(y.numpy(), axis=-1)])

In [None]:
classes = []
for i in os.listdir(train_path):
    classes+=[i]

classes

In [None]:
def fbeta(y_true, y_pred, threshold_shift=0):
    beta_squared = 4

    y_pred = K.clip(y_pred, 0, 1)

    y_pred_bin = K.round(y_pred + threshold_shift)

    tp = K.sum(K.round(y_true * y_pred_bin)) + K.epsilon()
    fp = K.sum(K.round(K.clip(y_pred_bin - y_true, 0, 1)))
    fn = K.sum(K.round(K.clip(y_true - y_pred, 0, 1)))

    precision = tp / (tp + fp)
    recall = tp / (tp + fn)

    return (beta_squared + 1) * (precision * recall) / (beta_squared * precision + recall + K.epsilon())

In [None]:
def confusion_matrix_plot(matrix):
    plt.figure(figsize=(12,10))
    cmap = "YlGnBu"
    ax= plt.subplot()
    sns.heatmap(matrix, annot=True, fmt='g', ax=ax, cmap=cmap);  #annot=True to annotate cells, ftm='g' to disable scientific notation
    plt.savefig('/kaggle/working/img1.png')
    # labels, title and ticks
    ax.set_xlabel('Predicted labels');
    ax.set_ylabel('True labels');
    ax.set_title('Confusion Matrix');
    ax.xaxis.set_ticklabels(classes);
    ax.yaxis.set_ticklabels(classes[::-1]);
    plt.show()

In [None]:
def cal_score(model, key):
    matrix = confusion_matrix(predictions, labels)
    print(matrix)
    print('\n')

    f1 = f1_score(predictions, labels, average='weighted')
    print(f'F1 Score: {f1}')
    print('\n')

    print(classification_report(predictions, labels, target_names=classes))

    if key==1:
        confusion_matrix_plot(matrix)

In [None]:
cal_score(model, 1)

In [None]:
# test_loss, test_acc = model.evaluate(test_generator, steps=len(test_generator), verbose=1)
# print('Loss: %.3f' % (test_loss * 100.0))
# print('Accuracy: %.3f' % (test_acc * 100.0))