# Data parallelism: Exercise

For this exercise we will be build upon last week's vanilla gradient descent example. Included in the next codebox are functions to perform feedforward and backprop on a single minibatch. The computeMinibatchGradientsTuple() function is the same as the computeMinibatchGradients() function, but its inputs in a single tuple will make using Python's ThreadPool easier later on.

You don’t need to do modify this first block of code. 

If you do not have scikit-learn then you can get it here: https://scikit-learn.org/stable/install.html

In [0]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import time

import os

os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OMP_NUM_THREADS'] = '1'

# In order to run this in class, we're going to reduce the dataset by a factor of 5
X, y = fetch_openml('mnist_784', version=1, return_X_y=True)
X = X[::5]
y = y.astype(int)[::5]
X, X_test, y, y_test = train_test_split(X, y)

# Here we specify the size of our neural network.
# We are mapping from 784 to 10 with 256 hiden layer nodes.

m = len(X)
n_0 = 784
n_1 = 256
N = 10


# Function to convert categorical labels into one-hot matrix.
def convert_to_one_hot(y, n_classes):
    T = np.zeros((y.shape[0], n_classes))
    for t, yy in zip(T, y):
        t[yy] = 1
    return T


# Convert the data to one hot notation
one_hot_y_actual = convert_to_one_hot(y, N)
one_hot_y_test = convert_to_one_hot(y_test, N)


# Sigmoid function (activation)
def sigmoid(a):
    return 1. / (1 + np.exp(-a))


# Softmax function (final layer for classification)
def softmax(A):
    numerator = np.exp(A)
    denominator = numerator.sum(axis=1)
    return numerator / denominator[:, np.newaxis]


# Categorical cross-entropy
def L(T, S, W1, W2, alpha_1=1e-2, alpha_2=1e-5):
    return -1. / len(T) * np.sum(T * np.log(S)) + np.sum(0.5 * alpha_1 * W1 ** 2) + np.sum(0.5 * alpha_2 * W2 ** 2)


# Run the neural network forward, given some weights and biases
def feedforward(X, W1, W2, b1, b2):
    # Feedforward
    A1 = X @ W1 + b1
    Z1 = sigmoid(A1)
    A2 = Z1 @ W2 + b2
    y_pred = softmax(A2)
    return y_pred, Z1


# Compute the neural network gradients using backpropagation
def backpropogate(y_pred, Z1, X, y_obs, alpha_1=1e-2, alpha_2=1e-5):
    # Backpropogate
    delta_2 = (1. / len(y_pred)) * (y_pred - y_obs)
    grad_W2 = Z1.T @ delta_2 + alpha_2 * W2
    grad_b2 = delta_2.sum(axis=0)

    delta_1 = delta_2 @ W2.T * Z1 * (1 - Z1)
    grad_W1 = X.T @ delta_1 + alpha_1 * W1
    grad_b1 = delta_1.sum(axis=0)
    return grad_W1, grad_W2, grad_b1, grad_b2


