# Deep Neural Network (DNN)
- MNIST Database
- Function: Sigmoid vs. ReLU func
- Initialization: RandomNormal vs. Xavier
- Dropout vs. Batch Normalization

In [1]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.utils import to_categorical    # converts a class vector (integers) to binary class matrix
from tensorflow.keras.datasets import mnist
from time import time
import os
print(tf.__version__)

2.3.0


### Checkpoint function

In [2]:
def load(model, checkpoint_dir):
    print(" [*] Reading checkpoints...")

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt :
        ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
        checkpoint = tf.train.Checkpoint(dnn=model)
        checkpoint.restore(save_path=os.path.join(checkpoint_dir, ckpt_name))
        counter = int(ckpt_name.split('-')[1])
        print(" [*] Success to read {}".format(ckpt_name))
        return True, counter
    else:
        print(" [*] Failed to find a checkpoint")
        return False, 0

def check_folder(dir):
    if not os.path.exists(dir):
        os.makedirs(dir)
    return dir

### Data load & pre-processing function

In [3]:
def load_mnist() :
    (train_data, train_labels), (test_data, test_labels) = mnist.load_data()
    train_data = np.expand_dims(train_data, axis=-1)    # [N, 28, 28] -> [N, 28, 28, 1]
    test_data = np.expand_dims(test_data, axis=-1)    # [N, 28, 28] -> [N, 28, 28, 1]

    train_data, test_data = normalize(train_data, test_data)

    train_labels = to_categorical(train_labels, 10)    # [N,] -> [N, 10]
    test_labels = to_categorical(test_labels, 10)    # [N,] -> [N, 10]

    return train_data, train_labels, test_data, test_labels

def normalize(train_data, test_data):
    train_data = train_data.astype(np.float32) / 255.0    # 명암이 0~255의 값으로 표현되므로
    test_data = test_data.astype(np.float32) / 255.0

    return train_data, test_data

### Performance function

In [4]:
# Cross-entropy loss: log loss, measures the performance of a classification model whose output is a probability value between 0 and 1.
def loss_fn(model, images, labels):
    logits = model(images, training=True)    # True: dropout on
    loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(y_true=labels, y_pred=logits, from_logits=True))
    return loss

def accuracy_fn(model, images, labels):
    logits = model(images, training=False)    # False: dropout off
    prediction = tf.equal(tf.argmax(labels, -1), tf.argmax(logits, -1))
    accuracy = tf.reduce_mean(tf.cast(prediction, tf.float32))
    return accuracy

def grad(model, images, labels):
    with tf.GradientTape() as tape:
        loss = loss_fn(model, images, labels)
    return tape.gradient(loss, model.variables)

### Model function

In [5]:
def flatten() :
    return tf.keras.layers.Flatten()

# to use fully-connected layer
def dense(label_dim, weight_init) :
    return tf.keras.layers.Dense(units=label_dim, use_bias=True, kernel_initializer=weight_init)

# Sigmoid func
def sigmoid() :
    return tf.keras.layers.Activation(tf.keras.activations.sigmoid)

# ReLU func
def relu() :
    return tf.keras.layers.Activation(tf.keras.activations.relu)

# Dropout
def dropout(rate):
    return tf.keras.layers.Dropout(rate)

# Batch Normalization
def batch_norm():
    return tf.keras.layers.BatchNormalization()

### Create model (class version)

In [6]:
# class create_model_class(tf.keras.Model):
#     def __init__(self, label_dim):
#         super(create_model_class, self).__init__()
#         weight_init = tf.keras.initializers.RandomNormal()
#         self.model = tf.keras.Sequential()
        
#         self.model.add(flatten())    # [N, 28, 28, 1] -> [N, 784]

#         for i in range(2):
#             # [N, 784] -> [N, 256] -> [N, 256]
#             self.model.add(dense(256, weight_init))
#             self.model.add(sigmoid())
#             self.model.add(dropout(rate=0.5))    # dropout

