Skip to content

Commit

Permalink
MULTIPLE CHANGES Fixed Inefficient Scheduling (Issue soravux#68)
Browse files Browse the repository at this point in the history
Made the following changes

1.  Added a new function append_init which is used by futures.submit.
    This function adds newly spawned futures to the broker's task
    queue.

2.  Added new state variable request_in_process to the FutureQueue
    object in order to track the state of the future request. This is
    used to track the state of a future request so that a future
    request is sent iff the previous future request has been answered

3.  Also, replaced function recvFutures with recvIncoming which passes
    the received incoming messages back to updateQueue where they are
    processed. Changed _recv to only perform preprocessing of the
    messages.
  • Loading branch information
maharjun committed Jun 16, 2017
1 parent 5c3e055 commit 8de3f0c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 132 deletions.
23 changes: 23 additions & 0 deletions scoop/_comm/scoopmessages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Worker requests
INIT = b"I"
REQUEST = b"RQ"
TASK = b"T"
REPLY = b"RP"
SHUTDOWN = b"S"
VARIABLE = b"V"
BROKER_INFO = b"B"
STATUS_READY = b"SD"
RESEND_FUTURE = b"RF"
HEARTBEAT = b"HB"
REQUEST_STATUS_REQUEST = b"RSR"
REQUEST_STATUS_ANS = b"RSA"
REQUEST_INPROCESS = b"RI"
REQUEST_UNKNOWN = b"RU"

# Task statuses
STATUS_HERE = b"H"
STATUS_GIVEN = b"G"
STATUS_NONE = b"N"

# Broker interconnection
CONNECT = b"C"
108 changes: 43 additions & 65 deletions scoop/_comm/scoopzmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,7 @@
from .. import shared, encapsulation, utils
from ..shared import SharedElementEncapsulation
from .scoopexceptions import Shutdown, ReferenceBroken

# Worker requests
INIT = b"I"
REQUEST = b"RQ"
TASK = b"T"
REPLY = b"RP"
SHUTDOWN = b"S"
VARIABLE = b"V"
BROKER_INFO = b"B"
STATUS_READY = b"SD"
HEARTBEAT = b"HB"
RESEND_FUTURE = b"RF"

# Task statuses
STATUS_HERE = b"H"
STATUS_GIVEN = b"G"
STATUS_NONE = b"N"

from .scoopmessages import *

LINGER_TIME = 1000

Expand Down Expand Up @@ -249,58 +232,49 @@ def _recv(self):
else:
msg = self.socket.recv_multipart()

try:
thisFuture = pickle.loads(msg[1])
except (AttributeError, ImportError) as e:
scoop.logger.error(
"An instance could not find its base reference on a worker. "
"Ensure that your objects have their definition available in "
"the root scope of your program.\n{error}".format(
error=e,
if msg[0] == TASK or msg[0] == REPLY:
try:
thisFuture = pickle.loads(msg[1])
except (AttributeError, ImportError) as e:
scoop.logger.error(
"An instance could not find its base reference on a worker. "
"Ensure that your objects have their definition available in "
"the root scope of your program.\n{error}".format(
error=e,
)
)
)
raise ReferenceBroken(e)

if msg[0] == TASK:
# Try to connect directly to this worker to send the result
# afterwards if Future is from a map.
if thisFuture.sendResultBack:
self.addPeer(thisFuture.id[0])
raise ReferenceBroken(e)

if msg[0] == TASK:
# Try to connect directly to this worker to send the result
# afterwards if Future is from a map.
if thisFuture.sendResultBack:
self.addPeer(thisFuture.id[0])

isCallable = callable(thisFuture.callable)
isDone = thisFuture._ended()
if not isCallable and not isDone:
# TODO: Also check in root module globals for fully qualified name
try:
module_found = hasattr(sys.modules["__main__"],
thisFuture.callable)
except TypeError:
module_found = False
if module_found:
thisFuture.callable = getattr(sys.modules["__main__"],
thisFuture.callable)
else:
raise ReferenceBroken("This element could not be pickled: "
"{0}.".format(thisFuture))
return (msg[0], thisFuture)

elif msg[0] == RESEND_FUTURE:
# TODO: This should not be here but in FuturesQueue.
future_id = pickle.loads(msg[1])
try:
scoop.logger.warning(
"Lost track of future {0}. Resending it..."
"".format(scoop._control.futureDict[future_id])
)
self.sendFuture(scoop._control.futureDict[future_id])
except KeyError:
# Future was received and processed meanwhile
scoop.logger.warning(
"Asked to resend unexpected future id {0}. future not found"
" (possibly received and processed in the meanwhile)"
"".format(future_id)
)
return
return (RESEND_FUTURE, future_id)

isCallable = callable(thisFuture.callable)
isDone = thisFuture._ended()
if not isCallable and not isDone:
# TODO: Also check in root module globals for fully qualified name
try:
module_found = hasattr(sys.modules["__main__"],
thisFuture.callable)
except TypeError:
module_found = False
if module_found:
thisFuture.callable = getattr(sys.modules["__main__"],
thisFuture.callable)
else:
raise ReferenceBroken("This element could not be pickled: "
"{0}.".format(thisFuture))
return thisFuture
else:
assert False, "Unrecognized incoming message {}".format(msg[0])

def pumpInfoSocket(self):
try:
Expand Down Expand Up @@ -360,7 +334,11 @@ def convertVariable(self, key, varName, varValue):
varName: result,
})

