Skip to content

Commit

Permalink
Merge b36362b into 465eadc
Browse files Browse the repository at this point in the history
  • Loading branch information
dkfellows authored May 11, 2018
2 parents 465eadc + b36362b commit 7ba9e4e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
six
future
enum34
futures; python_version == "2.7"
SpiNNUtilities >= 1!4.0.1, < 1!5.0.0
SpiNNMachine >= 1!4.0.1, < 1!5.0.0
SpiNNStorageHandlers >= 1!4.0.1, < 1!5.0.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@
'SpiNNMachine >= 1!4.0.1, < 1!5.0.0',
'enum34',
'future',
'futures; python_version == "2.7"',
'six']
)
28 changes: 15 additions & 13 deletions spinnman/connections/connection_listener.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from threading import Thread
from multiprocessing.pool import ThreadPool
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)
_POOL_SIZE = 4
Expand Down Expand Up @@ -31,27 +31,29 @@ def __init__(self, connection, n_processes=_POOL_SIZE, timeout=_TIMEOUT):
self.daemon = True
self._connection = connection
self._timeout = timeout
self._callback_pool = ThreadPool(processes=n_processes)
self._callback_pool = ThreadPoolExecutor(max_workers=n_processes)
self._done = False
self._callbacks = set()

def _run_step(self, handler):
if self._connection.is_ready_to_receive(timeout=self._timeout):
message = handler()
for callback in self._callbacks:
self._callback_pool.apply_async(callback, [message])
self._callback_pool.submit(callback, message)

def run(self):
handler = self._connection.get_receive_method()
while not self._done:
try:
self._run_step(handler)
except Exception:
if not self._done:
logger.warning("problem when dispatching message",
exc_info=True)
self._callback_pool.close()
self._callback_pool.join()
try:
handler = self._connection.get_receive_method()
while not self._done:
try:
self._run_step(handler)
except Exception:
if not self._done:
logger.warning("problem when dispatching message",
exc_info=True)
finally:
self._callback_pool.shutdown()
self._callback_pool = None

def add_callback(self, callback):
""" Add a callback to be called when a message is received
Expand Down

0 comments on commit 7ba9e4e

Please sign in to comment.