Skip to content

Commit

Permalink
libsubprocess: change default stream names
Browse files Browse the repository at this point in the history
Problem: upper case subprocess stream names STDIN, STDOUT,
and STDERR are not compatible with RFC 24 stream names.

Change the subprocess stream names to lower case.

Update users and tests.
  • Loading branch information
garlick committed Aug 14, 2019
1 parent 62cf2e2 commit add8e68
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 181 deletions.
2 changes: 1 addition & 1 deletion src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ static void runlevel_io_cb (runlevel_t *r, const char *name,
const char *msg, void *arg)
{
broker_ctx_t *ctx = arg;
int loglevel = !strcmp (name, "STDERR") ? LOG_ERR : LOG_INFO;
int loglevel = !strcmp (name, "stderr") ? LOG_ERR : LOG_INFO;
int runlevel = runlevel_get_level (r);

flux_log (ctx->h, loglevel, "rc%d: %s", runlevel, msg);
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)

void output_cb (flux_subprocess_t *p, const char *stream)
{
FILE *fstream = !strcasecmp (stream, "STDERR") ? stderr : stdout;
FILE *fstream = !strcmp (stream, "stderr") ? stderr : stdout;
const char *ptr;
int lenp;

Expand Down Expand Up @@ -207,7 +207,7 @@ static void stdin_cb (flux_reactor_t *r, flux_watcher_t *w,
while (p) {
if (flux_subprocess_state (p) == FLUX_SUBPROCESS_INIT
|| flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING) {
if (flux_subprocess_write (p, "STDIN", ptr, lenp) < 0)
if (flux_subprocess_write (p, "stdin", ptr, lenp) < 0)
log_err_exit ("flux_subprocess_write");
}
p = zlist_next (subprocesses);
Expand All @@ -216,7 +216,7 @@ static void stdin_cb (flux_reactor_t *r, flux_watcher_t *w,
else {
p = zlist_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "STDIN") < 0)
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
}
Expand Down Expand Up @@ -398,13 +398,13 @@ int main (int argc, char *argv[])
if (optparse_getopt (opts, "verbose", NULL) > 0)
fprintf (stderr, "%03fms: Sent all requests\n", monotime_since (t0));

/* -n,--noinput: close subprocess STDIN
/* -n,--noinput: close subprocess stdin
*/
if (optparse_getopt (opts, "noinput", NULL) > 0) {
flux_subprocess_t *p;
p = zlist_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "STDIN") < 0)
if (flux_subprocess_close (p, "stdin") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
}
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/flux-job.c
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ void print_output (flux_t *h, flux_jobid_t id, optparse_t *p, bool missing_ok)
if (iodecode (context, &stream, &rank, &data, &len, NULL) < 0)
log_msg_exit ("malformed event context");
if (len > 0) {
FILE *fp = !strcmp (stream, "STDOUT") ? stdout : stderr;
FILE *fp = !strcmp (stream, "stdout") ? stdout : stderr;
if (optparse_hasopt (p, "label"))
fprintf (fp, "%d: ", rank);
fwrite (data, len, 1, fp);
Expand Down
12 changes: 6 additions & 6 deletions src/common/libsubprocess/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
NULL,
local_in_cb,
NULL,
"STDIN",
"stdin",
CHANNEL_WRITE) < 0)
return -1;

Expand All @@ -296,7 +296,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
p->ops.on_stdout,
NULL,
local_stdout_cb,
"STDOUT",
"stdout",
CHANNEL_READ) < 0)
return -1;
}
Expand All @@ -306,7 +306,7 @@ static int local_setup_stdio (flux_subprocess_t *p)
p->ops.on_stderr,
NULL,
local_stderr_cb,
"STDERR",
"stderr",
CHANNEL_READ) < 0)
return -1;
}
Expand Down Expand Up @@ -451,14 +451,14 @@ static int local_child (flux_subprocess_t *p)
close_parent_fds (p);

