Skip to content

Commit

Permalink
Merge pull request #5 from SpiNNakerManchester/protocol-change
Browse files Browse the repository at this point in the history
Protocol change
  • Loading branch information
rowleya committed Jul 21, 2017
2 parents 7b0c60b + 1f69695 commit ba81c52
Show file tree
Hide file tree
Showing 15 changed files with 817 additions and 842 deletions.
3 changes: 2 additions & 1 deletion spalloc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from spalloc.version import __version__ # noqa

# Alias useful objects
from spalloc.protocol_client import ProtocolClient # noqa
from spalloc.protocol_client import ProtocolClient, ProtocolError # noqa
from spalloc.protocol_client import ProtocolTimeoutError # noqa
from spalloc.protocol_client import SpallocServerException # noqa
from spalloc.job import Job, JobDestroyedError, StateChangeTimeoutError # noqa
from spalloc.states import JobState # noqa
22 changes: 22 additions & 0 deletions spalloc/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time


def time_left(timestamp):
"""Convert a timestamp into how long to wait for it."""
if timestamp is None:
return None
return max(0.0, timestamp - time.time())


def timed_out(timestamp):
"""Check if a timestamp has been reached."""
if timestamp is None:
return False
return timestamp < time.time()


def make_timeout(delay_seconds):
"""Convert a delay (in seconds) into a timestamp."""
if delay_seconds is None:
return None
return time.time() + delay_seconds
137 changes: 68 additions & 69 deletions spalloc/job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""A high-level Python interface for allocating SpiNNaker boards."""

from collections import namedtuple
import logging
import threading
import time

from collections import namedtuple

from spalloc.protocol_client import ProtocolClient, ProtocolTimeoutError
from spalloc.config import read_config, SEARCH_PATH
from spalloc.states import JobState

import logging
from .protocol_client import ProtocolClient, ProtocolTimeoutError
from .config import read_config, SEARCH_PATH
from .states import JobState
from ._utils import time_left, timed_out, make_timeout

logger = logging.getLogger(__name__)

Expand All @@ -18,7 +17,6 @@
# https://docs.python.org/3.1/library/logging.html#configuring-logging-for-a-library
logger.addHandler(logging.StreamHandler())


VERSION_RANGE_START = (0, 4, 0)
VERSION_RANGE_STOP = (2, 0, 0)

Expand Down Expand Up @@ -264,13 +262,11 @@ def __init__(self, *args, **kwargs):
job_state = self._get_state()
if (job_state.state == JobState.unknown or
job_state.state == JobState.destroyed):
raise JobDestroyedError(
"Job {} does not exist: {}{}{}".format(
resume_job_id,
job_state.state.name,
": " if job_state.reason is not None else "",
job_state.reason
if job_state.reason is not None else ""))
raise JobDestroyedError("Job {} does not exist: {}{}{}".format(
resume_job_id,
job_state.state.name,
": " if job_state.reason is not None else "",
job_state.reason if job_state.reason is not None else ""))

# Snag the keepalive interval from the job
self._keepalive = job_state.keepalive
Expand All @@ -297,8 +293,8 @@ def __init__(self, *args, **kwargs):
# Sanity check arguments
if job_kwargs["owner"] is None:
raise ValueError("An owner must be specified.")
if ((job_kwargs["tags"] is not None) and
(job_kwargs["machine"] is not None)):
if (job_kwargs["tags"] is not None and
job_kwargs["machine"] is not None):
raise ValueError(
"Only one of tags and machine may be specified.")

Expand Down Expand Up @@ -337,6 +333,7 @@ def __enter__(self):
def __exit__(self, type=None, # @ReservedAssignment
value=None, traceback=None): # @UnusedVariable
self.destroy()
return False

def _assert_compatible_version(self):
"""Assert that the server version is compatible."""
Expand Down Expand Up @@ -480,8 +477,7 @@ def _get_machine_info(self):
width=info["width"],
height=info["height"],
connections=({(x, y): hostname
for (x, y), hostname
in info["connections"]}
for (x, y), hostname in info["connections"]}
if info["connections"] is not None
else None),
machine_name=info["machine_name"],
Expand Down Expand Up @@ -586,72 +582,78 @@ def wait_for_state_change(self, old_state, timeout=None):
:py:class:`~spalloc.JobState`
The new state, or old state if timed out.
"""
finish_time = time.time() + timeout if timeout is not None else None
finish_time = make_timeout(timeout)

