In [46]:
import time 
import numpy as np
import ipyparallel as ipp
from cnn import CNN
import tensorflow as tf
from AggregatorFunc import snd_to_eng, wc_from_eng, global_update, bg_from_eng, belta_update, grad_update, delta_update, binary_search

In [13]:
c = ipp.Client()

In [14]:
dview = c[:]

In [15]:
c.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [60]:
def parameter_shape(gradient):
    '''
    Record the parameters original shape
    Input: the return value of optimizer.compute_gradients (many matrices)
    Output: A list of shape
    
    '''
    gradient = np.array(gradient)
    #shape_list record parameter shapes of each layer
    shape_list = []
    for i in range(gradient.shape[0]):
        grad_temp = gradient[i,0].flatten()
        shape_list.append(['Layer_'+str(i+1), gradient[i,0].shape, grad_temp.shape[0]])
        
    return shape_list

In [61]:
def batch_gradient_collector(gradient):
    '''
    Collect the gradient of each batch
    Input: the return value of optimizer.compute_gradients (many matrices)
    Output: the sum of gradients within one epoch as a vector
    
    '''
    gradient = np.array(gradient)
    #shape_list record parameter shapes of each layer
    gradient_vector = []
    for i in range(gradient.shape[0]):
        grad_temp = gradient[i,0].flatten()
        gradient_vector.append(grad_temp)
        
    return np.array(gradient_vector)   

In [62]:
def batch_parameter_collector(gradient):
    '''
    Collect the gradient of each batch
    Input: the return value of optimizer.compute_gradients (many matrices)
    Output: the sum of gradients within one epoch as a vector
    
    '''
    gradient = np.array(gradient)
    #shape_list record parameter shapes of each layer
    parameter_vector = []
    for i in range(gradient.shape[0]):
        grad_temp = gradient[i,1].flatten()
        parameter_vector.append(grad_temp)
        
    return np.array(parameter_vector)

In [63]:
def create_new_conv_layer(input_data, num_input_channels, num_filters, filter_shape, pool_shape, name):
    # setup the filter input shape for tf.nn.conv_2d
    conv_filt_shape = [filter_shape[0], filter_shape[1], num_input_channels,
                      num_filters]

    # initialise weights and bias for the filter
    weights = tf.Variable(tf.truncated_normal(conv_filt_shape, stddev=0.03),
                                      name=name+'_W')
    bias = tf.Variable(tf.truncated_normal([num_filters]), name=name+'_b')

    # setup the convolutional layer operation
    out_layer = tf.nn.conv2d(input_data, weights, [1, 1, 1, 1], padding='SAME')

    # add the bias
    out_layer += bias

    # apply a ReLU non-linear activation
    out_layer = tf.nn.relu(out_layer)

    # now perform max pooling
    ksize = [1, pool_shape[0], pool_shape[1], 1]
    strides = [1, 2, 2, 1]
    out_layer = tf.nn.max_pool(out_layer, ksize=ksize, strides=strides, 
                               padding='SAME')

    return out_layer

In [64]:
class AdamOptimizer_Bing(AdamOptimizer):
    def __init__(self, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8,
               use_locking=False, name="Adam"):

        super(AdamOptimizer, self).__init__(use_locking, name)
        self._lr = learning_rate
        self._beta1 = beta1
        self._beta2 = beta2
        self._epsilon = epsilon

        # Tensor versions of the constructor arguments, created in _prepare().
        self._lr_t = None
        self._beta1_t = None
        self._beta2_t = None
        self._epsilon_t = None

        # Created in SparseApply if needed.
        self._updated_lr = None
    
    def minimize(self, loss, global_step=None, var_list=None,
               gate_gradients=1, aggregation_method=None,
               colocate_gradients_with_ops=False, name=None,
               grad_loss=None):
        """
        The same as function minimize, but return the result of compute_gradients
        Created by: Big Bing in 7/28 
        Purpose: To realize parallel computing(communicate gradient)
        """
        grads_and_vars = self.compute_gradients(
            loss, var_list=var_list, gate_gradients=gate_gradients,
            aggregation_method=aggregation_method,
            colocate_gradients_with_ops=colocate_gradients_with_ops,
            grad_loss=grad_loss)[-8:]

        vars_with_grad = [v for g, v in grads_and_vars if g is not None]
        if not vars_with_grad:
          raise ValueError(
              "No gradients provided for any variable, check your graph for ops"
              " that do not support gradients, between variables %s and loss %s." %
              ([str(v) for _, v in grads_and_vars], loss))
        #self.apply_gradients(grads_and_vars, global_step=global_step, name=name)

        return self.apply_gradients(grads_and_vars, global_step=global_step, name=name),grads_and_vars

