Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsubprocess: Support line buffering stdout/stderr #2262

Merged
merged 8 commits into from Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/common/libsubprocess/Makefile.am
Expand Up @@ -38,6 +38,7 @@ TESTS = \
check_PROGRAMS = \
$(TESTS) \
test_echo \
test_multi_echo \
test_fork_sleep

TEST_EXTENSIONS = .t
Expand Down Expand Up @@ -66,4 +67,6 @@ test_subprocess_t_LDADD = $(test_ldadd)

test_echo_SOURCES = test/test_echo.c

test_multi_echo_SOURCES = test/test_multi_echo.c

test_fork_sleep_SOURCES = test/test_fork_sleep.c
46 changes: 19 additions & 27 deletions src/common/libsubprocess/local.c
Expand Up @@ -154,14 +154,14 @@ static int channel_local_setup (flux_subprocess_t *p,
flux_watcher_f in_cb,
flux_watcher_f out_cb,
const char *name,
int channel_flags,
int buffer_size)
int channel_flags)
{
struct subprocess_channel *c = NULL;
int fds[2] = { -1, -1 };
char *e = NULL;
int save_errno;
int fd_flags;
int buffer_size;

if (!(c = channel_create (p, output_f, name, channel_flags))) {
flux_log_error (p->h, "calloc");
Expand All @@ -187,6 +187,11 @@ static int channel_local_setup (flux_subprocess_t *p,
goto error;
}

if ((buffer_size = cmd_option_bufsize (p, name)) < 0) {
flux_log_error (p->h, "cmd_option_bufsize");
goto error;
}

if ((channel_flags & CHANNEL_WRITE) && in_cb) {
c->buffer_write_w = flux_buffer_write_watcher_create (p->reactor,
c->parent_fd,
Expand All @@ -201,11 +206,18 @@ static int channel_local_setup (flux_subprocess_t *p,
}

if ((channel_flags & CHANNEL_READ) && out_cb) {
int wflag;

if ((wflag = cmd_option_line_buffer (p, name)) < 0) {
flux_log_error (p->h, "cmd_option_line_buffer");
goto error;
}

c->buffer_read_w = flux_buffer_read_watcher_create (p->reactor,
c->parent_fd,
buffer_size,
out_cb,
0,
wflag,
c);
if (!c->buffer_read_w) {
flux_log_error (p->h, "flux_buffer_read_watcher_create");
Expand Down Expand Up @@ -260,52 +272,38 @@ static int channel_local_setup (flux_subprocess_t *p,

static int local_setup_stdio (flux_subprocess_t *p)
{
int buffer_size;

if (p->flags & FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH)
return 0;

/* stdio is identical to channels, except they are limited to read
* and/or write, and the buffer's automatically get a NUL char
* appended on reads */

if ((buffer_size = cmd_option_bufsize (p, "STDIN")) < 0)
return -1;

if (channel_local_setup (p,
NULL,
local_in_cb,
NULL,
"STDIN",
CHANNEL_WRITE,
buffer_size) < 0)
CHANNEL_WRITE) < 0)
return -1;

if (p->ops.on_stdout) {
if ((buffer_size = cmd_option_bufsize (p, "STDOUT")) < 0)
return -1;

if (channel_local_setup (p,
p->ops.on_stdout,
NULL,
local_stdout_cb,
"STDOUT",
CHANNEL_READ,
buffer_size) < 0)
CHANNEL_READ) < 0)
return -1;
}

if (p->ops.on_stderr) {
if ((buffer_size = cmd_option_bufsize (p, "STDERR")) < 0)
return -1;

if (channel_local_setup (p,
p->ops.on_stderr,
NULL,
local_stderr_cb,
"STDERR",
CHANNEL_READ,
buffer_size) < 0)
CHANNEL_READ) < 0)
return -1;
}

Expand All @@ -332,18 +330,12 @@ static int local_setup_channels (flux_subprocess_t *p)

name = zlist_first (channels);
while (name) {
int buffer_size;

if ((buffer_size = cmd_option_bufsize (p, name)) < 0)
return -1;

if (channel_local_setup (p,
p->ops.on_channel_out,
local_in_cb,
p->ops.on_channel_out ? local_out_cb : NULL,
name,
channel_flags,
buffer_size) < 0)
channel_flags) < 0)
return -1;
name = zlist_next (channels);
}
Expand Down
41 changes: 15 additions & 26 deletions src/common/libsubprocess/remote.c
Expand Up @@ -245,6 +245,10 @@ static void remote_out_prep_cb (flux_reactor_t *r,
{
struct subprocess_channel *c = arg;

/* We won't handle line buffering as a special case. Since line
* buffering is enabled on the server side, we can safely assume
* we only get data when a line is available */

/* no need to handle failure states, on fatal error, these
* reactors are closed */
if (flux_buffer_bytes (c->read_buffer) > 0
Expand Down Expand Up @@ -293,18 +297,23 @@ static void remote_out_check_cb (flux_reactor_t *r,
static int remote_channel_setup (flux_subprocess_t *p,
flux_subprocess_output_f output_f,
const char *name,
int channel_flags,
int buffer_size)
int channel_flags)
{
struct subprocess_channel *c = NULL;
char *e = NULL;
int save_errno;
int buffer_size;

if (!(c = channel_create (p, output_f, name, channel_flags))) {
flux_log_error (p->h, "calloc");
goto error;
}

if ((buffer_size = cmd_option_bufsize (p, name)) < 0) {
flux_log_error (p->h, "cmd_option_bufsize");
goto error;
}

if (channel_flags & CHANNEL_WRITE) {
if (!(c->write_buffer = flux_buffer_create (buffer_size))) {
flux_log_error (p->h, "flux_buffer_create");
Expand Down Expand Up @@ -395,43 +404,29 @@ static int remote_channel_setup (flux_subprocess_t *p,

static int remote_setup_stdio (flux_subprocess_t *p)
{
int buffer_size;

/* stdio is identical to channels, except they are limited to read
* and/or write, and the buffer's automatically get a NUL char
* appended on reads */

if ((buffer_size = cmd_option_bufsize (p, "STDIN")) < 0)
return -1;

if (remote_channel_setup (p,
NULL,
"STDIN",
CHANNEL_WRITE,
buffer_size) < 0)
CHANNEL_WRITE) < 0)
return -1;

if (p->ops.on_stdout) {
if ((buffer_size = cmd_option_bufsize (p, "STDOUT")) < 0)
return -1;

if (remote_channel_setup (p,
p->ops.on_stdout,
"STDOUT",
CHANNEL_READ,
buffer_size) < 0)
CHANNEL_READ) < 0)
return -1;
}

if (p->ops.on_stderr) {
if ((buffer_size = cmd_option_bufsize (p, "STDERR")) < 0)
return -1;

if (remote_channel_setup (p,
p->ops.on_stderr,
"STDERR",
CHANNEL_READ,
buffer_size) < 0)
CHANNEL_READ) < 0)
return -1;
}

Expand All @@ -458,16 +453,10 @@ static int remote_setup_channels (flux_subprocess_t *p)

name = zlist_first (channels);
while (name) {
int buffer_size;

if ((buffer_size = cmd_option_bufsize (p, name)) < 0)
return -1;

if (remote_channel_setup (p,
p->ops.on_channel_out,
name,
channel_flags,
buffer_size) < 0)
channel_flags) < 0)
return -1;
name = zlist_next (channels);
}
Expand Down
44 changes: 30 additions & 14 deletions src/common/libsubprocess/subprocess.h
Expand Up @@ -230,20 +230,36 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);

