Skip to content

Commit

Permalink
libsubprocess: Add flux_subprocess_getline() tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chu11 authored and mergify-bot committed Jul 25, 2019
1 parent 535d805 commit b095403
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 0 deletions.
154 changes: 154 additions & 0 deletions src/common/libsubprocess/test/subprocess.c
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,156 @@ void test_basic_multiple_lines (flux_reactor_t *r)
flux_cmd_destroy (cmd);
}

void output_read_line_until_eof_cb (flux_subprocess_t *p, const char *stream)
{
const char *ptr;
int lenp = 0;
int *counter;

if (!strcasecmp (stream, "STDOUT"))
counter = &stdout_output_cb_count;
else if (!strcasecmp (stream, "STDERR"))
counter = &stderr_output_cb_count;
else {
ok (false, "unexpected stream %s", stream);
return;
}

ptr = flux_subprocess_getline (p, stream, &lenp);
if ((*counter) == 0) {
ok (ptr != NULL,
"flux_subprocess_getline on %s success", stream);
ok (!strcmp (ptr, "foo\n"),
"flux_subprocess_getline returned correct data");
ok (lenp == 4,
"flux_subprocess_getline returned correct data len");
}
else if ((*counter) == 1) {
ok (ptr != NULL,
"flux_subprocess_getline on %s success", stream);
ok (!strcmp (ptr, "bar"),
"flux_subprocess_getline returned correct data");
ok (lenp == 3,
"flux_subprocess_getline returned correct data len");
}
else {
ok (ptr != NULL,
"flux_subprocess_getline on %s success", stream);
ok (lenp == 0,
"flux_subprocess_getline returned EOF");
}

(*counter)++;
}

void test_basic_read_line_until_eof (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "-E", "-n", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;

ok ((cmd = flux_cmd_create (4, av, environ)) != NULL, "flux_cmd_create");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = output_read_line_until_eof_cb,
.on_stderr = output_read_line_until_eof_cb
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
stderr_output_cb_count = 0;
p = flux_local_exec (r, 0, cmd, &ops, NULL);
ok (p != NULL, "flux_local_exec");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

ok (flux_subprocess_write (p, "STDIN", "foo\n", 4) == 4,
"flux_subprocess_write success");

ok (flux_subprocess_write (p, "STDIN", "bar", 3) == 3,
"flux_subprocess_write success");

ok (flux_subprocess_close (p, "STDIN") == 0,
"flux_subprocess_close success");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count == 3, "stdout output callback called 3 times");
ok (stderr_output_cb_count == 3, "stderr output callback called 3 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

void output_read_line_until_eof_error_cb (flux_subprocess_t *p, const char *stream)
{
const char *ptr;
int lenp = 0;
int *counter;

if (!strcasecmp (stream, "STDOUT"))
counter = &stdout_output_cb_count;
else {
ok (false, "unexpected stream %s", stream);
return;
}

if ((*counter) == 0) {
ptr = flux_subprocess_getline (p, stream, &lenp);
ok (!ptr && errno == EPERM,
"flux_subprocess_getline returns EPERM "
"on non line-buffered stream");

/* drain whatever is in the buffer, we don't care about
* contents for this test */
ptr = flux_subprocess_read (p, stream, -1, &lenp);
ok (ptr != NULL && lenp > 0,
"flux_subprocess_read on %s success", stream);
}
else {
ptr = flux_subprocess_read (p, stream, -1, &lenp);
ok (ptr != NULL
&& lenp == 0,
"flux_subprocess_read on %s read EOF", stream);
}
(*counter)++;
}

void test_basic_read_line_until_eof_error (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "hi", NULL };
flux_cmd_t *cmd;
flux_subprocess_t *p = NULL;

ok ((cmd = flux_cmd_create (3, av, environ)) != NULL, "flux_cmd_create");

ok (flux_cmd_setopt (cmd, "STDOUT_LINE_BUFFER", "false") == 0,
"flux_cmd_setopt set STDOUT_LINE_BUFFER success");

flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_stdout = output_read_line_until_eof_error_cb,
.on_stderr = NULL
};
completion_cb_count = 0;
stdout_output_cb_count = 0;
stderr_output_cb_count = 0;
p = flux_local_exec (r, 0, cmd, &ops, NULL);
ok (p != NULL, "flux_local_exec");

ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING,
"subprocess state == RUNNING after flux_local_exec");

int rc = flux_reactor_run (r, 0);
ok (rc == 0, "flux_reactor_run returned zero status");
ok (completion_cb_count == 1, "completion callback called 1 time");
ok (stdout_output_cb_count == 2, "stdout output callback called 2 times");
ok (stderr_output_cb_count == 0, "stderr output callback called 0 times");
flux_subprocess_destroy (p);
flux_cmd_destroy (cmd);
}

