In [None]:
from __future__ import absolute_import, division, print_function
import tensorflow as tf
import numpy as np
import os
import time
from tensorflow.keras.datasets import fashion_mnist

## Fashion_Minist data processing

In [None]:
# Read in the fashion_MNIST dataset
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

x_train, x_test = np.array(x_train, np.float32), np.array(x_test, np.float32)
# Normalize images value from [0, 255] to [0, 1]
x_train, x_test = x_train / 255., x_test / 255.


In [None]:
print(' Size of x_train: ',x_train.shape, '\n', 'Size of x_test: ',  x_test.shape)

 Size of x_train:  (60000, 28, 28) 
 Size of x_test:  (10000, 28, 28)


## Define super parameters

In [None]:
num_classes = 10 # total classes of Fashion-MNIST dataset
learning_rate = 0.001
epoch = 5
batch_size = 128
training_steps = int(x_train.shape[0]/batch_size) * epoch
display_step = int(x_train.shape[0]/batch_size)/2
# Network parameters.
conv1_filters = 6 # number of filters for 1st conv layer.
conv2_filters = 16 # number of filters for 2nd conv layer.
fc1_units = 120 # number of neurons for 1st fully-connected layer.
fc2_units = 84  # number of neurons for 2st fully-connected layer.

In [None]:
# Shuffle and batch data.
train_data = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_data = train_data.repeat().shuffle(5000).batch(batch_size).prefetch(1)

## Define convolution and pooling function

In [None]:
def conv2d(x, W, b, strides=1):
    # Conv2D wrapper, with bias and relu activation.
    x = tf.nn.conv2d(x, W, strides=[1, strides, strides, 1], padding='VALID')
    x = tf.nn.bias_add(x, b)
    return tf.nn.relu(x)

def maxpool2d(x, k=2):
    # MaxPool2D wrapper.
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, k, k, 1], padding='SAME')

# Weights initialization
def weights_init():
    random_normal = tf.initializers.RandomNormal()
    weights = {
        # Conv Layer 1: 5x5 conv,  6 filters 
        'wc1': tf.Variable(random_normal([5, 5, 1, conv1_filters])),
        # Conv Layer 2: 5x5 conv, 16 filters.
        'wc2': tf.Variable(random_normal([5, 5, conv1_filters, conv2_filters])),
        # FC Layer 1: 4*4*64 inputs, 120 units.
        'wd1': tf.Variable(random_normal([4*4*16, fc1_units])),
        # FC Layer 2: 120 inputs, 84 units.
        'wd2': tf.Variable(random_normal([fc1_units, fc2_units])),
        # FC Out Layer: 84 inputs, 10 units (total number of classes)
        'w_out': tf.Variable(random_normal([fc2_units, num_classes]))
    }
    biases = {
        'bc1': tf.Variable(tf.zeros([conv1_filters])),
        'bc2': tf.Variable(tf.zeros([conv2_filters])),
        'bd1': tf.Variable(tf.zeros([fc1_units])),
        'bd2': tf.Variable(tf.zeros([fc2_units])),
        'bout': tf.Variable(tf.zeros([num_classes]))
    }

    return weights, biases 

weights, biases = weights_init()

# Question 1:  Baseline Model
# LeNet-5 style architecture


In [None]:
# Define model 
def conv_net(x):
    # Input shape: [batch_size, 28, 28, 1]. 
    # A batch of 28x28x1 (grayscale) images.
    x = tf.reshape(x, [-1, 28, 28, 1])
    # Convolution Layer. Output shape: [batch_size, 24, 24, 6].
    conv1 = conv2d(x, weights['wc1'], biases['bc1'])
    # Max Pooling. Output shape: [batch_size, 12, 12, 6].
    conv1 = maxpool2d(conv1, k=2)
   
    # Convolution Layer. Output shape: [batch_size, 8, 8, 16].
    conv2 = conv2d(conv1, weights['wc2'], biases['bc2'])
    # Max Pooling. Output shape: [batch_size, 4, 4, 16].
    conv2 = maxpool2d(conv2, k=2)

    # Reshape conv2 output to fit fully connected layer input
    # Output shape: [batch_size, 4*4*16=256].
    fc1 = tf.reshape(conv2, [-1, weights['wd1'].get_shape().as_list()[0]])
    # Fully connected layer, Output shape: [batch_size,120].
    fc1 = tf.add(tf.matmul(fc1, weights['wd1']), biases['bd1'])
    # Apply ReLU non-linearity.
    fc1 = tf.nn.relu(fc1)

    # Fully connected layer, Output shape: [batch_size, 84].
    fc2 = tf.add(tf.matmul(fc1, weights['wd2']), biases['bd2'])
    # Apply ReLU non-linearity.
    fc2 = tf.nn.relu(fc2)

    # Fully connected layer, Output shape: [batch_size, 10].
    out = tf.add(tf.matmul(fc2, weights['w_out']), biases['bout'])
    # Apply softmax to generate a probability distribution.
    return tf.nn.softmax(out)

