Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: support FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF #5975

Merged
merged 5 commits into from
May 30, 2024

Conversation

chu11
Copy link
Member

@chu11 chu11 commented May 16, 2024

Per discussion in #5919, some notes

  • built on top of my cleanups in libsubprocess: minor cleanup #5974
  • should the use of this flag in job-exec be a different PR?
  • in job-exec subprocess flags are passed in via the API to bulk-exec, so it felt wrong to just assume that flag is always set. So that led to adding a new flux_subprocess_get_flags() function and calling an appropriate "read" function depending on flags. This could be done differently of course, perhaps the flags aren't passed in from the API and are internalized within bulk-exec?
  • haven't tested to ensure that the performance profile changes as expected, dunno if we have a simple reproducer laying around? Edit: I did now, see notes in job-exec: possible scaling issue managing many job shells #5919

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch from 5bb7b6e to 406d9ac Compare May 17, 2024 21:54
Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of quick comments on a first pass

Comment on lines 276 to 286
/* This function is similar to flux_subprocess_read() but is used
* exclusively with FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF.
*
* Unlike flux_subprocess_read() the returned data is not NUL
* terminated. This can be useful for performance improvements,
* especially when data is written out to storage instead of a user's
* terminal.
*/
int flux_subprocess_read_local_unbuf (flux_subprocess_t *p,
const char *stream,
const char **bufp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just have the user call flux_subprocess_read() and change the behavior based on the flag?
The additional public function seems unnecessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the lack of a guaranteed NUL at the end made me want to create a new function and make that fact explicit, since it is a pretty big difference. It certainly could be done with flux_subprocess_read(), but I didn't like having returned data being different depending on flags.

So I guess a style choice. Shall we get another vote?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure on another vote.

Documentation that flux_subprocess_read() does not NULL terminate seems sufficient to me but maybe I like to live dangerously :-)

Alternatively, mabye we could just change iodecode() to add the termination?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is fine to change behavior of the flux_subprocess_read() function based on the flag.

I haven't re-familiarized myself with the code enough to comment on @garlick's proposal to change iodecode() as an alternative though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if we've got two votes to not worry about changing the behavior then I would say lets not get into changing iodecode(), if @chu11 agrees.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, lets just change flux_subprocess_read() based on flags.

