Skip to content

Commit

Permalink
Merge pull request #30 from ummavi/dynamic_scheduling_mpi
Browse files Browse the repository at this point in the history
MPI - Dynamic Scheduling
  • Loading branch information
mschoengens committed Nov 21, 2017
2 parents 8fba70d + cf201f6 commit 2758fa5
Showing 1 changed file with 116 additions and 66 deletions.
182 changes: 116 additions & 66 deletions abcpy/backends/mpi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as np
import cloudpickle
import time
import pickle

from mpi4py import MPI
Expand All @@ -17,8 +18,19 @@ class BackendMPIMaster(Backend):
OP_PARALLELIZE, OP_MAP, OP_COLLECT, OP_BROADCAST, OP_DELETEPDS, OP_DELETEBDS, OP_FINISH = [1, 2, 3, 4, 5, 6, 7]
finalized = False

def __init__(self, master_node_ranks=[0]):

def __init__(self, master_node_ranks=[0],chunk_size=1):
"""
Parameters
----------
master_node_ranks: Python list
list of ranks computation should not happen on.
Should include the master so it doesn't get
overwhelmed with work.
chunk_size: Integer
size of one block of data to be sent to free
executors
"""
self.comm = MPI.COMM_WORLD
self.size = self.comm.Get_size()
self.rank = self.comm.Get_rank()
Expand All @@ -31,10 +43,17 @@ def __init__(self, master_node_ranks=[0]):

#Initialize a BDS store for both master & slave.
self.bds_store = {}
self.pds_store = {}

#Initialize a store for the pds data that
#.. hasn't been sent to the workers yet
self.pds_pending_store = {}

self.chunk_size = chunk_size


def __command_slaves(self, command, data):
"""
"""Tell slaves to enter relevant execution block
This method handles the sending of the command to the slaves
telling them what operation to perform next.
Expand All @@ -55,7 +74,8 @@ def __command_slaves(self, command, data):
elif command == self.OP_MAP:
#In map we receive data as (pds_id,pds_id_new,func)
#Use cloudpickle to dump the function into a string.
function_packed = cloudpickle.dumps(data[2], pickle.HIGHEST_PROTOCOL)
# function_packed = self.__sanitize_and_pack_func()
function_packed = cloudpickle.dumps(data[2],pickle.HIGHEST_PROTOCOL)
data_packet = (command, data[0], data[1], function_packed)

elif command == self.OP_BROADCAST:
Expand All @@ -75,6 +95,7 @@ def __command_slaves(self, command, data):
_ = self.comm.bcast(data_packet, root=0)



def __generate_new_pds_id(self):
"""
This method generates a new pds_id to associate a PDS with it's remote counterpart
Expand Down Expand Up @@ -130,21 +151,61 @@ def parallelize(self, python_list):
pds_id = self.__generate_new_pds_id()
self.__command_slaves(self.OP_PARALLELIZE, (pds_id,))

#Initialize empty data lists for the processes on the master node
rdd_masters = [[] for i in range(len(self.master_node_ranks))]

#Split the data only amongst the number of workers
rdd_slaves = np.array_split(python_list, self.size - len(self.master_node_ranks), axis=0)
#Don't send any data. Just keep it as a queue we're going to pop.
self.pds_store[pds_id] = list(python_list)

#Combine the lists into the final rdd before we split it across all ranks.
rdd = rdd_masters + rdd_slaves

data_chunk = self.comm.scatter(rdd, root=0)

pds = PDSMPI(data_chunk, pds_id, self)
pds = PDSMPI([], pds_id, self)

return pds

def orchestrate_map(self,pds_id):
"""Orchestrates the slaves/workers to perform a map function
This works by keeping track of the workers who haven't finished executing,
waiting for them to request the next chunk of data when they are free,
responding to them with the data and then sending them a Sentinel
signalling that they can exit.
"""
is_map_done = [True if i in self.master_node_ranks else False for i in range(self.size)]
status = MPI.Status()

#Copy it to the pending. This is so when master accesses
#the PDS data it's not empty.
self.pds_pending_store[pds_id] = list(self.pds_store[pds_id])

#While we have some ranks that haven't finished
while sum(is_map_done)<self.size:
#Wait for a reqest from anyone
data_request = self.comm.recv(
source=MPI.ANY_SOURCE,
tag=MPI.ANY_TAG,
status=status,
)
request_from_rank = status.source

if data_request!=pds_id:
print("Ignoring stale PDS data request from",
request_from_rank,":",data_request,"/",pds_id)
continue

#Pointer so we don't have to keep doing dict lookups
current_pds_items = self.pds_pending_store[pds_id]
num_current_pds_items = len(current_pds_items)

#Everyone's already exhausted all the data.
# Send a sentinel and mark the node as finished
if num_current_pds_items == 0:
self.comm.send(None, dest=request_from_rank, tag=pds_id)
is_map_done[request_from_rank] = True
else:
#Create the chunk of data to send. Pop off items and tag them with an id.
# so we can sort them later
chunk_to_send = []
for i in range(self.chunk_size):
chunk_to_send+=[(num_current_pds_items-i,current_pds_items.pop())]

self.comm.send(chunk_to_send, dest=request_from_rank, tag=pds_id)

def map(self, func, pds):
"""
Expand Down Expand Up @@ -175,9 +236,9 @@ def map(self, func, pds):
data = (pds_id, pds_id_new, func)
self.__command_slaves(self.OP_MAP, data)