def mini_batch(x_sample, y_sample, start_batch_size):
    """
    Takes a copy of x_sample and y_sample and returns mini batch matrices of both and number of batches
    """

    # Batches must divide evenly into total number of samples for numpy arrays to be happy.
    # Gets number of bathes by finding next smallest number that evenly divides
    num_batches = start_batch_size
    while len(x_sample) % num_batches != 0:
        num_batches -= 1

    # randomly shuffle indices
    np.random.seed(42)
    random_indices = np.random.choice(range(len(x_sample)), len(x_sample), replace=False)

    # instantiate lists to hold batches
    x_list = [[] for i in range(num_batches)]
    y_list = [[] for i in range(num_batches)]

    # populate batches matrix with random mini batch indices
    for i in range(len(x_sample)):

        x_list[i // 105].append(x_sample[random_indices[i]])
        y_list[i // 105].append(y_sample[random_indices[i]])
    
    # Convert to numpy arrays
    x_batch = np.array(x_list)
    y_batch = np.array(y_list)

    return x_batch, y_batch, num_batches, num_batches


#computes the gradients of a single minibatch
def computeMinibatchGradients(W1, W2, b1, b2, x_batch, y_batch):
    y_pred, Z1 = feedforward(x_batch, W1, W2, b1, b2)
    """
    These are your gradients with respect to weight matrices W1 and W2 
    as well as your biases b1 and b2
    """
    grad_W1, grad_W2, grad_b1, grad_b2 = backpropogate(y_pred, Z1, x_batch, y_batch)
    
    return grad_W1, grad_W2, grad_b1, grad_b2

#computes the gradients of a single minibatch
def computeMinibatchGradientsTuple(inputTuple):
    W1, W2, b1, b2, x_batch, y_batch = inputTuple
    y_pred, Z1 = feedforward(x_batch, W1, W2, b1, b2)
    """
    These are your gradients with respect to weight matrices W1 and W2 
    as well as your biases b1 and b2
    """
    grad_W1, grad_W2, grad_b1, grad_b2 = backpropogate(y_pred, Z1, x_batch, y_batch)
    
    return grad_W1, grad_W2, grad_b1, grad_b2


# Vanilla Gradient Descent

This next codebox should look familiar; it performs vanilla gradient descent. You don't need to change this codebox, either. Run this, and notice that it now also prints out the time taken to evaluate each epoch. We'll use these times to evaluate how much of a speedup data parallelism will give us in a simple multithreading environment.

In [0]:
"""
Vanilla Gradient Descent
"""

# Hyper Parameters
eta = 1e-3
initial_batch_size = 104
epochs = 250

# Initialize random parameter matrices
np.random.seed(42)
W1 = 0.001 * np.random.randn(n_0, n_1)
W2 = 0.001 * np.random.randn(n_1, N)

b1 = 0.1 * np.random.randn(1, n_1)
b2 = 0.1 * np.random.randn(1, N)

# data for analysis
vanilla_loss = []


# Perform gradient descent
for i in range(epochs):
    epochStartTime = time.time()
    
    # generate mini batches
    x_batches, y_batches, num_batches, actual_batch_size = mini_batch(X, one_hot_y_actual, initial_batch_size)

    # perform gradient descent on mini batches
    for j in range(0, num_batches):
        
        grad_W1, grad_W2, grad_b1, grad_b2 = computeMinibatchGradients(W1, W2, b1, b2, x_batches[j], y_batches[j])
        '''
        use the gradients to update weights and biases
        '''
        W1 -= eta * grad_W1
        W2 -= eta * grad_W2
        b1 -= eta * grad_b1
        b2 -= eta * grad_b2


    # calc loss at end of each epoch
    y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
    vanilla_loss.append(L(one_hot_y_actual, y_entire_pred, W1, W2))
    
    #find the time taken to compute the epoch
    epochTimeTaken = (time.time() - epochStartTime)*1000

    # Print some summary statistics every ten iterations
    if i % 10 == 0:
        y_pred_test, Z1_test = feedforward(X_test, W1, W2, b1, b2)
        acc = sum(y_test == np.argmax(y_pred_test, axis=1)) / len(y_test)
        y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
        print("Epoch %d Loss %f Accuracy %f time taken %d ms" % (i, L(one_hot_y_actual, y_entire_pred, W1, W2), acc, epochTimeTaken))



Epoch 0 Loss 2.168636 Accuracy 0.624571 time taken 1005 ms
Epoch 10 Loss 0.777884 Accuracy 0.894286 time taken 997 ms
Epoch 20 Loss 0.460583 Accuracy 0.917429 time taken 1023 ms
Epoch 30 Loss 0.338518 Accuracy 0.922857 time taken 1016 ms
Epoch 40 Loss 0.273251 Accuracy 0.928000 time taken 974 ms
Epoch 50 Loss 0.231777 Accuracy 0.929429 time taken 1022 ms
Epoch 60 Loss 0.202757 Accuracy 0.932286 time taken 1011 ms
Epoch 70 Loss 0.181362 Accuracy 0.932286 time taken 1014 ms
Epoch 80 Loss 0.164755 Accuracy 0.933714 time taken 1017 ms
Epoch 90 Loss 0.151529 Accuracy 0.934571 time taken 1005 ms
Epoch 100 Loss 0.140723 Accuracy 0.934857 time taken 1005 ms
Epoch 110 Loss 0.131645 Accuracy 0.936000 time taken 1011 ms
Epoch 120 Loss 0.123826 Accuracy 0.936000 time taken 1006 ms
Epoch 130 Loss 0.117152 Accuracy 0.936571 time taken 1021 ms
Epoch 140 Loss 0.111325 Accuracy 0.936571 time taken 1016 ms
Epoch 150 Loss 0.106220 Accuracy 0.936857 time taken 1021 ms
Epoch 160 Loss 0.101701 Accuracy 0.93

# Updating Vanilla Gradient Descent with Data Parallelism

Now that we have some baseline timings, we're going to be updating this example to employ data parallelism. The Ben-Nun et.al. paper mainly focuses on parallelism in a distributed computing environment, but using a library like MPI for distributed parallelism would be well outside the scope of these assignments, so we're going to using Python's multiprocessing package to perform data parallelism with a ThreadPool

First, read the documentation on python's Pool class, located here:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing

We're going to be using the Pool's faster (and less documented) cousin, the ThreadPool. The Pool and ThreadPool have the same interface, but while Pool uses a single thread, trading it between the pool's workers, the ThreadPool actually spins up multiple instances of the Python interpreter in different threads to perform true parallel computation.

The next codeblock uses the ThreadPool's map function to give each process a different minibatch in parallel. 


1.
On line 60, use the ThreadPool's map function to parallelize the gradient calculation for each of the parallel batches.

2.
We will need to average the gradients returned from each parallel batch in order to perform gradient descent, but the thread pool returns a list of the list of each batch's gradients. To make averaging the gradients easier, line 58 uses the zip function to make a new list such that the first element in the list contains all the W1 gradients, the second element contains all the W2 gradients, etc. On lines 59-62, use the np.mean function to average all W1, W2, b1, and b2 gradients, and use those averages to update W1, W2, b1, and b2.


In [0]:
"""
Vanilla Gradient Descent with Data Parallelism
"""

#import the ThreadPool
from multiprocessing.pool import ThreadPool


# Hyper Parameters
eta = 1e-3
initial_batch_size = 100
epochs = 100

#add additional hyperparameters related to the data parallelism
threads_in_pool = 4
parallel_batches = 10

#create the thread pool
pool = ThreadPool(processes=threads_in_pool) 


# Initialize random parameter matrices
np.random.seed(42)
W1 = 0.001 * np.random.randn(n_0, n_1)
W2 = 0.001 * np.random.randn(n_1, N)

b1 = 0.1 * np.random.randn(1, n_1)
b2 = 0.1 * np.random.randn(1, N)

# data for analysis
vanilla_loss = []

# Perform gradient descent
for i in range(epochs):
    epochStartTime = time.time()
    
    # generate mini batches
    x_batches, y_batches, num_batches, actual_batch_size = mini_batch(X, one_hot_y_actual, initial_batch_size)
    
    
    # perform gradient descent on mini batches
    for j in range(0, num_batches, parallel_batches):
        
        #create the list of inputs for the pool threads
        #this might look weird, but by creating a list of tuples, the input data can be easily given to
        #each worker thread in the ThreadPool
        minibatchGradientInputLists = []
        for k in range(parallel_batches):
            minibatchGradientInputLists.append((W1, W2, b1, b2, x_batches[j+k], y_batches[j+k]))
        
        #TODO: use the ThreadPool's map function to compute minibatch gradients in parallel.
        gradientOutputs = pool.map(computeMinibatchGradientsTuple, minibatchGradientInputLists)
        
        
        '''
        use the gradients to update weights and biases
        '''
        gradients = list(zip(*gradientOutputs))
       # print(len(gradients), len(gradients[0]))
        W1 -= eta * np.mean(gradients[0], axis=0)
        W2 -= eta * np.mean(gradients[1], axis=0)
        b1 -= eta * np.mean(gradients[2], axis=0)
        b2 -= eta * np.mean(gradients[3], axis=0)

    # calc loss at end of each epoch
    y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
    vanilla_loss.append(L(one_hot_y_actual, y_entire_pred, W1, W2))
    
    #find the time taken to compute the epoch
    epochTimeTaken = (time.time() - epochStartTime) * 1000

    # Print some summary statistics every ten iterations
    if i % 10 == 0:
        y_pred_test, Z1_test = feedforward(X_test, W1, W2, b1, b2)
        acc = sum(y_test == np.argmax(y_pred_test, axis=1)) / len(y_test)
        y_entire_pred, Z1 = feedforward(X, W1, W2, b1, b2)
        print("Epoch %d Loss %f Accuracy %f time taken %d ms" % (i, L(one_hot_y_actual, y_entire_pred, W1, W2), acc, epochTimeTaken))

#kill the pool so it doesn't hang around without getting garbage collected
pool.terminate()

Epoch 0 Loss 2.297578 Accuracy 0.097714 time taken 1631 ms
Epoch 10 Loss 2.147794 Accuracy 0.644857 time taken 1404 ms
Epoch 20 Loss 1.916114 Accuracy 0.667714 time taken 1388 ms
Epoch 30 Loss 1.690979 Accuracy 0.708286 time taken 1460 ms
Epoch 40 Loss 1.495940 Accuracy 0.741714 time taken 1346 ms
Epoch 50 Loss 1.331413 Accuracy 0.782571 time taken 1384 ms
Epoch 60 Loss 1.193276 Accuracy 0.828857 time taken 1313 ms
Epoch 70 Loss 1.077234 Accuracy 0.853429 time taken 1479 ms
Epoch 80 Loss 0.979588 Accuracy 0.870857 time taken 1366 ms
Epoch 90 Loss 0.897108 Accuracy 0.884857 time taken 1428 ms


# Performance assessment and questions

Now that your data parallel implementation is finished, play around with the threads_in_pool and parallel_batches hyperparameters, and answer the following questions.

1. How does the speed of the data parallel implementation compare to the non-parallelized version.


2. Adjusting the threads_in_pool and parallel_batches hyperparameters, where do you see the most improvement in speed? When does increasing these hyperparameters stop making the computation faster?


3. Section 3 of the paper discusses Generalization in the context of statistical accuracy. How does the generalization issue relate to the parallel_batches hyperparameter?


4. Using a library like mpi4py, we could take the local, thread-parallel approach and do it in a true distributed environment. If the computeMinibatchGradients function was being run on different processors in a distributed system, what data would you have to send to the processors for each minibatch? What information would these distributed processors need to send back?


5. As we discussed in class on Tuesday, model parallelism involves splitting up a network between processors such that different portions of the same layer might be computed on different processors. Knowing that the example network is comprised of two full-connected layers, what changes would you have to make to the code to be able to employ model parallelism. (Note, actually doing this would be an enormous amount of work, but think critically about which parts of the network would need to be rewritten to achieve model parallelism.) 


6. Pipeline parallelism involves splitting up a network between processors such that each processor is responsible for one or more contiguous operators. How might you change the example to perform pipeline-parallelism? Would this be easier to implement than model parallelism, or harder?


7. If a pipeline-parallel network such as the one from the previous question was implemented, how would data quantization help improve performance in a distributed environment?

##answers

1. The parallel version is slightly slower under default parameters (4 threads, 4 minibatch), and the accuracy increased at a slightly slower rate when compared to the nonparallel version, converging to a final accuracy of 0.934000.

2. Running with 16 threads was faster than running with 4 threads, however, neither was faster than the single threaded version

3. Increasing the parallel batches decreased the rate of learning. This is because we are effectively increasing the minibatch size. In theory it should allow for slightly better generalization.

4. Each process would need to be sent the weights to the network. Each process would have to send back their gradient updates

5. A model parallel implemenation of fully connected layers would likely use parallel algorithms for matrix multiplication. This has the potential to speed up very large fully connected layers, but comes with a significant amount of overhead and additional programming work

6. You would have each process compute an entire layer. To implement this, you would create a pipeline where each layer is given a piece of data, and then the output from that layer is given to the next layer/process on the next cycle. This seems algorithmically easier to implement than model parallelism.

7. Quantizing the data would greatly decrease the amount of data that would need to be sent between processes.