Comment on lines 228 to 231
if (flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF) {
if ((len = flux_subprocess_read_local_unbuf (p, stream, &s)) < 0) {
flux_log_error (exec->h, "flux_subprocess_read_local_unbuf");
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be fine to assume the UNBUF flag, e.g. just |= to the flags provided to bulk_exec_push_cmd() and drop the new public function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch 2 times, most recently from c7b178a to ea18008 Compare May 21, 2024 21:29
@chu11
Copy link
Member Author

chu11 commented May 21, 2024

re-pushed given the comments above, did an additional profiling run and it showed some smalll incremental improvements.

Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a first pass and had some comments/questions

@@ -420,35 +420,37 @@ static int remote_channel_setup (flux_subprocess_t *p,
if (wflag)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message still mentions flux_subprocess_read_local_unbuf()

Comment on lines 583 to 590
/* N.B. we technically should clear/memset c->unbuf_data
* before returning EOF, as a user could read from it during
* the eof callback. But for performance reasons, we won't do
* that and assume the caller is smart enough not to do that.
*/
c->read_eof_received = true;
c->unbuf_len = 0;
c->unbuf_eof = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the remote returns data + EOF, doesn't setting c->unbuf_len to zero cause the data to be lost?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N.B. we technically should clear/memset c->unbuf_data before returning EOF, as a user could read from it during the eof callback. But for performance reasons, we won't do that and assume the caller is smart enough not to do that.

Well also it's a const char * so it's not possible to memset it.

If setting c->unbuf_len to zero why not set c->unbuf_data to NULL?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh that comment was leftover from a prior iteration of this PR. I should remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If setting c->unbuf_len to zero why not set c->unbuf_data to NULL?

I guess it wasn't completely necessary, but you are correct, it wouldn't hurt.

If the remote returns data + EOF, doesn't setting c->unbuf_len to zero cause the data to be lost?

Within this function, all the data is returned in the first block in the following code. Then the EOF in the following block. So the second setting of unbuf_len = 0 is intentional, to clear out that data and send EOF to the caller.

    if (data && len) {                                                                                                                      
        c->unbuf_data = data;                                                                                                               
        c->unbuf_len = len;                                                                                                                 
        c->unbuf_consumed = false;                                                                                                          
        if (eof)                                                                                                                            
            c->read_eof_received = true;                                                                                                    
        c->unbuf_eof = false;                                                                                                               
                                                                                                                                            
        c->output_cb (c->p, c->name);                                                                                                       
                                                                                                                                            
        c->unbuf_consumed = false;                                                                                                          
    }                                                                                                                                       
    /* N.B. any data not consumed by the user is lost, so if eof is                                                                         
     * seen, we can send it in the local unbuf immediately */                                                                               
    if (eof) {                                                                                                                              
        c->read_eof_received = true;                                                                                                        
        c->unbuf_data = NULL;                                                                                                               
        c->unbuf_len = 0;                                                                                                                   
        c->unbuf_eof = true;                                                                                                                
                                                                                                                                            
        c->output_cb (c->p, c->name);                                                                                                       
                                                                                                                                            
        c->eof_sent_to_caller = true;                                                                                                       
        c->p->channels_eof_sent++;                                                                                                          
    }                                

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops! My bad, I did not catch the output_cb() call in there.

Comment on lines 803 to 808
/* If user called this function twice, return nothing. They
* will hopefully call flux_subprocess_read_stream_closed() to
* verify EOF was not yet reached.
*/
if (c->unbuf_consumed)
return 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag and the check seem unnecessary. We can probably just document that reading twice returns the same data and it won't be very surprising given the lack of a buffer.

See doc comment below.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag and the check seem unnecessary. We can probably just document that reading twice returns the same data and it won't be very surprising given the lack of a buffer.

I was trying to be consistent to how the buffered case works. But yeah, we could tweak that.

Comment on lines 64 to 74
/* flux_rexec(): In order to improve performance, do not locally
* copy and buffer output from the remote subprocess. Immediately
* call output callbacks. Users can directly retrieve data via
* flux_subprocess_read(). Data will not NUL terminated. Use of
* other read functions will result in a EPERM error.
*/
FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF = 8,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can directly retrieve data via flux_subprocess_read().

We probably should specify that flux_subprocess_read() must be called exactly once when the output callback is invoked. If not called, data is lost. If called more than once, the same data is returned.

Data will not NUL terminated.

Missing a "be".

Use of other read functions will result in a EPERM error.

EPERM is kind of strange here. Would EINVAL be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EPERM is kind of strange here. Would EINVAL be better?

I currently use EPERM for another case where it is "illegal" to call a certain function. It seems an ok choice. To me EINVAL just makes you think bad input. Hmmm, a better option isn't coming to me. Will sleep on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal I suppose!

Comment on lines 832 to 851
return subprocess_read (p, stream, bufp, false, false, false, NULL);
return subprocess_read (p, stream, bufp, false, false, false, true, NULL);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method of reducing duplicated code with a common function with booleans that alter its behavior is hard to follow. Instead of adding one more boolean to this function, how about adding a check to flux_subprocess_read_line() et al like this:

if (p && (p->flags & FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF)) {
    errno = EINVAL;
    return -1;
}

Comment on lines 588 to 590
c->read_eof_received = true;
c->unbuf_len = 0;
c->unbuf_eof = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have both c->eof_received and c->unbuf_eof?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think unbuf_eof is left over from a prior attempt. Should remove it.

Comment on lines +322 to +325
len = flux_subprocess_read_line (p, stream, &line);
ok (len < 0
&& errno == EPERM,
"flux_subprocess_read_line fails w/ EPERM w/ LOCAL_UNBUF");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests like this should zero errno before testing that the function sets it.

Comment on lines 267 to 268
/* bulk-exec defaults to use unbuffered for performance */
c->flags = flags | FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just say "bulk-exec always uses unbuffered reads for performance" since calling it a default sort of implies that there could be a way to change it.

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch from ea18008 to f3562da Compare May 29, 2024 15:19
@chu11
Copy link
Member Author

chu11 commented May 29, 2024

re-pushed w/ tweaks per comments above. Only notable thing is kept EPERM vs EINVAL for the "can't use these functions" error case.

@garlick
Copy link
Member

garlick commented May 29, 2024

Sounds good. I'll get this on my test system that is configured for sdexec and see how it goes.

One concern is the lack of output buffering in sdexec, which in combination with this flag means no buffering at all. I'll try it and we should also audit job-exec with that in mind to see if we can spot any problem areas. We may need to add output buffering to sdexec before switching job-exec over if there are problems.

@garlick
Copy link
Member

garlick commented May 29, 2024

The shell barrier protocol is fine since I believe it's the only user of stdout, and the protocol consists of the shell sending a line of output and then job exec sending one line back. There is not much opportunity for buffering (or lack thereof) to screw that up.

@grondo suggested I print something from a userrc to check how job-exec handles shell output on stderr.

-- foo.lua
io.stderr:write("foo bar\n")
io.stderr:write("foo bar\n")
io.stderr:write("foo bar\n")
io.stderr:write("foo bar\n")

With master

$ flux run -o userrc=foo.lua true
0.040s: flux-shell[0]: stderr: foo bar
0.040s: flux-shell[0]: stderr: foo bar
0.040s: flux-shell[0]: stderr: foo bar
0.040s: flux-shell[0]: stderr: foo bar
$ flux job eventlog -H -p exec $(flux job last) | grep log
[  +0.017012] log component="flux-shell" stream="stderr" rank="0" data="foo bar\n"
[  +0.017024] log component="flux-shell" stream="stderr" rank="0" data="foo bar\n"
[  +0.017031] log component="flux-shell" stream="stderr" rank="0" data="foo bar\n"
[  +0.017038] log component="flux-shell" stream="stderr" rank="0" data="foo bar\n"

With this PR there is evidence that lines are not being buffered.
Surprisingly, this happens with and without sdexec. I thought we had remote line buffering with the normal subprocess server.

$ flux run -o userrc=foo true
0.064s: flux-shell[0]: stderr: foo bar
foo bar
foo bar
foo bar
$ flux job eventlog -H -p exec $(flux job last) | grep log
[  +0.033395] log component="flux-shell" stream="stderr" rank="0" data="foo bar\nfoo bar\nfoo bar\nfoo bar\n"

@garlick
Copy link
Member

garlick commented May 29, 2024

Oh! Even though the remote subprocess is line buffered (the default), we are using flux_subprocess_read() in the server to consume data from the buffer and put it into messages. We need to use the line oriented read functions so that we are sending only one line per message or else the benefits of line buffering are lost. I guess the server output callback needs to check the stream's LINE_BUFFER option setting and then use the appropriate read function.

@garlick
Copy link
Member

garlick commented May 29, 2024

This seems to fix that problem.

diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c
index fca01444e..139923f23 100644
--- a/src/common/libsubprocess/server.c
+++ b/src/common/libsubprocess/server.c
@@ -246,7 +246,10 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream)
     const char *buf;
     int len;
 
-    if ((len = flux_subprocess_read (p, stream, &buf)) < 0) {
+    len = flux_subprocess_getline (p, stream, &buf);
+    if (len < 0 && errno == EPERM) // not line buffered
+        len = flux_subprocess_read (p, stream, &buf);
+    if (len < 0) {
         llog_error (s,
                     "error reading from subprocess stream %s: %s",
                     stream,

@chu11
Copy link
Member Author

chu11 commented May 29, 2024

This seems to fix that problem.

Hmmmm. There's a part of me wondering if I originally messed up and had intended to NOT line buffer on the server side, i.e. all line buffer handling occurs on the client side.

But now that we have LOCAL_UNBUF, perhaps that no longer matters and the approach you have is what needs to be done.

@garlick
Copy link
Member

garlick commented May 30, 2024

I guess it's a trade off. The client has turned out to be pretty heavy with the buffering on that end. OTOH if we buffer on the server end then we can end up with more message framing overhead when there are bursts of messages. But since it's a common idiom to have many clients in one place spread out across many servers, moving the buffering to the server side probably makes sense.

Should we add the server side change in this PR and also maybe a test to show that we get one callback per line in UNBUF mode with the libsubprocess server? Then we could follow up with a PR for sdexec to add some buffering to it. I can look at that.

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch 2 times, most recently from 1d82dc8 to ef54471 Compare May 30, 2024 01:44
Copy link

codecov bot commented May 30, 2024

Codecov Report

Attention: Patch coverage is 74.60317% with 16 lines in your changes are missing coverage. Please review.

Project coverage is 83.29%. Comparing base (e0f5541) to head (ef54471).
Report is 2 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5975      +/-   ##
==========================================
- Coverage   83.30%   83.29%   -0.01%     
==========================================
  Files         518      518              
  Lines       83439    83481      +42     
==========================================
+ Hits        69506    69534      +28     
- Misses      13933    13947      +14     
Files Coverage Δ
src/common/libsubprocess/server.c 78.86% <100.00%> (+0.19%) ⬆️
src/common/libsubprocess/subprocess.c 89.56% <100.00%> (+0.63%) ⬆️
src/modules/job-exec/bulk-exec.c 77.86% <66.66%> (ø)
src/common/libsubprocess/remote.c 77.38% <61.53%> (-0.19%) ⬇️

... and 12 files with indirect coverage changes

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch from ef54471 to e116ac8 Compare May 30, 2024 14:46
@chu11
Copy link
Member Author

chu11 commented May 30, 2024

re-pushed, adding commit to update server side to send each line buffered chunk and added a multiline test.

Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're good!

@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch from e116ac8 to 1a567ee Compare May 30, 2024 19:32
chu11 added 4 commits May 30, 2024 15:01
Problem: When output is returned from a remote process, data is
copied into a local buffer before it is read by callers.  This can
become a performance issue at larger scales.

Support a new FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF flag.  When launching
a remote subprocess, do not locally buffer the data.  Instead, immediately
call the respective output callback and give the data to the user
via flux_subprocess_read().
Problem: If line buffering is enabled on a remote subprocess, the
server does not respond with output containing a single line of
data.  This was not an issue when output data was buffered on the
client side, but with the new LOCAL_UNBUF flag, multiple lines of
data could be read in a single callback.

In the server, get flux_subprocess_getline() over flux_subprocess_read().
Only use flux_subprocess_read() if line buffering was not enabled.
Problem: There is currently not testing for the new
FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF flag.

Add coverage in test/remote.c.  Add test in test/subprocess.c to
ensure the flag only works with remote subprocesses.
Problem: Tests show that when launching many remote subprocesses
through job-exec, there can be performance slowdown due to libsubprocess.

Use the new libsubprocess FLUX_SUBPROCESS_FLAGS_LOCAL_UNBUF to improve
performance.  This flag will skip the buffering of data before being
sent to the caller.  In additiona, it will not bother to NUL terminate
the data, which job-exec does not need since it is immediately sent to
the eventlog.
@chu11
Copy link
Member Author

chu11 commented May 30, 2024

hmmm alpine builder failed

Error: not ok 13 - job-exec: can specify default-shell on cmdline
Error: not ok 16 - job-exec: update default shell via config reload
Error: not ok 17 - job-exec: cmdline default shell takes priority
Error: not ok 18 - job-exec: can specify imp path on cmdline
Error: not ok 21 - job-exec: update imp path via config reload
Error: not ok 22 - job-exec: cmdline imp path takes priority
Error: not ok 23 - job-exec: can specify exec service on cmdline
Error: not ok 25 - job-exec: update exec service via config reload
Error: not ok 26 - job-exec: cmdline exec service takes priority
Error: not ok 28 - job-exec: update exec service override via config reload
Error: not ok 32 - job-exec: update sdexec properties via config reload
Error: ERROR: t2403-job-exec-conf.t - exited with status 1

these are all job-exec reload module tests. Wondering if the recent additions kill-timeout, etc. have issues when the module is reloaded. on alpine, the globals may not be reset.

Here's one troubling failure

  flux-module: job-exec.stats-get: Out of memory
  not ok 13 - job-exec: can specify default-shell on cmdline

I'll restart the builder, this obviously passed on other runs allowing this to be merged.

Edit: failed again same way

Problem: Per comment from trws in

97421e8

The musl libc loader doesn't actually unload objects on
dlclose, so a subsequent dlopen doesn't re-clear globals and
similar.

Since we support config reloading in job-exec, we need
to re-initialize globals on each load, otherwise invalid data
will exist in globals.

The new signaling globals are not re-initialized on reload.

Solution: Re-initialize signaling globals in job_exec_set_config_globals().
@chu11 chu11 force-pushed the issue5919_subprocess_unbuf_local branch from 1a567ee to a35f615 Compare May 30, 2024 23:09
@chu11
Copy link
Member Author

chu11 commented May 30, 2024

without any better ideas, re-pushed with an extra commit that re-initializes the new signaling globals. I'm guessing my changes to job-exec tickled things enough to make things fail.

@mergify mergify bot merged commit 71a8541 into flux-framework:master May 30, 2024
33 checks passed
@grondo
Copy link
Contributor

grondo commented May 30, 2024

Ah, yeah, that makes sense. Probably we were only lucky to get away with it previously.

@chu11
Copy link
Member Author

chu11 commented May 30, 2024

oh whoops, i should have removed merge-when-passing until that new commit was reviewed .... hopefully it's ok :-) here it is

a35f615

@chu11 chu11 deleted the issue5919_subprocess_unbuf_local branch May 30, 2024 23:49
@grondo
Copy link
Contributor

grondo commented May 31, 2024

Well that change overwrites the previous setting on any configuration reload with the defaults. However, probably not a big deal since these are mostly used only for testing at this point...

@chu11
Copy link
Member Author

chu11 commented May 31, 2024

Well that change overwrites the previous setting on any configuration reload with the defaults. However, probably not a big deal since these are mostly used only for testing at this point...

Gah ... that's right, should have passed the saved argc/argv into it like we do with the exec_config ... I'll do a fix.

    if (job_exec_set_config_globals (h, conf, 0, NULL, &err) < 0)
        goto error;

    while ((impl = implementations[i]) && impl->name) {
        if (impl->config) {
            if ((*impl->config) (h, conf, ctx->argc, ctx->argv, &err) < 0)
                goto error;
        }
        i++;
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants