Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement flux_future_t API #1083

Merged
merged 9 commits into from Jun 11, 2017

Conversation

Projects
None yet
6 participants
@garlick
Copy link
Member

garlick commented Jun 1, 2017

This is a preliminary PR not for merging yet.

It's an implementation of futures, as discussed in #1053. The core API is:

typedef struct flux_future_struct flux_future_t;

typedef void (*flux_continuation_f)(flux_future_t *f, void *arg);

flux_future_t *flux_future_create (flux_reactor_t *r);
void flux_future_destroy (flux_future_t *f);

int flux_future_set_timeout (flux_future_t *f, double timeout);

int flux_future_get (flux_future_t *f, void *result);

bool flux_future_check (flux_future_t *f);

int flux_future_then (flux_future_t *f, flux_continuation_f cb, void *arg);

void flux_future_set_result (flux_future_t *f,
                             void *result, flux_free_f free_fn);
void flux_future_set_error (flux_future_t *f, int errnum);

The basic idea is that a future is completed by calling set_result() or set_error(). It can also optionally be completed by a timeout. The get(), check(), and then() functions work like our current RPC API.

Internally, get() and check() create a temporary reactor and run it until the result is available, or until it would block (respectively). then() registers the continuation to be called when the result is available using the main reactor passed to the create call. The continuation is expected to call get(), which will not block in that context.

Next we have some utility functions intended to make creating complex futures easy:

typedef void (*flux_future_init_f)(flux_future_t *f, flux_reactor_t *r, void *arg);

void flux_future_set_init (flux_future_t *f, flux_future_init_f cb, void *arg);

The init is a callback that sets up reactor watchers in the context passed to it, which may be either the main reactor or the temporary reactor. It will be called at most twice for the two different contexts, which are created on demand depending on how the functions above are called. (For example, a synchronous operation may consist of create() and get() and in that case, only the temporary reactor context would be used, and the promise would be called only once).

Edit: renamed "promise" to "init" per discussion with @trws 5 Jun 2017

Because it's handy to attach objects to the future so they can be destroyed when the future is destroyed, we have

void *flux_future_aux_get (flux_future_t *f, const char *name);
int flux_future_aux_set (flux_future_t *f, const char *name,
                         void *aux, flux_free_f destroy);

exactly like those found elsewhere in the API except these allow a NULL name parameter. I found that to be useful when creating watchers from promises in the two contexts that I wanted to be destroyed when the future was destroyed. It was inconvenient to choose a unique name for them as the promise function isn't really supposed to be aware of which context it's being called in.

Finally, message handlers, which look a lot like reactor watchers, but are not quite, were a bit of a problem. I wanted them to work in the context of promises as described above but to do that the temporary reactor context would need to have a separate message dispatcher and flux_t watcher from that of the main reactor/handle. Eventually I came up with flux_clone(), which creates a new handle that's identical to the original except it has a different aux hash, which means it can have a different reactor/dispatcher associated with it.

If a future is to be used with message handlers, one can set the main handle (presumably the one associated with the reactor passed to create()) with a set_flux() call, and from a promise obtain one that works with the appropriate reactor context with a get_flux()call

void flux_future_set_flux (flux_future_t *f, flux_t *h);
flux_t *flux_future_get_flux (flux_future_t *f);

The clone is created on demand, if needed.

The PR then goes on to reimplement RPCs in terms of futures, but internally as an experiment, so its API doesn't change. The basics work but I'm working through the corner cases now.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 1, 2017

