# Convolutional Neural Network


## Imports and paths

In [None]:
# standard packages used to handle files
import sys
import os 
import glob
import time

import tensorflow
# commonly used library for data manipilation

import pandas
# numerical
import numpy as np

# handle images - opencv
import cv2

# machine learning library
import sklearn
import sklearn.preprocessing


#used to serialize python objects to disk and load them back to memory
import pickle

# helper functions kindly provided for you by Matthias 
import helpers

#plotting
import matplotlib.pyplot as plt

import random

# tell matplotlib that we plot in a notebook
%matplotlib notebook


# filepath constants
DATA_BASE_PATH = '../individual/data/data/'
OUTPUT_PATH='./'
PREDICTION_PATH = os.path.join(OUTPUT_PATH,'predictions')


DATA_TRAIN_PATH = os.path.join(DATA_BASE_PATH,'train')
DATA_TEST_PATH = os.path.join(DATA_BASE_PATH,'test')
DATA_TEST_GROUPED_PATH = os.path.join(DATA_BASE_PATH,'test_grouped')



In [None]:
folder_paths=glob.glob(os.path.join(DATA_TRAIN_PATH,'*'))
label_strings = np.sort(np.array([os.path.basename(path) for path in folder_paths]))
num_classes = label_strings.shape[0]
print(label_strings)
print(num_classes)


In [None]:
train_paths = dict((label_string, helpers.getImgPaths(os.path.join(DATA_TRAIN_PATH,label_string))) for label_string in label_strings)
test_grouped_paths = dict((label_string, helpers.getImgPaths(os.path.join(DATA_TEST_GROUPED_PATH,label_string))) for label_string in label_strings)

test_paths = helpers.getImgPaths(DATA_TEST_PATH)

# Background removal

In [None]:
import skimage


def maskForeground(image_array):
    image_array_downsample = np.zeros((int(image_array.shape[1]/2), int(image_array.shape[0]/2)))
    image_array_downsample = cv2.resize(image_array, image_array_downsample.shape, interpolation=cv2.INTER_LINEAR) 

    mask_downsample = np.zeros(image_array_downsample.shape[:2],np.uint8)

    bgdModel = np.zeros((1,65),np.float64)
    fgdModel = np.zeros((1,65),np.float64)

    rect = (25, 25, image_array.shape[1] - 50, image_array.shape[0] - 50)

    cv2.grabCut(image_array_downsample,mask_downsample,rect,bgdModel,fgdModel,5, cv2.GC_INIT_WITH_RECT)

    mask_downsample = np.where((mask_downsample==2)|(mask_downsample==0),0,1).astype('uint8')
    mask = np.zeros_like(image_array.shape[:2])
    mask = cv2.resize(mask_downsample, image_array.shape[:2][::-1], interpolation=cv2.INTER_LINEAR) 

    thresh = np.sum(mask)/(mask.shape[0] * mask.shape[1])

    if thresh < 0.05 or thresh > 0.55:
        mask = None
    else:
        mask = mask[:,:,np.newaxis]
        mask = mask.reshape(mask.shape[0], mask.shape[1])
        
    return image_array, mask

## Image Preprocessing

In [None]:
from PIL import Image
def process_image_with_background(image_path):
        
    im = Image.open(image_path)
    im = np.array(im.resize((256,256), Image.ANTIALIAS))
    return np.array(im/255,dtype='f')


In [None]:
from PIL import Image
def process_image(image_path,remove_background=True):
        
    im = Image.open(image_path)
    im = np.array(im.resize((256,256), Image.ANTIALIAS))
    
    if not remove_background:
        return np.array(im/255,dtype='f')
        
    img, mask = maskForeground(im)
    if img is None:
        return None
    
    if mask is None:
        masked_img = img
    else:
        masked_img = img * mask[:,:,np.newaxis]

    X = np.array(masked_img/255,dtype='f')
    with_background = np.array(im/255,dtype='f')
    return (X,with_background)





In [None]:
from multiprocessing.dummy import Pool as ThreadPool

X = []
y = []
for animal in label_strings:
    paths = train_paths[animal]
    
    pool = ThreadPool(8) 
    results = pool.map(process_image, paths)
    
    for result in results:
        if result is not None: 
            X.append(result[1])
            y.append(animal)