In [None]:
# Cross-Entropy loss function.
def cross_entropy(y_pred, y_true):
    y_true = tf.one_hot(y_true, depth=num_classes)
    y_pred = tf.clip_by_value(y_pred, 1e-9, 1.)
    return tf.reduce_mean(-tf.reduce_sum(y_true * tf.math.log(y_pred)))
    
# Accuracy metric.
def accuracy(y_pred, y_true):
    # Predicted class is the index of highest score in prediction vector (i.e. argmax).
    correct_prediction = tf.equal(tf.argmax(y_pred, 1), tf.cast(y_true, tf.int64))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32), axis=-1)

# ADAM optimizer.
optimizer = tf.optimizers.Adam(learning_rate)

## Optimization process.
## Question 2:  Add L2 weight decay regularization
## Question 3:  Add L1 weight decay regularization

In [None]:
# Optimization process. 
# reg_type:  1: L1 norm;  2: L2 norm;  other: Baseline without regularization
# lamda: regularizaton strength,  default is 0, without regularization. 
def run_optimization(x, y, reg_type = 0, lamda = 0 ):
    # Wrap computation inside a GradientTape for automatic differentiation.
    with tf.GradientTape() as g:
        pred = conv_net(x)
        loss = cross_entropy(pred, y)
        # Add L1 or L2 weight decay regularization to the loss
        # Using  kernel_regularizer
        if reg_type == 1:       # L1 norm
            kernel_weights = tf.reduce_sum(tf.abs(weights['wc1'])) + tf.reduce_sum(tf.abs(weights['wc2'])) + tf.reduce_sum(tf.abs(weights['wd1'])) \
             + tf.reduce_sum(tf.abs(weights['wd2'])) + tf.reduce_sum(tf.abs(weights['w_out']))   
            loss += lamda * kernel_weights
        elif reg_type == 2:     # L2 norm
            kernel_weights = tf.reduce_sum(tf.square(weights['wc1'])) + tf.reduce_sum(tf.square(weights['wc2'])) \
            + tf.reduce_sum(tf.square(weights['wd1'])) + tf.reduce_sum(tf.square(weights['wd2'])) \
            + tf.reduce_sum(tf.square(weights['w_out'])) 
            loss += lamda * kernel_weights
        else:                   # no regularization
            pass

    # trainable variables.
    trainable_variables = list(weights.values()) + list(biases.values())
    # Compute gradients.
    gradients = g.gradient(loss, trainable_variables)
    # Update W and b following gradients.
    optimizer.apply_gradients(zip(gradients, trainable_variables))

## Calculate the number of model for Question 4.
## Modify the architecture to remove the fully-connected layers at the backend of the network. For example, replace with Global Average Pooling or an alternative. Report the change in the number of parameters for this model compared to previous.

In [None]:
# Calculate the number of model for Question 4.
def model_size(model_params):
  num_para = 0
  for i in range(len(model_params)):
    num_para += len(tf.reshape(model_params[i],-1))

  return num_para


trainable_variables = list(weights.values()) + list(biases.values())
Baseline_Size = model_size(trainable_variables)
print(Baseline_Size )

44426


# Question 5: three runs of train / test 

In [None]:
## Question 5: three runs of train / test 

## Training  and Testing

In [None]:
## Question 5: three runs of train / test 
## set super parameters and run three times, manually 
regularizer = 1   # 0: no regularization; 1: L1; 2: L2 
weight_decay = 1e-3  # 1e-4, 1e-3

# Weights initialization 
weights, biases = weights_init() # clear weights and biases before new run of train/test
start_time = time.time()
with tf.device('/device:GPU:0'):
    for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
        run_optimization(batch_x, batch_y, regularizer, weight_decay)
        if step % display_step == 0:
            pred = conv_net(batch_x)
            loss = cross_entropy(pred, batch_y)/batch_size
            acc = accuracy(pred, batch_y)
            print("step: %i, loss: %f, accuracy: %f" % (step, loss, acc))