Wow, nice work. It will take me a bit to work through this, but so far the overall scheme looks good to me. A few initial (naive I'm sure) questions:

  • Could you explain again why the temporary reactor is needed for synchronous get and check? I think you explained this to me already, but I'm still not sure why we can't block using the main reactor (something about not expecting other watchers to be active while blocking synchronously?)
  • I guess I thought that flux_rpc_check never blocked, could check() just check to see if the promise has yet fulfilled the future?
  • I was confused too when reading about promise vs future, however I agree with your use here, except the name of the function flux_future_promise confused me at first because it sounds like that function is the promise, when actually you are setting the promise. Maybe flux_future_set_promise? (Though once I "got" it flux_future_promise was ok) If every future requires a promise, then we could also add the promise directly to flux_future_create

It might also be helpful to work through an example. Say in the new subprocess Flux API, there is a flux_future_t *flux_subprocess_kill (flux_subprocess_t *p, int signum). The subprocess_kill function needs to return a future in the case the kill is implemented under the covers as an RPC. However, in the local case, kill() is just a call to kill(2) and the future should be able to be fulfilled immediately, with future_check() returning true on the result immediately. Will this work with the current scheme?

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 1, 2017

Could you explain again why the temporary reactor is needed for synchronous get and check? I think you explained this to me already, but I'm still not sure why we can't block using the main reactor (something about not expecting other watchers to be active while blocking synchronously?)

It could work that way but it's a change to the concurrency model. It means any call to get or check from a reactor watcher/message handler becomes a "scheduling point" where other watcher/handlers or another thread of control through the original watcher/handler could run. It means that handlers have to be reentrant, and that new criticial sections appear where shared state is incrementally changed. There could be unchecked stack growth - think of a stream of requests to a particular message handler that calls get or check. The stream of requests could trigger "reactor calls handler calls reactor calls handler calls reactor calls handler..." etc..

Potentially we could do some auditing and make it work this way, but given the number of API calls we currently have that hide a synchronous RPC (disappointing actually), I think we will have no end of trouble going that way.

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 1, 2017

I guess I thought that flux_rpc_check never blocked, could check() just check to see if the promise has yet fulfilled the future?

Correct it doesn't block, but it does take a spin through the reactor with FLUX_REACTOR_NOWAIT to give watchers/handlers a chance to process any available events that would complete the future.

Maybe flux_future_set_promise

Works for me. I was thinking of promise (v) as == to make promise or set promise but you're right that's a bit confusing.

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 1, 2017

On subprocess_kill(), yes should be fine. In the kill() case, implementation would call flux_future_set_result() or flux_future_set_error() immediately. In the RPC case, an rpc continuation would make those calls.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 1, 2017

Potentially we could do some auditing and make it work this way, but given the number of API calls we currently have that hide a synchronous RPC (disappointing actually), I think we will have no end of trouble going that way.

Thanks for explaining that to me again! Makes good sense now.

Works for me. I was thinking of promise (v) as == to make promise or set promise but you're right that's a bit confusing.

Yeah, I can see that now. I'm not sold on making any change here, and once I understood what the call does the name does make a kind of sense. I'll leave it up to you

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 1, 2017

In other languages that support this kind of asynchronous futures or deferred results I've seen specific continuations for errors, which can be nice because it splits the error handling code into a separate function, simplifying functions for normal cases.

I'm not sure it makes that much sense here, but I bring it up as something to think about.

For instance an optional call

int flux_future_on_error (flux_future_t *f, flux_continuation_f cb, void *arg);

Could be provided that is called instead of the then continuation when flux_future_set_error() is used. If the error handler is not set then the normal continuation is called as before. A similar function could be used to register a separate handler for timeouts.

The benefit, if any, is that calls to flux_future_get are streamlined for the normal case inside of continuation callbacks, and perhaps in some cases a single error handling function can be used, reducing the total amount of overall code.

However, the drawbacks are possibly many and more state would have to be stored in each future object, probably for the uncommon case at that. So I'm not sure it's worth it, but bring it up only because it is something I've seen done before.

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 1, 2017

That would be easy to implement, but since get() can be called synchronously, outside of a continuation, the get() call still has to be able to indicate error (including ETIMEDOUT).

Would it be weird to then have get() calls in continuations that aren't checked for error?

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 1, 2017

Would it be weird to then have get() calls in continuations that aren't checked for error?

Yes, it might be a bit weird so I don't see much benefit to adding any kind of on_error calls.

@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Jun 2, 2017

Codecov Report

Merging #1083 into master will decrease coverage by 0.04%.
The diff coverage is 79.62%.

@@            Coverage Diff             @@
##           master    #1083      +/-   ##
==========================================
- Coverage   77.93%   77.88%   -0.05%     
==========================================
  Files         150      151       +1     
  Lines       25930    26169     +239     
==========================================
+ Hits        20208    20383     +175     
- Misses       5722     5786      +64
Impacted Files Coverage Δ
src/common/libflux/dispatch.c 85.15% <70.73%> (-1.4%) ⬇️
src/common/libflux/future.c 79.34% <79.34%> (ø)
src/common/libflux/handle.c 84.97% <88.88%> (-0.42%) ⬇️
src/common/libflux/response.c 83.76% <0%> (-1.71%) ⬇️
src/common/libutil/dirwalk.c 93.33% <0%> (-1.49%) ⬇️
src/modules/connector-local/local.c 70.49% <0%> (-1.44%) ⬇️
src/common/libflux/mrpc.c 85.31% <0%> (-1.2%) ⬇️
src/broker/modservice.c 79.2% <0%> (-1%) ⬇️
src/common/libflux/message.c 81.21% <0%> (-0.36%) ⬇️
... and 6 more
@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.07%) to 78.212% when pulling b39d707 on garlick:futures2 into d453552 on flux-framework:master.

@garlick garlick force-pushed the garlick:futures2 branch from b39d707 to 784493e Jun 2, 2017

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 2, 2017

I added some man pages and squashed the memory leak fix. The man pages could use some review to be sure they are clear.