#         self.model.add(dense(label_dim, weight_init))    # [N, 256] -> [N, 10]

#     def call(self, x, training=None, mask=None):
#         x = self.model(x)

#         return x

### Create model (function version)

In [7]:
def create_model_function(label_dim) :
#     weight_init = tf.keras.initializers.RandomNormal()
    weight_init = tf.keras.initializers.glorot_uniform()    # Xavier initialization

    model = tf.keras.Sequential()
    model.add(flatten())    # [N, 28, 28, 1] -> [N, 784]

    for i in range(2) :
        # [N, 784] -> [N, 256] -> [N, 256]
        '''layer -> normalization -> activation 순서가 일반적'''
        model.add(dense(256, weight_init))
#         model.add(batch_norm())
#         model.add(sigmoid())
        model.add(relu())
#         model.add(dropout(rate=0.5))

    model.add(dense(label_dim, weight_init))

    return model

### Define data & hyper-parameter

In [8]:
""" dataset """
train_x, train_y, test_x, test_y = load_mnist()

""" parameters """
learning_rate = 0.001
batch_size = 128

training_epochs = 1
training_iterations = len(train_x) // batch_size    # 60,000 / 128 = 468.75

label_dim = 10

train_flag = True

""" Graph Input using Dataset API """
train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).\
    shuffle(buffer_size=100000).\
    prefetch(buffer_size=batch_size).\
    batch(batch_size, drop_remainder=True)

test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).\
    shuffle(buffer_size=100000).\
    prefetch(buffer_size=len(test_x)).\
    batch(len(test_x))

### Define model & optimizer

In [9]:
""" Model """
network = create_model_function(label_dim)

""" Training """
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

### Writer

In [10]:
# checkpoint_dir = 'checkpoints'
# logs_dir = 'logs'
# model_dir = 'nn_softmax'

# checkpoint_dir = os.path.join(checkpoint_dir, model_dir)
# check_folder(checkpoint_dir)
# checkpoint_prefix = os.path.join(checkpoint_dir, model_dir)
# logs_dir = os.path.join(logs_dir, model_dir)

### Restore checkpoint & start train or test phase

In [11]:
'''NO CHECKPOINT'''

start_epoch = 0
start_iteration = 0

start_time = time()
for epoch in range(start_epoch, training_epochs):
    for idx, (train_input, train_label) in enumerate(train_dataset):            
        grads = grad(network, train_input, train_label)
        optimizer.apply_gradients(grads_and_vars=zip(grads, network.variables))

        train_loss = loss_fn(network, train_input, train_label)
        train_accuracy = accuracy_fn(network, train_input, train_label)

        for test_input, test_label in test_dataset:                
            test_accuracy = accuracy_fn(network, test_input, test_label)
        
