Skip to content

Commit

Permalink
Merge d06f82f into eb92c9f
Browse files Browse the repository at this point in the history
  • Loading branch information
dbindel committed Dec 11, 2018
2 parents eb92c9f + d06f82f commit 3dda838
Show file tree
Hide file tree
Showing 25 changed files with 634 additions and 214 deletions.
117 changes: 117 additions & 0 deletions libensemble/comms/tcp_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
TCP-based bidirectional communicator
------------------------------------
"""

from libensemble.comms.comms import QComm
from multiprocessing.managers import BaseManager
from multiprocessing import Queue


class ServerQCommManager:
"""Set up a QComm manager server.
The QComm manager server provides shared (networked) access to message
queues for communication between the libensemble manager and workers.
"""

def __init__(self, port, authkey):
"Initialize the server on localhost at an indicated TCP port and key."
queues = {'shared': Queue()}
class ServerQueueManager(BaseManager):
pass
def get_queue(name):
if name not in queues:
queues[name] = Queue()
return queues[name]
ServerQueueManager.register('get_queue', callable=get_queue)
self.manager = ServerQueueManager(address=('', port), authkey=authkey)
self.manager.start()

def shutdown(self):
"Shutdown the manager"
self.manager.shutdown()

@property
def address(self):
"Get IP address for socket."
return self.manager.address

def get_queue(self, name):
"Get a queue from the shared manager"
return self.manager.get_queue(name)

def get_inbox(self, workerID):
"Get a worker inbox queue."
return self.get_queue('inbox{}'.format(workerID))

def get_outbox(self, workerID):
"Get a worker outbox queue."
return self.get_queue('outbox{}'.format(workerID))

def get_shared(self):
"Get a shared queue for worker subscription."
return self.get_queue('shared')

def await_workers(self, nworkers):
"Wait for a pool of workers to join."
sharedq = self.get_shared()
wqueues = []
for _ in range(nworkers):
workerID = sharedq.get()
inbox = self.get_outbox(workerID)
outbox = self.get_inbox(workerID)
wqueues.append(QComm(inbox, outbox))
return wqueues

def __enter__(self):
"Context enter."
return self

def __exit__(self, etype, value, traceback):
"Context exit."
self.shutdown()


class ClientQCommManager:
"""Set up a client to the QComm server.
The client runs at the worker and mediates access to the shared queues
provided by the server.
"""

def __init__(self, ip, port, authkey, workerID):
"Attach by TCP to (ip, port) with a uniquely given workerID"
self.workerID = workerID
class ClientQueueManager(BaseManager):
pass
ClientQueueManager.register('get_queue')
self.manager = ClientQueueManager(address=(ip, port), authkey=authkey)
self.manager.connect()
sharedq = self.get_shared()
sharedq.put(workerID)

def get_queue(self, name):
"Get a queue from the server."
return self.manager.get_queue(name)

def get_inbox(self):
"Get this worker's inbox."
return self.get_queue('inbox{}'.format(self.workerID))

def get_outbox(self):
"Get this worker's outbox."
return self.get_queue('outbox{}'.format(self.workerID))

def get_shared(self):
"Get the shared queue for worker sign-up."
return self.get_queue('shared')

def __enter__(self):
"Enter the context."
return QComm(self.get_inbox(), self.get_outbox())

def __exit__(self, etype, value, traceback):
"Exit the context."
pass
38 changes: 18 additions & 20 deletions libensemble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,32 +287,30 @@ def register_calc(self, full_path, calc_type='sim', desc=None):
"Default {} app already set".format(calc_type))
self.default_apps[calc_type] = Application(full_path, calc_type, desc)

def manager_poll(self):
def manager_poll(self, comm):
""" Polls for a manager signal
The job controller manager_signal attribute will be updated.
"""

#Will use MPI_MODE from settings.py but for now assume MPI
from libensemble.message_numbers import \
STOP_TAG, MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL
from mpi4py import MPI

# Manager Signals
# Stop tag may be manager interupt as diff kill/stop/pause....
comm = MPI.COMM_WORLD
status = MPI.Status()
if comm.Iprobe(source=0, tag=STOP_TAG, status=status):
logger.info('Manager probe hit true')
man_signal = comm.recv(source=0, tag=STOP_TAG, status=status)
if man_signal == MAN_SIGNAL_FINISH:
self.manager_signal = 'finish'
elif man_signal == MAN_SIGNAL_KILL:
self.manager_signal = 'kill'
else:
logger.warning("Received unrecognized manager signal {} - "
"ignoring".format(man_signal))
# Check for messages; disregard anything but a stop signal
if not comm.mail_flag():
return
mtag, man_signal = comm.recv()
if mtag != STOP_TAG:
return

# Process the signal and push back on comm (for now)
logger.info('Manager probe hit true')
if man_signal == MAN_SIGNAL_FINISH:
self.manager_signal = 'finish'
elif man_signal == MAN_SIGNAL_KILL:
self.manager_signal = 'kill'
else:
logger.warning("Received unrecognized manager signal {} - "
"ignoring".format(man_signal))
comm.push_back(mtag, Work)

def get_job(self, jobid):
""" Returns the job object for the supplied job ID """
Expand Down
153 changes: 144 additions & 9 deletions libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@

__all__ = ['libE']

import os
import sys
import logging
import traceback
import random
import socket

import numpy as np

import libensemble.util.launcher as launcher
from libensemble.util.timer import Timer
from libensemble.history import History
from libensemble.libE_manager import manager_main, ManagerException
from libensemble.libE_worker import worker_main
from libensemble.alloc_funcs.give_sim_work_first import give_sim_work_first
from libensemble.comms.comms import QCommProcess, Timeout
from libensemble.comms.logs import manager_logging_config

from libensemble.comms.tcp_mgr import ServerQCommManager, ClientQCommManager

logger = logging.getLogger(__name__)
#For debug messages in this module - uncomment
Expand Down Expand Up @@ -125,13 +129,13 @@ def libE(sim_specs, gen_specs, exit_criteria,
2 = Manager timed out and ended simulation
"""