Still trying to track down this failure in t2100-aggregate.t:

not ok 3 - flux-aggregate: abort works
#	
#	    test_expect_code 1 run_timeout 5 flux exec -r 0-7 flux aggregate . 1
#	

expecting success: 
    run_timeout 2 flux exec -r 0-7 bash -c "flux aggregate test \$(flux getattr rank)" &&
    $kvstest test "x.count == 8" &&
    $kvstest test "x.min == 0" &&
    $kvstest test "x.max == 7"

aggregate: synchronous subscribe took 0.407ms
aggregate: rank 0 setup took 0.724ms
aggregate: barrier took 0.014s
aggregate: push took 0.493ms
2017-06-02T18:26:02.308211Z connector-local.err[0]: response_cb: topic unsubscribe: missing sender uuid

(I think the missing uuid log message is likely another problem introduced by this PR, but is not causing the test failure, so need to track that down too).

Something's also gone wrong in the python tests:

PASS: ../t/t9990-python-tests.t 1 test.event.TestEvent.test_full_event
PASS: ../t/t9990-python-tests.t 2 test.event.TestEvent.test_t1_0_sub
PASS: ../t/t9990-python-tests.t 3 test.event.TestEvent.test_t1_1_unsub
PASS: ../t/t9990-python-tests.t 4 test.wrapper.TestWrapper.test_automatic_unwrapping
PASS: ../t/t9990-python-tests.t 5 test.wrapper.TestWrapper.test_call_insufficient_arguments
PASS: ../t/t9990-python-tests.t 6 test.wrapper.TestWrapper.test_call_invalid_argument_type
PASS: ../t/t9990-python-tests.t 7 test.wrapper.TestWrapper.test_call_non_existant
ERROR: ../t/t9990-python-tests.t - missing test plan
ERROR: ../t/t9990-python-tests.t - exited with status 139 (terminated by signal 11?)

Anyway, just a before-lunch checkpoint.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 2, 2017

Coverage Status

Coverage increased (+0.04%) to 78.179% when pulling 784493e on garlick:futures2 into d453552 on flux-framework:master.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 2, 2017

I'll look into the aggregate abort test. The flux-aggregate utility should exit after it prints that the aggregate was aborted, but it looks like either it is timing out instead or exiting with an unexpected exit code.

(BTW, in this case at least I think the output from the failing test comes before the not ok. The output pasted is from a succeeding test. Not sure if that caused some confusion here)

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 2, 2017

I'll look into the aggregate abort test. The flux-aggregate utility should exit after it prints that the aggregate was aborted

The problem appears to actually be in flux-exec. All the executed flux-aggregate processes are exiting as expected, however, flux-exec is hanging around because (verified with some printfs) one stdout stream is not yet closed.

I'm not sure why this would happen only in the aggregate abort test, but it could be a new timing that is triggering a latent bug. I'll keep poking at it.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 3, 2017

I can't seem to make too much progress on this one. To me it seems like the stdout message that has eof=true disappears between the broker and the flux-exec command. The strange thing is that I can't reproduce this outside of this specific aggregate abort scenario, even though I think from the exec perspective the same thing should happen for a process that emits some I/O then exits with nonzero status
.

I can reproduce with a single broker, using the same invocation as the test:

$ flux exec -r 0 flux aggregate . 1
aggregate: synchronous subscribe took 0.343ms
aggregate: rank 0 setup took 0.952ms
aggregate: barrier took 0.001s
aggregate: push took 0.565ms
2017-06-03T01:31:10.338275Z aggregator.err[0]: sink: refusing to sink to rootdir
2017-06-03T01:31:10.338285Z aggregator.err[0]: sink: aborting aggregate .

flux-aggregate: aggregate to `.` aborted!
[hangs]

I added a debug printf to the child I/O handler in broker/exec.c, and ran with FLUX_HANDLE_TRACE set and added another debug message to flux-exec "complete" and I get the following:

