## mpi4py example (https://www.youtube.com/watch?v=36nCgG40DJo&t=1215s):
allreduce functions listed: https://education.molssi.org/parallel-programming/03-distributed-examples-mpi4py/index.html

In [None]:
from mpi4py import MPI
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
print("my_rank =",my_rank)
p = comm.Get_size()
print("p =",p)

In [None]:
if my_rank != 0:
    message = "Hello from "+str(my_rank)
    comm.send(message, dest=0)
else:
    for procid in range(1,p):
        message = comm.recv(source=procid)
        print("process 0 receives message from process", procid,":",message)

#### With all reduce:

In [None]:
if my_rank == 0:
    data = [1,2,3]
    all_data_sum = comm.allreduce(data, op=MPI.SUM)
    print("process ",my_rank,"has computed sum =", all_data_sum)
else:
    data = [4,5,6]
    all_data_sum = comm.allreduce(data, op=MPI.SUM)
    print("process ",my_rank,"has computed sum =", all_data_sum)

Call with:
mpiexec -n 4 python script.py

## MapReduce example (https://www.youtube.com/watch?v=EmHc9hV5Xi8):

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep

In [None]:
class RatingsBreakdown(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_ratings,
                  reducer=self.reducer_count_ratings)
        ]
    
    def mapper_get_ratings(self, _, line):
        (userID, movieID, rating, timestamp) = line.split('\t')
        return rating, 1
    
    def reducer_count_ratings(self, key, values):
        return key, sum(values)
    
#if __name__ = '__main__':
RatingsBreakdown.run()

## Multiprocessing async SGD for training a linear regression model example:

In [24]:
import scipy.sparse
import numpy as np
from multiprocessing.sharedctypes import Array
from ctypes import c_double
from multiprocessing import Pool

In [9]:
n=10 # number of features
m=20000 # number of training examples

X = scipy.sparse.random(m,n, density=.2).toarray() # Guarantees sparse grad updates
real_w = np.random.uniform(0,1,size=(n,1)) # Define our true weight vector

X = X/X.max() # Normalizing for training

y = np.dot(X,real_w)

print("Size training data:", X.shape)
print("Size training labels:", y.shape)

Size training data: (20000, 10)
Size training labels: (20000, 1)


First, we need to define the weight vector w in shared memory that can be accessed without locks. We will have to use the “sharedctypes.Array” class from multiprocessing for this functionality. We will further use numpy’s frombuffer function to make it accessible from a numpy array.

In [12]:
coef_shared = Array(c_double, 
        (np.random.normal(size=(n,1)) * 1./np.sqrt(n)).flat,
        lock=False) # Hogwild!
w = np.frombuffer(coef_shared)
w = w.reshape((n,1)) 
print("size accessible w:", w.shape)

size accessible w: (10, 1)


Our ultimate goal is to perform the gradient update in parallel. To do this, we need to define what this update is:

In [14]:
# The calculation has been adjusted to allow for mini-batches
learning_rate = .001
def mse_gradient_step(X_y_tuple):
    global w # Only for instructive purposes!
    X, y = X_y_tuple # Required for how multiprocessing.Pool.map works
    
    # Calculate the gradient
    err = y.reshape((len(y),1))-np.dot(X,w)
    grad = -2.*np.dot(np.transpose(X),err)/ X.shape[0]

    # Update the nonzero weights one at a time
    for index in np.where(abs(grad) > .01)[0]:
        coef_shared[index] -= learning_rate*grad[index,0]

Preparing the examples for multiprocessing.Pool: We are going to have to cut the training examples into tuples, one per example, to pass to our workers. We will also be reshaping them to adhere to the way the gradient step is written above. Code is included to allow you to use mini-batches via the batch_size variable.

In [15]:
batch_size=1
examples=[None]*int(X.shape[0]/float(batch_size))
for k in range(int(X.shape[0]/float(batch_size))):
    Xx = X[k*batch_size : (k+1)*batch_size,:].reshape((batch_size,X.shape[1]))
    yy = y[k*batch_size : (k+1)*batch_size].reshape((batch_size,1))
    examples[k] = (Xx, yy)

The Asynchronous Bit:

In [None]:
p = Pool(5)  
p.map(mse_gradient_step, examples)

print('Loss function on the training set:', np.mean(abs(y-np.dot(X,w))))
print('Difference from the real weight vector:', abs(real_w-w).sum())