Skip to content

Commit

Permalink
Merge pull request #401 from SpiNNakerManchester/keep_alive
Browse files Browse the repository at this point in the history
simplify the spalloc keep alive
  • Loading branch information
rowleya committed May 15, 2024
2 parents cdfc5e4 + 6a20a4f commit dab1e4e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 98 deletions.
58 changes: 58 additions & 0 deletions manual_scripts/get_triad.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2024 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from spinn_utilities.config_holder import set_config
from spinnman.spalloc import SpallocClient
from spinnman.config_setup import unittest_setup


SPALLOC_URL = "https://spinnaker.cs.man.ac.uk/spalloc"
SPALLOC_USERNAME = ""
SPALLOC_PASSWORD = ""

SPALLOC_MACHINE = "SpiNNaker1M"

x = 0
y = 3
b = 0 # Must be 0 if requesting a rect
RECT = True
WIDTH = 1 # In triads!
HEIGHT = 1 # In triads!

unittest_setup()
set_config("Machine", "version",5)
client = SpallocClient(SPALLOC_URL, SPALLOC_USERNAME, SPALLOC_PASSWORD)
if RECT:
job = client.create_job_rect_at_board(
WIDTH, HEIGHT, triad=(x, y, b), machine_name=SPALLOC_MACHINE,
max_dead_boards=1)
else:
job = client.create_job_board(
triad=(x, y, b), machine_name=SPALLOC_MACHINE)
print(job)
print("Waiting until ready...")
with job:
job.wait_until_ready()
print(job.get_connections())

txrx = job.create_transceiver()
# This call is for testing and can be changed without notice!
dims = txrx._get_machine_dimensions()
print(f"{dims.height=}, {dims.width=}")

machine = txrx.get_machine_details()
print(machine)

input("Press Enter to release...")
client.close()#print(2)#print(2^(1/(2^1)))
113 changes: 39 additions & 74 deletions spinnman/spalloc/spalloc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
"""
Implementation of the client for the Spalloc web service.
"""

from contextlib import contextmanager
import time
from logging import getLogger
from multiprocessing import Process, Queue

import queue
import struct
import threading
from time import sleep
from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable,
Iterator, List, Mapping, Optional, Tuple, cast)
from typing import (Any, Callable, Dict, FrozenSet, Iterable, List, Mapping,
Optional, Tuple, cast)
from urllib.parse import urlparse, urlunparse, ParseResult

from packaging.version import Version
Expand Down Expand Up @@ -69,6 +68,8 @@
_msg = struct.Struct("<II")
_msg_to = struct.Struct("<IIIII")

KEEP_ALIVE_PERIOND = 120


def fix_url(url: Any) -> str:
"""
Expand Down Expand Up @@ -199,7 +200,7 @@ def _create(self, create: Mapping[str, JsonValue],
def create_job(
self, num_boards: int = 1,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob:
return self._create({
"num-boards": int(num_boards),
"keepalive-interval": f"PT{int(keepalive)}S"
Expand All @@ -209,7 +210,7 @@ def create_job(
def create_job_rect(
self, width: int, height: int,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob:
return self._create({
"dimensions": {
"width": int(width),
Expand All @@ -224,7 +225,7 @@ def create_job_board(
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive: int = KEEP_ALIVE_PERIOND) -> SpallocJob:
board: JsonObject
if triad:
x, y, z = triad
Expand All @@ -248,7 +249,8 @@ def create_job_rect_at_board(
triad: Optional[Tuple[int, int, int]] = None,
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None, keepalive: int = 45,
machine_name: Optional[str] = None,
keepalive: int = KEEP_ALIVE_PERIOND,
max_dead_boards: int = 0) -> SpallocJob:
board: JsonObject
if triad:
Expand Down Expand Up @@ -285,27 +287,6 @@ class _ProxyServiceError(IOError):
"""


def _spalloc_keepalive(url, interval, term_queue, cookies, headers):
"""
Actual keepalive task implementation. Don't use directly.
"""
headers["Content-Type"] = "text/plain; charset=UTF-8"
while True:
requests.put(url, data="alive", cookies=cookies, headers=headers,
allow_redirects=False, timeout=10)
try:
term_queue.get(True, interval)
break
except queue.Empty:
continue
# On ValueError or OSError, just terminate the keepalive process
# They happen when the term_queue is directly closed
except ValueError:
break
except OSError:
break