end_time = time.time()
print('Training loss: %f \n' %loss,  'Training acc: %f' % acc)
print ('Time cost: ', end_time - start_time)

# Test model on validation set.
pred = conv_net(x_test)
loss = cross_entropy(pred, y_test)/y_test.shape[0]
print('Test loss: %f' % loss)
print("Test Accuracy: %f" % accuracy(pred, y_test))

step: 234, loss: 0.415999, accuracy: 0.859375
step: 468, loss: 0.394258, accuracy: 0.828125
step: 702, loss: 0.488098, accuracy: 0.828125
step: 936, loss: 0.275866, accuracy: 0.937500
step: 1170, loss: 0.278102, accuracy: 0.906250
step: 1404, loss: 0.402531, accuracy: 0.843750
step: 1638, loss: 0.373588, accuracy: 0.843750
step: 1872, loss: 0.330263, accuracy: 0.898438
step: 2106, loss: 0.272336, accuracy: 0.898438
step: 2340, loss: 0.314002, accuracy: 0.875000
Training loss: 0.314002 
 Training acc: 0.875000
Time cost:  23.348323106765747
Test loss: 0.357108
Test Accuracy: 0.869300


In [None]:
def run3():
    # Weights initialization 
    weights, biases = weights_init() # clear weights and biases before new run of train/test
    start_time = time.time()
    with tf.device('/device:GPU:0'):
        for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
            run_optimization(batch_x, batch_y, regularizer, weight_decay)
            
                pred = conv_net(batch_x)
                loss = cross_entropy(pred, batch_y)/batch_size
                acc = accuracy(pred, batch_y)
                print("step: %i, loss: %f, accuracy: %f" % (step, loss, acc))

    end_time = time.time()
    print('Training loss: %f \n' %loss,  'Training acc: %f' % acc)
    print ('Time cost: ', end_time - start_time)

    # Test model on validation set.
    pred = conv_net(x_test)
    loss = cross_entropy(pred, y_test)/y_test.shape[0]
    print('Test loss: %f' % loss)
    print("Test Accuracy: %f" % accuracy(pred, y_test))

In [None]:
## Question 5: three runs of train / test 
## set super parameters and run three times, manually 
regularizer = 1      # 0: no regularization; 1: L1; 2: L2 
weight_decay = 1e-3  # 1e-4, 1e-3
run3()

step: 234, loss: 0.345885, accuracy: 0.882812
step: 468, loss: 0.245843, accuracy: 0.921875
step: 702, loss: 0.466272, accuracy: 0.820312
step: 936, loss: 0.293578, accuracy: 0.882812
step: 1170, loss: 0.220783, accuracy: 0.890625
step: 1404, loss: 0.308055, accuracy: 0.867188
step: 1638, loss: 0.269659, accuracy: 0.890625
step: 1872, loss: 0.304259, accuracy: 0.898438
step: 2106, loss: 0.278094, accuracy: 0.906250
step: 2340, loss: 0.349781, accuracy: 0.859375
Training loss: 0.349781 
 Training acc: 0.859375
Time cost:  23.416645050048828
Test loss: 0.327695
Test Accuracy: 0.880100


In [None]:
# FC layer 1
Hoyer_layer1 = Hoyer_index(weights['wd1'])  
# FC layer 2
Hoyer_layer2 = Hoyer_index(weights['wd2'])

print(float(Hoyer_layer1), float(Hoyer_layer2))

tf.Tensor(1277.594, shape=(), dtype=float32) 30720
tf.Tensor(400.74744, shape=(), dtype=float32) 10080
0.9969193339347839 0.9947248101234436


In [None]:
y_test.shape[0]

10000

In [None]:
# Test model on validation set.
loss = 0 
pred = 0
pred = conv_net(x_test)
loss = cross_entropy(pred, y_test)
print('Test loss: %f' % loss)
print("Test Accuracy: %f" % accuracy(pred, y_test))

Test loss: 3414.680664
Test Accuracy: 0.876500


## Question 4: Remove fully-connected layers

In [None]:
# Q4: Remove fully-connected layers
def globalAveragePooling2d(x):
    return  tf.reduce_mean(x, axis=[1,2])

