# Direct Feedback Alignment Provides Learning In Deep Neural Networks

Feedforwd Network implementation using direct feedback alignment for optimization.
Data utilities are from the source code of the udacity deep learning course.

In [4]:
# These are all the modules we'll be using later. Make sure you can import them
# before proceeding further.

# GPU setting
#import os
#os.environ["CUDA_VISIBLE_DEVICES"]="0"

from __future__ import print_function
import numpy as np
import tensorflow as tf
from six.moves import cPickle as pickle
from six.moves import range
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)


Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz


Load data set.

In [5]:
digits, labels = mnist.train.images[:], mnist.train.labels[:]
test_digits, test_labels = mnist.test.images[:], mnist.test.labels[:]

In [6]:
image_size = 28
num_labels = 10

def accuracy(predictions, labels):
  return (100.0 * np.sum(np.argmax(predictions, 1) == np.argmax(labels, 1))
          / predictions.shape[0])


print('Training set', digits.shape, labels.shape)
print('Test set', test_digits.shape, test_labels.shape)

Training set (55000, 784) (55000, 10)
Test set (10000, 784) (10000, 10)


In [7]:
def drelu(x):
    zero = tf.zeros(x.get_shape())
    one = tf.ones(x.get_shape())
    return(tf.select(tf.greater(x, zero), one, zero))

def dtanh(x):
    return(1-tf.mul(tf.nn.tanh(x),tf.nn.tanh(x)))

def act_ftn(name):
    if(name == "tanh"):
        return(tf.nn.tanh)
    elif(name == "relu"):
        return(tf.nn.relu)
    else:
        print("not tanh or relu")
        
def dact_ftn(name):
    if(name == "tanh"):
        return(dtanh)
    elif(name == "relu"):
        return(drelu)
    else:
        print("not tanh or relu")

def init_ftn(name, num_input, num_output, runiform_range):
    if(name == "normal"):
        return(tf.truncated_normal([num_input, num_output]))
    elif(name == "uniform"):
        return(tf.random_uniform([num_input, num_output], minval = -runiform_range, maxval = runiform_range ))
    else:
        print("not normal or uniform")

In [8]:
class Weights:
    def __init__(self, batch_size, num_input, num_output, num_final, 
                 act_f, init_f, notfinal = True, back_init_f = "uniform", 
                 weight_uni_range = 0.05, back_uni_range = 0.5):
        self.weights = tf.Variable(init_ftn(init_f, num_input, num_output, weight_uni_range))
        self.biases = tf.Variable(tf.zeros([num_output]))
        backward_t = tf.Variable(init_ftn(back_init_f, num_final, num_output, back_uni_range))
        self.backward = tf.reshape(tf.pack([backward_t for _ in range(batch_size)]), [batch_size, num_final, num_output])
        
        self.batch_size = batch_size
        self.num_input = num_input
        self.num_output = num_output
        self.num_final = num_final
        self.activation = act_ftn(act_f)
        self.dactivation = dact_ftn(act_f)
        self.notfinal = notfinal

        self.inputs = None
        self.before_activation = None
    
    def __call__(self, x, batch_size):
        if (batch_size == self.batch_size):
            self.inputs = tf.reshape(x, [batch_size, self.num_input, 1])
            self.before_activation = tf.matmul(x, self.weights) + self.biases
            if (self.notfinal):
                return(self.activation(self.before_activation))
            else:
                return(self.before_activation)
        else:
            before_activation = tf.matmul(x, self.weights) + self.biases
            if (self.notfinal):
                return(self.activation(before_activation))
            else:
                return(before_activation)
    
    def optimize(self, dError_dy, lr = 0.01):
        #dError_dy dim is [batch_size, 1, num_fianl]
        if (self.notfinal):
            dError_dhidden = tf.batch_matmul(dError_dy, 
                                         tf.batch_matmul(self.backward, tf.matrix_diag(self.dactivation(self.before_activation))))
            delta_weights = tf.reduce_mean(tf.batch_matmul(self.inputs, dError_dhidden), 0)
            delta_biases = tf.reduce_mean(dError_dhidden, 0)
        else:
            delta_weights = tf.reduce_mean(tf.batch_matmul(self.inputs, dError_dy), 0)
            delta_biases = tf.reduce_mean(dError_dy, 0)
        change_weights = tf.assign_sub(self.weights, lr*delta_weights)
        change_biases = tf.assign_sub(self.biases, lr*tf.reshape(delta_biases,(self.num_output,)))
        return change_weights, change_biases

In [17]:
# hyper parameter setting

image_size = 28
batch_size = 100
valid_size = test_size = 10000
num_data_input = image_size*image_size
num_hidden = 1000
num_labels = 10
act_f = "relu" # Activation function
init_f = "uniform" # Initializing weights using init_f distribution. Recommend to use uniform.
back_init_f = "uniform" # Initializing direct feed back weights using back_init_f distribution. Recommend to use uniform.
weight_uni_range = 0.05
back_uni_range = 0.5
lr = 0.005
num_layer = 3 #should be >= 3
num_steps = 5000

In [18]:
graph = tf.Graph()

with graph.as_default():

    # Input data. For the training data, we use a placeholder that will be fed
    # at run time with a training minibatch.
    tf_train_dataset = tf.placeholder(tf.float32,
                                      shape=(batch_size, image_size * image_size))
    tf_train_labels = tf.placeholder(tf.float32, shape=(batch_size, num_labels))
    # tf_valid_dataset = tf.constant(valid_dataset)
    tf_test_dataset = tf.constant(test_digits)
    
    # model building
    Weight_list = {}

    name = "W0"
    Weight_list[name] = Weights(batch_size, num_data_input, num_hidden, num_labels, act_f, init_f, True, back_init_f, weight_uni_range, back_uni_range)

    for i in range(num_layer-3):
        name = "W" + str(i+1)
        Weight_list[name] = Weights(batch_size, num_hidden, num_hidden, num_labels, act_f, init_f, True, back_init_f, weight_uni_range, back_uni_range)

    name = "W" + str(num_layer-2)
    Weight_list[name] = Weights(batch_size, num_hidden, num_labels, num_labels, act_f, init_f, False, back_init_f, weight_uni_range, back_uni_range)

    y_train = None
    x_train = tf_train_dataset
    for i in range(num_layer-1):
        name = "W"+str(i)
        if (i != num_layer - 2):
            x_train = Weight_list[name](x_train, batch_size)
        else:
            y_train = Weight_list[name](x_train, batch_size)
    logits = y_train
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits, tf_train_labels)
    loss = tf.reduce_mean(cross_entropy)
    
    dError_dy = tf.reshape(tf.gradients(cross_entropy, logits)[0], [batch_size, 1, num_labels])
    
    # optimization
    train_list = []
    for i in range(num_layer-1):
        name = "W"+str(i)
        train_list += Weight_list[name].optimize(dError_dy, lr)

    '''y_valid = None
    x_valid = tf_valid_dataset
    for i in range(num_layer-1):
        name = "W"+str(i)
        if (i != num_layer - 2):
            x_valid = Weight_list[name](x_valid, valid_size)
        else:
            y_valid = Weight_list[name](x_valid, valid_size)
    logits_valid = y_valid'''
    
    y_test = None
    x_test = tf_test_dataset
    for i in range(num_layer-1):
        name = "W"+str(i)
        if (i != num_layer - 2):
            x_test = Weight_list[name](x_test, test_size)
        else:
            y_test = Weight_list[name](x_test, test_size)
    logits_test = y_test
    
    # Predictions for the training, validation, and test data.
    train_prediction = tf.nn.softmax(logits)
    # valid_prediction = tf.nn.softmax(logits_valid)
    test_prediction = tf.nn.softmax(logits_test)

In [None]:
with tf.Session(graph=graph) as session:
    tf.initialize_all_variables().run()
    print("Initialized")
    for step in range(1000):
      # Pick an offset within the training data, which has been randomized.
      # Note: we could use better randomization across epochs.
      # offset = (step * batch_size) % (labels.shape[0] - batch_size)
      # Generate a minibatch.
      batch_data, batch_labels  = mnist.train.next_batch(100)
      # Prepare a dictionary telling the session where to feed the minibatch.
      # The key of the dictionary is the placeholder node of the graph to be fed,
      # and the value is the numpy array to feed to it.
      feed_dict = {tf_train_dataset : batch_data, tf_train_labels : batch_labels}
      l, predictions = session.run([loss, train_prediction], feed_dict=feed_dict)
      session.run(train_list, feed_dict = feed_dict)
      if (step % 50 == 0):
        print("Minibatch loss at step %d: %f" % (step, l))
        print("Minibatch accuracy: %.1f%%" % accuracy(predictions, batch_labels))
        # print("Validation accuracy: %.1f%%" % accuracy(
        #   valid_prediction.eval(), valid_labels))
    print("Test accuracy: %.1f%%" % accuracy(test_prediction.eval(), test_labels))

Initialized
Minibatch loss at step 0: 2.293149
Minibatch accuracy: 11.0%
Minibatch loss at step 50: 2.167094
Minibatch accuracy: 32.0%
Minibatch loss at step 100: 1.804323
Minibatch accuracy: 65.0%
Minibatch loss at step 150: 1.480418
Minibatch accuracy: 64.0%
Minibatch loss at step 200: 1.159681
Minibatch accuracy: 74.0%
