Skip to content

Commit

Permalink
Merge 71d53fd into eb92c9f
Browse files Browse the repository at this point in the history
  • Loading branch information
dbindel committed Dec 6, 2018
2 parents eb92c9f + 71d53fd commit d80f805
Show file tree
Hide file tree
Showing 23 changed files with 592 additions and 190 deletions.
117 changes: 117 additions & 0 deletions libensemble/comms/tcp_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
TCP-based bidirectional communicator
------------------------------------
"""

from libensemble.comms.comms import QComm
from multiprocessing.managers import BaseManager
from multiprocessing import Queue


class ServerQCommManager:
"""Set up a QComm manager server.
The QComm manager server provides shared (networked) access to message
queues for communication between the libensemble manager and workers.
"""

def __init__(self, port, authkey):
"Initialize the server on localhost at an indicated TCP port and key."
queues = {'shared': Queue()}
class ServerQueueManager(BaseManager):
pass
def get_queue(name):
if name not in queues:
queues[name] = Queue()
return queues[name]
ServerQueueManager.register('get_queue', callable=get_queue)
self.manager = ServerQueueManager(address=('', port), authkey=authkey)
self.manager.start()

def shutdown(self):
"Shutdown the manager"
self.manager.shutdown()

@property
def address(self):
"Get IP address for socket."
return self.manager.address

def get_queue(self, name):
"Get a queue from the shared manager"
return self.manager.get_queue(name)

def get_inbox(self, workerID):
"Get a worker inbox queue."
return self.get_queue('inbox{}'.format(workerID))

def get_outbox(self, workerID):
"Get a worker outbox queue."
return self.get_queue('outbox{}'.format(workerID))

def get_shared(self):
"Get a shared queue for worker subscription."
return self.get_queue('shared')

def await_workers(self, nworkers):
"Wait for a pool of workers to join."
sharedq = self.get_shared()
wqueues = []
for _ in range(nworkers):
workerID = sharedq.get()
inbox = self.get_outbox(workerID)
outbox = self.get_inbox(workerID)
wqueues.append(QComm(inbox, outbox))
return wqueues

def __enter__(self):
"Context enter."
return self

def __exit__(self, etype, value, traceback):
"Context exit."
self.shutdown()


class ClientQCommManager:
"""Set up a client to the QComm server.
The client runs at the worker and mediates access to the shared queues
provided by the server.
"""

def __init__(self, ip, port, authkey, workerID):
"Attach by TCP to (ip, port) with a uniquely given workerID"
self.workerID = workerID
class ClientQueueManager(BaseManager):
pass
ClientQueueManager.register('get_queue')
self.manager = ClientQueueManager(address=(ip, port), authkey=authkey)
self.manager.connect()
sharedq = self.get_shared()
sharedq.put(workerID)

def get_queue(self, name):
"Get a queue from the server."
return self.manager.get_queue(name)

def get_inbox(self):
"Get this worker's inbox."
return self.get_queue('inbox{}'.format(self.workerID))

def get_outbox(self):
"Get this worker's outbox."
return self.get_queue('outbox{}'.format(self.workerID))

def get_shared(self):
"Get the shared queue for worker sign-up."
return self.get_queue('shared')

def __enter__(self):
"Enter the context."
return QComm(self.get_inbox(), self.get_outbox())

def __exit__(self, etype, value, traceback):
"Exit the context."
pass
147 changes: 138 additions & 9 deletions libensemble/libE.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@

__all__ = ['libE']

import os
import sys
import logging
import traceback
import random
import socket

import numpy as np

import libensemble.util.launcher as launcher
from libensemble.history import History
from libensemble.libE_manager import manager_main, ManagerException
from libensemble.libE_worker import worker_main
from libensemble.alloc_funcs.give_sim_work_first import give_sim_work_first
from libensemble.comms.comms import QCommProcess, Timeout
from libensemble.comms.logs import manager_logging_config

from libensemble.comms.tcp_mgr import ServerQCommManager, ClientQCommManager

logger = logging.getLogger(__name__)
#For debug messages in this module - uncomment
Expand Down Expand Up @@ -125,13 +128,13 @@ def libE(sim_specs, gen_specs, exit_criteria,
2 = Manager timed out and ended simulation
"""