if 'nprocesses' in libE_specs:
libE_f = libE_local
else:
libE_f = libE_mpi

return libE_f(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)
comms_type = libE_specs.get('comms', 'mpi')
libE_funcs = {'mpi': libE_mpi,
'tcp': libE_tcp,
'local': libE_local}
assert comms_type in libE_funcs, "Unknown comms type: {}".format(comms_type)
return libE_funcs[comms_type](sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info,
Expand Down Expand Up @@ -290,6 +294,137 @@ def cleanup():
on_cleanup=cleanup)


# ==================== TCP version =================================


def get_ip():
"Get the IP address of the current host"
try:
return socket.gethostbyname(socket.gethostname())
except socket.gaierror:
return 'localhost'


def libE_tcp_authkey():
"Generate an authkey if not assigned by manager."
nonce = random.randrange(99999)
return 'libE_auth_{}'.format(nonce)


def libE_tcp_default_ID():
"Assign a (we hope unique) worker ID if not assigned by manager."
return "{}_pid{}".format(get_ip(), os.getpid())


def libE_tcp(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE."

libE_specs = check_inputs(True, libE_specs,
alloc_specs, sim_specs, gen_specs,
exit_criteria, H0)

if 'workerID' in libE_specs:
libE_tcp_worker(sim_specs, gen_specs, libE_specs)
return [], persis_info, []

return libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_tcp_worker_launcher(libE_specs):
"Get a launch function from libE_specs."
if 'worker_launcher' in libE_specs:
worker_launcher = libE_specs['worker_launcher']
else:
worker_cmd = libE_specs['worker_cmd']
def worker_launcher(specs):
"Basic worker launch function."
return launcher.launch(worker_cmd, specs)
return worker_launcher


def libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf):
"Launch nworkers workers that attach back to a managers server."
worker_procs = []
specs = {'manager_ip' : ip,
'manager_port' : port,
'authkey' : authkey}
with Timer() as timer:
for w in range(1, nworkers+1):
logger.info("Manager is launching worker {}".format(w))
if workers is not None:
specs['worker_ip'] = workers[w-1]
specs['tunnel_port'] = 0x71BE
specs['workerID'] = w
worker_procs.append(launchf(specs))
logger.info("Manager is awaiting {} workers".format(nworkers))
wcomms = manager.await_workers(nworkers)
logger.info("Manager connected to {} workers ({} s)".
format(nworkers, timer.elapsed))
return worker_procs, wcomms


def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE at manager."

hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)

# Set up a worker launcher
launchf = libE_tcp_worker_launcher(libE_specs)

# Get worker launch parameters and fill in defaults for TCP/IP conn
if 'nprocesses' in libE_specs:
workers = None
nworkers = libE_specs['nprocesses']
elif 'workers' in libE_specs:
workers = libE_specs['workers']
nworkers = len(workers)
ip = libE_specs.get('ip', None) or get_ip()
port = libE_specs.get('port', 0)
authkey = libE_specs.get('authkey', libE_tcp_authkey())

with ServerQCommManager(port, authkey.encode('utf-8')) as manager:

# Get port if needed because of auto-assignment
if port == 0:
_, port = manager.address

manager_logging_config(filename='ensemble.log', level=logging.DEBUG)
logger.info("Launched server at ({}, {})".format(ip, port))

# Launch worker team and set up logger
worker_procs, wcomms =\
libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf)

def cleanup():
"Handler to clean up launched team."
for wp in worker_procs:
launcher.cancel(wp, timeout=libE_specs.get('worker_timeout'))

# Run generic manager
return libE_manager(wcomms, sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, hist,
on_cleanup=cleanup)


def libE_tcp_worker(sim_specs, gen_specs, libE_specs):
"Main routine for TCP worker launched by libE."

ip = libE_specs['ip']
port = libE_specs['port']
authkey = libE_specs['authkey']
workerID = libE_specs['workerID']

with ClientQCommManager(ip, port, authkey, workerID) as comm:
worker_main(comm, sim_specs, gen_specs,
workerID=workerID, log_comm=True)
logger.debug("Worker {} exiting".format(workerID))


# ==================== Common input checking =================================


Expand Down
9 changes: 5 additions & 4 deletions libensemble/sim_funcs/job_control_hworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#Alt send values through X
sim_count = 0

def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
def polling_loop(comm, jobctl, job, timeout_sec=6.0, delay=1.0):
import time
start = time.time()

Expand All @@ -17,7 +17,7 @@ def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
time.sleep(delay)

#print('Probing manager at time: ', time.time() - start)
jobctl.manager_poll()
jobctl.manager_poll(comm)
if jobctl.manager_signal == 'finish':
jobctl.kill(job)
calc_status = MAN_SIGNAL_FINISH # Worker will pick this up and close down
Expand Down Expand Up @@ -62,10 +62,11 @@ def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
return job, calc_status


def job_control_hworld(H, persis_info, sim_specs, _):
def job_control_hworld(H, persis_info, sim_specs, libE_specs):
""" Test of launching and polling job and exiting on job finish"""
jobctl = MPIJobController.controller
cores = sim_specs['cores']
comm = libE_specs['comm']

args_for_sim = 'sleep 3'
#pref send this in X as a sim_in from calling script
Expand All @@ -86,7 +87,7 @@ def job_control_hworld(H, persis_info, sim_specs, _):
timeout = 20.0

job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args_for_sim, hyperthreads=True)
job, calc_status = polling_loop(jobctl, job, timeout)
job, calc_status = polling_loop(comm, jobctl, job, timeout)

#assert job.finished, "job.finished should be True. Returned " + str(job.finished)
#assert job.state == 'FINISHED', "job.state should be FINISHED. Returned " + str(job.state)
Expand Down

0 comments on commit 3dda838

Please sign in to comment.