class _SpallocMachine(SessionAware, SpallocMachine):
"""
Represents a Spalloc-controlled machine.
Expand Down Expand Up @@ -507,7 +488,7 @@ class _SpallocJob(SessionAware, SpallocJob):
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__machine_url", "__chip_url",
"_keepalive_url", "__keepalive_handle", "__proxy_handle",
"_keepalive_url", "__proxy_handle",
"__proxy_thread", "__proxy_ping")

def __init__(self, session: Session, job_handle: str):
Expand All @@ -519,11 +500,13 @@ def __init__(self, session: Session, job_handle: str):
logger.info("established job at {}", job_handle)
self.__machine_url = self._url + "machine"
self.__chip_url = self._url + "chip"
self._keepalive_url = self._url + "keepalive"
self.__keepalive_handle: Optional[Queue] = None
self._keepalive_url: Optional[str] = self._url + "keepalive"
self.__proxy_handle: Optional[WebSocket] = None
self.__proxy_thread: Optional[_ProxyReceiver] = None
self.__proxy_ping: Optional[_ProxyPing] = None
keep_alive = threading.Thread(
target=self.__start_keepalive, daemon=True)
keep_alive.start()

@overrides(SpallocJob.get_session_credentials_for_db)
def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]:
Expand Down Expand Up @@ -651,9 +634,7 @@ def wait_until_ready(self, timeout: Optional[int] = None,

@overrides(SpallocJob.destroy)
def destroy(self, reason: str = "finished"):
if self.__keepalive_handle:
self.__keepalive_handle.close()
self.__keepalive_handle = None
self._keepalive_url = None
if self.__proxy_handle is not None:
if self.__proxy_thread:
self.__proxy_thread.close()
Expand All @@ -663,38 +644,32 @@ def destroy(self, reason: str = "finished"):
self._delete(self._url, reason=str(reason))
logger.info("deleted job at {}", self._url)

@overrides(SpallocJob.keepalive)
def keepalive(self) -> None:
self._put(self._keepalive_url, "alive")
def __keepalive(self) -> bool:
"""
Signal spalloc that we want the job to stay alive for a while longer.
@overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
def launch_keepalive_task(
self, period: float = 30) -> ContextManager[Process]:
:return: False if the job has not been destroyed
:rtype: bool
"""
.. note::
Tricky! *Cannot* be done with a thread, as the main thread is known
to do significant amounts of CPU-intensive work.
if self._keepalive_url is None:
return False
cookies, headers = self._session_credentials
headers["Content-Type"] = "text/plain; charset=UTF-8"
logger.debug(self._keepalive_url)
requests.put(self._keepalive_url, data="alive", cookies=cookies,
headers=headers, allow_redirects=False, timeout=10)
return True

def __start_keepalive(self) -> None:
"""
Method for keep alive thread to start the keep alive class
"""
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
q: Queue = Queue(1)
p = Process(target=_spalloc_keepalive, args=(
self._keepalive_url, 0 + period, q,
*self._session_credentials), daemon=True)
p.start()
self.__keepalive_handle = q
return self.__closer(q, p)

@contextmanager
def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
try:
yield p
finally:
q.put("quit")
# Give it a second, and if it still isn't dead, kill it
p.join(1)
if p.is_alive():
p.kill()
while self.__keepalive():
time.sleep(KEEP_ALIVE_PERIOND / 2)
except Exception as ex: # pylint: disable=broad-except
logger.exception(ex)

@overrides(SpallocJob.where_is_machine)
def where_is_machine(self, x: int, y: int) -> Optional[
Expand All @@ -705,16 +680,6 @@ def where_is_machine(self, x: int, y: int) -> Optional[
return cast(Tuple[int, int, int], tuple(
r.json()["physical-board-coordinates"]))

@property
def _keepalive_handle(self) -> Optional[Queue]:
return self.__keepalive_handle

@_keepalive_handle.setter
def _keepalive_handle(self, handle: Queue):
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
self.__keepalive_handle = handle

@overrides(SpallocJob.create_transceiver)
def create_transceiver(self) -> Transceiver:
if self.get_state() != SpallocState.READY:
Expand Down
24 changes: 0 additions & 24 deletions spinnman/spalloc/spalloc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from contextlib import AbstractContextManager
from typing import Dict, Mapping, Optional, Tuple
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
from spinnman.constants import SCP_SCAMP_PORT
Expand Down Expand Up @@ -182,29 +181,6 @@ def destroy(self, reason: str = "finished"):
"""
raise NotImplementedError()

@abstractmethod
def keepalive(self) -> None:
"""
Signal the job that we want it to stay alive for a while longer.
"""
raise NotImplementedError()

@abstractmethod
def launch_keepalive_task(
self, period: int = 30) -> AbstractContextManager:
"""
Starts a periodic task to keep a job alive.
:param SpallocJob job:
The job to keep alive
:param int period:
How often to send a keepalive message (in seconds)
:return:
Some kind of closeable task handle; closing it terminates the task.
Destroying the job will also terminate the task.
"""
raise NotImplementedError()

@abstractmethod
def where_is_machine(self, x: int, y: int) -> Optional[
Tuple[int, int, int]]:
Expand Down

0 comments on commit dab1e4e

Please sign in to comment.