Skip to content

Commit

Permalink
Merge pull request #5950 from chu11/issue5940_libsubprocess_read_no_len
Browse files Browse the repository at this point in the history
libsubprocess: remove unused len parameter from flux_subprocess_read()
  • Loading branch information
mergify[bot] committed May 13, 2024
2 parents 1c626ca + 64af2e4 commit 9f5ad97
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/cmd/job/mpir.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ static void output_cb (flux_subprocess_t *p, const char *stream)

line = flux_subprocess_read_trimmed_line (p, stream, &len);
if (line && len == 0)
line = flux_subprocess_read (p, stream, -1, &len);
line = flux_subprocess_read (p, stream, &len);
if (len)
log_msg ("MPIR: rank %d: %s: %s: %s", rank, prog, stream, line);
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/libsubprocess/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ static void proc_output_cb (flux_subprocess_t *p, const char *stream)
const char *ptr;
int lenp;

if (!(ptr = flux_subprocess_read (p, stream, -1, &lenp))) {
if (!(ptr = flux_subprocess_read (p, stream, &lenp))) {
llog_error (s,
"error reading from subprocess stream %s: %s",
stream,
Expand Down
22 changes: 8 additions & 14 deletions src/common/libsubprocess/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ void subprocess_standard_output (flux_subprocess_t *p, const char *stream)

/* we're at the end of the stream, read any lingering data */
if (!lenp && flux_subprocess_read_stream_closed (p, stream)) {
if (!(ptr = flux_subprocess_read (p, stream, -1, &lenp))) {
if (!(ptr = flux_subprocess_read (p, stream, &lenp))) {
log_err ("subprocess_standard_output: read_line");
return;
}
Expand Down Expand Up @@ -762,7 +762,7 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)

static const char *subprocess_read (flux_subprocess_t *p,
const char *stream,
int len, int *lenp,
int *lenp,
bool read_line,
bool trimmed,
bool line_buffered_required,
Expand All @@ -778,11 +778,6 @@ static const char *subprocess_read (flux_subprocess_t *p,
return NULL;
}

if (!read_line && len == 0) {
errno = EINVAL;
return NULL;
}

c = zhash_lookup (p->channels, stream);
if (!c || !(c->flags & CHANNEL_READ)) {
errno = EINVAL;
Expand Down Expand Up @@ -816,7 +811,7 @@ static const char *subprocess_read (flux_subprocess_t *p,
}
}
else {
if (!(ptr = fbuf_read (fb, len, lenp)))
if (!(ptr = fbuf_read (fb, -1, lenp)))
return NULL;
}

Expand All @@ -825,24 +820,23 @@ static const char *subprocess_read (flux_subprocess_t *p,

const char *flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
int len,
int *lenp)
{
return subprocess_read (p, stream, len, lenp, false, false, false, NULL);
return subprocess_read (p, stream, lenp, false, false, false, NULL);
}

const char *flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
int *lenp)
{
return subprocess_read (p, stream, 0, lenp, true, false, false, NULL);
return subprocess_read (p, stream, lenp, true, false, false, NULL);
}

const char *flux_subprocess_read_trimmed_line (flux_subprocess_t *p,
const char *stream,
int *lenp)
{
return subprocess_read (p, stream, 0, lenp, true, true, false, NULL);
return subprocess_read (p, stream, lenp, true, true, false, NULL);
}

bool flux_subprocess_read_stream_closed (flux_subprocess_t *p,
Expand Down Expand Up @@ -874,12 +868,12 @@ const char *flux_subprocess_getline (flux_subprocess_t *p,
int len;
bool readonly;

ptr = subprocess_read (p, stream, 0, &len, true, false, true, &readonly);
ptr = subprocess_read (p, stream, &len, true, false, true, &readonly);

/* if no lines available and EOF received, read whatever is
* lingering in the buffer */
if (ptr && len == 0 && readonly)
ptr = flux_subprocess_read (p, stream, -1, &len);
ptr = flux_subprocess_read (p, stream, &len);

if (lenp)
(*lenp) = len;
Expand Down
21 changes: 15 additions & 6 deletions src/common/libsubprocess/subprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,27 @@ int flux_subprocess_write (flux_subprocess_t *p,
int flux_subprocess_close (flux_subprocess_t *p, const char *stream);

/*
* Read up to `len` bytes of unread data from stream `stream`. To
* read all data, specify 'len' of -1. 'stream' can be "stdout",
* Read unread data from stream `stream`. 'stream' can be "stdout",
* "stderr", or the name of a stream specified with flux_cmd_add_channel().
*
* Returns pointer to buffer on success and NULL on error with errno
* set. Buffer is guaranteed to be NUL terminated. User shall not
* free returned pointer. Length of buffer returned can optionally
* returned in 'lenp'. A length of 0 indicates that the subprocess
* has closed this stream.
* returned in 'lenp'.
*
* In most cases, a length of 0 indicates that the subprocess has
* closed this stream. A length of 0 could be returned if read
* functions are called multiple times within a single output
* callback, so it is generally recommended to call this function
* once per output callback. flux_subprocess_read_stream_closed()
* can always be used to verify if the stream is in fact closed.
*/
const char *flux_subprocess_read (flux_subprocess_t *p,
const char *stream,
int len,
int *lenp);

/*
* Read line unread data from stream `stream`. 'stream' can be
* Read line of unread data from stream `stream`. 'stream' can be
* "stdout", "stderr", or the name of a stream specified with
* flux_cmd_add_channel().
*
Expand All @@ -215,6 +219,11 @@ const char *flux_subprocess_read (flux_subprocess_t *p,
* be NUL terminated. If no line is available, returns pointer and
* length of zero. User shall not free returned pointer. Length of
* buffer returned can optionally returned in 'lenp'.
*
* A length of zero may be returned if the stream is closed OR if
* the stream is line buffered and a line is not yet available. Use
* flux_subprocess_read_stream_closed() to distinguish between the
* two.
*/
const char *flux_subprocess_read_line (flux_subprocess_t *p,
const char *stream,
Expand Down
10 changes: 5 additions & 5 deletions src/common/libsubprocess/test/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void channel_fd_env_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -142,7 +142,7 @@ void channel_in_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -213,7 +213,7 @@ void channel_in_and_out_cb (flux_subprocess_t *p, const char *stream)
/* no check of flux_subprocess_read_stream_closed(), we aren't
* closing channel in test below */

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -303,7 +303,7 @@ void channel_multiple_lines_cb (flux_subprocess_t *p, const char *stream)
/* no check of flux_subprocess_read_stream_closed(), we aren't
* closing channel in test below */

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -378,7 +378,7 @@ void channel_nul_terminate_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down
100 changes: 88 additions & 12 deletions src/common/libsubprocess/test/stdio.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ int multiple_lines_stderr_output_cb_count;
int stdin_closed_stdout_cb_count;
int stdin_closed_stderr_cb_count;
int timer_cb_count;
char outputbuf[1024];
int outputbuf_len;

static int fdcount (void)
{
Expand Down Expand Up @@ -96,7 +98,7 @@ void output_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -135,6 +137,78 @@ void test_basic_stdout (flux_reactor_t *r)
flux_cmd_destroy (cmd);
}

void output_no_readline_cb (flux_subprocess_t *p, const char *stream)
{
const char *ptr;
char cmpbuf[1024];
int lenp = 0;
int *counter;

if (!strcasecmp (stream, "stdout"))
counter = &stdout_output_cb_count;
else if (!strcasecmp (stream, "stderr"))
counter = &stderr_output_cb_count;
else {
ok (false, "unexpected stream %s", stream);
return;
}

ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL,
"flux_subprocess_read on %s success", stream);

if (lenp > 0) {
memcpy (outputbuf + outputbuf_len, ptr, lenp);
outputbuf_len += lenp;
}
else {
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

sprintf (cmpbuf, "%s:hi\n", stream);
ok (streq (outputbuf, cmpbuf),
"flux_subprocess_read_line returned correct data");
/* 1 + 2 + 1 for ':', "hi", '\n' */
ok (outputbuf_len == (strlen (stream) + 1 + 2 + 1),
"flux_subprocess_read returned correct amount of data");
}

(*counter)++;
}

/* use flux_subprocess_read() instead of flux_subprocess_read_line() */
void test_basic_stdout_no_readline (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-P", "-O", "hi", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;

ok ((cmd = flux_cmd_create (4, av, environ)) != NULL, "flux_cmd_create");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = output_no_readline_cb
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
stderr_output_cb_count = 0;
memset (outputbuf, '\0', sizeof (outputbuf));
outputbuf_len = 0;
p = flux_local_exec (r, 0, cmd, &ops);
ok (p != NULL, "flux_local_exec");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count >= 2, "stdout output callback called atleast 2 times");
ok (stderr_output_cb_count == 0, "stderr output callback called 0 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

void test_basic_stderr (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-P", "-E", "hi", NULL };
Expand Down Expand Up @@ -249,7 +323,7 @@ void output_default_stream_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", "stdout");

ptr = flux_subprocess_read (p, "stdout", -1, &lenp);
ptr = flux_subprocess_read (p, "stdout", &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", "stdout");
Expand Down Expand Up @@ -314,7 +388,7 @@ void output_no_newline_cb (flux_subprocess_t *p, const char *stream)
&& lenp == 0,
"flux_subprocess_read_line on %s read 0 lines", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp > 0,
"flux_subprocess_read on %s read success", stream);
Expand All @@ -331,7 +405,7 @@ void output_no_newline_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -402,7 +476,7 @@ void output_trimmed_line_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -494,7 +568,7 @@ void multiple_lines_output_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -564,7 +638,7 @@ void stdin_closed_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -709,12 +783,12 @@ void output_read_line_until_eof_error_cb (flux_subprocess_t *p, const char *stre

/* drain whatever is in the buffer, we don't care about
* contents for this test */
ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL && lenp > 0,
"flux_subprocess_read on %s success", stream);
}
else {
ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
Expand Down Expand Up @@ -889,7 +963,7 @@ void line_output_cb (flux_subprocess_t *p, const char *stream)
ok (flux_subprocess_read_stream_closed (p, stream),
"flux_subprocess_read_stream_closed saw EOF on %s", stream);

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
ok (ptr != NULL && lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
}
Expand Down Expand Up @@ -1089,7 +1163,7 @@ void start_stdout_after_stderr_cb (flux_subprocess_t *p, const char *stream)
return;
}

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
(*counter)++;
(*len_counter)+= lenp;

Expand Down Expand Up @@ -1190,7 +1264,7 @@ void mid_stop_cb (flux_subprocess_t *p, const char *stream)
return;
}

ptr = flux_subprocess_read (p, stream, -1, &lenp);
ptr = flux_subprocess_read (p, stream, &lenp);
if (stdout_output_cb_count == 0) {
flux_watcher_t *tw = NULL;
ok (ptr && lenp > 0,
Expand Down Expand Up @@ -1283,6 +1357,8 @@ int main (int argc, char *argv[])

diag ("basic_stdout");
test_basic_stdout (r);
diag ("basic_stdout_no_readline");
test_basic_stdout_no_readline (r);
diag ("basic_stderr");
test_basic_stderr (r);
diag ("basic_stdout_and_stderr");
Expand Down

0 comments on commit 9f5ad97

Please sign in to comment.