Skip to content

Commit

Permalink
Updated the spalloc configuration so it might work from within our tools
Browse files Browse the repository at this point in the history
To use, give the spalloc service address as something like
https://root:hunter2@spalloc.example.com:8080/spalloc-base/
The spalloc_user and spalloc_port configuration settings will be
ignored.
  • Loading branch information
dkfellows committed Sep 10, 2021
1 parent 4c09387 commit b84ec63
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,77 @@
AbstractMachineAllocationController)
from spinn_front_end_common.abstract_models.impl import (
MachineAllocationController)
from spinn_front_end_common.utilities.spalloc import (
SpallocClient, SpallocJob, SpallocState)


class _SpallocJobController(MachineAllocationController):
class _NewSpallocJobController(MachineAllocationController):
__slots__ = [
# the spalloc job object
"_job",
# the current job's old state
"_state",
"__client",
"__closer"
]

def __init__(self, client, job, closer):
"""
:param SpallocClient client:
:param SpallocJob job:
:param closer:
"""
if job is None:
raise Exception("must have a real job")
self.__client = client
self.__closer = closer
self._job: SpallocJob = job
self._state = job.get_state()
super().__init__("SpallocJobController")

@overrides(AbstractMachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time):
# Does Nothing in this allocator - machines are held until exit
pass

@overrides(AbstractMachineAllocationController.close)
def close(self):
super().close()
self.__closer.close()
self._job.destroy()
self.__client.close()

@overrides(AbstractMachineAllocationController.where_is_machine)
def where_is_machine(self, chip_x, chip_y):
"""
:param int chip_x:
:param int chip_y:
:rtype: tuple(int,int,int)
"""
return self._job.where_is_machine(chip_y=chip_y, chip_x=chip_x)

@overrides(MachineAllocationController._wait)
def _wait(self):
try:
if self._state != SpallocState.DESTROYED:
self._state = self._job.wait_for_state_change(self._state)
except TypeError:
pass
except Exception as e: # pylint: disable=broad-except
if not self._exited:
raise e
return self._state != SpallocState.DESTROYED

@overrides(MachineAllocationController._teardown)
def _teardown(self):
if not self._exited:
self.__closer.close()
self._job.close()
self.__client.close()
super()._teardown()


