Skip to content

Commit

Permalink
Merge 4914855 into c644b7f
Browse files Browse the repository at this point in the history
  • Loading branch information
garlick committed Apr 18, 2018
2 parents c644b7f + 4914855 commit 63c6719
Show file tree
Hide file tree
Showing 6 changed files with 646 additions and 250 deletions.
41 changes: 41 additions & 0 deletions src/broker/broker.c
Expand Up @@ -1276,6 +1276,46 @@ static void cmb_disconnect_cb (flux_t *h, flux_msg_handler_t *mh,
/* no response */
}

/* Publish event synchronously.
* User sends request message with topic "cmb.pub.<topic>" to rank 0.
* Service publishes event with same payload as request, topic=<topic>,
* then responds with success or failure.
* The synchronization use case driving the need for this is
* discussed in issue #342
*/
static void cmb_pub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
broker_ctx_t *ctx = arg;
const char *topic;
const char *json_str;
flux_msg_t *event = NULL;

if (overlay_get_rank (ctx->overlay) > 0) {
errno = ENOSYS;
goto error;
}
if (flux_request_decode (msg, &topic, &json_str) < 0)
goto error;
if (strlen (topic) <= 8) {
errno = EPROTO;
goto error;
}
topic += 8; // push past "cmb.pub." (8 chars)
if (!(event = flux_event_encode (topic, json_str)))
goto error;
if (flux_send (h, event, 0) < 0)
goto error;
if (flux_respond (h, msg, 0, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
goto done;
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
done:
flux_msg_destroy (event);
}

static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -1346,6 +1386,7 @@ static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.pub.*", cmb_pub_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
Expand Down
23 changes: 22 additions & 1 deletion src/modules/wreck/Makefile.am
Expand Up @@ -30,7 +30,7 @@ fluxlibexec_PROGRAMS = \
fluxmod_libadd = $(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libflux-internal.la

job_la_SOURCES = job.c rcalc.c rcalc.h
job_la_SOURCES = job.c rcalc.c rcalc.h wreck_job.c wreck_job.h
job_la_LDFLAGS = $(AM_LDFLAGS) $(fluxmod_ldflags) -module
job_la_LIBADD = $(fluxmod_libadd)

Expand All @@ -50,6 +50,27 @@ wrexecd_LDADD = \
$(wrexecd_libs) \
$(ZMQ_LIBS) $(LUA_LIB) $(LIBPTHREAD)

TESTS = \
test_wreck_job.t

test_ldadd = \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la \
$(top_builddir)/src/common/libtap/libtap.la \
$(ZMQ_LIBS) $(LIBPTHREAD)

test_cppflags = \
$(AM_CPPFLAGS) \
-I$(top_srcdir)/src/common/libtap

check_PROGRAMS = $(TESTS)

test_wreck_job_t_SOURCES = test/wreck_job.c
test_wreck_job_t_CPPFLAGS = $(test_cppflags)
test_wreck_job_t_LDADD = \
$(top_builddir)/src/modules/wreck/wreck_job.o \
$(test_ldadd)

dist_wreckscripts_SCRIPTS = \
lua.d/01-env.lua \
lua.d/02-affinity.lua \
Expand Down

0 comments on commit 63c6719

Please sign in to comment.