FLUX_HANDLE_TRACE=1 flux exec -vexec -r 0 flux aggregate . 
exec: 1.488447ms: Starting flux on 0
--------------------------------------
>[000] ||
>[008] cmb.exec
>[1848] { "env": { "DBUS_SESSION_BUS_ADDRESS": "unix:path=\/run\/user\/6885\/bus",  ... }, "cmdline": [ "flux", "aggregate", ".", "1" ], "cwd": "\/home\/grondo\/git\/f" }
>[020] 8E01010FFFFFFFFF000000000000000000000001
exec: 2.067804ms: Sent all requests
--------------------------------------
<[000] ||
<[008] cmb.exec
<[039] {"rank":0,"state":"Running","pid":4152}
<[020] 8E01020F00001AE5000000010000000000000001
child_io: '{"eof": false, "data": "YWdncmVnYXRlOiBzeW5jaHJvbm91cyBzdWJzY3JpYmUgdG9vayAwLjQxMG1zCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'
--------------------------------------
<[000] ||
<[008] cmb.exec
<[146] {"eof": false, "data": "YWdncmVnYXRlOiBzeW5jaHJvbm91cyBzdWJzY3JpYmUgdG9vayAwLjQxMG1zCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
aggregate: synchronous subscribe took 0.410ms
child_io: '{"eof": false, "data": "YWdncmVnYXRlOiByYW5rIDAgc2V0dXAgdG9vayAxLjI3NW1zCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'
--------------------------------------
<[000] ||
<[008] cmb.exec
<[134] {"eof": false, "data": "YWdncmVnYXRlOiByYW5rIDAgc2V0dXAgdG9vayAxLjI3NW1zCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
aggregate: rank 0 setup took 1.275ms
child_io: '{"eof": false, "data": "YWdncmVnYXRlOiBiYXJyaWVyIHRvb2sgMC4wMDFzCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'
--------------------------------------
<[000] ||
<[008] cmb.exec
<[126] {"eof": false, "data": "YWdncmVnYXRlOiBiYXJyaWVyIHRvb2sgMC4wMDFzCg==", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
2017-06-03T01:29:56.912737Z aggregator.err[0]: sink: refusing to sink to rootdir
aggregate: barrier took 0.001s
child_io: '{"eof": false, "data": "YWdncmVnYXRlOiBwdXNoIHRvb2sgMC42NjJtcwo=", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'
--------------------------------------
<[000] ||
<[008] cmb.exec
<[122] {"eof": false, "data": "YWdncmVnYXRlOiBwdXNoIHRvb2sgMC42NjJtcwo=", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
aggregate: push took 0.662ms
2017-06-03T01:29:56.923016Z aggregator.err[0]: sink: refusing to sink to rootdir
2017-06-03T01:29:56.923026Z aggregator.err[0]: sink: aborting aggregate .

child_io: '{"eof": false, "data": "Zmx1eC1hZ2dyZWdhdGU6IGFnZ3JlZ2F0ZSB0byBgLmAgYWJvcnRlZCEK", "pid": 4152, "type": "io", "name": "stderr", "rank": 0}'
--------------------------------------
<[000] ||
<[008] cmb.exec
<[138] {"eof": false, "data": "Zmx1eC1hZ2dyZWdhdGU6IGFnZ3JlZ2F0ZSB0byBgLmAgYWJvcnRlZCEK", "pid": 4152, "type": "io", "name": "stderr", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
child_io: '{"eof": true, "data": "", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'
child_io: '{"eof": true, "data": "", "pid": 4152, "type": "io", "name": "stderr", "rank": 0}'
flux-aggregate: aggregate to `.` aborted!
--------------------------------------
<[000] ||
<[008] cmb.exec
<[081] {"eof": true, "data": "", "pid": 4152, "type": "io", "name": "stderr", "rank": 0}
<[020] 8E01020F00001AE5000000010000000000000001
exec: complete: nexited=0 nclosed.stdout=0 stderr=1
--------------------------------------
<[000] ||
<[008] cmb.exec
<[069] {"rank": 0, "state": "Exited", "pid": 4152, "status": 256, "code": 1}
<[020] 8E01020F00001AE5000000010000000000000001
exec: complete: nexited=1 nclosed.stdout=0 stderr=1
2017-06-03T01:29:56.924216Z connector-local.err[0]: response_cb: topic unsubscribe: missing sender uuid

The final flux-exec "completion check" shows that it hasn't yet received EOF for stdout

exec: complete: nexited=1 nclosed.stdout=0 stderr=1

The EOF was sent (or at least attempted to be sent) from the broker's child_io handler:

child_io: '{"eof": true, "data": "", "pid": 4152, "type": "io", "name": "stdout", "rank": 0}'

But at least the handle trace code shows it was never received by the handle, though EOF for stderr was Could it somehow be queued up by zeromq?

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 3, 2017

Wow thanks for all that @grondo.

I wonder if the missing exec response message somehow got matched up with the unsubscribe request, hence the "extra" unsubscribe response with the missing uuid? Hmm maybe I can add a check to the unsubscribe RPC to assert if the response doesn't have topic=unsubscribe?

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 3, 2017

I wonder if the missing exec response message somehow got matched up with the unsubscribe request, hence the "extra" unsubscribe response with the missing uuid?

Oh, that's a pretty good thought! It will be interesting to find out, because I can't reproduce the same error using all the other alternative scenarios I've tried (including execing processes that use local connector, kvs watch, etc). So I was still wondering if it was somehow the aggregator's fault.

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 3, 2017

Oh, yikes, this seems to fix it for me. In the dispatcher "slowpath" a call to flux_msg_cmp() verifies that the matchtag is from the handle's pool by verifying that the route stack is empty. The fastpath was missing that check:

diff --git a/src/common/libflux/dispatch.c b/src/common/libflux/dispatch.c
index ddc99d3..f269053 100644
--- a/src/common/libflux/dispatch.c
+++ b/src/common/libflux/dispatch.c
@@ -310,7 +310,8 @@ static bool dispatch_message (struct dispatch *d,
 
     /* fastpath */
     if (type == FLUX_MSGTYPE_RESPONSE) {
-        if (fastpath_response_lookup (d, msg, &w) == 0 && w->running) {
+        if (fastpath_response_lookup (d, msg, &w) == 0 && w->running
+                                            && flux_msg_cmp (msg, w->match)) {
             call_handler (w, msg);
             match = true;
         }
@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 3, 2017

I think that particular problem is solved, and the wrong uuid warning too.

Thanks for the hard work @grondo! I was really stuck on that one this afternoon.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 3, 2017

Great, I can verify your patch fixed it for me as well.
Nice description of why the bug was only triggered with the addition of futures too! (I was really wondering about that)

Should we try to add a test case specific to this issue? I'm a bit concerned it was only tested as a side effect of the aggregator abort case.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 3, 2017

I'm also seeing a segv from the python bindings

[New LWP 28900]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
Core was generated by `python ../src/bindings/python/test_commands/test_runner.t'.
Program terminated with signal SIGSEGV, Segmentation fault.
#0  msglist_pollfd (l=0x0) at msglist.c:178
178	    if (l->pollfd < 0) {
(gdb) where
#0  msglist_pollfd (l=0x0) at msglist.c:178
#1  0x00007f35528ea062 in flux_pollfd (h=<optimized out>, 
    h@entry=0x55632e6e03f0) at handle.c:828
#2  0x00007f35528f4b50 in ev_flux_init (w=w@entry=0x55632e6e3308, 
    cb=cb@entry=0x7f35528ea1a0 <handle_cb>, h=h@entry=0x55632e6e03f0, 
    events=events@entry=1) at ev_flux.c:80
#3  0x00007f35528eac4a in flux_handle_watcher_create (
    r=r@entry=0x55632e6ddc40, h=h@entry=0x55632e6e03f0, events=events@entry=1, 
    cb=cb@entry=0x7f35528eb580 <handle_cb>, arg=arg@entry=0x55632e6f31d0)
    at reactor.c:312
#4  0x00007f35528ebc25 in dispatch_get (h=h@entry=0x55632e6e03f0)
    at dispatch.c:153
#5  0x00007f35528ebfc8 in dispatch_get (h=0x55632e6e03f0) at dispatch.c:542
#6  flux_msg_handler_create (h=0x55632e6e03f0, match=..., 
    cb=cb@entry=0x7f35528f1db0 <response_cb>, arg=arg@entry=0x55632e6f2d50)
    at dispatch.c:521
#7  0x00007f35528f1d4b in promise_cb (f=0x55632e6f3550, r=<optimized out>, 
    arg=0x55632e6f2d50) at rpc.c:122
#8  0x00007f35528f561d in run_now (f=f@entry=0x55632e6f3550, 
    reactor_flags=reactor_flags@entry=1) at future.c:289
#9  0x00007f35528f570e in run_now (reactor_flags=1, f=0x55632e6f3550)
    at future.c:323
#10 flux_future_check (f=0x55632e6f3550) at future.c:324
#11 0x00007f35528f1e49 in flux_rpc_destroy (rpc=0x55632e6f2d50) at rpc.c:75
---Type <return> to continue, or q <return> to quit---
#12 0x00007f35530e7706 in _cffi_f_flux_rpc_destroy (self=<optimized out>, 
    arg0=0x7f3553584b48) at _core.c:12771
#13 0x000055632c393d3c in PyEval_EvalFrameEx ()
#14 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#15 0x000055632c3a6e3e in ?? ()
#16 0x000055632c378da3 in PyObject_Call ()
#17 0x000055632c3bcb4e in ?? ()
#18 0x000055632c378da3 in PyObject_Call ()
#19 0x000055632c41b057 in ?? ()
#20 0x000055632c378da3 in PyObject_Call ()
#21 0x000055632c3923f2 in PyEval_EvalFrameEx ()
#22 0x000055632c3921df in PyEval_EvalFrameEx ()
#23 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#24 0x000055632c3a6e3e in ?? ()
#25 0x000055632c378da3 in PyObject_Call ()
#26 0x000055632c3bcb4e in ?? ()
#27 0x000055632c378da3 in PyObject_Call ()
#28 0x000055632c396a6d in PyEval_CallObjectWithKeywords ()
#29 0x000055632c43069c in ?? ()
#30 0x000055632c3c5b0c in ?? ()
#31 0x000055632c3a1827 in ?? ()
#32 0x000055632c3a7ebc in ?? ()
#33 0x000055632c3a7e9b in ?? ()
#34 0x000055632c368da8 in PyDict_SetItem ()
---Type <return> to continue, or q <return> to quit---
#35 0x000055632c36c60c in PyDict_SetItemString ()
#36 0x000055632c3935d9 in PyEval_EvalFrameEx ()
#37 0x000055632c3921df in PyEval_EvalFrameEx ()
#38 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#39 0x000055632c3a6ff8 in ?? ()
#40 0x000055632c378da3 in PyObject_Call ()
#41 0x000055632c38f59f in PyEval_EvalFrameEx ()
#42 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#43 0x000055632c3a6e3e in ?? ()
#44 0x000055632c378da3 in PyObject_Call ()
#45 0x000055632c3bcb4e in ?? ()
#46 0x000055632c378da3 in PyObject_Call ()
#47 0x000055632c41b057 in ?? ()
#48 0x000055632c378da3 in PyObject_Call ()
#49 0x000055632c3923f2 in PyEval_EvalFrameEx ()
#50 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#51 0x000055632c3a6ff8 in ?? ()
#52 0x000055632c378da3 in PyObject_Call ()
#53 0x000055632c38f59f in PyEval_EvalFrameEx ()
#54 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#55 0x000055632c3a6e3e in ?? ()
#56 0x000055632c378da3 in PyObject_Call ()
#57 0x000055632c3bcb4e in ?? ()
#58 0x000055632c378da3 in PyObject_Call ()
---Type <return> to continue, or q <return> to quit---
#59 0x000055632c41b057 in ?? ()
#60 0x000055632c378da3 in PyObject_Call ()
#61 0x000055632c3923f2 in PyEval_EvalFrameEx ()
#62 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#63 0x000055632c3a6ff8 in ?? ()
#64 0x000055632c378da3 in PyObject_Call ()
#65 0x000055632c38f59f in PyEval_EvalFrameEx ()
#66 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#67 0x000055632c3a6e3e in ?? ()
#68 0x000055632c378da3 in PyObject_Call ()
#69 0x000055632c3bcb4e in ?? ()
#70 0x000055632c378da3 in PyObject_Call ()
#71 0x000055632c41b057 in ?? ()
#72 0x000055632c378da3 in PyObject_Call ()
#73 0x000055632c3923f2 in PyEval_EvalFrameEx ()
#74 0x000055632c3921df in PyEval_EvalFrameEx ()
#75 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#76 0x000055632c3924dd in PyEval_EvalFrameEx ()
#77 0x000055632c38ab85 in PyEval_EvalCodeEx ()
#78 0x000055632c38a929 in PyEval_EvalCode ()
#79 0x000055632c3ba04f in ?? ()
#80 0x000055632c3b4b72 in PyRun_FileExFlags ()
#81 0x000055632c3b32fe in PyRun_SimpleFileExFlags ()
#82 0x000055632c366262 in Py_Main ()
---Type <return> to continue, or q <return> to quit---
#83 0x00007f3554c103f1 in __libc_start_main (main=0x55632c365b90 <main>, 
    argc=2, argv=0x7fff1a38f278, init=<optimized out>, fini=<optimized out>, 
    rtld_fini=<optimized out>, stack_end=0x7fff1a38f268)
    at ../csu/libc-start.c:291
#84 0x000055632c365a8a in _start ()

`flux_future_promise()` registers a promise callback. The promise sets
up class-specific watchers on the reactor to fulfill the promise as
asynchronous events occur. The watchers must eventually call

This comment has been minimized.

@grondo

grondo Jun 3, 2017

Contributor

It took me a bit to come to grips with the use of the term "fulfilling the promise" here. I would have preferred "fulfill the future" since the future is the object representing the unknown/deferred result.

Though there are examples in other languages, e.g. C++, of use of "fulfill the promise", this seems to be when a promise is a full-fledged object, and acts as the provider side handle of a future-promise object (so resolution of the future is a side effect of promise::fulfill.)

I was also literally going off this quote from Wikipedia, though I'm not sure how much weight to give it

Setting the value of a future is also called resolving, fulfilling, or binding it.

Though reading through the manpage, I did kind of get where you were going "fulfill the promise" as meaning that the promise callback has completed.

I think in the end as long as we're consistent it will be fine, however I just wanted to note my initial trouble with terming flux_future_set_result as "fulfilling the promise".

This comment has been minimized.

@garlick

garlick Jun 3, 2017

Author Member

No I think you are right here - I should use consistent terminology. I'll go with "fulfill the future". (I didn't really mean to introduce a separate concept "fullfill the promise" - I was using that interchangeably with "complete the future"). Good catch.

`flux_future_aux_get()` retrieves an object by _name_.
Anonymous objects cannot be retrieved. All aux objects are destroyed
when the future is destroyed.

This comment has been minimized.

@grondo

grondo Jun 3, 2017

Contributor

Should this be "all aux objects with destructors set are destroyed when the future is destroyed"?

Also, why can't anonymous objects be retrieved with flux_future_aux_get (f, NULL)? If there is some reason for this it could be explained here.

This comment has been minimized.

@garlick

garlick Jun 3, 2017

Author Member

Yes on the first, will change.

There can be multiple anon objects so no way to tell which one would be retrieved by aux_get (NULL).

This comment has been minimized.

@grondo

grondo Jun 3, 2017

Contributor

Ok, good point. It wasn't clear from the manpage, or I missed it (and I think you told me but I've forgotten) what the purpose of the anonymous objects are, and how you'd access them or use them in the context of a promise, especially if multiple are allowed. Is it meant to be a way to simply hold a reference to something for the duration of the promise?

This comment has been minimized.

@garlick

garlick Jun 3, 2017

Author Member

There was this, but I'll see what I can do to make that more clear. The idea was not to make the objects retrievable, just to get a destructor to run during future destruction.

Since the promise may be called in either reactor context (at most once each),
and is unaware of which context that is, it should take care when managing
any context-specific state not to overwrite the state from a prior call.
The ability to attach objects with destructors anonymously to the future
with flux_future_aux_set() may be useful for managing the life cycle
of reactor watchers and message handlers created by promises.

`flux_future_get_flux()` is context sensitive. If called in the main
reactor context, it directly returns the broker handle registered with
`flux_future_set_flux()`. If called in the internal reactor context,
it returns a clone of that handle, obtained with `flux_clone()`, and

This comment has been minimized.

@grondo

grondo Jun 3, 2017

Contributor

flux_clone(3) is mentioned here -- just a reminder that we might want a manpage for it, if you've not already done it.

This comment has been minimized.

@garlick

garlick Jun 3, 2017

Author Member

Thanks! Will do that.

@garlick garlick force-pushed the garlick:futures2 branch from 27f038c to 29c021e Jun 3, 2017

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 3, 2017

Found the python SIGSEGV - looks like the handle was being closed before the RPC was destroyed, and flux_rpc_destroy() was attempting to access the handle. I fixed that and also streamlined what I realized was a poor implementation of the check for whether the response had been received.

Those commits are broken out, but I did force an update to fix typos in one commit message, and to remove an explanation from another commit which I believe was incorrect.

Still todo: man page fixups, more tests, and need to evaluate whether it makes sense to go all in with futures in flux_rpc, e.g. have it return a flux_future_t and change all the callers.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 3, 2017

Coverage Status

Coverage increased (+0.02%) to 78.164% when pulling 29c021e on garlick:futures2 into d453552 on flux-framework:master.

@garlick garlick force-pushed the garlick:futures2 branch from d91abf3 to 596b652 Jun 3, 2017

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 3, 2017

Coverage Status

Coverage increased (+0.07%) to 78.209% when pulling 596b652 on garlick:futures2 into d453552 on flux-framework:master.

@garlick garlick force-pushed the garlick:futures2 branch from 6922e7c to 8b795c1 Jun 8, 2017

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 8, 2017

Just rebased squashing the incremental development, and added some comments in future.c. The coverage sadnes seems to be mostly due to uncovered error handling code. I think the actual coverage of mainline code here should be pretty decent.

@morrone and I had a face to face chat and I think we're at an impasse. He doesn't like the imbalance of creating a future with a wrapper function and destroying it with flux_future_destroy(), and also feels that the event based mode (continuation) and the non-event based mode (wait_on) are entirely different things that don't belong in the same class and that continuations have nothing to do with "futures" (I assume C++ futures?). Did I characterize that correctly?

I think that's a key design point (having both modes in the same class) and don't want to give that up. I think given that we are already using this idiom extensively in the form of flux_rpc_t, and IMHO successfully with a minimum of confusion, that we should keep moving foward and do this. Meanwhile if after seeing the RPC get converted to futures and how @grondo uses them in libsubprocess, people feel this was a bad step, we can open an isuse and discuss alternatives. My $0.02.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 8, 2017

Coverage Status

Coverage decreased (-0.06%) to 78.126% when pulling 8b795c1 on garlick:futures2 into 33589c6 on flux-framework:master.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 8, 2017

Coverage Status

Coverage decreased (-0.04%) to 78.145% when pulling 8b795c1 on garlick:futures2 into 33589c6 on flux-framework:master.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 9, 2017

int flux_future_then (flux_future_t *f, flux_reactor_t *r,double timeout, flux_continuation_f cb, void *arg);

I'm struggling with the addition of a reactor argument to flux_future_then(). I understand it might be a good hint to the caller that then() is registering a continuation callback with a reactor, however, I'm not sure I like leaving it up to the caller which reactor to pass in to the then().

The future is created and returned by a provider that knows which reactor should be used to wait for completion and fulfill the future. Therefore, it should be up to the provider to register the reactor that makes sense. In fact, I can't think of a case off the top of my head where the proper reactor won't already have been passed into the providing function, either explicitly because the providing function is purely reactor based, or implicitly via a flux_t handle.

The added benefit of removing reactor arg to then() is saving lots of required calls to flux_get_reactor (h) (for the most common case of futures based of rpcs)

Does that make any sense?

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 9, 2017

That does make sense. To make it more concrete, think of RPC:

flux_future_t *flux_rpc (flux_t *h, const char *topic, const char *json_str,
                         uint32_t nodeid, int flags);

Here the rpc implementation calls flux_future_create() and flux_future_set_flux (h).

The user then must pass the reactor on which the dispatcher for h was registered to flux_future_then() or else the future will never be fulfilled. IMHO r should be passed to flux_future_create() by the RPC implementation so the user isn't invited to fall into this trap.

If we find a use case later for allowing the continuation to be registered on an arbitrary reactor, we do have the ability to clone the handle and create a new dispatcher.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 9, 2017

If you are looking for any extra testing, it appears there's no users of flux_future_get/set_flux(). (And I see now why flux_future_set_flux() is special). I imagine the rpc changes will be the first user of this interface?

Also there is something weird in the coverage of flux_dispatch_requeue(). It seems to be used in the code but coverage seems to show that most lines of the function aren't called? (it could be wrong)

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 9, 2017

Ah, thanks for the test hint. Will look at that.

garlick added a commit to garlick/flux-core that referenced this pull request Jun 9, 2017

libflux/future: flux_future_create takes reactor arg
Per discussion in flux-framework#1083, move reactor argument from
flux_future_then() to flux_future_create().
@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 9, 2017

Coverage Status

Coverage decreased (-0.02%) to 78.168% when pulling 153618b on garlick:futures2 into 33589c6 on flux-framework:master.

@garlick garlick force-pushed the garlick:futures2 branch from 153618b to a7dfa53 Jun 9, 2017

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 9, 2017

Coverage Status

Coverage decreased (-0.03%) to 78.149% when pulling a7dfa53 on garlick:futures2 into 33589c6 on flux-framework:master.

@garlick garlick force-pushed the garlick:futures2 branch from a7dfa53 to bd20f58 Jun 9, 2017

test/dispatch: add dispatch test
Add dispatch test and verify that fastpath does not
match responses in the wrong matchtag domain.
@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 9, 2017

Coverage Status

Coverage decreased (-0.04%) to 78.146% when pulling bd20f58 on garlick:futures2 into 33589c6 on flux-framework:master.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 9, 2017

This seems close to ready?

up class-specific watchers on the reactor to handle asynchronous events.
The watchers must eventually call `flux_future_fulfill()` or
`flux_future_fulfill_error()` to fulfill the future. The callback may
occurs in one or both of two contexts. A call in the first context occurs

This comment has been minimized.

@grondo

grondo Jun 9, 2017

Contributor

Typo here The callback may occurs

@garlick garlick force-pushed the garlick:futures2 branch from bd20f58 to 10b6923 Jun 10, 2017

@garlick

This comment has been minimized.

Copy link
Member Author

garlick commented Jun 10, 2017

Thanks, fixed the typo you mentioned and made another pass through the man pages tightening up explanations. I also moved flux_future_get() to the flux_future_then(3) page and adjusted language to suggest that in some cases it might not be wrapped by another class.

Also changed flux_future_wait_for() handling of a zero timeout so that it doesn't enter the reactor in that case. I realized in doing the RPCs that it's handy to be able to check whether the future is fulfilled without entering (or triggering creation of) the internal reactor and thought that might be generally useful. Adjusted man page description to track that.

Rebased and pushed. If tests pass I'm for merging.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Jun 10, 2017

Coverage Status

Coverage decreased (-0.02%) to 78.167% when pulling 10b6923 on garlick:futures2 into 33589c6 on flux-framework:master.

@grondo

This comment has been minimized.

Copy link
Contributor

grondo commented Jun 11, 2017

Nice!

@grondo grondo merged commit 89dafb9 into flux-framework:master Jun 11, 2017

2 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
coverage/coveralls Coverage decreased (-0.02%) to 78.167%
Details

@grondo grondo referenced this pull request Aug 23, 2017

Closed

0.8.0 Release #1160

@garlick garlick deleted the garlick:futures2 branch Sep 6, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.