In [None]:
X_test = []
y_test = []
for animal in label_strings:
    paths = test_grouped_paths[animal]
    
    pool = ThreadPool(8) 
    results = pool.map(process_image_with_background, paths)
    
    for result in results:
        if result is not None: 
            X_test.append(result)
            y_test.append(animal)


# Use a labelencoder to obtain numerical labels

In [None]:

y_labels = y

label_encoder = sklearn.preprocessing.LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)


## Split training and validation data

In [None]:
from sklearn.model_selection import train_test_split


X = np.array(X)
y = np.array(y)
X_train, X_test, y_train, y_test = train_test_split(X,
                                                    y,
                                                    random_state = 123456,
                                                    test_size = 0.3
                                                   )


## Convert to categorial arrays

In [None]:

from keras.utils import to_categorical


X_train = np.array(X)
y_train = np.array(y)
X_test = np.array(X_test)
y_test = np.array(y_test)

y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)

## Training

In [None]:
import keras
from keras.models import Sequential
from keras.layers import Dense, Conv2D, MaxPooling2D, Dropout, Flatten
from keras_preprocessing.image import ImageDataGenerator
from sklearn.utils import class_weight

batch_size = 64
epochs = 50


datagen=ImageDataGenerator(featurewise_center=False, samplewise_center=False, featurewise_std_normalization=False, samplewise_std_normalization=False, zca_whitening=True, rotation_range=0.10,  width_shift_range=0.2, height_shift_range=0.2, horizontal_flip = True, vertical_flip = False,fill_mode='nearest',shear_range=0.2)
train_generator=datagen.flow(X_train,y_train,batch_size=batch_size,shuffle=True)

test_datagen=ImageDataGenerator()
test_datagen.fit(X_test)
validation_generator=test_datagen.flow(X_test,y_test,batch_size=batch_size)


model = keras.Sequential()
model.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(256,256,3)))
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.25))

model.add(Conv2D(32, (3, 3), padding='same', activation='relu'))
model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.5))

model.add(Conv2D(64, (3, 3), padding='same', activation='relu'))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
model.add(Dropout(0.5))


model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(11, activation='softmax'))



model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

class_weights = class_weight.compute_class_weight('balanced',
                                                 np.unique(y),
                                                 y) 

history = model.fit_generator(
        train_generator,
        epochs=epochs,
        validation_steps=len(X_test)//batch_size,
        steps_per_epoch=len(X_train)//batch_size*3,
        validation_data=validation_generator,
        workers=8)


## Plot learning curves

In [None]:
import matplotlib.pyplot as plt

print(history.history.keys())
# summarize history for accuracy
plt.plot(history.history['acc'])
plt.plot(history.history['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

## Save model to disk

In [None]:
# serialize model to JSON
model_json = model.to_json()
with open("model_final.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
model.save_weights("model_final.h5")
print("Saved model to disk")

## Load model from disk

In [None]:
# load json and create model
from keras.models import model_from_json
import keras
json_file = open('model_final.json', 'r')
loaded_model_json = json_file.read()
json_file.close()
loaded_model = model_from_json(loaded_model_json)
# load weights into new model
loaded_model.load_weights("model_final.h5")
print("Loaded model from disk")
model = loaded_model
optimizer = keras.optimizers.RMSprop(lr=0.0008)
model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])


## Load Test images

In [None]:

from PIL import Image

X_test = []
for image in test_paths:
    im = Image.open(image)
    im = im.resize((256,256), Image.ANTIALIAS)
    X_test.append(np.array(im,dtype='f')/255)
    

## Plot confusion matrix

In [None]:
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

pred = model.predict_proba(np.array(X_test))
cm = confusion_matrix(np.argmax(y_test, axis=1),np.argmax(pred, axis=1) )

cm_nor = np.matrix((cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]).round(decimals=2))
df_cm = pd.DataFrame(cm_nor, label_strings,
                  label_strings)
plt.figure(figsize = (10,7))
sn.set(font_scale=1.4)#for label size
sn.heatmap(df_cm, annot=True,annot_kws={"size": 16}, cmap="Blues")# font size

## Predict

In [None]:
predictions = model.predict_proba(np.array(X_test))

In [None]:

# Build a submission
pred_file_path = os.path.join(PREDICTION_PATH, helpers.generateUniqueFilename('prediction','csv'))
helpers.writePredictionsToCsv(predictions,pred_file_path,label_strings)