if 'nprocesses' in libE_specs:
libE_f = libE_local
else:
libE_f = libE_mpi

return libE_f(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)
comms_type = libE_specs.get('comms', 'mpi')
libE_funcs = {'mpi': libE_mpi,
'tcp': libE_tcp,
'local': libE_local}
assert comms_type in libE_funcs, "Unknown comms type: {}".format(comms_type)
return libE_funcs[comms_type](sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info,
Expand Down Expand Up @@ -290,6 +293,132 @@ def cleanup():
on_cleanup=cleanup)


# ==================== TCP version =================================


def get_ip():
"Get the IP address of the current host"
return socket.gethostbyname(socket.gethostname())


def libE_tcp_authkey():
"Generate an authkey if not assigned by manager."
nonce = random.randrange(99999)
return 'libE_auth_{}'.format(nonce)


def libE_tcp_default_ID():
"Assign a (we hope unique) worker ID if not assigned by manager."
return "{}_pid{}".format(get_ip(), os.getpid())


def libE_tcp(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE."

libE_specs = check_inputs(True, libE_specs,
alloc_specs, sim_specs, gen_specs,
exit_criteria, H0)

if 'workerID' in libE_specs:
libE_tcp_worker(sim_specs, gen_specs, libE_specs)
return [], persis_info, []

return libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0)


def libE_tcp_worker_launcher(libE_specs):
"Get a launch function from libE_specs."
if 'worker_launcher' in libE_specs:
worker_launcher = libE_specs['worker_launcher']
else:
worker_cmd = libE_specs['worker_cmd']
def worker_launcher(specs):
"Basic worker launch function."
return launcher.launch(worker_cmd, specs)
return worker_launcher


def libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf):
"Launch nworkers workers that attach back to a managers server."
worker_procs = []
specs = {'manager_ip' : ip,
'manager_port' : port,
'authkey' : authkey}
for w in range(1, nworkers+1):
logger.info("Manager is launching worker {}".format(w))
if workers is not None:
specs['worker_ip'] = workers[w]
specs['tunnel_port'] = 0x71BE + w
specs['workerID'] = w
worker_procs.append(launchf(specs))
logger.info("Manager is awaiting {} workers".format(nworkers))
wcomms = manager.await_workers(nworkers)
logger.info("Manager connected to {} workers".format(nworkers))
return worker_procs, wcomms


def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, H0):
"Main routine for TCP multiprocessing launch of libE at manager."

hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0)

# Set up a worker launcher
launchf = libE_tcp_worker_launcher(libE_specs)

# Get worker launch parameters and fill in defaults for TCP/IP conn
if 'nprocesses' in libE_specs:
workers = None
nworkers = libE_specs['nprocesses']
elif 'workers' in libE_specs:
workers = libE_specs['workers']
nworkers = len(workers)
ip = libE_specs.get('ip', get_ip())
port = libE_specs.get('port', 0)
authkey = libE_specs.get('authkey', libE_tcp_authkey())

with ServerQCommManager(port, authkey.encode('utf-8')) as manager:

# Get port if needed because of auto-assignment
if port == 0:
_, port = manager.address

manager_logging_config(filename='ensemble.log', level=logging.DEBUG)
logger.info("Launched server at ({}, {})".format(ip, port))

# Launch worker team and set up logger
worker_procs, wcomms =\
libE_tcp_start_team(manager, nworkers, workers,
ip, port, authkey, launchf)

def cleanup():
"Handler to clean up launched team."
for wp in worker_procs:
launcher.cancel(wp, timeout=libE_specs.get('worker_timeout'))

# Run generic manager
return libE_manager(wcomms, sim_specs, gen_specs, exit_criteria,
persis_info, alloc_specs, libE_specs, hist,
on_cleanup=cleanup)


