# <span style="color:#0b486b">  FIT5215: Deep Learning (2021)</span>
***
*CE/Lecturer:*  **Dr Trung Le** | trunglm@monash.edu <br/>
*Head TA:*  **Dr Van Nguyen** | van.nguyen1@monash.edu <br/>
*Tutor:* **Mr Anh Bui** \[tuananh.bui@monash.edu\] | **Mr Tuan Nguyen**  \[tuan.ng@monash.edu \] | **Dr Binh Nguyen** \[binh.nguyen1@monash.edu\] | **Dr Mahmoud Mohammad** \[mahmoud.hossam@monash.edu\]
<br/> <br/>
Faculty of Information Technology, Monash University, Australia
***


## Convolutional Neural Network with MiniVGG for Animals Dataset</span>

In [None]:
import tensorflow as tf
from tensorflow.python.ops import control_flow_ops
import cv2
import os
from imutils import paths
import imutils
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

In [None]:
class SimplePreprocessor:
    def __init__(self, width, height, inter= cv2.INTER_AREA):
        self.width= width
        self.height = height
        self.inter = inter
    
    def preprocess(self, image):
        image= cv2.resize(image, (self.width, self.height), interpolation = self.inter)
        b,g,r= cv2.split(image)
        image= cv2.merge((r,g,b))
        return image

In [None]:
class AnimalsDatasetManager:
    def __init__(self, preprocessors=None):
        self.cur_pos=0
        self.preprocessors = preprocessors
        if self.preprocessors is None:
            self.preprocessors = list()
    
    def load(self, label_folder_dict, max_num_images=500, verbose =-1):
        data =list(); labels = list()
        for label, folder in label_folder_dict.items():
            image_paths = list(paths.list_images(folder))
            print(label, len(image_paths))
            for (i, image_path) in enumerate(image_paths):
                image = cv2.imread(image_path)
                if self.preprocessors is not None:
                    for p in self.preprocessors:
                        image = p.preprocess(image)
                data.append(image); labels.append(label)
                if verbose > 0 and i>0 and (i+1)% verbose ==0:
                    print("Processed {}/{}".format(i+1, max_num_images))
                if i+1 >= max_num_images:
                    break
        self.data= np.array(data)
        self.labels= np.array(labels)
        self.train_size= int(self.data.shape[0])
    
    def process_data_label(self):
        label_encoder= preprocessing.LabelEncoder()
        label_encoder.fit(self.labels)
        self.labels= label_encoder.transform(self.labels)
        self.data= self.data.astype("float")/255.0
        self.classes= label_encoder.classes_
    
    def train_valid_test_split(self, train_size=0.8, test_size= 0.1, rand_seed=33):
        valid_size = 1 - (train_size + test_size)
        X1, X_test, y1, y_test = train_test_split(self.data, self.labels, test_size = test_size, random_state= rand_seed)
        self.X_test= X_test
        self.y_test= y_test
        X_train, X_valid, y_train, y_valid = train_test_split(X1, y1, test_size = float(valid_size)/(valid_size+ train_size))
        self.X_train= X_train
        self.y_train= y_train
        self.X_valid= X_valid
        self.y_valid= y_valid
    
    def next_batch(self, batch_size=32):
        end_pos= self.cur_pos + batch_size
        x_batch= []
        y_batch= []
        if end_pos <= self.train_size:
            x_batch= self.X_train[self.cur_pos:end_pos, :]
            y_batch= self.y_train[self.cur_pos:end_pos]
            self.cur_pos= end_pos
        else:
            cur_pos_new= (end_pos-1) % self.train_size +1
            x_batch= np.concatenate((self.X_train[self.cur_pos: self.train_size, :], self.X_train[0:cur_pos_new,:]))
            y_batch= np.concatenate((self.y_train[self.cur_pos: self.train_size], self.y_train[0:cur_pos_new]))
            self.cur_pos= cur_pos_new
        return x_batch, y_batch
        


In [None]:
def create_label_folder_dict(adir):
    sub_folders= [folder for folder in os.listdir(adir)
                  if os.path.isdir(os.path.join(adir, folder))]
    label_folder_dict= dict()
    for folder in sub_folders:
        item= {folder: os.path.abspath(os.path.join(adir, folder))}
        label_folder_dict.update(item)
    return label_folder_dict

label_folder_dict= create_label_folder_dict("./Data/Animals")
sp = SimplePreprocessor(width=32, height=32)
data_manager = AnimalsDatasetManager([sp])
data_manager.load(label_folder_dict, verbose=100)
data_manager.process_data_label()
data_manager.train_valid_test_split()
print(data_manager.X_train.shape, data_manager.y_train.shape)
print(data_manager.X_valid.shape, data_manager.y_valid.shape)
print(data_manager.X_test.shape, data_manager.y_test.shape)
print(data_manager.classes)

