In [1]:
import tensorflow as tf
import keras
from keras import Input, Model
from keras.layers import UpSampling2D, add
from keras.layers import Input, BatchNormalization, Activation, Dense, Dropout, LeakyReLU, GlobalAveragePooling2D
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.optimizers import SGD, rmsprop
from keras.activations import relu, softmax, sigmoid
from keras.losses import binary_crossentropy
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.utils import np_utils
from keras.utils import to_categorical
from keras.datasets import cifar10
from keras.applications.resnet50 import ResNet50
from keras import regularizers
from keras.callbacks import LearningRateScheduler, ModelCheckpoint
import matplotlib.pyplot as plt
import matplotlib
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # Disable warning message of tensorflow
import numpy as np
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
#from sklearn.metrics import classification_report,confusion_matrix
import csv
import cv2

Using TensorFlow backend.


In [2]:
# Load X data
path1 = './train_64/'
#X_data = os.listdir(path1)
#X_data = sorted(X_data)
X_data = []
Y_data = []
with open('annot_train.csv', 'r') as csvFile:
    reader = csv.reader(csvFile)
    for row in reader:
        X_data.append(row[0])
        Y_data.append(row[2])
csvFile.close()

X_data = X_data[1:len(X_data)]

X = np.array([np.array(cv2.imread(path1 + str(img),0)).flatten() for img in X_data],'f') 
X = np.array(X)

Y_data = Y_data[1:len(Y_data)]
Y = np.array(Y_data)

In [3]:
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, random_state = 4)
X_train = X_train.reshape(X_train.shape[0], 64, 64, 1)
X_val = X_val.reshape(X_val.shape[0], 64, 64, 1)

# Assigning X_train and X_val as float
X_train = X_train.astype('float32') 
X_val = X_val.astype('float32')

# Normalization of data 
# Data pixels are between 0 and 1
X_train /= 255
X_val /= 255

Y_train = Y_train.astype('int64')
Y_val = Y_val.astype('int64')
# Y_train = np_utils.to_categorical(Y_train, 2)
# Y_val = np_utils.to_categorical(Y_val, 2)

In [4]:
def block(n_output, upscale=False):
    # n_output: number of feature maps in the block
    # upscale: should we use the 1x1 conv2d mapping for shortcut or not
    
    # keras functional api: return the function of type
    # Tensor -> Tensor
    def f(x):
        
        # H_l(x):
        # first pre-activation
        h = BatchNormalization()(x)
        h = Activation(sigmoid)(h)
        # first convolution
        h = Conv2D(kernel_size=3, filters=n_output, strides=1, padding='same', kernel_regularizer=regularizers.l2(0.01))(h)
        
        # second pre-activation
        h = BatchNormalization()(x)
        h = Activation(sigmoid)(h)
        # second convolution
        h = Conv2D(kernel_size=3, filters=n_output, strides=1, padding='same', kernel_regularizer=regularizers.l2(0.01))(h)
        
        # f(x):
        if upscale:
            # 1x1 conv2d
            f = Conv2D(kernel_size=1, filters=n_output, strides=1, padding='same')(x)
        else:
            # identity
            f = x
        
        # F_l(x) = f(x) + H_l(x):
        return add([f, h])
    
    return f

In [5]:
input_tensor = Input((64, 64, 1))

# first conv2d with post-activation to transform the input data to some reasonable form
x = Conv2D(kernel_size=3, filters=8, strides=1, padding='same', kernel_regularizer=regularizers.l2(0.01))(input_tensor)
x = BatchNormalization()(x)
x = Activation(sigmoid)(x)

# F_1
x = block(8)(x)
# F_2
x = block(8)(x)

# F_3
# H_3 is the function from the tensor of size 28x28x16 to the the tensor of size 28x28x32
# and we can't add together tensors of inconsistent sizes, so we use upscale=True
x = block(16, upscale=True)(x)       # !!! <------- Uncomment for local evaluation
# F_4
x = block(16)(x)                     # !!! <------- Uncomment for local evaluation
# F_5
x = block(16)(x)                     # !!! <------- Uncomment for local evaluation

# F_6
x = block(24, upscale=True)(x)       # !!! <------- Uncomment for local evaluation
# F_7
x = block(24)(x)                     # !!! <------- Uncomment for local evaluation

# last activation of the entire network's output
x = BatchNormalization()(x)
x = Activation(sigmoid)(x)

# average pooling across the channels
# 28x28x48 -> 1x48
x = GlobalAveragePooling2D()(x)

# dropout for more robust learning
x = Dropout(0.2)(x)

# last softmax layer
x = Dense(units=1, kernel_regularizer=regularizers.l2(0.01))(x)
out1 = Activation(sigmoid)(x)


In [None]:
# Optimizer used is Stochastic Gradient Descent 
# Loss is calculated using categorical cross entropy
model = Model(inputs=input_tensor, outputs=out1)
model.compile(loss='binary_crossentropy', optimizer='SGD', metrics=['accuracy'])

def sigmoidal_decay(e, start=0, end=20, lr_start=0.001, lr_end=1e-5):
    if e < start:
        return lr_start
    
    if e > end:
        return lr_end
    
    middle = (start + end) / 2
    s = lambda x: 1 / (1 + np.exp(-x))
    
    return s(13 * (-e + middle) / np.abs(end - start)) * np.abs(lr_start - lr_end) + lr_end

lr = LearningRateScheduler(lambda e: sigmoidal_decay(e, end=100))

results = model.fit(X_train, Y_train, epochs=20, validation_data=(X_val, Y_val), batch_size=256, callbacks=[lr])

Train on 26088 samples, validate on 6523 samples
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20

In [None]:
# visualizing losses and accuracy
train_loss = results.history['loss']
val_loss = results.history['val_loss'] 
train_acc = results.history['acc']
val_acc = results.history['val_acc']

plt.figure(figsize=(8, 8))
plt.title("Learning curve")
plt.plot(results.history["loss"], label="loss")
plt.plot(results.history["val_loss"], label="val_loss")
plt.plot( np.argmin(results.history["val_loss"]), np.min(results.history["val_loss"]), marker="x", color="r", label="best model")
plt.xlabel("Epochs")
plt.ylabel("loss")
plt.legend();

In [None]:
# Testing
# Load X data
path1 = './test_64/'
#X_data = os.listdir(path1)
#X_data = sorted(X_data)
X_data = []
Y_data = []
with open('annot_test.csv', 'r') as csvFile:
    reader = csv.reader(csvFile)
    for row in reader:
        X_data.append(row[0])
        Y_data.append(row[1])
csvFile.close()

X_data = X_data[1:len(X_data)]

X = np.array([np.array(cv2.imread(path1 + str(img),0)).flatten() for img in X_data],'f') 
X_test = np.array(X)

Y_data = Y_data[1:len(Y_data)]
Y_test = np.array(Y_data)


X_test = X_test.reshape(X_test.shape[0], 64, 64, 1)

# Assigning X_train and X_test as float
X_test = X_test.astype('float32') 

# Normalization of data 
# Data pixels are between 0 and 1
X_test /= 255

# Y_test = np_utils.to_categorical(Y_test, 2)

Y_preds = model.predict(X_test)


In [None]:
#print(Y_preds,Y_test)
score = model.evaluate(X_test, Y_test, verbose=0)
# print(score)
pred = model.predict_classes(X_test[5])
print(pred,np.argmax(Y_test[5]))