Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: add flux_event_publish() interface #1512

merged 15 commits into from May 9, 2018


None yet
4 participants
Copy link

garlick commented May 8, 2018

This PR adds functions for "synchronous" event publication. That is, they encapsulate an event in a request message and send it as an RPC to a broker service that assigns a sequence number to it, then responds to the RPC in parallel with the event propagation.

This was added in a more ad-hoc way recently, with the broker service. That is replaced here with a more elaborate service that breaks event publishing out from broker.c to publisher.[ch]. The new implementation adds a couple of missing features, like raw payloads and privacy flag.

The following missing API functions were added:

/* Publish an event.
 * The future is fulfilled once the event has been assigned a sequence number,
 * and does not indicate that the event has yet reached all subscribers.
flux_future_t *flux_event_publish (flux_t *h,
                                   const char *topic, int flags,
                                   const char *json_str);

flux_future_t *flux_event_publish_pack (flux_t *h,
                                        const char *topic, int flags,
                                        const char *fmt, ...);

flux_future_t *flux_event_publish_raw (flux_t *h,
                                       const char *topic, int flags,
                                       const void *data, int len);

/* Obtain the event sequence number from the fulfilled
 * flux_event_publish() future.
int flux_event_publish_get_seq (flux_future_t *f, int *seq);

The job module was switched from the ad-hoc interface to this one.

flux-event(1) was extended to optionally use the above interface (flux event pub -s ...).

Man pages and unit tests were (minimally) updated.


This comment has been minimized.

Copy link

coveralls commented May 8, 2018

Coverage Status

Coverage increased (+0.02%) to 78.853% when pulling a55a78f on garlick:event_pub into dc91642 on flux-framework:master.

@garlick garlick force-pushed the garlick:event_pub branch from e1be13a to 82d052e May 8, 2018


This comment has been minimized.

Copy link

codecov-io commented May 8, 2018

Codecov Report

Merging #1512 into master will increase coverage by 0.02%.
The diff coverage is 81.43%.

@@            Coverage Diff             @@
##           master    #1512      +/-   ##
+ Coverage   78.53%   78.55%   +0.02%     
  Files         164      165       +1     
  Lines       30653    30814     +161     
+ Hits        24074    24207     +133     
- Misses       6579     6607      +28
Impacted Files Coverage Δ
src/modules/wreck/job.c 70.18% <100%> (-0.07%) ⬇️
src/common/libflux/event.c 79.85% <62.26%> (-10.85%) ⬇️
src/broker/broker.c 76.77% <64.7%> (-0.21%) ⬇️
src/broker/publisher.c 84.94% <84.94%> (ø)
src/common/libflux/message.c 81.29% <86.66%> (-0.31%) ⬇️
src/cmd/flux-event.c 77.57% <96.49%> (+7.79%) ⬆️
src/common/libutil/base64.c 95.07% <0%> (-0.71%) ⬇️
src/common/libflux/future.c 88.98% <0%> (-0.45%) ⬇️
src/broker/overlay.c 73.81% <0%> (-0.32%) ⬇️
... and 7 more

This comment has been minimized.

Copy link
Member Author

garlick commented May 8, 2018

Yeah, I'll work on that coverage a bit!


This comment has been minimized.

Copy link

grondo commented May 8, 2018

Nice work! I'll merge once you are happy with the coverage.

garlick added some commits Apr 26, 2018

broker/publisher: add event publshing service
Problem: the broker RPC handler does not
handle raw (non-JSON) payloads or "private" events,
and is generally a little half baked.

Add a new broker service "" that is loaded
only on rank 0. The request contains
a topic string, flags, and optional base64-encoded payload.

The event's userid and rolemask parameters are copied
from the request message.
libflux/event: add flux_event_publish()
Add functions for publishing events.  These functions
return a future which is fulfilled once the event has
been allocated a sequence number:

The sequence number assigned to the event is available via
cmd/flux-event: add pub --synchronous option
Add an option:  flux event pub --synchronous ...
which uses flux_event_publish() to publish the
event (synchronously), and prints the event's
sequence number.
wreck: switch job module to flux_event_publish()
Switch the code in send_create_message() from
the hardwired RPC over to flux_event_publish().
build: update man3 clean target
Problem:  make clean in doc/man3 does not remove
flux_future_then.3 build product.

doc/flux-event(1): describe pub -s, -p options
Add descriptions for
  -s,--synchronous      use RPC
  -p,--privacy          set message privacy flag
t/t0004-event.t: cover RPC
Call flux-event with the -s option to exercise
the RPC, with different payload types,
and with and without the privacy flag.
cmd/flux-event: [cleanup] refactor event_pub()
Problem: the event_pub() function was getting too large
and unclear.

Factor out four event publication functions.
cmd/flux-event: add pub --private option
Add option to publish event with FLUX_MSGFLAG_PRIVATE
set, so only sender and instance owner will receive.
libflux/message: add checks to flux_msg_set_payload
Problem: flux_msg_set_payload() allows a FLUX_MSGFLAG_JSON
payload to be added that is not valid JSON.

Move the cursory check (not a full JSON object parse) from
flux_msg_set_json() to flux_msg_set_payload(), conditional
upon FLUX_MSGFLAG_JSON being set.

In addtion, veryfiy that the '\0' string terminator is
included in the payload, and that the string ends with '}'.

Finally, add an EINVAL check that the msg parameter != NULL,
since that would cause a segfault.
libflux/message: export flux_msg_set_flags()
Problem: some tests in the message unit test will no
longer be able to encode invalid messages using
flux_msg_set_payload() after we tighten up the
checks for JSON payloads.

Expose flux_msg_set_flags() for use by the unit test.
libflux/message: flux_msg_get_json catch no end brace
Problem: flux_msg_get_json()'s cursory JSON check
verifies open brace for JSON object but not closing one.

Check for closing brace and fail with EPROTO if missing.
libflux/test: fix flux_msg_set_payload() coverage
Problem: flux_msg_get_json unit test coverage relies
on the the ability to set an invalid payload
with flux_msg_set_payload().  This loophole is
about to be closed.

Use flux_msg_set_flags() to allow the same thing
to be accomplished with a lower level interface.

Also add coverage for additional some bad input to
lua/t0003-events.t: add coverage for
Send RPC with malformed input and
ensure correct response is received.

@garlick garlick force-pushed the garlick:event_pub branch from 19885f1 to a55a78f May 9, 2018


This comment has been minimized.

Copy link
Member Author

garlick commented May 9, 2018

I was able to get coverage up by adding a lua test to send malformed requests to the new service, and by augmenting the flux-event pub command to send private events. There was a bit of tweaking of libflux/message along the way (minor). I think this is probably ready to merge?


This comment has been minimized.

Copy link

grondo commented May 9, 2018


@grondo grondo merged commit 54fcc97 into flux-framework:master May 9, 2018

4 checks passed

codecov/patch 81.43% of diff hit (target 78.53%)
codecov/project 78.55% (+0.02%) compared to dc91642
continuous-integration/travis-ci/pr The Travis CI build passed
coverage/coveralls Coverage increased (+0.02%) to 78.853%

@garlick garlick deleted the garlick:event_pub branch May 9, 2018


This comment has been minimized.

Copy link
Member Author

garlick commented May 9, 2018


This was referenced May 10, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.