In [None]:
import random
import warnings
import os
import shutil

import numpy as np

from sklearn.metrics import accuracy_score, f1_score, recall_score, roc_auc_score, confusion_matrix, precision_score
from sklearn.model_selection import StratifiedKFold

from PIL import Image
from PIL import ImageFile
from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model, Sequential
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Input, Conv2D, Lambda, Dense, Flatten, MaxPooling2D, Activation, BatchNormalization
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from tensorflow.keras.optimizers import Adam

import pandas as pd
from pandas import DataFrame

In [None]:
GLOBAL_SEED = 744
os.environ['PYTHONHASHSEED'] = str(GLOBAL_SEED)
random.seed(GLOBAL_SEED)
np.random.seed(GLOBAL_SEED)
tf.random.set_seed(GLOBAL_SEED)

In [None]:
img_rows, img_cols = 220, 360

src_dir = '<path_to_your_dataset>'
train_path = './train/'
validation_path = './validation/'

class_double_print = 'double'
class_good = 'good'
class_interrupted = 'interrupted'
all_classes = [class_double_print, class_good, class_interrupted]

model_file_name = 'shaver_shell_simple_conv_v2.h5'
model_path = os.path.join('./models', model_file_name)

X=[]
Y=[]

In [None]:
for i in range(len(all_classes)):
    source_files=os.listdir(os.path.join(src_dir, all_classes[i]))
    for f in source_files:
        X.append(f)
        Y.append(i)

X=np.asarray(X)
Y=np.asarray(Y)

In [None]:
input_shape = (img_rows, img_cols, 1)

def conv_net(conv_blocks = 1, filter_size = (3,3), no_filters = 16, is_init = True, is_last = True):
  convnet = Sequential()
  if is_init:
      convnet.add(tf.keras.layers.experimental.preprocessing.Rescaling(1./255, input_shape=input_shape))
      convnet.add(tf.keras.layers.experimental.preprocessing.Normalization())
  for i in range(conv_blocks):
      convnet.add(Conv2D(no_filters,filter_size,padding='same'))
      convnet.add(BatchNormalization())
      convnet.add(Activation('relu'))
      convnet.add(tf.keras.layers.Dropout(0.4))
      convnet.add(MaxPooling2D())
  if is_last:
      convnet.add(Flatten())
  return convnet

def create_model():  
  inp = Input(input_shape)

  base = conv_net()(inp)
  detailed = conv_net(conv_blocks = 1, filter_size = (1,1), is_last = True)(inp)

  concat_layer = tf.concat([base, detailed], axis = 1)
  out = keras.layers.Dense(len(all_classes), activation="softmax", 
                         kernel_regularizer=tf.keras.regularizers.l1_l2(l1=0.0001, l2=0.0001))(concat_layer)

  model = Model([inp], out)
  optimizer = Adam(0.00001)
  model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['acc'])

  return model

In [None]:
def calc_metrics(y_true, y_pred, y_proba):
    y_good = (y_true == all_classes.index('19-01 goed')).astype(int)
    y_double_print = (y_true == all_classes.index('19-01 dubbeldruk')).astype(int)
    y_interrupted = (y_true == all_classes.index('19-01 onderbroken')).astype(int)
    res = {'binary_roc_auc': roc_auc_score(y_good, y_proba[:, all_classes.index('19-01 goed')]),
            'binary_recall': recall_score(y_good, (y_pred == all_classes.index('19-01 goed')).astype(int), pos_label=0),
            'multiclass_roc_auc': roc_auc_score(y_true, y_proba, multi_class='ovr', average='weighted'),
            'binary_acc': accuracy_score(y_good, (y_pred == all_classes.index('19-01 goed')).astype(int)),
            'multiclass_acc': accuracy_score(y_true, y_pred),
            'recall_double_print': recall_score(y_double_print, (y_pred == all_classes.index('19-01 dubbeldruk')).astype(int), pos_label=1),
            'recall_interrupted': recall_score(y_interrupted, (y_pred == all_classes.index('19-01 onderbroken')).astype(int), pos_label=1),
            'prec_good': precision_score(y_good, (y_pred == all_classes.index('19-01 goed')).astype(int), pos_label=1),
           }
    print('Result: ' ,res)
    return res

class ReductionStratifiedKFold:
    def __init__(self, n_splits=3, keep=1.0, good_class=1, shuffle=True, random_state=0):
        self.n_splits = n_splits
        self.random_state = random_state
        self.good_class = good_class
        self.keep = keep

    def split(self, X, y, groups=None):
        rng = np.random.RandomState(self.random_state)
        skf = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
        
        for train, test in skf.split(X, y):
            train_y = y[train]
            train_new = [train[train_y == self.good_class]]
            
            for c in np.unique(y):
                if c != self.good_class:
                    c_inds = train[train_y == c]
                    train_new.append(
                        rng.choice(c_inds, round(len(c_inds) * self.keep), replace=False))
            yield np.sort(np.concatenate(train_new)), test
        
    def get_n_splits(self, X, y, groups=None):
        return self.n_splits

