Aanvraag / besluit classifier

In [None]:
import numpy as np
import math
from PIL import Image
from scipy import misc
import keras.backend as K
import matplotlib.pyplot as plt
from sklearn import preprocessing
from sklearn.preprocessing import OneHotEncoder
from sklearn.calibration import calibration_curve

%matplotlib inline

import keras
from keras.datasets import mnist
from keras.models import Sequential, Model
from keras.layers import Input, Dense, Dropout, Flatten, Conv2D, MaxPooling2D
from keras import layers
from keras.initializers import RandomUniform
from keras.layers import LeakyReLU
from keras.preprocessing.image import ImageDataGenerator
 

from stats import list_stats, show_train_curves, show_prediction_list, show_prediction_images_new
# from stats import show_train_curves
from data import split_data
from examples.aanvraag_besluit.load_data import load_data_aanvraag, preprocess_X
from examples.aanvraag_besluit.transformer import Transformer
from image_display import show_image

# Hot reload packages
%load_ext autoreload
%autoreload 2


# Load dataset

In [None]:
img_dim = (200, 200, 3);
img_dim = (250, 250, 3);
# img_dim = (300, 300, 3);
# img_dim = (400, 400, 3);

[Xtrain_raw, Ytrain_raw, Xvalid_raw, Yvalid_raw] = load_data_aanvraag(
    {
        'images': f'examples/aanvraag_besluit/eerste_dataset/resized/{img_dim[0]}x{img_dim[1]}/',
        'labels': 'examples/aanvraag_besluit/eerste_dataset/labels/'
    },
    {
        'images': f'examples/aanvraag_besluit/tweede_dataset/images/{img_dim[0]}x{img_dim[1]}/',
        'labels': 'examples/aanvraag_besluit/tweede_dataset/labels/'
    },
)

print(f"shape Xtrain[0]: {Xtrain_raw[0].shape}")
print(f"shape Xtrain[1]: {Xtrain_raw[1].shape}")
print(f"shape Ytrain: {Ytrain_raw.shape}")

print(f"shape Xvalid[0]: {Xvalid_raw[0].shape}")
print(f"shape Xvalid[1]: {Xvalid_raw[1].shape}")
print(f"shape Yvalid: {Yvalid_raw.shape}")

In [None]:
# Preprocess (encode, transform features and labels)
transformer = Transformer()

Xdata_mix = Xtrain_raw[1].append(Xvalid_raw[1])
transformer.fit(Xdata_mix)

Xtrain = preprocess_X(Xtrain_raw[0], Xtrain_raw[1], transformer)
Xvalid = preprocess_X(Xvalid_raw[0], Xvalid_raw[1], transformer)
# print(Xtrain[1][:4])
# print(transformer.decode(Xtrain[1][:4]))
print(Xvalid[1][:4])
print(transformer.decode(Xvalid[1][:4]))

num_features = Xtrain[1].shape[1]
print(Xvalid[1].shape)
assert Xvalid[1].shape[1] == num_features

In [None]:
classes = list(set(Ytrain_raw))
print(classes)
num_classes = 2
assert len(classes) == num_classes


print('')
print('--- TRAIN ---')
list_stats(Ytrain_raw)

print('')
print('--- VALID ---')
list_stats(Yvalid_raw)

In [None]:
# enc = preprocessing.LabelEncoder()  # outputs 1d array, binary classification
# Ytrain = enc.transform(Ytrain_raw)
# Yvalid = enc.transform(Yvalid_raw)

enc = preprocessing.OneHotEncoder()  # outputs 2d array, multi class classification
assert Ytrain_raw.ndim == 1
enc.fit(Ytrain_raw.reshape(-1, 1))

print(Ytrain_raw.shape)
print(Ytrain_raw[:10])

Ytrain = enc.transform(Ytrain_raw.reshape(-1, 1)).toarray()
Yvalid = enc.transform(Yvalid_raw.reshape(-1, 1)).toarray()
print('Ytrain: ', Ytrain.shape)
print('Yvalid: ', Yvalid.shape)
print('Ytrain: ', Ytrain[:10])

# Define model

In [None]:
lr_alpha=0.1
drop_chance=0.1

def create_mlp(num_features):
    model = Sequential()
    model.add(Dense(8, input_dim=num_features, name="input-mlp"))