# We may get disconnected while waiting so keep listening...
while finish_time is None or finish_time > time.time():
while not timed_out(finish_time):
try:
# Watch for changes in this Job's state
self._client.notify_job(self.id)

# Wait for job state to change
while finish_time is None or finish_time > time.time():
while not timed_out(finish_time):
# Has the job changed state?
new_state = self._get_state().state
if new_state != old_state:
return new_state

# Wait for a state change and keep the job alive
while finish_time is None or finish_time > time.time():
self._client.job_keepalive(
self.id, timeout=self._timeout)

# Wait for the job to change
try:
# Block waiting for the job to change no-longer
# than the user-specified timeout or half the
# keepalive interval.
if (finish_time is not None and
self._keepalive is not None):
time_left = finish_time - time.time()
wait_timeout = min(self._keepalive / 2.0,
time_left)
elif finish_time is None:
if self._keepalive is None:
wait_timeout = None
else:
wait_timeout = self._keepalive / 2.0
else:
wait_timeout = finish_time - time.time()
if wait_timeout is None or wait_timeout >= 0.0:
self._client.wait_for_notification(
wait_timeout)
break
except ProtocolTimeoutError:
# Its been a while, send a keep-alive since
# we're still holding the lock
pass
else:
# The user's timeout expired while waiting for a
# state change, return the old state and give up.
if not self._do_wait_for_a_change(finish_time):
# The user's timeout expired while waiting for a state
# change, return the old state and give up.
return old_state
except (IOError, OSError, ProtocolTimeoutError):
# Something went wrong while communicating with the server,
# reconnect after the reconnection delay (or timeout, whichever
# came first.
self._client._close()
if finish_time is not None:
delay = min(finish_time - time.time(),
self._reconnect_delay)
else:
delay = self._reconnect_delay
time.sleep(max(0.0, delay))
self._reconnect()
self._do_reconnect(finish_time)

# If we get here, the timeout expired without a state change, just
# return the old state
return old_state

def _do_wait_for_a_change(self, finish_time):
"""Wait for a state change and keep the job alive.
"""
# Since we're about to block holding the client lock, we must be
# responsible for keeping everything alive.
while not timed_out(finish_time):
self._client.job_keepalive(self.id, timeout=self._timeout)

# Wait for the job to change
try:
# Block waiting for the job to change no-longer than the
# user-specified timeout or half the keepalive interval.
if finish_time is not None and self._keepalive is not None:
wait_timeout = min(self._keepalive / 2.0,
time_left(finish_time))
elif finish_time is None:
wait_timeout = None if self._keepalive is None \
else self._keepalive / 2.0
else:
wait_timeout = time_left(finish_time)
if wait_timeout is None or wait_timeout >= 0.0:
self._client.wait_for_notification(wait_timeout)
return True
except ProtocolTimeoutError:
# Its been a while, send a keep-alive since we're still
# holding the lock
pass
# The user's timeout expired while waiting for a state change
return False

def _do_reconnect(self, finish_time):
"""Reconnect after the reconnection delay (or timeout, whichever
came first).
"""
self._client.close()
if finish_time is not None:
delay = min(time_left(finish_time), self._reconnect_delay)
else:
delay = self._reconnect_delay
time.sleep(max(0.0, delay))
self._reconnect()

def wait_until_ready(self, timeout=None):
"""Block until the job is allocated and ready.
Expand All @@ -669,8 +671,8 @@ def wait_until_ready(self, timeout=None):
If the job was destroyed before becoming ready.
"""
cur_state = None
finish_time = time.time() + timeout if timeout is not None else None
while finish_time is None or finish_time > time.time():
finish_time = make_timeout(timeout)
while not timed_out(finish_time):
if cur_state is None:
# Get initial state (NB: done here such that the command is
# never sent if the timeout has already occurred)
Expand All @@ -693,11 +695,8 @@ def wait_until_ready(self, timeout=None):
"Spalloc server no longer recognises job.")

# Wait for a state change...
if finish_time is None:
time_left = None
else:
time_left = finish_time - time.time()
cur_state = self.wait_for_state_change(cur_state, time_left)
cur_state = self.wait_for_state_change(
cur_state, time_left(finish_time))

# Timed out!
raise StateChangeTimeoutError()
Expand Down

0 comments on commit ba81c52

Please sign in to comment.