In [None]:
def get_instance_number(class_label):
    root_dir = '../input/shaver-shell-full-all-classes-v2/shaver-shell-full'
    def aux():
        path = os.path.join(root_dir, class_label)
        return len(os.listdir(path))
    return aux()

def determine_class_weights():
    weights = {}

    target_number = get_instance_number('19-01 goed')

    for cls_no, label in enumerate(all_classes):
        actual_number = get_instance_number(label)
        if actual_number < target_number:
            weights[cls_no] = int(round((target_number - actual_number) / actual_number))
        else:
            weights[cls_no] = 1
    
    return weights

cls_weights = determine_class_weights()

In [None]:
def copy_split_images(X, Y, dest):
    for eachIndex in range(len(X)):
        label=''
        for i in range(len(all_classes)):
            if(Y[eachIndex]==i):
                label=all_classes[i]
        shutil.copy(os.path.join(src_dir, label, X[eachIndex]), 
                    os.path.join(dest, label, X[eachIndex]))

In [None]:
# ===============Stratified K-Fold======================

batch_size = 4
val_split = 0.2

all_results = []
for keep in [1.0]: #1.00, 0.75, 0.5, 0.25]:
  print("Results for keep: ",keep)
  skf = ReductionStratifiedKFold(n_splits=10, keep=keep, # n_splits=10
                                 good_class=all_classes.index('19-01 goed'),
                                 random_state=GLOBAL_SEED)
  foldNum=0

  for train_index, val_index in skf.split(X, Y):
    model=create_model()
    
    #Remove old split
    if os.path.exists(validation_path):
        shutil.rmtree(validation_path)
    if os.path.exists(train_path):
        shutil.rmtree(train_path)
    
    #Recreate paths
    for label in all_classes:
        os.makedirs( train_path+label, exist_ok = True)
        os.makedirs( validation_path+label, exist_ok = True)

    foldNum+=1
    print("Results for fold: ", foldNum)
    
    X_train, X_val = X[train_index], X[val_index]
    Y_train, Y_val = Y[train_index], Y[val_index]
    
    # Copy train images of this keep and fold from full_data to the train folder
    copy_split_images(X_train, Y_train, train_path)
    
    # Copy validation images of this fold from full_data folder to the validation folder
    copy_split_images(X_val, Y_val, validation_path)
        
    # Create data loaders
    train_datagen = ImageDataGenerator(validation_split=val_split)
    train_val_datagen = ImageDataGenerator(validation_split=val_split)
    validation_datagen = ImageDataGenerator()
        
    
    train_generator = train_datagen.flow_from_directory(
        train_path,
        target_size=(img_rows, img_cols),
        batch_size=batch_size,
        class_mode='categorical',
        color_mode = "grayscale",
        seed=GLOBAL_SEED,
        subset='training')
    
    train_val_generator = train_val_datagen.flow_from_directory(
        train_path,
        target_size=(img_rows, img_cols),
        batch_size=batch_size,
        class_mode='categorical',
        color_mode = "grayscale",
        seed=GLOBAL_SEED,
        subset='validation')

    validation_generator = validation_datagen.flow_from_directory(
        validation_path,
        target_size=(img_rows, img_cols),
        batch_size=batch_size,
        class_mode=None,
        color_mode = "grayscale",
        seed=GLOBAL_SEED,
        shuffle=False)   
    
    # fit model
    mc = tf.keras.callbacks.ModelCheckpoint(model_path, monitor='val_loss', mode='min', save_best_only=True, verbose = 1)
    
    # class_weight is for loss weighting, comment in or out accordingly
    model.fit(train_generator, epochs=50, validation_data=train_val_generator, 
              shuffle=True, callbacks=[mc], verbose = 0) #, class_weight = cls_weights)  #epochs=200
    
    # Calculate results for current keep and split
    model = tf.keras.models.load_model(model_path)  # load the best checkpointed model
    predictions = model.predict_generator(validation_generator, verbose=1)
    y_pred = np.argmax(predictions, axis=1)
    true_classes = validation_generator.classes

    r = calc_metrics(true_classes, y_pred, predictions)
    r['model'] = 'custom_cnn'
    r['keep'] = keep
    r['fold'] = foldNum

    all_results.append(r)

In [None]:
all_results = DataFrame.from_records(all_results)
all_results.groupby(['model', 'keep']).mean()

In [None]:
all_results.to_csv('all_results.csv')