Skip to content

Commit

Permalink
trimming out some comms code that isn't used anywhere, but is still t…
Browse files Browse the repository at this point in the history
…ested (#1123)
  • Loading branch information
jlnav committed Oct 11, 2023
1 parent 5ca64ba commit 79e4171
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 596 deletions.
328 changes: 0 additions & 328 deletions libensemble/comms/comms.py
Expand Up @@ -28,12 +28,9 @@

# from multiprocessing import Process, Queue, Value, Lock
from multiprocessing import Process, Queue
from threading import Thread
from time import time
from traceback import format_exc

import numpy as np


class Timeout(Exception):
"""Communication timeout exception."""
Expand Down Expand Up @@ -152,66 +149,6 @@ def mail_flag(self):
return not self._inbox.empty()


class QCommThread(Comm):
"""Launch a user function in a thread with an attached QComm."""

def __init__(self, main, nworkers, *args, **kwargs):
self.inbox = thread_queue.Queue()
self.outbox = thread_queue.Queue()
self.main = main
self._result = None
self._exception = None
kwargs["comm"] = QComm(self.inbox, self.outbox, nworkers, True)
self.thread = Thread(target=self._qcomm_main, args=args, kwargs=kwargs)

def send(self, *args):
"""Send a message to the thread (called from creator)"""
self.inbox.put(copy.deepcopy(args))

def recv(self, timeout=None):
"""Return a message from the thread or raise TimeoutError."""
try:
if not self.outbox.empty():
return self.outbox.get()
return self.outbox.get(timeout=timeout)
except thread_queue.Empty:
raise Timeout()

def mail_flag(self):
"""Check whether we know a message is ready for receipt."""
return not self.outbox.empty()

def run(self):
"""Start the thread."""
self.thread.start()

def result(self):
"""Join and return the thread main result (or re-raise an exception)."""
self.thread.join()
if isinstance(self._exception, Exception):
raise self._exception
return self._result

@property
def running(self):
"""Check if the thread is running."""
return self.thread.is_alive()

def _qcomm_main(self, *args, **kwargs):
"""Main routine -- handles return values and exceptions."""
try:
self._result = self.main(*args, **kwargs)
except Exception as e:
self._exception = e

def __enter__(self):
self.run()
return self

def __exit__(self, etype, value, traceback):
self.thread.join()


class QCommProcess(Comm):
"""Launch a user function in a process with an attached QComm."""

Expand All @@ -223,9 +160,6 @@ def __init__(self, main, nworkers, *args, **kwargs):
self._done = False
comm = QComm(self.inbox, self.outbox, nworkers)

# with QComm.lock:
# QComm._ncomms.value += 1

self.process = Process(target=QCommProcess._qcomm_main, args=(comm, main) + args, kwargs=kwargs)

def _is_result_msg(self, msg):
Expand Down Expand Up @@ -284,8 +218,6 @@ def result(self, timeout=None):
raise Timeout()
if self._exception is not None:
raise RemoteException(self._exception.msg, self._exception.exc)
# with QComm.lock:
# QComm._ncomms.value -= 1
return self._result

def terminate(self, timeout=None):
Expand All @@ -295,8 +227,6 @@ def terminate(self, timeout=None):
self.process.join(timeout=timeout)
if self.running:
raise Timeout()
# with QComm.lock:
# QComm._ncomms.value -= 1

@property
def running(self):
Expand All @@ -319,261 +249,3 @@ def __enter__(self):

def __exit__(self, etype, value, traceback):
self.process.join()


class CommHandler(ABC):
"""Comm wrapper with message handler dispatching.
The comm wrapper defines a message processor that dispatches to
different handler methods based on message types. An incoming message
with the tag 'foo' gets dispatched to a handler 'on_foo'; if 'on_foo'
is not defined, we pass to the 'on_unhandled_message' routine.
"""

def __init__(self, comm):
"""Set the comm to be wrapped."""
self.comm = comm

def send(self, *args):
"""Send via the comm."""
self.comm.send(*args)

def process_message(self, timeout=None):
"""Receive and process a message via the comm."""
msg = self.comm.recv(timeout)
msg_type = msg[0]
args = msg[1:]
try:
method = f"on_{msg_type}"
handler = getattr(self, method)
except AttributeError:
return self.on_unhandled_message(msg)
return handler(*args)

def on_unhandled_message(self, msg):
"""Handle any messages for which there are no named handlers."""
raise ValueError(f"No handler available for message {msg[0]}{msg[1:]}")


class GenCommHandler(CommHandler):
"""Wrapper for handling messages at a persistent gen."""

def send_request(self, recs):
"""Request new evaluations."""
self.send("request", recs)

def send_kill(self, sim_id):
"""Kill an evaluation."""
self.send("kill", sim_id)

def send_get_history(self, lo, hi):
"""Request history from manager."""
self.send("get_history", lo, hi)

def send_subscribe(self):
"""Request subscription to updates on sims not launched by this gen."""
self.send("subscribe")

def on_stop(self):
"""Handle stop message."""
raise ManagerStop()

@abstractmethod
def on_worker_avail(self, nworker):
"""Handle updated number of workers available to perform sims."""

@abstractmethod
def on_queued(self, sim_id):
"""Handle sim_id assignment in response to a request"""

@abstractmethod
def on_result(self, sim_id, recs):
"""Handle simulation results"""

@abstractmethod
def on_update(self, sim_id, recs):
"""Handle simulation updates"""

@abstractmethod
def on_killed(self, sim_id):
"""Handle a simulation kill"""


class SimCommHandler(CommHandler):
"""Wrapper for handling messages at sim."""

def send_result(self, sim_id, recs):
"""Send a simulation result"""
self.send("result", sim_id, recs)

def send_update(self, sim_id, recs):
"""Send a simulation update"""
self.send("update", sim_id, recs)

def send_killed(self, sim_id):
"""Send notification that a simulation was killed"""
self.send("killed", sim_id)

def on_stop(self):
"""Handle stop message."""
raise ManagerStop()

@abstractmethod
def on_request(self, sim_id, recs):
"""Handle a request for a simulation"""

@abstractmethod
def on_kill(self, sim_id):
"""Handle a request to kill a simulation"""


class CommEval(GenCommHandler):
"""Future-based interface for generator comms"""

def __init__(self, comm, workers=0, gen_specs=None):
super().__init__(comm)
self.sim_started = 0
self.sim_pending = 0
self.workers = workers
self.gen_specs = gen_specs
self.promises = {}
self.returning_promises = None
self.waiting_for_queued = 0

def request(self, recs):
"""Request simulations, return promises"""
self.sim_started += len(recs)
self.sim_pending += len(recs)
self.send_request(recs)
self.waiting_for_queued = len(recs)
while self.waiting_for_queued > 0:
self.process_message()
returning_promises = self.returning_promises
self.returning_promises = None
return returning_promises

def __call__(self, *args, **kwargs):
"""Request a simulation and return a promise"""
assert not (args and kwargs), "Must specify simulation args by position or keyword, but not both"
assert args or kwargs, "Must specify simulation arguments."
rec = np.zeros(1, dtype=self.gen_specs["out"])
if args:
assert len(args) == len(self.gen_specs["out"]), "Wrong number of positional arguments in sim call."
for k, spec in enumerate(self.gen_specs["out"]):
name = spec[0]
rec[name] = args[k]
else:
for name, value in kwargs.items():
rec[name] = value
return self.request(rec)[0]

def wait_any(self):
"""Wait for any pending simulation to be done"""
sim_pending = self.sim_pending
while sim_pending == self.sim_pending:
self.process_message()

def wait_all(self):
"""Wait for all pending simulations to be done"""
while self.sim_pending > 0:
self.process_message()

# --- Message handlers

def on_worker_avail(self, nworker):
"""Update worker count"""
self.workers = nworker
return -1

def on_queued(self, sim_id):
"""Set up futures with indicated simulation IDs"""
lo = sim_id
hi = sim_id + self.waiting_for_queued
self.waiting_for_queued = 0
self.returning_promises = []
for s in range(lo, hi):
promise = Future(self, s)
self.promises[s] = promise
self.returning_promises.append(promise)
return -1

def on_result(self, sim_id, recs):
"""Handle completed simulation"""
for k, rec in enumerate(recs):
self.sim_pending -= 1
self.promises[sim_id + k].on_result(rec)
return sim_id

def on_update(self, sim_id, recs):
"""Handle updated simulation"""
for k, rec in enumerate(recs):
self.promises[sim_id + k].on_update(rec)
return sim_id

def on_killed(self, sim_id):
"""Handle killed simulation"""
self.sim_pending -= 1
self.promises[sim_id].on_killed()
return sim_id


class Future:
"""Future objects for monitoring asynchronous simulation calls.
The Future objects are not meant to be instantiated on their own;
they are only produced by a call on a CommEval object.
"""

def __init__(self, ceval, sim_id):
self._ceval = ceval
self._id = sim_id
self._comm = ceval.comm
self._result = None
self._killed = False
self._success = False

@property
def current_result(self):
"""Return the current (possibly incomplete) result immediately."""
return self._result

def cancelled(self):
"""Return True if the simulation was killed."""
return self._killed

def done(self):
"""Return True if the simulation completed successfully or was killed."""
return self._success or self._killed

def cancel(self):
"""Cancel the simulation."""
self._ceval.send_kill(self._id)

def result(self, timeout=None):
"""Get the result of the simulation or throw a timeout."""
while not self.done():
if timeout is not None and timeout < 0:
raise Timeout()
tstart = time()
try:
self._ceval.process_message(timeout)
except Timeout:
pass
if timeout is not None:
timeout -= time() - tstart
return self._result

# --- Message handlers

def on_result(self, result):
"""Handle an incoming result."""
self._result = result
self._success = True

def on_update(self, result):
"""Handle an incoming update."""
self._result = result

def on_killed(self):
"""Handle a kill notification."""
self._killed = True

0 comments on commit 79e4171

Please sign in to comment.