def libE_tcp_worker(sim_specs, gen_specs, libE_specs):
"Main routine for TCP worker launched by libE."

ip = libE_specs['ip']
port = libE_specs['port']
authkey = libE_specs['authkey']
workerID = libE_specs['workerID']

with ClientQCommManager(ip, port, authkey, workerID) as comm:
worker_main(comm, sim_specs, gen_specs,
workerID=workerID, log_comm=True)
logger.debug("Worker {} exiting".format(workerID))


# ==================== Common input checking =================================


Expand Down
97 changes: 97 additions & 0 deletions libensemble/tests/regression_tests/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Common plumbing for regression tests
"""

import sys
import os
import argparse


parser = argparse.ArgumentParser()
parser.add_argument('--comms', type=str, nargs='?',
choices=['local', 'tcp', 'ssh', 'client', 'mpi'],
default='mpi',
help='Type of communicator')
parser.add_argument('--nworkers', type=int, nargs='?',
help='Number of local forked processes')
parser.add_argument('--workers', type=str, nargs='+',
help='List of worker nodes')
parser.add_argument('--workerID', type=int, nargs='?',
help='Client worker ID')
parser.add_argument('--server', type=str, nargs=3,
help='Triple of (ip, port, authkey) used to reach manager')
parser.add_argument('--pwd', type=str, nargs='?',
help='Working directory to be used')
parser.add_argument('--tester_args', type=str, nargs='*',
help='Additional arguments for use by specific testers')


def mpi_parse_args(args):
"Parse arguments for MPI comms."
from mpi4py import MPI
nworkers = MPI.COMM_WORLD.Get_size()-1
is_master = MPI.COMM_WORLD.Get_rank() == 0
libE_specs = {'comm': MPI.COMM_WORLD,
'color': 0,
'comms': 'mpi'}
return nworkers, is_master, libE_specs, args.tester_args


def local_parse_args(args):
"Parse arguments for forked processes using multiprocessing."
nworkers = args.nworkers or 4
libE_specs = {'nprocesses': nworkers,
'comms': 'local'}
return nworkers, True, libE_specs, args.tester_args


def tcp_parse_args(args):
"Parse arguments for local TCP connections"
nworkers = args.nworkers or 4
cmd = [sys.executable, sys.argv[0], "--comms", "client",
"--server", "{manager_ip}", "{manager_port}", "{authkey}",
"--workerID", "{workerID}", "--nworkers", str(nworkers)]
libE_specs = {'nprocesses': nworkers,
'worker_cmd': cmd,
'comms': 'tcp'}
return nworkers, True, libE_specs, args.tester_args


def ssh_parse_args(args):
"Parse arguments for SSH with reverse tunnel."
nworkers = len(args.workers)
cmd = ["ssh", "-R", "{tunnel_port}:localhost:{manager_port}",
"python", sys.argv[0],
"--pwd", "/TODO/",
"--server", "localhost", "{tunnel_port}", "{authkey}",
"--workerID", "{workerID}", "--nworkers", str(nworkers)]
libE_specs = {'workers': args.workers,
'worker_cmd': cmd,
'comms': 'tcp'}
return nworkers, True, libE_specs, args.tester_args


def client_parse_args(args):
"Parse arguments for a TCP client."
nworkers = args.nworkers or 4
ip, port, authkey = args.server
libE_specs = {'ip': ip,
'port': int(port),
'authkey': authkey.encode('utf-8'),
'workerID': args.workerID,
'nprocesses': nworkers,
'comms': 'tcp'}
return nworkers, False, libE_specs, args.tester_args


def parse_args():
"Unified parsing interface for regression test arguments"
args = parser.parse_args(sys.argv[1:])
front_ends = {'mpi': mpi_parse_args,
'local': local_parse_args,
'tcp': tcp_parse_args,
'ssh': ssh_parse_args,
'client': client_parse_args}
if args.pwd:
os.chdir(args.pwd)
return front_ends[args.comms or 'mpi'](args)

0 comments on commit d80f805

Please sign in to comment.