Skip to content

Commit

Permalink
Merge pull request #8 from SpiNNakerManchester/improve-client
Browse files Browse the repository at this point in the history
Improve client parallelism
  • Loading branch information
rowleya committed Jul 14, 2017
2 parents eecc429 + 1153be0 commit 7b0c60b
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 195 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ script:
py.test tests/ \
--cov spalloc \
--cov tests \
--durations=10
--durations=10 \
--timeout=120
# Code quality check
- flake8
after_success:
Expand Down
170 changes: 78 additions & 92 deletions spalloc/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ def __init__(self, *args, **kwargs):

# Connection to server (and associated lock)
self._client = ProtocolClient(hostname, port)
self._client_lock = threading.RLock()

# Set-up (but don't start) background keepalive thread
self._keepalive_thread = threading.Thread(
Expand Down Expand Up @@ -335,7 +334,8 @@ def __enter__(self):
self.destroy()
raise

def __exit__(self, type=None, value=None, traceback=None):
def __exit__(self, type=None, # @ReservedAssignment
value=None, traceback=None): # @UnusedVariable
self.destroy()

def _assert_compatible_version(self):
Expand Down Expand Up @@ -373,20 +373,18 @@ def _keepalive_thread(self):
if keepalive is not None:
keepalive /= 2.0
while not self._stop.wait(keepalive):
with self._client_lock:
# Keep trying to send the keep-alive packet, if this fails,
# keep trying to reconnect until it succeeds.
while not self._stop.is_set():
try:
self._client.job_keepalive(
self.id, timeout=self._timeout)
break
except (ProtocolTimeoutError, IOError, OSError):
# Something went wrong, reconnect, after a delay which
# may be interrupted by the thread being stopped
self._client.close()
if not self._stop.wait(self._reconnect_delay):
self._reconnect()
# Keep trying to send the keep-alive packet, if this fails,
# keep trying to reconnect until it succeeds.
while not self._stop.is_set():
try:
self._client.job_keepalive(self.id, timeout=self._timeout)
break
except (ProtocolTimeoutError, IOError, OSError):
# Something went wrong, reconnect, after a delay which
# may be interrupted by the thread being stopped
self._client._close()
if not self._stop.wait(self._reconnect_delay):
self._reconnect()

def destroy(self, reason=None):
"""Destroy the job and disconnect from the server.
Expand Down Expand Up @@ -431,14 +429,12 @@ def _get_state(self):
-------
:py:class:`._JobStateTuple`
"""
with self._client_lock:
state = self._client.get_job_state(self.id, timeout=self._timeout)
return _JobStateTuple(
state=JobState(state["state"]),
power=state["power"],
keepalive=state["keepalive"],
reason=state["reason"],
)
state = self._client.get_job_state(self.id, timeout=self._timeout)
return _JobStateTuple(
state=JobState(state["state"]),
power=state["power"],
keepalive=state["keepalive"],
reason=state["reason"])

def set_power(self, power):
"""Turn the boards allocated to the job on or off.
Expand All @@ -454,13 +450,10 @@ def set_power(self, power):
True to power on the boards, False to power off. If the boards are
already turned on, setting power to True will reset them.
"""
with self._client_lock:
if power:
self._client.power_on_job_boards(
self.id, timeout=self._timeout)
else:
self._client.power_off_job_boards(
self.id, timeout=self._timeout)
if power:
self._client.power_on_job_boards(self.id, timeout=self._timeout)
else:
self._client.power_off_job_boards(self.id, timeout=self._timeout)

def reset(self):
"""Reset (power-cycle) the boards allocated to the job.
Expand All @@ -480,21 +473,19 @@ def _get_machine_info(self):
-------
:py:class:`._JobMachineInfoTuple`
"""
with self._client_lock:
info = self._client.get_job_machine_info(
self.id, timeout=self._timeout)

return _JobMachineInfoTuple(
width=info["width"],
height=info["height"],
connections=({(x, y): hostname
for (x, y), hostname
in info["connections"]}
if info["connections"] is not None
else None),
machine_name=info["machine_name"],
boards=info["boards"],
)
info = self._client.get_job_machine_info(
self.id, timeout=self._timeout)

return _JobMachineInfoTuple(
width=info["width"],
height=info["height"],
connections=({(x, y): hostname
for (x, y), hostname
in info["connections"]}
if info["connections"] is not None
else None),
machine_name=info["machine_name"],
boards=info["boards"])

@property
def state(self):
Expand Down Expand Up @@ -601,8 +592,7 @@ def wait_for_state_change(self, old_state, timeout=None):
while finish_time is None or finish_time > time.time():
try:
# Watch for changes in this Job's state
with self._client_lock:
self._client.notify_job(self.id)
self._client.notify_job(self.id)

# Wait for job state to change
while finish_time is None or finish_time > time.time():
Expand All @@ -612,55 +602,51 @@ def wait_for_state_change(self, old_state, timeout=None):
return new_state

# Wait for a state change and keep the job alive
with self._client_lock:
# Since we're about to block holding the client lock,
# we must be responsible for keeping everything alive.
while finish_time is None or finish_time > time.time():
self._client.job_keepalive(
self.id, timeout=self._timeout)

# Wait for the job to change
try:
# Block waiting for the job to change no-longer
# than the user-specified timeout or half the
# keepalive interval.
if (finish_time is not None and
self._keepalive is not None):
time_left = finish_time - time.time()
wait_timeout = min(self._keepalive / 2.0,
time_left)
elif finish_time is None:
if self._keepalive is None:
wait_timeout = None
else:
wait_timeout = self._keepalive / 2.0
while finish_time is None or finish_time > time.time():
self._client.job_keepalive(
self.id, timeout=self._timeout)

# Wait for the job to change
try:
# Block waiting for the job to change no-longer
# than the user-specified timeout or half the
# keepalive interval.
if (finish_time is not None and
self._keepalive is not None):
time_left = finish_time - time.time()
wait_timeout = min(self._keepalive / 2.0,
time_left)
elif finish_time is None:
if self._keepalive is None:
wait_timeout = None
else:
wait_timeout = finish_time - time.time()
if wait_timeout is None or wait_timeout >= 0.0:
self._client.wait_for_notification(
wait_timeout)
break
except ProtocolTimeoutError:
# Its been a while, send a keep-alive since
# we're still holding the lock
pass
else:
# The user's timeout expired while waiting for a
# state change, return the old state and give up.
return old_state
wait_timeout = self._keepalive / 2.0
else:
wait_timeout = finish_time - time.time()
if wait_timeout is None or wait_timeout >= 0.0:
self._client.wait_for_notification(
wait_timeout)
break
except ProtocolTimeoutError:
# Its been a while, send a keep-alive since
# we're still holding the lock
pass
else:
# The user's timeout expired while waiting for a
# state change, return the old state and give up.
return old_state
except (IOError, OSError, ProtocolTimeoutError):
# Something went wrong while communicating with the server,
# reconnect after the reconnection delay (or timeout, whichever
# came first.
with self._client_lock:
self._client.close()
if finish_time is not None:
delay = min(finish_time - time.time(),
self._reconnect_delay)
else:
delay = self._reconnect_delay
time.sleep(max(0.0, delay))
self._reconnect()
self._client._close()
if finish_time is not None:
delay = min(finish_time - time.time(),
self._reconnect_delay)
else:
delay = self._reconnect_delay
time.sleep(max(0.0, delay))
self._reconnect()

# If we get here, the timeout expired without a state change, just
# return the old state
Expand Down

0 comments on commit 7b0c60b

Please sign in to comment.