rdd = list(map(func, pds.python_list))
self.orchestrate_map(pds_id)

pds_res = PDSMPI(rdd, pds_id_new, self)
pds_res = PDSMPI([], pds_id_new, self)

return pds_res

Expand All @@ -201,15 +262,22 @@ def collect(self, pds):
# Tell the slaves to enter collect with the pds's pds_id
self.__command_slaves(self.OP_COLLECT, (pds.pds_id,))

python_list = self.comm.gather(pds.python_list, root=0)
all_data = self.comm.gather(pds.python_list, root=0)

#Initialize lists to accumulate results
all_data_indices,all_data_items = [],[]

for node_data in all_data:
for item in node_data:
all_data_indices+=[item[0]]
all_data_items+=[item[1]]

#Sort the accumulated data according to the indices we tagged
#them with when distributing
rdd_sorted = [all_data_items[i] for i in np.argsort(all_data_indices)]


# When we gather, the results are a list of lists one
# .. per rank. Undo that by one level and still maintain multi
# .. dimensional output (which is why we cannot use np.flatten)
combined_result = []
list(map(combined_result.extend, python_list))
return combined_result
return rdd_sorted


def broadcast(self, value):
Expand Down Expand Up @@ -274,7 +342,7 @@ def __del__(self):


class BackendMPISlave(Backend):
"""Defines the behavior of the slaves processes
"""Defines the behavior of the slaves/worker processes
This class defines how the slaves should behave during operation.
Slaves are those processes(not nodes like Spark) that have rank!=0
Expand Down Expand Up @@ -331,8 +399,8 @@ def slave_run(self):
if op == self.OP_PARALLELIZE:
pds_id = data[1]
self.__rec_pds_id = pds_id
pds = self.parallelize([])
self.pds_store[pds.pds_id] = pds
pds_id, pds_id_new = self.__get_received_pds_id()
self.pds_store[pds_id] = None


elif op == self.OP_MAP:
Expand All @@ -341,14 +409,10 @@ def slave_run(self):

#Use cloudpickle to convert back function string to a function
func = cloudpickle.loads(function_packed)
#Set the function's backend to current class
#so it can access bds_store properly
# func.backend = self


# Access an existing PDS
pds = self.pds_store[pds_id]
pds_res = self.map(func, pds)
#Enter the map so we can grab data and perform the func.
#Func sent before and not during for performance reasons
pds_res = self.map(func)

# Store the result in a newly gnerated PDS pds_id
self.pds_store[pds_res.pds_id] = pds_res
Expand Down Expand Up @@ -388,38 +452,11 @@ def __get_received_pds_id(self):
return self.__rec_pds_id, self.__rec_pds_id_result


def parallelize(self, python_list):
"""
This method distributes the list on the available workers and returns a
reference object.
The list is split into number of workers many parts as a numpy array.
Each part is sent to a separate worker node using the MPI scatter.

SLAVE: python_list should be [] and is ignored by the scatter()
def parallelize(self):
pass

Parameters
----------
list: Python list
the list that should get distributed on the worker nodes
Returns
-------
PDSMPI class (parallel data set)
A reference object that represents the parallelized list
"""

#Get the PDS id we should store this data in
pds_id, pds_id_new = self.__get_received_pds_id()

data_chunk = self.comm.scatter(None, root=0)

pds = PDSMPI(data_chunk, pds_id, self)

return pds


def map(self, func, pds):
def map(self, func):
"""
A distributed implementation of map that works on parallel data sets (PDS).
Expand All @@ -429,19 +466,31 @@ def map(self, func, pds):
----------
func: Python func
A function that can be applied to every element of the pds
pds: PDS class
A parallel data set to which func should be applied
Returns
-------
PDSMPI class
a new parallel data set that contains the result of the map
"""

map_start = time.time()

#Get the PDS id we operate on and the new one to store the result in
pds_id, pds_id_new = self.__get_received_pds_id()

rdd = list(map(func, pds.python_list))
rdd = []
while True:
#Ask for a chunk of data since it's free
data_chunks = self.comm.sendrecv(pds_id, 0, pds_id)

#If it receives a sentinel, it's done and it can exit
if data_chunks is None:
break

#Accumulate the indicess and *processed* chunks
for chunk in data_chunks:
data_index,data_item = chunk
rdd+=[(data_index,func(data_item))]

pds_res = PDSMPI(rdd, pds_id_new, self)

Expand Down Expand Up @@ -507,6 +556,7 @@ def __init__(self, master_node_ranks=[0]):
raise Exception("Slaves exitted main loop.")



class PDSMPI(PDS):
"""
This is an MPI wrapper for a Python parallel data set.
Expand Down

0 comments on commit 2758fa5

Please sign in to comment.