def recvFuture(self):
def recvIncoming(self):
"""
This function continually reads the input from the socket, processes it
according to _recv and returns the result
"""
while self._poll(0):
received = self._recv()
if received:
Expand Down
100 changes: 72 additions & 28 deletions scoop/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import greenlet
import scoop
from scoop._comm import Communicator, Shutdown
from scoop._comm.scoopmessages import *

# Backporting collection features
if sys.version_info < (2, 7):
Expand Down Expand Up @@ -269,6 +270,7 @@ def __init__(self):
self.ready = deque()
self.inprogress = set()
self.socket = Communicator()
self.request_in_process = False
if scoop.SIZE == 1 and not scoop.CONFIGURATION.get('headless', False):
self.lowwatermark = float("inf")
self.highwatermark = float("inf")
Expand Down Expand Up @@ -310,24 +312,33 @@ def append_ready(self, future):
" before adding, on worker: {}").format(future.id,
scoop.worker))

def append_init(self, future):
"""
This appends a movable future to the queue FOR THE FIRST TIME.
NOTE: This is different from append_movable in that all the futures
are not actually appended but are sent to the broker.
"""
if future.greenlet is None and not future.isDone and future.id[0] == scoop.worker:
self.socket.sendFuture(future)
else:
raise ValueError((
"The future id {} being added to queue initially is not "
" movable before adding, on worker: {}").format(future.id,
scoop.worker))

def append_movable(self, future):
"""
This appends a movable future to the queue.
NOTE: A Future is movable if it hasn't yet begun execution. This is
characterized by the lack of a greenlet and not having completed
characterized by the lack of a greenlet and not having completed. Also note
that this function is only called when appending a future retrieved from the
broker. to append a newly spawned future, use `FutureQueue.append_init`
"""
if future.greenlet is None and not future.isDone:
self.movable.append(future)