void test_write_after_close (flux_reactor_t *r)
{
char *av[] = { TEST_SUBPROCESS_DIR "test_echo", "-O", "-E", NULL };
Expand Down Expand Up @@ -2063,6 +2213,10 @@ int main (int argc, char *argv[])
test_basic_trimmed_line (r);
diag ("basic_multiple_lines");
test_basic_multiple_lines (r);
diag ("basic_read_line_until_eof");
test_basic_read_line_until_eof (r);
diag ("basic_read_line_until_eof_error");
test_basic_read_line_until_eof_error (r);
diag ("write_after_close");
test_write_after_close (r);
diag ("env_passed");
Expand Down
6 changes: 6 additions & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ check_PROGRAMS = \
rexec/rexec_signal \
rexec/rexec_ps \
rexec/rexec_count_stdout \
rexec/rexec_getline \
ingest/submitbench \
sched-simple/jj-reader \
shell/rcalc \
Expand Down Expand Up @@ -409,6 +410,11 @@ rexec_rexec_count_stdout_CPPFLAGS = $(test_cppflags)
rexec_rexec_count_stdout_LDADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

rexec_rexec_getline_SOURCES = rexec/rexec_getline.c
rexec_rexec_getline_CPPFLAGS = $(test_cppflags)
rexec_rexec_getline_LDADD = \
$(test_ldadd) $(LIBDL) $(LIBUTIL)

ingest_job_manager_dummy_la_SOURCES = ingest/job-manager-dummy.c
ingest_job_manager_dummy_la_CPPFLAGS = $(test_cppflags)
ingest_job_manager_dummy_la_LDFLAGS = $(fluxmod_ldflags) -module -rpath /nowhere
Expand Down
165 changes: 165 additions & 0 deletions t/rexec/rexec_getline.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/************************************************************\
* Copyright 2014 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <inttypes.h>
#include <flux/core.h>
#include <flux/optparse.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/read_all.h"

extern char **environ;

static struct optparse_option cmdopts[] = {
{ .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "rank",
.usage = "Specify rank for test" },
{ .name = "stdin2stream", .key = 'i', .has_arg = 1, .arginfo = "CHANNEL",
.usage = "Read in stdin and forward to subprocess channel" },
OPTPARSE_TABLE_END
};

optparse_t *opts;

int exit_code = 0;

void completion_cb (flux_subprocess_t *p)
{
int ec = flux_subprocess_exit_code (p);

if (ec > exit_code)
exit_code = ec;
}

void stdin2stream (flux_subprocess_t *p, const char *stream)
{
char *buf = NULL;
int tmp, len;

if ((len = read_all (STDIN_FILENO, (void **)&buf)) < 0)
log_err_exit ("read_all");

if (len) {
if ((tmp = flux_subprocess_write (p, stream, buf, len)) < 0)
log_err_exit ("flux_subprocess_write");

if (tmp != len)
log_err_exit ("overflow in write");
}

/* do not close for channel, b/c can race w/ data coming back */
if (!strcmp (stream, "STDIN")) {
if (flux_subprocess_close (p, stream) < 0)
log_err_exit ("flux_subprocess_close");
}

free (buf);
}

void output_cb (flux_subprocess_t *p, const char *stream)
{
FILE *fstream = !strcasecmp (stream, "STDERR") ? stderr : stdout;
const char *ptr;
int lenp;

if (!(ptr = flux_subprocess_getline (p, stream, &lenp)))
log_err_exit ("flux_subprocess_getline");
if (lenp)
fwrite (ptr, lenp, 1, fstream);
else
fprintf (fstream, "EOF\n");
}

int main (int argc, char *argv[])
{
flux_t *h;
flux_reactor_t *reactor;
flux_cmd_t *cmd;
char *cwd;
flux_subprocess_t *p = NULL;
flux_subprocess_ops_t ops = {
.on_completion = completion_cb,
.on_state_change = NULL,
.on_channel_out = NULL,
.on_stdout = output_cb,
.on_stderr = NULL,
};
const char *optargp;
int optindex;
int rank = 0;

log_init ("rexec-until-eof");

opts = optparse_create ("rexec-until-eof");
if (optparse_add_option_table (opts, cmdopts) != OPTPARSE_SUCCESS)
log_msg_exit ("optparse_add_option_table");
if ((optindex = optparse_parse_args (opts, argc, argv)) < 0)
exit (1);

if (optparse_getopt (opts, "rank", &optargp) > 0)
rank = atoi (optargp);

if (optindex == argc) {
optparse_print_usage (opts);
exit (1);
}

/* all args to cmd */
if (!(cmd = flux_cmd_create (argc - optindex, &argv[optindex], environ)))
log_err_exit ("flux_cmd_create");

if (!(cwd = get_current_dir_name ()))
log_err_exit ("get_current_dir_name");

if (flux_cmd_setcwd (cmd, cwd) < 0)
log_err_exit ("flux_cmd_setcwd");

if (optparse_getopt (opts, "stdin2stream", &optargp) > 0) {
if (strcmp (optargp, "STDIN")
&& strcmp (optargp, "STDOUT")
&& strcmp (optargp, "STDERR")) {
if (flux_cmd_add_channel (cmd, optargp) < 0)
log_err_exit ("flux_cmd_add_channel");
ops.on_channel_out = flux_standard_output;
}
}

if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

if (!(reactor = flux_get_reactor (h)))
log_err_exit ("flux_get_reactor");

if (!(p = flux_rexec (h, rank, 0, cmd, &ops)))
log_err_exit ("flux_rexec");

if (optparse_getopt (opts, "stdin2stream", &optargp) > 0)
stdin2stream (p, optargp);

if (flux_reactor_run (reactor, 0) < 0)
log_err_exit ("flux_reactor_run");

/* Clean up.
*/
flux_subprocess_destroy (p);
flux_close (h);
log_fini ();

return exit_code;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
10 changes: 10 additions & 0 deletions t/t0005-rexec.t
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,14 @@ test_expect_success 'rexec line buffering can be disabled' '
test "$count" -gt 2
'

# the last line of output is "bar" without a newline. "EOF" is output
# from "rexec_getline", so if everything is working correctly, we
# should see the concatenation "barEOF" at the end of the output.
test_expect_success 'rexec read_getline call works on remote streams' '
/bin/echo -en "foo\nbar" | ${FLUX_BUILD_DIR}/t/rexec/rexec_getline -i STDIN ${TEST_SUBPROCESS_DIR}/test_echo -O -n > output 2>&1 &&
echo "foo" > expected &&
echo "barEOF" >> expected &&
test_cmp expected output
'

test_done

0 comments on commit b095403

Please sign in to comment.