Skip to content

Commit

Permalink
Merge pull request #5965 from chu11/issue5953_libsubprocess_update_fu…
Browse files Browse the repository at this point in the history
…nctions

libsubprocess: improve read API
  • Loading branch information
mergify[bot] committed May 16, 2024
2 parents 8a6eb0e + c436b85 commit 7bd3e35
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 331 deletions.
2 changes: 1 addition & 1 deletion src/broker/runat.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static void stdio_cb (flux_subprocess_t *p, const char *stream)
const char *line;
int len;

if ((line = flux_subprocess_getline (p, stream, &len)) && len > 0) {
if ((len = flux_subprocess_getline (p, stream, &line)) > 0) {
if (streq (stream, "stderr"))
flux_log (r->h, LOG_ERR, "%s.%d: %s", entry->name, index, line);
else
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/flux-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,16 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
void output_cb (flux_subprocess_t *p, const char *stream)
{
FILE *fstream = streq (stream, "stderr") ? stderr : stdout;
const char *ptr;
int lenp;
const char *buf;
int len;

if (!(ptr = flux_subprocess_getline (p, stream, &lenp)))
if ((len = flux_subprocess_getline (p, stream, &buf)) < 0)
log_err_exit ("flux_subprocess_getline");

if (lenp) {
if (len) {
if (optparse_getopt (opts, "label-io", NULL) > 0)
fprintf (fstream, "%d: ", flux_subprocess_rank (p));
fwrite (ptr, lenp, 1, fstream);
fwrite (buf, len, 1, fstream);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/cmd/flux-start.c
Original file line number Diff line number Diff line change
Expand Up @@ -418,17 +418,17 @@ static void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
void channel_cb (flux_subprocess_t *p, const char *stream)
{
struct client *cli = flux_subprocess_aux_get (p, "cli");
const char *ptr;
int rc, lenp;
const char *buf;
int rc, len;

assert (cli);
assert (streq (stream, "PMI_FD"));

if (!(ptr = flux_subprocess_read_line (p, stream, &lenp)))
if ((len = flux_subprocess_read_line (p, stream, &buf)) < 0)
log_err_exit ("%s: flux_subprocess_read_line", __FUNCTION__);

if (lenp) {
rc = pmi_simple_server_request (ctx.pmi.srv, ptr, cli, cli->rank);
if (len) {
rc = pmi_simple_server_request (ctx.pmi.srv, buf, cli, cli->rank);
if (rc < 0)
log_err_exit ("%s: pmi_simple_server_request", __FUNCTION__);
if (rc == 1)
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/job/mpir.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ static void output_cb (flux_subprocess_t *p, const char *stream)
const char *prog = basename (MPIR_executable_path);
int rank = flux_subprocess_rank (p);

line = flux_subprocess_read_trimmed_line (p, stream, &len);
if (line && len == 0)
line = flux_subprocess_read (p, stream, &len);
if (len)
len = flux_subprocess_read_trimmed_line (p, stream, &line);
if (len == 0)
len = flux_subprocess_read (p, stream, &line);
if (len > 0)
log_msg ("MPIR: rank %d: %s: %s: %s", rank, prog, stream, line);
}

Expand Down
10 changes: 5 additions & 5 deletions src/common/libsubprocess/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,19 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream)
{
subprocess_server_t *s = flux_subprocess_aux_get (p, srvkey);
const flux_msg_t *request = flux_subprocess_aux_get (p, msgkey);
const char *ptr;
int lenp;
const char *buf;
int len;

if (!(ptr = flux_subprocess_read (p, stream, &lenp))) {
if ((len = flux_subprocess_read (p, stream, &buf)) < 0) {
llog_error (s,
"error reading from subprocess stream %s: %s",
stream,
strerror (errno));
goto error;
}

if (lenp) {
if (proc_output (p, stream, s, request, ptr, lenp, false) < 0)
if (len) {
if (proc_output (p, stream, s, request, buf, len, false) < 0)
goto error;
}
else {
Expand Down
98 changes: 49 additions & 49 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,27 @@ void subprocess_standard_output (flux_subprocess_t *p, const char *stream)
{
/* everything except stderr goes to stdout */
FILE *fstream = !strcasecmp (stream, "stderr") ? stderr : stdout;
const char *ptr;
int lenp;
const char *buf;
int len;

/* Do not use flux_subprocess_getline(), this should work
* regardless if stream is line buffered or not */

if (!(ptr = flux_subprocess_read_line (p, stream, &lenp))) {
if ((len = flux_subprocess_read_line (p, stream, &buf)) < 0) {
log_err ("subprocess_standard_output: read_line");
return;
}

/* we're at the end of the stream, read any lingering data */
if (!lenp && flux_subprocess_read_stream_closed (p, stream)) {
if (!(ptr = flux_subprocess_read (p, stream, &lenp))) {
log_err ("subprocess_standard_output: read_line");
if (len == 0 && flux_subprocess_read_stream_closed (p, stream)) {
if ((len = flux_subprocess_read (p, stream, &buf)) < 0) {
log_err ("subprocess_standard_output: read");
return;
}
}

if (lenp)
fwrite (ptr, lenp, 1, fstream);
if (len)
fwrite (buf, len, 1, fstream);
}

/*
Expand Down Expand Up @@ -764,38 +764,39 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
return 0;
}

static const char *subprocess_read (flux_subprocess_t *p,
const char *stream,
int *lenp,
bool read_line,
bool trimmed,
bool line_buffered_required,
bool *readonly)
static int subprocess_read (flux_subprocess_t *p,
const char *stream,
const char **bufp,
bool read_line,
bool trimmed,
bool line_buffered_required,
bool *readonly)
{
struct subprocess_channel *c;
struct fbuf *fb;
const char *ptr;
int len;

if (!p || !stream
|| (p->local && p->in_hook)) {
errno = EINVAL;
return NULL;
return -1;
}

c = zhash_lookup (p->channels, stream);
if (!c || !(c->flags & CHANNEL_READ)) {
errno = EINVAL;
return NULL;
return -1;
}

if (line_buffered_required && !c->line_buffered) {
errno = EPERM;
return NULL;
return -1;
}

if (p->local) {
if (!(fb = fbuf_read_watcher_get_buffer (c->buffer_read_w)))
return NULL;
return -1;
}
else
fb = c->read_buffer;
Expand All @@ -806,41 +807,44 @@ static const char *subprocess_read (flux_subprocess_t *p,

if (read_line) {
if (trimmed) {
if (!(ptr = fbuf_read_trimmed_line (fb, lenp)))
return NULL;
if (!(ptr = fbuf_read_trimmed_line (fb, &len)))
return -1;
}
else {
if (!(ptr = fbuf_read_line (fb, lenp)))
return NULL;
if (!(ptr = fbuf_read_line (fb, &len)))
return -1;
}
}
else {
if (!(ptr = fbuf_read (fb, -1, lenp)))
return NULL;
if (!(ptr = fbuf_read (fb, -1, &len)))
return -1;
}

return ptr;
if (bufp && len > 0)
(*bufp) = ptr;
return len;
}

const char *flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
int *lenp)
int flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
const char **bufp)
{
return subprocess_read (p, stream, lenp, false, false, false, NULL);
return subprocess_read (p, stream, bufp, false, false, false, NULL);
}

const char *flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
int *lenp)

int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
const char **bufp)
{
return subprocess_read (p, stream, lenp, true, false, false, NULL);
return subprocess_read (p, stream, bufp, true, false, false, NULL);
}

const char *flux_subprocess_read_trimmed_line (flux_subprocess_t *p,
const char *stream,
int *lenp)
int flux_subprocess_read_trimmed_line (flux_subprocess_t *p,
const char *stream,
const char **bufp)
{
return subprocess_read (p, stream, lenp, true, true, false, NULL);
return subprocess_read (p, stream, bufp, true, true, false, NULL);
}

bool flux_subprocess_read_stream_closed (flux_subprocess_t *p,
Expand All @@ -864,25 +868,21 @@ bool flux_subprocess_read_stream_closed (flux_subprocess_t *p,
return fb ? fbuf_is_readonly (fb) : false;
}

const char *flux_subprocess_getline (flux_subprocess_t *p,
const char *stream,
int *lenp)
int flux_subprocess_getline (flux_subprocess_t *p,
const char *stream,
const char **bufp)
{
const char *ptr;
int len;
bool readonly;

ptr = subprocess_read (p, stream, &len, true, false, true, &readonly);
len = subprocess_read (p, stream, bufp, true, false, true, &readonly);

/* if no lines available and EOF received, read whatever is
* lingering in the buffer */
if (ptr && len == 0 && readonly)
ptr = flux_subprocess_read (p, stream, &len);

if (lenp)
(*lenp) = len;
if (len == 0 && readonly)
len = flux_subprocess_read (p, stream, bufp);

return ptr;
return len;
}

static flux_future_t *add_pending_signal (flux_subprocess_t *p, int signum)
Expand Down
44 changes: 21 additions & 23 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream);
* Read unread data from stream `stream`. 'stream' can be "stdout",
* "stderr", or the name of a stream specified with flux_cmd_add_channel().
*
* Returns pointer to buffer on success and NULL on error with errno
* set. Buffer is guaranteed to be NUL terminated. User shall not
* free returned pointer. Length of buffer returned can optionally
* returned in 'lenp'.
* Returns length of data on success and -1 on error with errno set.
* Buffer of data is returned in bufp. Buffer is guaranteed to be
* NUL terminated. User shall not free returned pointer.
*
* In most cases, a length of 0 indicates that the subprocess has
* closed this stream. A length of 0 could be returned if read
Expand All @@ -205,36 +204,36 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream);
* once per output callback. flux_subprocess_read_stream_closed()
* can always be used to verify if the stream is in fact closed.
*/
const char *flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
int *lenp);
int flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
const char **bufp);

/*
* Read line of unread data from stream `stream`. 'stream' can be
* "stdout", "stderr", or the name of a stream specified with
* flux_cmd_add_channel().
*
* Returns pointer to buffer on success and NULL on error with errno
* set. Buffer will include newline character and is guaranteed to
* be NUL terminated. If no line is available, returns pointer and
* length of zero. User shall not free returned pointer. Length of
* buffer returned can optionally returned in 'lenp'.
* Returns length of data on success and -1 on error with errno set.
* Buffer with line is returned in bufp. Buffer will include
* newline character and is guaranteed to be NUL terminated. If no
* line is available, returns length of zero. User shall not free
* returned pointer.
*
* A length of zero may be returned if the stream is closed OR if
* the stream is line buffered and a line is not yet available. Use
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*/
const char *flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
int *lenp);
int flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
const char **bufp);

/* Identical to flux_subprocess_read_line(), but does not return
* trailing newline.
*/
const char *flux_subprocess_read_trimmed_line (flux_subprocess_t *p,
const char *stream,
int *lenp);
int flux_subprocess_read_trimmed_line (flux_subprocess_t *p,
const char *stream,
const char **bufp);

/* Determine if the read stream has is closed / received an EOF. This
* function can be useful if you are reading lines via
Expand All @@ -258,14 +257,13 @@ bool flux_subprocess_read_stream_closed (flux_subprocess_t *p,
* last data on the stream does not terminate in a newline
* character, this function will return that last data without the
* trailing newline.
* - if the stream has been closed / reached EOF, lenp will be set to
* 0.
* - if the stream has been closed / reached EOF, 0 will be returned.
* - if the stream is not line buffered, NULL and errno = EPERM will
* be returned.
*/
const char *flux_subprocess_getline (flux_subprocess_t *p,
const char *stream,
int *lenp);
int flux_subprocess_getline (flux_subprocess_t *p,
const char *stream,
const char **bufp);

/*
* Create RPC to send signal `signo` to subprocess `p`.
Expand Down

0 comments on commit 7bd3e35

Please sign in to comment.