class _OldSpallocJobController(MachineAllocationController):
__slots__ = [
# the spalloc job object
"_job",
Expand Down Expand Up @@ -129,6 +197,49 @@ def __call__(
n_boards += 1
n_boards = int(math.ceil(n_boards))

if (spalloc_server.lower().startswith("http:") or
spalloc_server.lower().startswith("https:")):
return self.allocate_job_new(spalloc_server, n_boards)
else:
return self.allocate_job_old(spalloc_server, n_boards)

def allocate_job_new(self, spalloc_server, n_boards):
"""
Request a machine from an old-style spalloc server that will fit the
given number of boards.
:param str spalloc_server:
The server from which the machine should be requested
:param int n_boards: The number of boards required
:rtype: tuple(str, int, None, bool, bool, None, None,
MachineAllocationController)
"""

spalloc_machine = get_config_str("Machine", "spalloc_machine")
client = SpallocClient(spalloc_server)
job = client.create_job(n_boards, spalloc_machine)
closer_for_keepalive_task = client.launch_keepalive_task(job)
job.wait_until_ready()
root = job.get_root_host()
machine_allocation_controller = _NewSpallocJobController(
client, job, closer_for_keepalive_task)
return (
root, self._MACHINE_VERSION, None, False,
False, None, None, machine_allocation_controller
)

def allocate_job_old(self, spalloc_server, n_boards):
"""
Request a machine from an old-style spalloc server that will fit the
given number of boards.
:param str spalloc_server:
The server from which the machine should be requested
:param int n_boards: The number of boards required
:rtype: tuple(str, int, None, bool, bool, None, None,
MachineAllocationController)
"""

spalloc_kw_args = {
'hostname': spalloc_server,
'owner': get_config_str("Machine", "spalloc_user")
Expand All @@ -141,7 +252,7 @@ def __call__(
spalloc_kw_args['machine'] = spalloc_machine

job, hostname = self._launch_job(n_boards, spalloc_kw_args)
machine_allocation_controller = _SpallocJobController(job)
machine_allocation_controller = _OldSpallocJobController(job)

return (
hostname, self._MACHINE_VERSION, None, False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from spalloc import ProtocolClient
from spinn_machine.virtual_machine import virtual_machine
from spinn_machine.machine import Machine
from spinn_front_end_common.utilities.spalloc import SpallocClient


class SpallocMaxMachineGenerator(object):
Expand All @@ -38,6 +39,72 @@ def __call__(
:return: A virtual machine
:rtype: ~spinn_machine.Machine
"""
if (spalloc_server.lower().startswith("http:") or
spalloc_server.lower().startswith("https:")):
width, height = self.discover_max_machine_area_new(
spalloc_server, spalloc_machine)
else:
width, height = self.discover_max_machine_area_old(
spalloc_server, spalloc_port, spalloc_machine)

if width is None:
raise Exception(
"The spalloc server appears to have no compatible machines")

n_cpus_per_chip = (Machine.max_cores_per_chip() -
max_machine_core_reduction)

# Return the width and height, and make no assumption about wrap-
# arounds or version.
return virtual_machine(
width=width, height=height,
n_cpus_per_chip=n_cpus_per_chip, validate=False)

def discover_max_machine_area_new(self, spalloc_server, spalloc_machine):
"""
Generate a maximum virtual machine a given allocation server can
generate, communicating with the spalloc server using the new protocol.
:param str spalloc_server: Spalloc server URL
:param spalloc_machine: Desired machine name, or ``None`` for default.
:type spalloc_machine: str or None
:return: the dimensions of the maximum machine
:rtype: tuple(int or None,int or None)
"""
max_width = None
max_height = None
max_area = -1

with SpallocClient(spalloc_server) as client:
for machine in client.list_machines().values():
if spalloc_machine is not None:
if spalloc_machine != machine.name:
continue
else:
if "default" not in machine.tags:
continue

# The "biggest" board is the one with the most chips
if machine.area > max_area:
max_area = machine.area
max_width = machine.width
max_height = machine.height

return max_width, max_height

def discover_max_machine_area_old(
self, spalloc_server, spalloc_port, spalloc_machine):
"""
Generate a maximum virtual machine a given allocation server can
generate, communicating with the spalloc server using the old protocol.
:param str spalloc_server: Spalloc server hostname
:param int spalloc_port: Spalloc server port
:param spalloc_machine: Desired machine name, or ``None`` for default.
:type spalloc_machine: str or None
:return: the dimensions of the maximum machine
:rtype: tuple(int or None,int or None)
"""
with ProtocolClient(spalloc_server, spalloc_port) as client:
machines = client.list_machines()
# Close the context immediately; don't want to keep this particular
Expand All @@ -47,7 +114,7 @@ def __call__(
max_height = None
max_area = -1

for machine in self._filter(machines, spalloc_machine):
for machine in self._filter_old(machines, spalloc_machine):
# Get the width and height in chips, and logical area in chips**2
width, height, area = self._get_size(machine)

Expand All @@ -57,21 +124,10 @@ def __call__(
max_width = width
max_height = height

if max_width is None:
raise Exception(
"The spalloc server appears to have no compatible machines")

n_cpus_per_chip = (Machine.max_cores_per_chip() -
max_machine_core_reduction)

# Return the width and height, and make no assumption about wrap-
# arounds or version.
return virtual_machine(
width=max_width, height=max_height,
n_cpus_per_chip=n_cpus_per_chip, validate=False)
return max_width, max_height

@staticmethod
def _filter(machines, target_name):
def _filter_old(machines, target_name):
"""
:param list(dict(str,str)) machines:
:param str target_name:
Expand Down
83 changes: 80 additions & 3 deletions spinn_front_end_common/utilities/spalloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@ def _credentials(self):
headers = {self.__csrf_header: self.__csrf}
return cookies, headers

def _purge(self):
"""
Clears out all credentials from this session, rendering the session
completely inoperable henceforth.
"""
self.__username = None
self.__password = None
self._session_id = None
self.__csrf = None


class SpallocClient:
"""
Expand All @@ -231,13 +241,18 @@ class SpallocClient:
__slots__ = ("__session",
"__machines_url", "__jobs_url", "version")

def __init__(self, service_url, username, password):
def __init__(self, service_url, username=None, password=None):
"""
:param str service_url: The reference to the service.
*Should not* include a username or password in it.
May have username and password supplied as part of the network
location; if so, the ``username`` and ``password`` arguments
*must* be ``None``.
:param str username: The user name to use
:param str password: The password to use
"""
if username is None and password is None:
service_url, username, password = self.__parse_service_url(
service_url)
self.__session = _Session(service_url, username, password)
obj = self.__session.renew()
v = obj["version"]
Expand All @@ -247,6 +262,24 @@ def __init__(self, service_url, username, password):
self.__jobs_url = obj["jobs-ref"]
logger.info("established session to {} for {}", service_url, username)

@staticmethod
def __parse_service_url(url):
"""
Parses a combined service reference.
:param str url:
:rtype: tuple(str,str,str)
"""
pieces = urlparse(url)
user = pieces.username
password = pieces.password
netloc = pieces.hostname
if pieces.port is not None:
netloc += f":{pieces.port}"
url = urlunparse((
pieces.scheme, netloc, pieces.path, None, None, None))
return url, user, password

def list_machines(self):
"""
Get the machines supported by the server.
Expand Down Expand Up @@ -406,6 +439,21 @@ def close(self):
p.start()
return closer

def close(self):
if self.__session is not None:
self.__session._purge()
self.__session = None

def __enter__(self):
"""
:rtype: SpallocClient
"""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False


def _SpallocKeepalive(url, cookies, headers, interval, term_queue):
headers["Content-Type"] = "text/plain; charset=UTF-8"
Expand Down Expand Up @@ -471,7 +519,7 @@ class SpallocJob:
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__session", "__url", "__machine_url",
__slots__ = ("__session", "__url", "__machine_url", "__chip_url",
"_keepalive_url", "__keepalive_handle")

def __init__(self, session, job_handle):
Expand All @@ -483,6 +531,7 @@ def __init__(self, session, job_handle):
self.__session = session
self.__url = _clean_url(job_handle)
self.__machine_url = self.__url + "machine"
self.__chip_url = self.__url + "chip"
self._keepalive_url = self.__url + "keepalive"
self.__keepalive_handle = None

Expand Down Expand Up @@ -528,6 +577,18 @@ def wait_for_state_change(self, old_state):
if s != old_state or s == SpallocState.DESTROYED:
return s

def wait_until_ready(self):
"""
Wait until the allocation is in the ``READY`` state.
:raises Exception: If the allocation is destroyed
"""
state = SpallocState.UNKNOWN
while state != SpallocState.READY:
state = self.wait_for_state_change(state)
if state == SpallocState.DESTROYED:
raise Exception("job was unexpectedly destroyed")

def destroy(self, reason="finished"):
"""
Destroy the job.
Expand All @@ -546,6 +607,22 @@ def keepalive(self):
"""
self.__session.put(self._keepalive_url, "alive")

def where_is_machine(self, x, y):
"""
Get the *physical* coordinates of the board hosting the given chip.
:param int x: Chip X coordinate
:param int y: Chip Y coordinate
:return: physical board coordinates (cabinet, frame, board), or
``None`` if there are no boards currently allocated to the job or
the chip lies outside the allocation.
:rtype: tuple(int,int,int) or None
"""
r = self.__session.get(self.__chip_url, x=int(x), y=int(y))
if r.status_code == 204:
return None
return tuple(r.json()["physical-board-coordinates"])

@property
def _keepalive_handle(self):
return self.__keepalive_handle
Expand Down

0 comments on commit b84ec63

Please sign in to comment.