Skip to content

Commit

Permalink
flux-start: add embedded server for testing
Browse files Browse the repository at this point in the history
Problem: tests that involve starting and stopping brokers are
difficult to orchestrate using flux-start, but we will need
support for running such tests in CI.

Use the new usock_service to embed a server in flux-start.
The server creates a socket named 'start' in the rundir, so
a client just needs to make a substitution to the value of
FLUX_URI to be able to connect.

Currently the server has support for the following methods:

start.status
  Return an array of procs that includes broker PIDs in rank order

disconnect
  Log receipt of disconnect message.  This is a placeholder for
  future streaming socket management.

Add a test front end command that provides the client side for the
start.status RPC, and is available to add sub-commands for simple,
shell script driven testing.  More sophisticated, event driven test
programs would be written in python and combine broker and flux-start
communication.

Add a few tests to t0001-basic.t to exercise basic function.
  • Loading branch information
garlick committed May 13, 2021
1 parent 64ef7c6 commit 8323da3
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 0 deletions.
92 changes: 92 additions & 0 deletions src/cmd/flux-start.c
Expand Up @@ -23,6 +23,7 @@
#include <signal.h>
#include <argz.h>
#include <sys/ioctl.h>
#include <jansson.h>
#include <flux/core.h>
#include <flux/optparse.h>

Expand All @@ -36,6 +37,7 @@
#include "src/common/libpmi/clique.h"
#include "src/common/libpmi/dgetline.h"
#include "src/common/libhostlist/hostlist.h"
#include "src/common/librouter/usock_service.h"

#define DEFAULT_KILLER_TIMEOUT 20.0

Expand All @@ -53,6 +55,8 @@ static struct {
zhash_t *kvs;
struct pmi_simple_server *srv;
} pmi;
flux_t *h;
flux_msg_handler_t **handlers;
} ctx;

struct client {
Expand Down Expand Up @@ -574,6 +578,90 @@ void restore_termios (void)
log_err ("tcsetattr");
}

void status_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct client *cli;
json_t *procs = NULL;

if (!(procs = json_array()))
goto nomem;
cli = zlist_first (ctx.clients);
while (cli) {
json_t *entry;

if (!(entry = json_pack ("{s:i}",
"pid", flux_subprocess_pid (cli->p))))
goto nomem;
if (json_array_append_new (procs, entry) < 0) {
json_decref (entry);
goto nomem;
}
cli = zlist_next (ctx.clients);
}
if (flux_respond_pack (h, msg, "{s:O}", "procs", procs) < 0)
log_err ("error responding to status request");
json_decref (procs);
return;
nomem:
errno = ENOMEM;
if (flux_respond_error (h, msg, errno, NULL) < 0)
log_err ("error responding to status request");
json_decref (procs);
}

void disconnect_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
char *uuid = NULL;

if (flux_msg_get_route_first (msg, &uuid) < 0)
goto done;
if (optparse_hasopt (ctx.opts, "verbose"))
log_msg ("disconnect from %.5s", uuid);
done:
free (uuid);
}

const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "start.status", status_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "disconnect", disconnect_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

/* Set up test-related RPC handlers on local://${rundir}/start
* Ensure that service-related reactor watchers do not contribute to the
* reactor usecount, since the reactor is expected to exit once the
* subprocesses are complete.
*/
void start_server_initialize (const char *rundir, bool verbose)
{
char path[1024];
if (snprintf (path, sizeof (path), "%s/start", rundir) >= sizeof (path))
log_msg_exit ("internal buffer overflow");
if (!(ctx.h = usock_service_create (ctx.reactor, path, verbose)))
log_err_exit ("could not created embedded flux-start server");
if (flux_msg_handler_addvec (ctx.h, htab, NULL, &ctx.handlers) < 0)
log_err_exit ("could not register service methods");
/* Service related watchers:
* - usock server listen fd
* - flux_t handle watcher (adds 2 active prep/check watchers)
*/
int ignore_watchers = 3;
while (ignore_watchers-- > 0)
flux_reactor_active_decref (ctx.reactor);
}

void start_server_finalize (void)
{
flux_msg_handler_delvec (ctx.handlers);
flux_close (ctx.h);
}

/* Start an internal PMI server, and then launch the requested number of
* broker processes that inherit a file desciptor to the internal PMI
* server. They will use that to bootstrap. Since the PMI server is
Expand Down Expand Up @@ -612,6 +700,9 @@ int start_session (const char *cmd_argz, size_t cmd_argz_len,
else
scratch_dir = create_scratch_dir ();

start_server_initialize (scratch_dir,
optparse_hasopt (ctx.opts, "verbose"));

if (optparse_hasopt (ctx.opts, "trace-pmi-server"))
flags |= PMI_SIMPLE_SERVER_TRACE;

Expand Down Expand Up @@ -649,6 +740,7 @@ int start_session (const char *cmd_argz, size_t cmd_argz_len,
log_err_exit ("flux_reactor_run");

pmi_server_finalize ();
start_server_finalize ();

hostlist_destroy (hosts);
free (scratch_dir);
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Expand Up @@ -221,6 +221,7 @@ EXTRA_DIST= \
shell/initrc/tests \
flux-jobs/tests \
scripts/run_timeout.py \
scripts/startctl.py \
jobspec \
flux-resource \
resource/get-xml-test.py \
Expand Down
33 changes: 33 additions & 0 deletions t/scripts/startctl.py
@@ -0,0 +1,33 @@
###############################################################
# Copyright 2021 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################

# startctl - tell flux-start to do things
#
# Usage: flux start -s1 flux python startctl.py

import os
import flux


def status(h):
print(h.rpc("start.status").get_str())


def main():
uri = os.environ.get("FLUX_URI").replace("local-0", "start")
h = flux.Flux(uri)
status(h)


if __name__ == "__main__":
main()


# vi: ts=4 sw=4 expandtab
15 changes: 15 additions & 0 deletions t/t0001-basic.t
Expand Up @@ -13,6 +13,7 @@ test -n "$FLUX_TESTS_LOGFILE" && set -- "$@" --logfile
. `dirname $0`/sharness.sh

RPC=${FLUX_BUILD_DIR}/t/request/rpc
startctl=${SHARNESS_TEST_SRCDIR}/scripts/startctl.py

test_expect_success 'TEST_NAME is set' '
test -n "$TEST_NAME"
Expand Down Expand Up @@ -101,6 +102,20 @@ test_expect_success 'flux-start --test-hosts with insufficient hosts fails' "
test_expect_success 'flux-start --test-hosts with garbled hosts fails' "
test_must_fail flux start ${ARGS} -s2 --test-hosts=foo] /bin/true
"
test_expect_success 'flux-start embedded server works from initial program' "
flux start -v ${ARGS} -s1 flux python ${startctl} \
>startctl.out 2>startctl.err
"
test_expect_success HAVE_JQ 'flux-start embedded server status got JSON' "
jq -c . <startctl.out
"
test_expect_success 'flux-start embedded server logs hi/bye from client' "
grep hi startctl.err &&
grep bye startctl.err
"
test_expect_success 'flux-start embedded server logs disconnect notification' "
grep 'disconnect from' startctl.err
"
test_expect_success 'flux-start in exec mode passes through errors from command' "
test_must_fail flux start ${ARGS} /bin/false
"
Expand Down

0 comments on commit 8323da3

Please sign in to comment.