In [16]:
import os
dview.map(os.chdir, ['D:/GitHub/Parallel/classifier']*len(c.ids))
print(dview.apply_sync(os.getcwd))

['D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier', 'D:\\GitHub\\Parallel\\classifier']


In [18]:
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)

Extracting MNIST_data/train-images-idx3-ubyte.gz
Extracting MNIST_data/train-labels-idx1-ubyte.gz
Extracting MNIST_data/t10k-images-idx3-ubyte.gz
Extracting MNIST_data/t10k-labels-idx1-ubyte.gz


In [59]:
#sending date to engines
for i in range(len(c.ids)):
    c[i]['mnist'] = mnist

In [None]:
t = 0
s = 0
b = 0 # aggregator consumption
R = 15

# Python optimisation variables
batch_size = 50
layer_size = [[1, 32, [5, 5], [2, 2]],
                [32, 64, [5, 5], [2, 2]],
                1000,
                10]

data_size = np.array([15000,15000])
gamma = 10
phi = 0.2
torque_aggregator = 1
stop = False

In [None]:
# Adjustment needed when change data
parameter_length_list = [32*5*5*1,32,32*64*5*5,64,1000*7*7*64,1000,1000*10,10]
w_aggregator = cnn_parameter_initial(parameter_length_list)

In [None]:
cnn = CNN(layer_size)
dview.push(dict(CNN = CNN))
#sending LinearRegression object to engines
dview['cnn'] = cnn

In [None]:
while True:
        tic = time.time()
        snd_to_eng(w_aggregator,torque_aggregator,c)
        t_0 = t
        t = t + torque_aggregator
        dview.execute("""
import numpy as np
import time    
cnn.Rec_from_Agg(w_aggregator, torque_aggregator)
cnn.time_record()
if cnn.t > 0:
    cnn.Est_Belta(mnist)
svm.fit(mnist)
        """)
        
        
        w_local, c_local = wc_from_eng(c,'cnn')
        w_local = flatten_matrix(w_local)
        #calculate local consumption per iteration
        c_per = np.array(c_local)/torque_aggregator
        w_aggregator = global_update(w_local, data_size)
        w_aggregator = wrangle_matrix(w_aggregator, parameter_length_list)
        if stop:
            w_final = w_aggregator
            break
        #c_local.sum() equal to c*t
        s = s + np.array(c_local).sum()+ b
        
        if t_0 > 0:
            belta_local, grad_local = bg_from_eng(c,'cnn')
            grad_local = flatten_matrix(grad_local)
            belta_aggregator = belta_update(belta_local,data_size)
            grad_aggregator = grad_update(grad_local,data_size)
            delta_aggregator = delta_update(grad_local, grad_aggregator, data_size)
            torque_aggregator, G_list = binary_search(torque_aggregator, delta_aggregator, belta_aggregator, gamma, phi)
            print('New torque is:',torque_aggregator)
        
        toc = time.time()
        b = toc - tic
        temp = s + torque_aggregator*c_per.sum() + b
        if temp >= R:
            torque_max = (R-b-s)/c_per.sum()
            G_list = np.array(G_list)
            G_min = G_list.min()
            for i, item in enumerate(G_list):
                if item >= torque_max:
                    itme = G_min
            torque_aggregator = np.argmax(G_list) + 1
            stop = True 

In [95]:
#The padding type is in consistent with pooling strided defined in create_new_conv_layer()
def conv_output_size(input_size, filter_size, stride, padding = 'Same'):
    if padding == 'Same':
        output_size = input_size
    else:
        output_size = int((input_size - filter_size)/stride) + 1
    return output_size
#The strides of pooling is defaulted to be 2 in consistent with pooling strided defined in create_new_conv_layer()
def pooling_output_size(input_size, filter_size, stride=2):
    return int((input_size - filter_size)/stride) + 1

In [1]:
def flatten_matrix(matrix):
    '''
    Flatten parameter matrix recieve from classifier objects.
    Parameter recieved from cnn object have a shape (#number of parameter matrix, # of paramter each matrix)
    
    matrix: paramter and gradient matrix of cnn
    output: a vector 
    '''
    temp = []
    for i in range(len(matrix)):
        temp.extend(matrix[i])
    return temp   

In [2]:
def wrangle_matrix(vector, parameter_length_list):
    '''
    Reverse function of flatten_matrix.
    vector to matrix
    Var:
        vector: A vector like parameter list
        parameter_length_list: A list that saves parameter length of each layer
    '''
    matrix = []
    flag = 0
    for i in range(len(parameter_length_list)):
            temp = vector[flag:parameter_length_list[i]]
            matrix.append(temp)
            flag += parameter_length_list[i]
    return np.array(matrix)