if (!(p->flags & FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH)) {
if ((c = zhash_lookup (p->channels, "STDIN"))) {
if ((c = zhash_lookup (p->channels, "stdin"))) {
if (dup2 (c->child_fd, STDIN_FILENO) < 0) {
flux_log_error (p->h, "dup2");
_exit (1);
}
}

if ((c = zhash_lookup (p->channels, "STDOUT"))) {
if ((c = zhash_lookup (p->channels, "stdout"))) {
if (dup2 (c->child_fd, STDOUT_FILENO) < 0) {
flux_log_error (p->h, "dup2");
_exit (1);
Expand All @@ -467,7 +467,7 @@ static int local_child (flux_subprocess_t *p)
else
close (STDOUT_FILENO);

if ((c = zhash_lookup (p->channels, "STDERR"))) {
if ((c = zhash_lookup (p->channels, "stderr"))) {
if (dup2 (c->child_fd, STDERR_FILENO) < 0) {
flux_log_error (p->h, "dup2");
_exit (1);
Expand Down
6 changes: 3 additions & 3 deletions src/common/libsubprocess/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,22 +470,22 @@ static int remote_setup_stdio (flux_subprocess_t *p)

if (remote_channel_setup (p,
NULL,
"STDIN",
"stdin",
CHANNEL_WRITE) < 0)
return -1;

if (p->ops.on_stdout) {
if (remote_channel_setup (p,
p->ops.on_stdout,
"STDOUT",
"stdout",
CHANNEL_READ) < 0)
return -1;
}

if (p->ops.on_stderr) {
if (remote_channel_setup (p,
p->ops.on_stderr,
"STDERR",
"stderr",
CHANNEL_READ) < 0)
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ int flux_subprocess_server_terminate_by_uuid (flux_subprocess_server_t *s,
void flux_standard_output (flux_subprocess_t *p, const char *stream)
{
/* everything except stderr goes to stdout */
FILE *fstream = !strcasecmp (stream, "STDERR") ? stderr : stdout;
FILE *fstream = !strcasecmp (stream, "stderr") ? stderr : stdout;
const char *ptr;
int lenp;

Expand Down
28 changes: 14 additions & 14 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ typedef enum {
*/
enum {
/* flux_exec(): let parent stdin, stdout, stderr, carry to child.
* Do not create "STDIN", "STDOUT", or "STDERR" channels. Subsequently,
* Do not create "stdin", "stdout", or "stderr" channels. Subsequently,
* flux_subprocess_write()/close()/read()/read_line() will fail on
* streams of "STDIN", "STDOUT", or "STDERR".
* streams of "stdin", "stdout", or "stderr".
*/
FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH = 1,
/* flux_exec(): call setpgrp() before exec(2) */
Expand Down Expand Up @@ -250,9 +250,9 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);
* The buffer size can be adjusted with this option.
*
* - name + "_BUFSIZE" - set buffer size on channel name
* - STDIN_BUFSIZE - set buffer size on stdin
* - STDOUT_BUFSIZE - set buffer size on stdout
* - STDERR_BUFSIZE - set buffer size on stderr
* - stdin_BUFSIZE - set buffer size on stdin
* - stdout_BUFSIZE - set buffer size on stdout
* - stderr_BUFSIZE - set buffer size on stderr
*
* "LINE_BUFFER" option
*
Expand All @@ -264,8 +264,8 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);
* "true" to keep default behavior of line buffering.
*
* - name + "_LINE_BUFFER" - configuring line buffering on channel name
* - STDOUT_LINE_BUFFER - configure line buffering for stdout
* - STDERR_LINE_BUFFER - configure line buffering for stderr
* - stdout_LINE_BUFFER - configure line buffering for stdout
* - stderr_LINE_BUFFER - configure line buffering for stderr
*
* "STREAM_STOP" option
*
Expand All @@ -279,8 +279,8 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);
* keep default behavior.
*
* - name + "_STREAM_STOP" - configure start/stop on channel name
* - STDOUT_STREAM_STOP - configure start/stop for stdout
* - STDERR_STREAM_STOP - configure start/stop for stderr
* - stdout_STREAM_STOP - configure start/stop for stdout
* - stderr_STREAM_STOP - configure start/stop for stderr
*/
int flux_cmd_setopt (flux_cmd_t *cmd, const char *var, const char *val);
const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var);
Expand Down Expand Up @@ -333,7 +333,7 @@ int flux_subprocess_stream_status (flux_subprocess_t *p, const char *stream);

/*
* Write data to "stream" stream of subprocess `p`. 'stream' can be
* "STDIN" or the name of a stream specified with flux_cmd_add_channel().
* "stdin" or the name of a stream specified with flux_cmd_add_channel().
*
* Returns the total amount of data successfully buffered.
*/
Expand All @@ -342,15 +342,15 @@ int flux_subprocess_write (flux_subprocess_t *p, const char *stream,

/*
* Close "stream" stream of subprocess `p` and schedule EOF to be sent.
* 'stream' can be "STDIN" or the name of a stream specified with
* 'stream' can be "stdin" or the name of a stream specified with
* flux_cmd_add_channel().
*/
int flux_subprocess_close (flux_subprocess_t *p, const char *stream);

/*
* Read up to `len` bytes of unread data from stream `stream`. To
* read all data, specify 'len' of -1. 'stream' can be "STDOUT",
* "STDERR", or the name of a stream specified with flux_cmd_add_channel().
* read all data, specify 'len' of -1. 'stream' can be "stdout",
* "stderr", or the name of a stream specified with flux_cmd_add_channel().
*
* Returns pointer to buffer on success and NULL on error with errno
* set. Buffer is guaranteed to be NUL terminated. User shall not
Expand All @@ -364,7 +364,7 @@ const char *flux_subprocess_read (flux_subprocess_t *p,

/*
* Read line unread data from stream `stream`. 'stream' can be
* "STDOUT", "STDERR", or the name of a stream specified with
* "stdout", "stderr", or the name of a stream specified with
* flux_cmd_add_channel().
*
* Returns pointer to buffer on success and NULL on error with errno
Expand Down
Loading

0 comments on commit add8e68

Please sign in to comment.