#     model.add(layers.BatchNormalization())
    model.add(LeakyReLU(alpha=lr_alpha))
    model.add(Dropout(drop_chance))
    
    model.add(Dense(8))
#     model.add(layers.BatchNormalization())
    model.add(LeakyReLU(alpha=lr_alpha))
    model.add(Dropout(drop_chance))
    
#     model.add(Dense(2, activation='softmax'))
    
    return model

def create_cnn(img_dim):
    inputs = Input(shape=img_dim, name="input-cnn")

    x = Conv2D(16, (3, 3), use_bias=False)(inputs)
#     x = layers.BatchNormalization()(x)
    x = LeakyReLU(alpha=lr_alpha)(x)

    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(32, (3, 3), use_bias=False)(x)
#     x = layers.BatchNormalization()(x)
    x = LeakyReLU(alpha=lr_alpha)(x)

    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(32, (3, 3), use_bias=False)(x)
#     x = layers.BatchNormalization()(x)
    x = LeakyReLU(alpha=lr_alpha)(x)

    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Conv2D(32, (3, 3), use_bias=False)(x)
#     x = layers.BatchNormalization()(x)
    x = LeakyReLU(alpha=lr_alpha)(x)

    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Flatten()(x)

    x = Dense(16, kernel_initializer = RandomUniform())(x)
#     x = layers.BatchNormalization()(x)
    x = LeakyReLU(alpha=lr_alpha)(x)
    x = Dropout(drop_chance)(x)
    
#     x = Dense(2, activation="softmax", name='output')(x)
    
    model = Model(inputs, x)
    return model


def build_multi_feature(num_classes, img_dim, num_features):
    # Multi feature system based on https://www.pyimagesearch.com/2019/02/04/keras-multiple-inputs-and-mixed-data/
    print('img_dim: ', img_dim)
    print('num_features: ', num_features)
    
    # create the MLP and CNN models
    mlp = create_mlp(num_features)
    cnn = create_cnn(img_dim)

    # Merge the two branches
    combinedInput = layers.concatenate([mlp.output, cnn.output])

    # Final logic
    x = Dense(8, activation="relu")(combinedInput)
    x = Dropout(drop_chance)(x)
    
    x = Dense(num_classes, activation="softmax", name='output')(x)

    # our final model will accept categorical/numerical data on the MLP
    # input and images on the CNN input
    # Output: softmax
    model = Model(inputs=[cnn.input, mlp.input], outputs=x)
    return model


model = build_multi_feature(num_classes, img_dim, num_features)
# model = create_cnn(img_dim)
# model = create_mlpB(num_features)

model.summary()

In [None]:
# Xtrain = [0, Ytrain]
# Xvalid = [0, Yvalid]

# Train

In [None]:
batch_size = 100
epochs = 100

datagen = ImageDataGenerator(
        zoom_range=0.1,        # randomly zoom into images
        rotation_range=5,      # randomly rotate images in the range (degrees, 0 to 180)
        width_shift_range=0.1, # randomly shift images horizontally (fraction of total width)
        height_shift_range=0.1,# randomly shift images vertically (fraction of total height)
        horizontal_flip=True,  # randomly flip images
        vertical_flip=False    # randomly flip images
)
 

def is_binary(model):
    n_classes = model.get_layer('output').output_shape[1]
    return n_classes == 1
    
def compile_model(model):
    assert(K.image_data_format() == 'channels_last')
    
#     if is_binary(model):
#         loss= keras.losses.binary_crossentropy
#     else:
    loss=keras.losses.categorical_crossentropy
    
    model.compile(
        loss=loss,
#         optimizer=keras.optimizers.Adadelta(),
#         optimizer='rmsprop',
        optimizer=keras.optimizers.Adam(),        
        metrics=['accuracy']
    )

def train(model, X_train, Y_train, X_test, Y_test, batch_size, epochs):
    compile_model(model)
    
    history = model.fit(X_train, Y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=1,
              validation_data=(X_test, Y_test)
           )
    return history

def train_gen(model, X_train, Y_train, X_test, Y_test, batch_size, epochs):
    compile_model(model)
    
    history = model.fit_generator(
        datagen.flow(X_train,
                     Y_train,
                     batch_size=batch_size
        ),
        steps_per_epoch=int(np.ceil(X_train.shape[0] / float(batch_size))),
        epochs=epochs,
        validation_data=(X_test, Y_test),
        workers=4
    )
    return history