In [None]:
class Layers:
    @staticmethod
    def dense(inputs, output_size, name="dense1", act=None):
        with tf.name_scope(name):
            input_size= int(inputs.get_shape()[1])
            W_init = tf.random_normal([input_size, output_size], mean=0, stddev= 0.1, dtype= tf.float32)
            b_init= tf.random_normal([output_size], mean=0, stddev= 0.1, dtype= tf.float32)
            W= tf.Variable(W_init, name= "W")
            b= tf.Variable(b_init, name="b")
            Wxb= tf.matmul(inputs, W) + b
            if act is None:
                return Wxb
            else:
                return act(Wxb)

    @staticmethod
    def conv2D(inputs, filter_shape, strides=[1,1,1,1], padding="SAME", name= "conv1", act=None):
        with tf.name_scope(name):
            W_init= tf.random_normal(filter_shape, mean=0, stddev=0.1, dtype= tf.float32)
            W= tf.Variable(W_init, name="W")
            b_init= tf.random_normal([int(filter_shape[3])], mean=0, stddev=0.1, dtype= tf.float32)
            b= tf.Variable(b_init, name="b")
            Wxb= tf.nn.conv2d(input= inputs, filter= W, strides= strides, padding= padding)+b
            if act is None:
                return Wxb
            else:
                return act(Wxb)
    @staticmethod
    def max_pool(inputs, ksize=[1,2,2,1],strides=[1,2,2,1], padding="SAME"):
        return tf.nn.max_pool(value= inputs, ksize=ksize, strides= strides, padding= padding)
    
    @staticmethod
    def dropout(inputs, keep_prob):
        return tf.nn.dropout(inputs, keep_prob= keep_prob)
    
    @staticmethod
    def batch_norm(inputs, phase_train):
        return tf.contrib.layers.batch_norm(inputs, decay= 0.99, 
                                            is_training=phase_train, center= True, scale=True, reuse= False)
         

In [None]:
class MiniVGGNet():
    def __init__(self, width=32, height=32, depth=3, num_classes=4, keep_prob= 0.8,
                 batch_size=10, epochs= 20, optimizer= tf.train.AdamOptimizer(learning_rate=0.0001), learning_rate=0.0001):
        tf.reset_default_graph()
        self.width= width
        self.height= height
        self.depth= depth
        self.num_classes= num_classes
        self.keep_prob= keep_prob
        self.batch_size= batch_size
        self.epochs= epochs
        self.optimizer= optimizer
        self.optimizer.learning_rate= learning_rate
        self.session= tf.Session()
        
    def build(self):
        self.X= tf.placeholder(shape=[None, self.height, self.width, self.depth], dtype=tf.float32)
        self.y= tf.placeholder(shape= [None], dtype= tf.int64)
        self.keep_prob_holder= tf.placeholder(dtype= tf.float32)
        self.phase_train= tf.placeholder(dtype= tf.bool)
        conv1= Layers.conv2D(inputs=self.X, filter_shape=[3,3,3,32], act= tf.nn.relu)
        #bn1= Layers.batch_norm(conv1, self.phase_train)
        conv2= Layers.conv2D(conv1, [3,3,32,32], act= tf.nn.relu)
        #bn2= Layers.batch_norm(conv2, self.phase_train)
        pool2= Layers.max_pool(conv2)
        pool2_drop= Layers.dropout(pool2, self.keep_prob_holder)
        conv3= Layers.conv2D(pool2_drop, [3,3,32,64], act= tf.nn.relu)
        #bn3= Layers.batch_norm(conv3, self.phase_train)
        conv4= Layers.conv2D(conv3, [3,3,64,64], act= tf.nn.relu)
        #bn4= Layers.batch_norm(conv4, self.phase_train)
        pool4= Layers.max_pool(conv4)
        pool4_drop= Layers.dropout(pool4, self.keep_prob_holder)
        pool4_flat= tf.reshape(pool4_drop,[-1, 8*8*64])
        full5= Layers.dense(pool4_flat, 512, act= tf.nn.relu)
        #bn5= Layers.batch_norm(full5, self.phase_train)
        full5_drop= Layers.dropout(pool4_flat, self.keep_prob_holder)
        self.outputs= Layers.dense(full5_drop, self.num_classes)
        with tf.name_scope("train"):
            cross_entropy= tf.nn.sparse_softmax_cross_entropy_with_logits(labels=self.y, logits= self.outputs)
            self.loss= tf.reduce_mean(cross_entropy)
            self.train= self.optimizer.minimize(self.loss)
        with tf.name_scope("predict"):
            self.y_pred= tf.argmax(self.outputs, 1)
            corrections= tf.equal(self.y_pred, self.y)
            self.accuracy= tf.reduce_mean(tf.cast(corrections, tf.float32))
        self.session.run(tf.global_variables_initializer())
    
    def partial_fit(self, X_batch, y_batch):
        self.session.run([self.train], feed_dict={self.X:X_batch, self.y:y_batch, 
                                                  self.keep_prob_holder: self.keep_prob, self.phase_train: True})
        
    def predict(self, X, y):
        y_pred, acc= self.session.run([self.y_pred, self.accuracy], feed_dict={self.X:X, self.y:y, self.keep_prob_holder:1,
                                                                              self.phase_train: False})
        return y_pred, acc
        
    def compute_acc_loss(self, X, y):
        loss, acc = self.session.run([self.loss, self.accuracy], 
                                             feed_dict={self.X:X, self.y:y, self.keep_prob_holder:1, self.phase_train: False})
        return loss, acc
        
    def __exit__(self, exc_type, exc_value, traceback):
        self.session.close()
        

In [None]:
batch_size=10
epochs=100
num_classes= len(data_manager.classes)
training_size= data_manager.X_train.shape[0]
iter_per_epoch= int(training_size/batch_size) +1
network= MiniVGGNet(batch_size= batch_size, epochs= epochs, num_classes= num_classes)
network.build()
for epoch in range(epochs):
    for i in range(iter_per_epoch):
        X_batch,y_batch= data_manager.next_batch(batch_size)
        network.partial_fit(X_batch, y_batch)
    train_loss, train_acc= network.compute_acc_loss(data_manager.X_train, data_manager.y_train)
    val_loss, val_acc= network.compute_acc_loss(data_manager.X_valid, data_manager.y_valid)
    print("Epoch {}: train loss={}, val loss={}\n".format(epoch, train_loss, val_loss))
    print("########: train acc={}, val acc={}\n".format(train_acc, val_acc))
    #print(train_out, val_out)
print("Finish training and come to testing")
y_test, acc= network.predict(data_manager.X_test, data_manager.y_test)
print("Testing accuracy= {}".format(acc))