# Weights initialization for Question 4. 
def weights_init_Q4():
random_normal = tf.initializers.RandomNormal():
    weights = {
        # Conv Layer 1: 5x5 conv,  6 filters 
        'wc1': tf.Variable(random_normal([5, 5, 1, conv1_filters])),
        # Conv Layer 2: 5x5 conv, 16 filters.
        'wc2': tf.Variable(random_normal([5, 5, conv1_filters, conv2_filters])),
        # FC Out Layer: 84 inputs, 10 units (total number of classes)
        'w_out': tf.Variable(random_normal([conv2_filters, num_classes]))
    }
    biases = {
        'bc1': tf.Variable(tf.zeros([conv1_filters])),
        'bc2': tf.Variable(tf.zeros([conv2_filters])),
        'bout': tf.Variable(tf.zeros([num_classes]))
    }
# Define model
def conv_net_Q4(x):
    # Input shape: [batch_size, 28, 28, 1]. 
    # A batch of 28x28x1 (grayscale) images.
    x = tf.reshape(x, [-1, 28, 28, 1])
    # Convolution Layer. Output shape: [batch_size, 24, 24, 6].
    conv1 = conv2d(x, weights['wc1'], biases['bc1'])
    # Max Pooling. Output shape: [batch_size, 12, 12, 6].
    conv1 = maxpool2d(conv1, k=2)
   
    # Convolution Layer. Output shape: [batch_size, 8, 8, 16].
    conv2 = conv2d(conv1, weights['wc2'], biases['bc2'])
    # Max Pooling. Output shape: [batch_size, 4, 4, 16].
    conv2 = maxpool2d(conv2, k=2)

    # Global Average Pooling. Output shape: [batch_size, 16].
    GAP1 = globalAveragePooling2d(conv2)
    # Fully connected layer, Output shape: [batch_size, 10].
    out = tf.add(tf.matmul(GAP1, weights['w_out']), biases['bout'])
    # Apply softmax to generate a probability distribution.
    return tf.nn.softmax(out)

def run_optimization_Q4(x, y ):
    with tf.GradientTape() as g:
        pred = conv_net_Q4(x)
        loss = cross_entropy(pred, y)
    
    trainable_variables = list(weights.values()) + list(biases.values())
    gradients = g.gradient(loss, trainable_variables)
    optimizer.apply_gradients(zip(gradients, trainable_variables))

## Q4: Training  and Testing

In [None]:
# Q4: Training and Testing
# Weights initialization 
weights, biases = weights_init_Q4() # clear weights and biases before new run of train/test

# train model
start_time = time.time()
with tf.device('/device:GPU:0'):
    for step, (batch_x, batch_y) in enumerate(train_data.take(training_steps), 1):
        run_optimization_Q4(batch_x, batch_y)
        if step % display_step == 0:
            pred = conv_net_Q4(batch_x)
            loss = cross_entropy(pred, batch_y)
            acc = accuracy(pred, batch_y)
            print("step: %i, loss: %f, accuracy: %f" % (step, loss, acc))

end_time = time.time()
print ( 'Time cost: ', end_time - start_time)

# Test model on validation set.
pred = conv_net_Q4(x_test)
print("Test Accuracy: %f" % accuracy(pred, y_test))

## Question 4: 
## The change in the number of parameters for the model of Question 4 compared to previous is:
## 41684.

In [None]:
trainable_variables = list(weights.values()) + list(biases.values())
Model_Q4_Size  = model_size(trainable_variables)
print('The difference of number of parameters between Baseline model and the model using GAP: \n', \
      Baseline_Size - Model_Q4_Size)

The difference of number of parameters between Baseline model and the model using GAP: 
 41684


## Question 6: Analyze the weights of the regularized models
## Hoyer's index 

In [None]:

def Hoyer_index(w_fc):
    sum_cj = tf.reduce_sum(tf.abs(w_fc)) 
    sqrt_sumsquare_cj = tf.sqrt(tf.reduce_sum(tf.square(w_fc))) 
    # Number of elements of FC layer
    N = int(tf.size(w_fc)) 
    # Hoyer's index
    return ((np.sqrt(N) - sum_cj/ sqrt_sumsquare_cj)*(1/(np.sqrt(N)-1)))
    
# FC layer 1
Hoyer_layer1 = Hoyer_index(weights['wd1'])  
# FC layer 2
Hoyer_layer2 = Hoyer_index(weights['wd2'])

print(float(Hoyer_layer1), float(Hoyer_layer2))

0.46089068055152893 0.47526222467422485