# Send the oldest future in the movable deque until under the hwm
over_hwm = self.timelen(self.movable) > self.highwatermark
while over_hwm and len(self.movable) > 1:
sending_future = self.movable.popleft()
if sending_future.id[0] != scoop.worker:
scoop._control.delFuture(sending_future)
self.socket.sendFuture(sending_future)
over_hwm = self.timelen(self.movable) > self.highwatermark
assert len(self.movable) == 1, "movable size isnt adding up"
else:
raise ValueError((
"The future id {} being added to movable queue is not "
Expand All @@ -345,13 +356,18 @@ def pop(self):

# Check if queue is empty
while len(self) == 0:
# If so, Block until message arrives. Also, keep sending requests. This is
# because if a node disconnects and reconnects and is lost in between, there
# is a possibility that it has been removed from the brokers list of
# assignable workers, in which case, it needs to add itself by sending a
# future request. The future requests do not add up and only one future
# is returned
self.requestFuture()
# If so, Block until message arrives. Only send future request once (to
# ensure FCFS). This has the following potential issue. If a node
# disconnects and reconnects and is considered by the broker to be lost,
# there is a possibility that it has been removed from the brokers list
# of assignable workers, in which case, this worker will forever be
# stuck in this loop. However, this is a problem that is expected to
# NEVER happen and therefore we leave it be. Currently, I have added
# some code that can be used to protect against this (see
# FutureQueue.checkRequestStatus, REQUEST_STATUS_REQUEST and related)
if not self.request_in_process:
self.requestFuture()

self.socket._poll(POLLING_TIME)
self.updateQueue()
if len(self.ready) != 0:
Expand All @@ -373,37 +389,65 @@ def flush(self):
def requestFuture(self):
"""Request futures from the broker"""
self.socket.sendRequest()
self.request_in_process = True

def updateQueue(self):
"""Process inbound communication buffer.
Updates the local queue with elements from the broker.
Note that the broker only sends either non-executed (movable)
futures, or completed futures"""
for future in self.socket.recvFuture():
if future._ended():
for incoming_msg in self.socket.recvIncoming():
incoming_msg_categ = incoming_msg[0]
incoming_msg_value = incoming_msg[1]

if incoming_msg_categ == REPLY:
future = incoming_msg_value
# If the answer is coming back, update its entry
try:
thisFuture = scoop._control.futureDict[future.id]
except KeyError:
# Already received?
scoop.logger.warn('{0}: Received an unexpected future: '
'{1}'.format(scoop.worker, future.id))
continue
return
thisFuture.resultValue = future.resultValue
thisFuture.exceptionValue = future.exceptionValue
thisFuture.executor = future.executor
thisFuture.isDone = future.isDone
self.finalizeReturnedFuture(thisFuture)
elif future.id not in scoop._control.futureDict:
# This is the case where the worker is executing a remotely
# generated future
scoop._control.futureDict[future.id] = future
self.append_movable(scoop._control.futureDict[future.id])
elif incoming_msg_categ == TASK:
future = incoming_msg_value
if future.id not in scoop._control.futureDict:
# This is the case where the worker is executing a remotely
# generated future
scoop._control.futureDict[future.id] = future
self.append_movable(scoop._control.futureDict[future.id])
else:
# This is the case where the worker is executing a locally
# generated future
self.append_movable(scoop._control.futureDict[future.id])
if len(self.movable) > 0:
# This means that a future has been returned corresponding to the
# future request
self.request_in_process = False
elif incoming_msg_categ == RESEND_FUTURE:
future_id = incoming_msg_value
try:
scoop.logger.warning(
"Lost track of future {0}. Resending it..."
"".format(scoop._control.futureDict[future_id])
)
self.socket.sendFuture(scoop._control.futureDict[future_id])
except KeyError:
# Future was received and processed meanwhile
scoop.logger.warning(
"Asked to resend unexpected future id {0}. future not found"
" (likely received and processed in the meanwhile)"
"".format(future_id)
)
else:
# This is the case where the worker is executing a locally
# generated future
self.append_movable(scoop._control.futureDict[future.id])
assert False, "Unrecognized incoming message"

def finalizeReturnedFuture(self, future):
"""Finalize a future that was generated here and executed remotely.
Expand Down
Loading

0 comments on commit 8de3f0c

Please sign in to comment.