history = train(model, Xtrain, Ytrain, Xvalid, Yvalid, batch_size, epochs)
# history = train(model, Xtrain[1], Ytrain, Xvalid[1], Yvalid, batch_size, epochs)
# history = train_gen(model, Xtrain[0], Ytrain, Xvalid[0], Yvalid, batch_size, epochs)

show_train_curves(history)

In [None]:
# Y_train = np.argmax(Y_train, axis=1)
# Y_test = np.argmax(Y_test, axis=1)

In [None]:
train_score = model.evaluate(Xtrain, Ytrain, verbose=1)
print('Train loss:', round(train_score[0], 3))
print(f'Train accuracy: {round(train_score[1] * 100, 2)}%')

valid_score = model.evaluate(Xvalid, Yvalid, verbose=1)
print('Test loss:', round(valid_score[0], 3))
valid_acc_str = f'{round(valid_score[1] * 100, 2)}%'
print(f'Test accuracy: {valid_acc_str}')


In [None]:
print(f"types: {classes}")

print("train predictions, truth")
predictions_train =  model.predict(Xtrain, verbose=1)
show_prediction_list(predictions_train, Ytrain_oh)

print("test predictions, truth")
predictions_valid = model.predict(Xvalid, verbose=1)
show_prediction_list(predictions_valid, Yvalid_oh)

In [None]:
idx = 11
print(ids_test[idx])
show_image(Xvalid, idx)

In [None]:
# Y_train_idx = np.argmax(Y_train, axis=1)        
# Y_test_idx = np.argmax(Y_test, axis=1)

# print("train set:")
# show_prediction_images_new(Xtrain, Ytrain_oh, predictions_train, Ytrain_meta, enc, 10)

print("test set:")
show_prediction_images_new(Xvalid, Yvalid_oh, predictions_valid, Yvalid_meta, enc, 50)

In [None]:
# def multi_class_to_binary(class_true: np.ndarray, class_pred: np.ndarray):
#     # Converting to probablilty that Y_true == 1
#     assert class_true.shape[1] == 2  # 2 classes
#     assert class_pred.shape[1] == 2  # 2 classes
#     assert class_true.shape[0] == class_pred.shape[0]
    
#     y_true = np.argmax(class_true, axis=1)
    
# #     pred_ids = np.argmax(class_pred, axis=1)
# #     y_prob = class_pred[range(class_pred.shape[0]), pred_ids]
#     y_prob = class_pred[:, 1]
#     assert y_true.shape == y_prob.shape
#     return [y_true, y_prob]
    
# [y_true, y_prob] = multi_class_to_binary(Y_test, predictions_test)


In [None]:
print(Yvalid[0:20])
print(np.round(predictions_valid[0:20], 3))
predictions_valid.shape
# print(np.round(y_prob[0:10], 3))

In [None]:
# class_pred = predictions_test
# pred_ids = np.argmax(class_pred, axis=1)
# y_prob = class_pred[range(class_pred.shape[0]), pred_ids]
y_prob = predictions_valid

In [None]:
# reference https://scikit-learn.org/stable/auto_examples/calibration/plot_calibration_curve.html#sphx-glr-auto-examples-calibration-plot-calibration-curve-py

n_bins = 5

print(f'accuracy: {test_acc_str}')
conf_avg = np.average(predictions_test)
conf_avg_str = f'{round(conf_avg * 100, 2)}%'
print(f'confidence avg: {conf_avg_str}')

def draw_confidence_histogram(y_prob, n_bins):
    plt.figure()
    plt.title('Confidence histogram')
    plt.xlabel("Confidence")
    plt.ylabel("Sample count")
    plt.hist(y_prob, bins=n_bins)    
draw_confidence_histogram(y_prob, n_bins=n_bins)

def draw_reliability_curve(y_true, y_prob, n_bins):
    plt.figure()
    plt.title('Reliability curve')
    plt.xlabel("Confidence")
    plt.ylabel("Accuracy")
    [prob_true_bins, prob_pred_bins] = calibration_curve(y_true, y_prob, n_bins=n_bins)

    plt.plot([0, 1], [0, 1], "k:", label="Perfectly calibrated")
    plt.plot(prob_pred_bins, prob_true_bins, marker='s')
draw_reliability_curve(y_true, y_prob, n_bins)