#         print(
#             "Epoch: [%2d] [%5d/%5d] time: %4.2f, train_loss: %.4f, train_accuracy: %.4f, test_Accuracy: %.4f" \
#             % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy, test_accuracy))
        
        if idx % 10 == 0:
            print(
                "Epoch: [%2d] [%5d/%5d] time: %4.2f, train_loss: %.4f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy, test_accuracy))
        elif idx == training_iterations - 1:
            print(
                "Epoch: [%2d] [%5d/%5d] time: %4.2f, train_loss: %.4f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy, test_accuracy))

Epoch: [ 0] [    0/  468] time: 0.50, train_loss: 2.0879, train_accuracy: 0.4297, test_Accuracy: 0.2652
Epoch: [ 0] [   10/  468] time: 1.74, train_loss: 0.9723, train_accuracy: 0.7812, test_Accuracy: 0.7719
Epoch: [ 0] [   20/  468] time: 2.89, train_loss: 0.5492, train_accuracy: 0.8203, test_Accuracy: 0.8416
Epoch: [ 0] [   30/  468] time: 4.05, train_loss: 0.4106, train_accuracy: 0.8906, test_Accuracy: 0.8788
Epoch: [ 0] [   40/  468] time: 5.18, train_loss: 0.4604, train_accuracy: 0.8359, test_Accuracy: 0.8864
Epoch: [ 0] [   50/  468] time: 6.30, train_loss: 0.3938, train_accuracy: 0.8750, test_Accuracy: 0.8934
Epoch: [ 0] [   60/  468] time: 7.46, train_loss: 0.3481, train_accuracy: 0.9219, test_Accuracy: 0.8986
Epoch: [ 0] [   70/  468] time: 8.60, train_loss: 0.2592, train_accuracy: 0.9297, test_Accuracy: 0.9094
Epoch: [ 0] [   80/  468] time: 9.74, train_loss: 0.2235, train_accuracy: 0.9375, test_Accuracy: 0.9120
Epoch: [ 0] [   90/  468] time: 10.88, train_loss: 0.2992, train

In [12]:
# ORIGINAL SOURCE CODE

"""

if train_flag :

    checkpoint = tf.train.Checkpoint(dnn=network)

    # create writer for tensorboard
    summary_writer = tf.summary.create_file_writer(logdir=logs_dir)
    start_time = time()

    # restore check-point if it exits
    could_load, checkpoint_counter = load(network, checkpoint_dir)    

    if could_load:
        start_epoch = (int)(checkpoint_counter / training_iterations)        
        counter = checkpoint_counter        
        print(" [*] Load SUCCESS")
    else:
        start_epoch = 0
        start_iteration = 0
        counter = 0
        print(" [!] Load failed...")

    # train phase
    with summary_writer.as_default():  # for tensorboard
        for epoch in range(start_epoch, training_epochs):
            for idx, (train_input, train_label) in enumerate(train_dataset):            
                grads = grad(network, train_input, train_label)
                optimizer.apply_gradients(grads_and_vars=zip(grads, network.variables))

                train_loss = loss_fn(network, train_input, train_label)
                train_accuracy = accuracy_fn(network, train_input, train_label)
                
                for test_input, test_label in test_dataset:                
                    test_accuracy = accuracy_fn(network, test_input, test_label)

                tf.summary.scalar(name='train_loss', data=train_loss, step=counter)
                tf.summary.scalar(name='train_accuracy', data=train_accuracy, step=counter)
                tf.summary.scalar(name='test_accuracy', data=test_accuracy, step=counter)

                print(
                    "Epoch: [%2d] [%5d/%5d] time: %4.4f, train_loss: %.8f, train_accuracy: %.4f, test_Accuracy: %.4f" \
                    % (epoch, idx, training_iterations, time() - start_time, train_loss, train_accuracy,
                       test_accuracy))
                counter += 1                
        checkpoint.save(file_prefix=checkpoint_prefix + '-{}'.format(counter))
        
# test phase      
else :
    _, _ = load(network, checkpoint_dir)
    for test_input, test_label in test_dataset:    
        test_accuracy = accuracy_fn(network, test_input, test_label)

    print("test_Accuracy: %.4f" % (test_accuracy))

"""

'\n\nif train_flag :\n\n    checkpoint = tf.train.Checkpoint(dnn=network)\n\n    # create writer for tensorboard\n    summary_writer = tf.summary.create_file_writer(logdir=logs_dir)\n    start_time = time()\n\n    # restore check-point if it exits\n    could_load, checkpoint_counter = load(network, checkpoint_dir)    \n\n    if could_load:\n        start_epoch = (int)(checkpoint_counter / training_iterations)        \n        counter = checkpoint_counter        \n        print(" [*] Load SUCCESS")\n    else:\n        start_epoch = 0\n        start_iteration = 0\n        counter = 0\n        print(" [!] Load failed...")\n\n    # train phase\n    with summary_writer.as_default():  # for tensorboard\n        for epoch in range(start_epoch, training_epochs):\n            for idx, (train_input, train_label) in enumerate(train_dataset):            \n                grads = grad(network, train_input, train_label)\n                optimizer.apply_gradients(grads_and_vars=zip(grads, network.var