Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/comms #101

Merged
merged 17 commits into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
117 changes: 117 additions & 0 deletions libensemble/comms/tcp_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
TCP-based bidirectional communicator
------------------------------------

"""

from libensemble.comms.comms import QComm
from multiprocessing.managers import BaseManager
from multiprocessing import Queue


class ServerQCommManager:
"""Set up a QComm manager server.

The QComm manager server provides shared (networked) access to message
queues for communication between the libensemble manager and workers.
"""

def __init__(self, port, authkey):
"Initialize the server on localhost at an indicated TCP port and key."
queues = {'shared': Queue()}
class ServerQueueManager(BaseManager):
pass
def get_queue(name):
if name not in queues:
queues[name] = Queue()
return queues[name]
ServerQueueManager.register('get_queue', callable=get_queue)
self.manager = ServerQueueManager(address=('', port), authkey=authkey)
self.manager.start()

def shutdown(self):
"Shutdown the manager"
self.manager.shutdown()

@property
def address(self):
"Get IP address for socket."
return self.manager.address

def get_queue(self, name):
"Get a queue from the shared manager"
return self.manager.get_queue(name)

def get_inbox(self, workerID):
"Get a worker inbox queue."
return self.get_queue('inbox{}'.format(workerID))

def get_outbox(self, workerID):
"Get a worker outbox queue."
return self.get_queue('outbox{}'.format(workerID))

def get_shared(self):
"Get a shared queue for worker subscription."
return self.get_queue('shared')

def await_workers(self, nworkers):
"Wait for a pool of workers to join."
sharedq = self.get_shared()
wqueues = []
for _ in range(nworkers):
workerID = sharedq.get()
inbox = self.get_outbox(workerID)
outbox = self.get_inbox(workerID)
wqueues.append(QComm(inbox, outbox))
return wqueues

def __enter__(self):
"Context enter."
return self

def __exit__(self, etype, value, traceback):
"Context exit."
self.shutdown()


class ClientQCommManager:
"""Set up a client to the QComm server.

The client runs at the worker and mediates access to the shared queues
provided by the server.
"""

def __init__(self, ip, port, authkey, workerID):
"Attach by TCP to (ip, port) with a uniquely given workerID"
self.workerID = workerID
class ClientQueueManager(BaseManager):
pass
ClientQueueManager.register('get_queue')
self.manager = ClientQueueManager(address=(ip, port), authkey=authkey)
self.manager.connect()
sharedq = self.get_shared()
sharedq.put(workerID)

def get_queue(self, name):
"Get a queue from the server."
return self.manager.get_queue(name)

def get_inbox(self):
"Get this worker's inbox."
return self.get_queue('inbox{}'.format(self.workerID))

def get_outbox(self):
"Get this worker's outbox."
return self.get_queue('outbox{}'.format(self.workerID))

def get_shared(self):
"Get the shared queue for worker sign-up."
return self.get_queue('shared')

def __enter__(self):
"Enter the context."
return QComm(self.get_inbox(), self.get_outbox())

def __exit__(self, etype, value, traceback):
"Exit the context."
pass
38 changes: 18 additions & 20 deletions libensemble/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,32 +287,30 @@ def register_calc(self, full_path, calc_type='sim', desc=None):
"Default {} app already set".format(calc_type))
self.default_apps[calc_type] = Application(full_path, calc_type, desc)

def manager_poll(self):
def manager_poll(self, comm):
""" Polls for a manager signal

The job controller manager_signal attribute will be updated.

"""

#Will use MPI_MODE from settings.py but for now assume MPI
from libensemble.message_numbers import \
STOP_TAG, MAN_SIGNAL_FINISH, MAN_SIGNAL_KILL
from mpi4py import MPI

# Manager Signals
# Stop tag may be manager interupt as diff kill/stop/pause....
comm = MPI.COMM_WORLD
status = MPI.Status()
if comm.Iprobe(source=0, tag=STOP_TAG, status=status):
logger.info('Manager probe hit true')
man_signal = comm.recv(source=0, tag=STOP_TAG, status=status)
if man_signal == MAN_SIGNAL_FINISH:
self.manager_signal = 'finish'
elif man_signal == MAN_SIGNAL_KILL:
self.manager_signal = 'kill'
else:
logger.warning("Received unrecognized manager signal {} - "
"ignoring".format(man_signal))
# Check for messages; disregard anything but a stop signal
if not comm.mail_flag():
return
mtag, man_signal = comm.recv()
if mtag != STOP_TAG:
return

# Process the signal and push back on comm (for now)
logger.info('Manager probe hit true')
if man_signal == MAN_SIGNAL_FINISH:
self.manager_signal = 'finish'
elif man_signal == MAN_SIGNAL_KILL:
self.manager_signal = 'kill'
else:
logger.warning("Received unrecognized manager signal {} - "
"ignoring".format(man_signal))
comm.push_back(mtag, Work)

def get_job(self, jobid):
""" Returns the job object for the supplied job ID """
Expand Down
153 changes: 144 additions & 9 deletions libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@

__all__ = ['libE']

import os
import sys
import logging
import traceback
import random
import socket

import numpy as np

import libensemble.util.launcher as launcher
from libensemble.util.timer import Timer
from libensemble.history import History
from libensemble.libE_manager import manager_main, ManagerException
from libensemble.libE_worker import worker_main
from libensemble.alloc_funcs.give_sim_work_first import give_sim_work_first
from libensemble.comms.comms import QCommProcess, Timeout
from libensemble.comms.logs import manager_logging_config

from libensemble.comms.tcp_mgr import ServerQCommManager, ClientQCommManager

logger = logging.getLogger(__name__)
#For debug messages in this module - uncomment
Expand Down Expand Up @@ -125,13 +129,13 @@ def libE(sim_specs, gen_specs, exit_criteria,
2 = Manager timed out and ended simulation
"""

if 'nprocesses' in libE_specs:
libE_f = libE_local
else:
libE_f = libE_mpi

return libE_f(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)
comms_type = libE_specs.get('comms', 'mpi')
libE_funcs = {'mpi': libE_mpi,
'tcp': libE_tcp,
'local': libE_local}
assert comms_type in libE_funcs, "Unknown comms type: {}".format(comms_type)
return libE_funcs[comms_type](sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info,
Expand Down Expand Up @@ -290,6 +294,137 @@ def cleanup():
on_cleanup=cleanup)


# ==================== TCP version =================================


def get_ip():
"Get the IP address of the current host"
try:
return socket.gethostbyname(socket.gethostname())
except socket.gaierror:
return 'localhost'


def libE_tcp_authkey():
"Generate an authkey if not assigned by manager."
nonce = random.randrange(99999)
return 'libE_auth_{}'.format(nonce)


def libE_tcp_default_ID():
"Assign a (we hope unique) worker ID if not assigned by manager."
return "{}_pid{}".format(get_ip(), os.getpid())


def libE_tcp(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE."

libE_specs = check_inputs(True, libE_specs,
alloc_specs, sim_specs, gen_specs,
exit_criteria, H0)

if 'workerID' in libE_specs:
libE_tcp_worker(sim_specs, gen_specs, libE_specs)
return [], persis_info, []

return libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_tcp_worker_launcher(libE_specs):
"Get a launch function from libE_specs."
if 'worker_launcher' in libE_specs:
worker_launcher = libE_specs['worker_launcher']
else:
worker_cmd = libE_specs['worker_cmd']
def worker_launcher(specs):
"Basic worker launch function."
return launcher.launch(worker_cmd, specs)
return worker_launcher


def libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf):
"Launch nworkers workers that attach back to a managers server."
worker_procs = []
specs = {'manager_ip' : ip,
'manager_port' : port,
'authkey' : authkey}
with Timer() as timer:
for w in range(1, nworkers+1):
logger.info("Manager is launching worker {}".format(w))
if workers is not None:
specs['worker_ip'] = workers[w-1]
specs['tunnel_port'] = 0x71BE
specs['workerID'] = w
worker_procs.append(launchf(specs))
logger.info("Manager is awaiting {} workers".format(nworkers))
wcomms = manager.await_workers(nworkers)
logger.info("Manager connected to {} workers ({} s)".
format(nworkers, timer.elapsed))
return worker_procs, wcomms


def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE at manager."

hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)

# Set up a worker launcher
launchf = libE_tcp_worker_launcher(libE_specs)

# Get worker launch parameters and fill in defaults for TCP/IP conn
if 'nprocesses' in libE_specs:
workers = None
nworkers = libE_specs['nprocesses']
elif 'workers' in libE_specs:
workers = libE_specs['workers']
nworkers = len(workers)
ip = libE_specs.get('ip', None) or get_ip()
port = libE_specs.get('port', 0)
authkey = libE_specs.get('authkey', libE_tcp_authkey())

with ServerQCommManager(port, authkey.encode('utf-8')) as manager:

# Get port if needed because of auto-assignment
if port == 0:
_, port = manager.address

manager_logging_config(filename='ensemble.log', level=logging.DEBUG)
logger.info("Launched server at ({}, {})".format(ip, port))

# Launch worker team and set up logger
worker_procs, wcomms =\
libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf)

def cleanup():
"Handler to clean up launched team."
for wp in worker_procs:
launcher.cancel(wp, timeout=libE_specs.get('worker_timeout'))

# Run generic manager
return libE_manager(wcomms, sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, hist,
on_cleanup=cleanup)


def libE_tcp_worker(sim_specs, gen_specs, libE_specs):
"Main routine for TCP worker launched by libE."

ip = libE_specs['ip']
port = libE_specs['port']
authkey = libE_specs['authkey']
workerID = libE_specs['workerID']

with ClientQCommManager(ip, port, authkey, workerID) as comm:
worker_main(comm, sim_specs, gen_specs,
workerID=workerID, log_comm=True)
logger.debug("Worker {} exiting".format(workerID))


# ==================== Common input checking =================================


Expand Down
9 changes: 5 additions & 4 deletions libensemble/sim_funcs/job_control_hworld.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#Alt send values through X
sim_count = 0

def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
def polling_loop(comm, jobctl, job, timeout_sec=6.0, delay=1.0):
import time
start = time.time()

Expand All @@ -17,7 +17,7 @@ def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
time.sleep(delay)

#print('Probing manager at time: ', time.time() - start)
jobctl.manager_poll()
jobctl.manager_poll(comm)
if jobctl.manager_signal == 'finish':
jobctl.kill(job)
calc_status = MAN_SIGNAL_FINISH # Worker will pick this up and close down
Expand Down Expand Up @@ -62,10 +62,11 @@ def polling_loop(jobctl, job, timeout_sec=6.0, delay=1.0):
return job, calc_status


def job_control_hworld(H, persis_info, sim_specs, _):
def job_control_hworld(H, persis_info, sim_specs, libE_specs):
""" Test of launching and polling job and exiting on job finish"""
jobctl = MPIJobController.controller
cores = sim_specs['cores']
comm = libE_specs['comm']

args_for_sim = 'sleep 3'
#pref send this in X as a sim_in from calling script
Expand All @@ -86,7 +87,7 @@ def job_control_hworld(H, persis_info, sim_specs, _):
timeout = 20.0

job = jobctl.launch(calc_type='sim', num_procs=cores, app_args=args_for_sim, hyperthreads=True)
job, calc_status = polling_loop(jobctl, job, timeout)
job, calc_status = polling_loop(comm, jobctl, job, timeout)

#assert job.finished, "job.finished should be True. Returned " + str(job.finished)
#assert job.state == 'FINISHED', "job.state should be FINISHED. Returned " + str(job.state)
Expand Down