/*
* Set generic string options for command object `cmd`. As with environment
* variables, this function adds the option `var` to with value `val` to
* variables, this function adds the option `var` with value `val` to
* the options array for this command. This can be used to enable optional
* behavior for executed processes (e.g. setpgrp(2))
*
* String options, note that name indicates the 'name' argument used
* in flux_cmd_add_channel() above.
*
* name + "_BUFSIZE" = buffer size
* STDIN_BUFSIZE = buffer size
* STDOUT_BUFSIZE = buffer size
* STDERR_BUFSIZE = buffer size
* "BUFSIZE" option
*
* By default, stdio and channels use an internal buffer of 1 meg.
* The buffer size can be adjusted with this option.
*
* - name + "_BUFSIZE" - set buffer size on channel name
* - STDIN_BUFSIZE - set buffer size on stdin
* - STDOUT_BUFSIZE - set buffer size on stdout
* - STDERR_BUFSIZE - set buffer size on stderr
*
* "LINE_BUFFER" option
*
* By default, output callbacks such as 'on_stdout' and 'on_stderr'
* are called when a line of data is available (with the exception
* with data after a subprocess has exited). By setting this
* option to "false", output callbacks will be called whenever any
* amount of data is available. These options can also be set to
* "true" to keep default behavior of line buffering.
*
* - name + "_LINE_BUFFER" - configuring line buffering on channel name
* - STDOUT_LINE_BUFFER - configure line buffering for stdout
* - STDERR_LINE_BUFFER - configure line buffering for stderr
*
* By default, stdio and channels use an internal buffer of 1 meg.
* The buffer size can be adjusted with this option.
*/
int flux_cmd_setopt (flux_cmd_t *cmd, const char *var, const char *val);
const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var);
Expand Down Expand Up @@ -308,10 +324,10 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream);
* "STDOUT".
*
* Returns pointer to buffer on success and NULL on error with errno
* set. If reading from "STDOUT" or "STDERR", buffer is guaranteed
* to be NUL terminated. User shall not free returned pointer.
* Length of buffer returned can optionally returned in 'lenp'. A
* length of 0 indicates that the subprocess has closed this stream.
* set. Buffer is guaranteed to be NUL terminated. User shall not
* free returned pointer. Length of buffer returned can optionally
* returned in 'lenp'. A length of 0 indicates that the subprocess
* has closed this stream.
*/
const char *flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
Expand All @@ -324,9 +340,9 @@ const char *flux_subprocess_read (flux_subprocess_t *p,
* "STDOUT".
*
* Returns pointer to buffer on success and NULL on error with errno
* set. If reading from "STDOUT" or "STDERR", buffer is guaranteed
* to be NUL terminated. User shall not free returned pointer.
* Length of buffer returned can optionally returned in 'lenp'.
* set. Buffer will include newline character and is guaranteed to
* be NUL terminated. User shall not free returned pointer. Length
* of buffer returned can optionally returned in 'lenp'.
*/
const char *flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down