Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wreck: rework job.submit, job.create, wrexecd.run handlers with continuations #1472

Merged
merged 6 commits into from Apr 18, 2018

Conversation

Projects
None yet
6 participants
@garlick
Copy link
Member

garlick commented Apr 18, 2018

This is a hopefully somewhat better structured version of PR #1471.

This one doesn't add a hash of active jobs and doesn't limt the job module's ability to ingest jobs on ranks other than 0.

It adds a struct wjob data structure (with unit tests), and reworks the job.submit, job.create, and wrexecd.run request handlers to manipulate the KVS etc using continuations rather than synchronous RPC's, to improve job module responsiveness.

It also satisfies a long standing feature request for an ability to synchronously publish events; that is, get a response indicating that the event has received a sequence number, as discussed in #337 and #342. That was necessary to remove the final synchronous step in job.create and job.submit, where the handlers block listening for an event to be published before sending the response to the user.

FWIW I reran a little script inspired by @trws that that submits 50K jobs (wtih the sched module check disabled), running 200 submits in parallel:

#!/bin/bash
ulimit 65536
seq 1 50000 | xargs -n 1 -P 200  \
    flux submit -n 1 sleep 0

Before this PR I got 134 job per second. After I get 442.

I haven't tried to probe the effect on job launch, but it should improve latency under load - for example if there are many small jobs being launched, the overhead of checking a new job not targetting the local rank should not slow down (as much) the launch of one that does target the local rank, since new events can start to be handled before the last one has completed. E.g. there could be many fetches of R_lite for different jobs occurring simultaneously on each rank.

Sorry the "rework" commits are basically unreadable. They each effectively rewrite a handler and its helper functions as a chain of continuations. It might be better to look at the final result.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Apr 18, 2018

Coverage Status

Coverage increased (+0.05%) to 79.07% when pulling 4914855 on garlick:job_state2 into c644b7f on flux-framework:master.

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Apr 18, 2018

Codecov Report

Merging #1472 into master will increase coverage by <.01%.
The diff coverage is 76.44%.

@@            Coverage Diff             @@
##           master    #1472      +/-   ##
==========================================
+ Coverage   78.71%   78.72%   +<.01%     
==========================================
  Files         163      164       +1     
  Lines       30264    30328      +64     
==========================================
+ Hits        23823    23875      +52     
- Misses       6441     6453      +12
Impacted Files Coverage Δ
src/broker/broker.c 76.97% <61.11%> (-0.34%) ⬇️
src/modules/wreck/job.c 73.19% <74.69%> (+0.71%) ⬆️
src/modules/wreck/wreck_job.c 90.24% <90.24%> (ø)
src/common/libflux/keepalive.c 86.66% <0%> (-6.67%) ⬇️
src/common/libflux/mrpc.c 85.49% <0%> (-1.18%) ⬇️
src/common/libflux/rpc.c 93.38% <0%> (-0.83%) ⬇️
src/common/libutil/base64.c 95.07% <0%> (-0.71%) ⬇️
src/bindings/lua/flux-lua.c 81.58% <0%> (-0.09%) ⬇️
src/common/libflux/message.c 81.36% <0%> (ø) ⬆️
... and 5 more
@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Apr 18, 2018

Very nice result @garlick! Thanks for thinking of doing that test. I wonder if we'd get even better throughput with a bulk submit tool that can issue job.submit requests asynchronously? A 4x improvement is good enough for now though, and really proves that blocking rpcs should be avoided in services!

I'll just review the final result, but I'm sure everything is much improved.

Thanks also for renaming struct job to struct wjob, it was going to be one of my comments on the last PR. I would have even suggested struct wreck_job to reduce chance of future conflicts.

Just a thought: On commit descriptions, I've gone to using wreck: to prefix all of the modules/wreck based hacks and kludges. Not that this PR is a kludge, but there is some confusion as when to use modules/wreck vs modules/job (which may conflict with a future subsystem), whereas simply wreck: could easily be grepped for in history (or more likely grep -v).

Nice improvement on the event publishing too, I'm sure that helped throughput as well!

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

Thanks - I'll fix the commit messages, and look at renaming wjob to wreck_job.

The test above does run flux submit in parallel (up to 200 of them) due to xargs -P, so those results are probably similar to what you'd get from a single asynchronous submit tool.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Apr 18, 2018

The test above does run flux submit in parallel (up to 200 of them) due to xargs -P, so those results are probably similar to what you'd get from a single asynchronous submit tool.

Oops sorry I didn't look at the example closely enough!

@dongahn

This comment has been minimized.

Copy link
Contributor

dongahn commented Apr 18, 2018

It also satisfies a long standing feature request for an ability to synchronously publish events; that is, get a response indicating that the event has received a sequence number, as discussed in #337 and #342. That was necessary to remove the final synchronous step in job.create and job.submit, where the handlers block listening for an event to be published before sending the response to the user.

@garlick, this is awesome and will make certain concurrent programming much more intuitive without hitting a performance hit!

One question so that I understand its semantics. The sync eventing is suitable when you have a single receiver who needs to be sync'ed with the sender? We will have to use different mechanism if you have multiple subscribers?

* User sends request message with topic "cmb.pub.<topic>" to rank 0.
* Service publishes event with same payload as request, topic=<topic>,
* then responds with success or failure.
* The synchronization use case driving the need need for this is

This comment has been minimized.

@dongahn

dongahn Apr 18, 2018

Contributor

Just a very small knit. Two needs.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

oops thanks!

@grondo

grondo approved these changes Apr 18, 2018

Copy link
Contributor

grondo left a comment

Just a couple inline comments as I went through this quickly, but of course overall looks great to me!

free (topic);
return;
if (snprintf (topic, sizeof (topic), "cmb.pub.wreck.state.%s", job->state)
>= sizeof (topic)) {

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

This is different enough from other RPCs that the cmb.pub. prefix here could use a comment.

As an aside, I wonder if you considered having the cmb.pub RPC encode the request payload like

{ "topic": "wreck.state.submitted", 
  "payload": { "jobid": 1, ... }
}

Which would make this more like other RPCs. (Though I think what you did here is also fine..)

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Thanks, I'll add a comment there.

I didn't see the need for the extra JSON decode/encode. It could certainly be done that way though!

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

Sorry, I did guess why you did it that way, and it seemed smart to me! I was just wondering if this was different enough from other "RPCs" that it might confuse users of the interface. However, if the interface is hidden behind an API call or if this form of RPC topic becomes useful as standard practice, I don't see a problem with it (threw me at first though)

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

I did add a little note about the topic string transformation just now. Hopefully that helps?

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

I guess my question was a more general one, but yes that clarifies that one line.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Oh sorry, more generally I was thinking that to generalize this we might want to implement some API functions for event publishing. (Currently we encode an even and pass it to flux_send()):

// future is fulfilled once sequence is assigned
flux_future_t *flux_event_publish_seq (flux_t *h, const char *topic, const char *json_str);

// fire and forget
int flux_event_publish (flux_t *h, const char *topic, const char *json_str);

Then whatever libertiy we're taking behind the scenes with topic strings is just an implementation detail where a comment will suffice.

Updating the API seems like its own PR though. Am I still missing the point of your question?

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

Yeah, that satisfies my curiosity.
In this case updating the API is definitely its own PR. However, in future cases where we aren't just updating throwaway code, it is much preferable IMO to add new service, add new API calls, add users of the new API calls, rather than adding API calls after the fact and having to go through and find/update all callers. (i.e. updating the API isn't always going to be its own PR)
(After writing that down it seems obvious, sorry!)

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

At the time I was thinking of it as experimental and maybe not useful beyond this module. But having written down those proposed API calls, maybe I'm convincing myself it's not a bad idea? Anwyay, if you're OK with deferring to another PR, let's do that. I'll open an issue to track it.

if (flux_request_decode (msg, NULL, &json_str) < 0)
goto error;
if (!json_str || !(o = json_tokener_parse (json_str)))
goto inval;

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

Since we're in here, would it be too difficult to remove vestiges of json-c?

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Hmm, maybe. Let me see how much work that is.

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

Don't worry about it if it is even more than a little work. I can take a look at it in the next round.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

It seems somewhat orthogonal to this PR so I'll skip that for now.

"ncores", &job->ncores,
"walltime", &job->walltime) < 0)
goto error;
wjob_set_state (job, "submitted");

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

I think there is a pending PR to also add "gpus" to the "job" structure. To keep things in one place, I wonder if there is a way to have functions like flux_rpc_pack_wjob and flux_request_unpack_wjob?

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Good idea.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Would like to discuss the constraints on these messages before doing something like this.

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

Unimportant. I think we should save the design of "official" message constraints for the replacement.

We can look at abstracting the messages into the struct wreck_job api if it reduces the burden of adding new members to the structure along with the "gpus" PR.

flux_log (h, LOG_INFO, "No %s.R_lite: %s",
kvspath, flux_strerror (errno));
flux_log (h, LOG_INFO, "No %s: %s", key, flux_strerror (errno));
if (lwj_targets_this_node (h, job))

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

It seems like it would be pretty simple to convert lwj_targets_this_node() also into asynchronous mode (really all it needs to do is check for existence of <kvs_path>.rank.<rank> and that the key is a directory. Since sched still uses rank.N method it might be worth it here (and would clean up the goto spawn path)

If you want to wait to do in a future PR, that is fine with me, the spawn code also does a synchronous getattr that could be removed by replacing the wrexecd_path with internal module if needed.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Yeah, I was just being lazy there - hoping that code would go away soon. Maybe not?

This comment has been minimized.

@grondo

grondo Apr 18, 2018

Contributor

It didn't seem like R_lite producing sched was imminent, but maybe @dongahn can comment here. Hate for you to do extra work just to have it removed in a week.

This comment has been minimized.

@garlick

garlick Apr 18, 2018

Author Member

Just pushed a change to implement this.

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

One question so that I understand its semantics. The sync eventing is suitable when you have a single receiver who needs to be sync'ed with the sender? We will have to use different mechanism if you have multiple subscribers?

No, event delivery is still "open loop" or "fire and forget". This RPC returns when the event has been accepted on rank 0, meaning it has a place in the event sequence. The issue it was intended to solve, as I understand it, is event ordering in the following scenario:

Say A and B both implement a service that publishes an event upon request.

If C first sends an RPC to A generating E1, then sends an RPC to B, generating E2, what order do subscribers see the events? We don't know, it could be E1 then E2, or E2 then E1. This is because events are "fire and forget" from A and B's perspective, and maybe A's broker is busier than B's, so its messages reach the rank 0 broker after B's.

This new thing just provides an optional way to publish events with a response from the rank 0 broker indicating that the event has been accepted and assigned a sequence number. So if A and B wait for the response to the new RPC before responding to C, then C knows the events will be sent in the order requested.

Before in the job module, we were listening for events to loop back to the sender, which accomplished the same thing, but was a little more awkward to incorporate into the chain of continuations pattern in use in this rework.

It's kind of a niche use case I think, so don't get too excited :-)

@garlick garlick force-pushed the garlick:job_state2 branch from 980ff6a to 378cedc Apr 18, 2018

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

Pushed some minor changes:

  • rename wjob to wreck_job
  • fix commit wording, and wreck: prefix

(those were squashed)

Then I added a couple of small commits that began addressing review comments which can be squashed later. More coming tomorrow.

@garlick garlick changed the title job module: rework job.submit, job.create, wrexecd.run handlers with continuations wreck: rework job.submit, job.create, wrexecd.run handlers with continuations Apr 18, 2018

@trws

This comment has been minimized.

Copy link
Member

trws commented Apr 18, 2018

This looks like a really excellent refinement!

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Apr 18, 2018

LGTM, ready for merge?

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

Let me squash down the incremental development, then OK.

garlick added some commits Apr 17, 2018

wreck: add wreck_job class
Add a 'struct wreck_job' data structure, useful for
packaging up job information that needs to be passed
through the void * argument of a continuation.

The class includes functions for keeping wreck_job's
in a zhash_t, should that be needed later on for
tracking active jobs.

An "aux" accessor allows ancillary data to be attached
to the wreck_job, with destructor.
wreck: drop explicit job.shutdown handler
Problem: job.shutdown is explicitly handled in the
job module, but its handler is identical to the
implicit one (stops the reactor).

Probably this is leftover from when the job module
registered a handler for "job.*".  Drop.
wreck: rework runevent_cb with continuations
Problem: the wrexec.run.<jobid> event handler makes
the following synchronous RPC's in reactor context:
- KVS lookup on R_lite
- (conditional fallback) KVS lookup on rank.N
the job module reactor unresponsive for one or two RTTs.

Restructure runevent_cb() to issue the synchronous RPCs
in chained continuations, allowing the reactor to run
while waiting for responses.

Use 'struct wreck_job' to conveniently pass job information
through the continuation void *argument and to helpers.
wreck: rework job_create_cb with continuations
Problem: the job.create and job.submit request handlers
perform the following synchronous RPCs:
- fetch the next jobid
- synchronous KVS commit of job info
- listen for wreck.state.<state> event loopback
This leaves the job module unresponsive for three
back to back RTTs.

Restructure job_create_cb() to use chained continations
to perform the same tasks without blocking the reactor.

Use 'struct wreck_job' to conveniently pass job information
through the continuation void *argument and to helpers.
broker: add synchronous event RPC
Problem:  when multiple module instances publish
events, with the order coordinated by a third party,
there is no way to ensure that the events sent
sequentially in time processed in the same order when
they are rebroadcast.

Add a rank 0 broker service to publish events with
a response indicating that the event has received
a sequence number.

The "fire and forget" event transport is still the
primary one.  This is here primary to support the
wreck job module.  If it turns out to be more useful,
we can add it to the Flux API.

Fixes #342.

@garlick garlick force-pushed the garlick:job_state2 branch from e49bdb8 to 4914855 Apr 18, 2018

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

Squashed.

I restarted a couple of builders that failed installing packages.

@grondo grondo merged commit cb2f8db into flux-framework:master Apr 18, 2018

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
coverage/coveralls Coverage increased (+0.05%) to 79.07%
Details
@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Apr 18, 2018

Thanks!

@garlick garlick deleted the garlick:job_state2 branch Apr 18, 2018

@grondo grondo referenced this pull request May 10, 2018

Closed

0.9.0 Release #1479

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.