Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flux-exec: ensure stdin is restored to blocking mode on exit #1814

Merged
merged 21 commits into from Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0e89a2e
testsuite: use flux exec with -n when stdin not used
garlick Nov 7, 2018
2d5757e
cmd/flux-exec: don't open /dev/null if -n
garlick Nov 8, 2018
a350488
cmd/flux-exec: set stdin O_NONBLOCK flag and restore
garlick Nov 8, 2018
872cc6a
libsubprocess: set O_NONBLOCK on fd
garlick Nov 8, 2018
2e17870
testsuite: set O_NONBLOCK on stdin/out in reactorcat
garlick Nov 8, 2018
20555e5
libflux/reactor: buffer read/write watchers require O_NONBLOCK
garlick Nov 8, 2018
89c3ad7
testsuite: libflux/reactor: update buffer write watcher tests
grondo Nov 8, 2018
90132c2
doc/flux-exec(1): drop lightweight job (LWJ) reference
garlick Nov 8, 2018
7a75a65
doc/flux-exec(1): reword -n description
garlick Nov 8, 2018
190a5fc
libutil/fdutils: add utility functions for fds
grondo Nov 8, 2018
3de3f52
testsuite: add unit tests for libutil/fdutils
grondo Nov 8, 2018
1f11379
testsuite: libflux: avoid use of fcntl(2) in reactor test
grondo Nov 8, 2018
7d44e04
libutil/popen2: fix close-on-exec flag use
grondo Nov 8, 2018
deec724
cmd/proxy: update to common fdutils code
grondo Nov 8, 2018
26bb95d
subprocess/local: update to commonn fdutils code
grondo Nov 8, 2018
30e0fef
libzio: update to common fdutils code
grondo Nov 8, 2018
8ac0cff
connectors/local: update to common libutils code
grondo Nov 8, 2018
dc51a09
connectors/ssh: update to common fdutils code
grondo Nov 8, 2018
7ed950a
modules/connector-local: update to common fdutils code
grondo Nov 8, 2018
dfaf937
wreck/luastack: remove unneeded include of fcntl.h
grondo Nov 8, 2018
008fdb5
libflux/reactor.c: update to common fdutils code
grondo Nov 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions doc/man1/flux-exec.adoc
Expand Up @@ -25,8 +25,8 @@ stdout and stderr.
On receipt of SIGINT and SIGTERM signals, flux-exec(1) shall forward
the received signal to all currently running remote processes.

flux-exec(1) is meant as an administrative and test utility, and should not
be used for executing lightweight jobs (LWJs) or user commands.
flux-exec(1) is meant as an administrative and test utility, and cannot
be used to launch Flux jobs.

EXIT STATUS
-----------
Expand All @@ -46,7 +46,7 @@ OPTIONS
Label lines of output with the source RANK.

*-n, --noinput*::
Redirect `stdin` from `/dev/null`.
Do not attempt to forward stdin. Send EOF to remote process stdin.

*-d, --dir*'=DIR'::
Set the working directory of remote 'COMMANDS' to 'DIR'. The default is to
Expand Down
24 changes: 5 additions & 19 deletions src/cmd/builtin/proxy.c
Expand Up @@ -38,7 +38,6 @@
#include <errno.h>
#include <libgen.h>
#include <stdbool.h>
#include <fcntl.h>
#include <argz.h>
#include <glob.h>
#include <czmq.h>
Expand All @@ -47,6 +46,7 @@
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/oom.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/fdutils.h"
#include "src/common/libsubprocess/subprocess.h"

#define LISTEN_BACKLOG 5
Expand Down Expand Up @@ -126,20 +126,6 @@ static proxy_ctx_t *ctx_create (flux_t *h)
return ctx;
}

static int set_nonblock (int fd, bool nonblock)
{
int flags = fcntl (fd, F_GETFL);
if (flags < 0)
return -1;
if (nonblock)
flags |= O_NONBLOCK;
else
flags &= ~O_NONBLOCK;
if (fcntl (fd, F_SETFL, flags) < 0)
return -1;
return 0;
}

static client_t * client_create (proxy_ctx_t *ctx, int rfd, int wfd)
{
client_t *c;
Expand Down Expand Up @@ -168,12 +154,12 @@ static client_t * client_create (proxy_ctx_t *ctx, int rfd, int wfd)
flux_watcher_start (c->inw);
flux_msg_iobuf_init (&c->inbuf);
flux_msg_iobuf_init (&c->outbuf);
if (set_nonblock (c->rfd, true) < 0) {
flux_log_error (h, "set_nonblock");
if (fd_set_nonblocking (c->rfd) < 0) {
flux_log_error (h, "fd_set_nonblocking");
goto error;
}
if (c->wfd != c->rfd && set_nonblock (c->wfd, true) < 0) {
flux_log_error (h, "set_nonblock");
if (c->wfd != c->rfd && fd_set_nonblocking (c->wfd) < 0) {
flux_log_error (h, "fd_set_nonblocking");
goto error;
}

Expand Down
52 changes: 39 additions & 13 deletions src/cmd/flux-exec.c
Expand Up @@ -68,8 +68,8 @@ zlist_t *subprocesses;

optparse_t *opts = NULL;

int stdin_flags;
flux_watcher_t *stdin_w;
int stdin_fd = STDIN_FILENO;

void completion_cb (flux_subprocess_t *p)
{
Expand Down Expand Up @@ -108,10 +108,12 @@ void state_cb (flux_subprocess_t *p, flux_subprocess_state_t state)
exited++;
}

if (started == rank_count)
flux_watcher_start (stdin_w);
if (exited == rank_count)
flux_watcher_stop (stdin_w);
if (stdin_w) {
if (started == rank_count)
flux_watcher_start (stdin_w);
if (exited == rank_count)
flux_watcher_stop (stdin_w);
}

if (state == FLUX_SUBPROCESS_EXEC_FAILED
|| state == FLUX_SUBPROCESS_FAILED) {
Expand Down Expand Up @@ -221,6 +223,15 @@ void subprocess_destroy (void *arg)
flux_subprocess_destroy (p);
}

/* atexit handler
* This is a good faith attempt to restore stdin flags to what they were
* before we set O_NONBLOCK per bug #1803.
*/
void restore_stdin_flags (void)
{
(void)fcntl (STDIN_FILENO, F_SETFL, stdin_flags);
}

int main (int argc, char *argv[])
{
const char *optargp;
Expand Down Expand Up @@ -315,16 +326,31 @@ int main (int argc, char *argv[])
if (optparse_getopt (opts, "verbose", NULL) > 0)
fprintf (stderr, "%03fms: Sent all requests\n", monotime_since (t0));

/* -n,--noinput: close subprocess STDIN
*/
if (optparse_getopt (opts, "noinput", NULL) > 0) {
if ((stdin_fd = open ("/dev/null", O_RDONLY)) < 0)
log_err_exit ("open");
flux_subprocess_t *p;
p = zlist_first (subprocesses);
while (p) {
if (flux_subprocess_close (p, "STDIN") < 0)
log_err_exit ("flux_subprocess_close");
p = zlist_next (subprocesses);
}
}
/* configure stdin watcher
*/
else {
if ((stdin_flags = fcntl (STDIN_FILENO, F_GETFL)) < 0)
log_err_exit ("fcntl F_GETFL stdin");
if (atexit (restore_stdin_flags) != 0)
log_err_exit ("atexit");
if (fcntl (STDIN_FILENO, F_SETFL, stdin_flags | O_NONBLOCK) < 0)
log_err_exit ("fcntl F_SETFL stdin");
if (!(stdin_w = flux_buffer_read_watcher_create (r, STDIN_FILENO,
1 << 20, stdin_cb,
0, NULL)))
log_err_exit ("flux_buffer_read_watcher_create");
}

if (!(stdin_w = flux_buffer_read_watcher_create (r, stdin_fd,
1 << 20, stdin_cb,
0, NULL)))
log_err_exit ("flux_buffer_read_watcher_create");

if (signal (SIGINT, signal_cb) == SIG_ERR)
log_err_exit ("signal");

Expand Down
44 changes: 13 additions & 31 deletions src/common/libflux/reactor.c
Expand Up @@ -42,6 +42,7 @@
#include "src/common/libev/ev.h"
#include "src/common/libutil/ev_zmq.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/fdutils.h"

struct flux_reactor {
struct ev_loop *loop;
Expand Down Expand Up @@ -379,30 +380,6 @@ int flux_fd_watcher_get_fd (flux_watcher_t *w)
/* buffer
*/

static int fd_set_nonblocking (int fd, int *flags_orig)
{
int fval;

if ((fval = fcntl (fd, F_GETFL, 0)) < 0)
return (-1);

if (fcntl (fd, F_SETFL, fval | O_NONBLOCK) < 0)
return (-1);

if (flags_orig)
(*flags_orig) = fval;

return (0);
}

static int fd_set_flags (int fd, int flags)
{
if (fcntl (fd, F_SETFL, flags) < 0)
return (-1);

return (0);
}

static void buffer_read_start (flux_watcher_t *w)
{
struct ev_buffer_read *ebr = (struct ev_buffer_read *)w->data;
Expand Down Expand Up @@ -442,15 +419,18 @@ flux_watcher_t *flux_buffer_read_watcher_create (flux_reactor_t *r, int fd,
{
struct ev_buffer_read *ebr;
flux_watcher_t *w = NULL;
int flags_orig;
int fd_flags;

if (fd < 0) {
errno = EINVAL;
return NULL;
}

if (fd_set_nonblocking (fd, &flags_orig) < 0)
if ((fd_flags = fd_get_flags (fd)) < 0)
return NULL;
if (!(fd_flags & O_NONBLOCK)) {
errno = EINVAL;
return NULL;
}

if (!(w = flux_watcher_create (r,
sizeof (*ebr),
Expand All @@ -477,7 +457,6 @@ flux_watcher_t *flux_buffer_read_watcher_create (flux_reactor_t *r, int fd,

cleanup:
flux_watcher_destroy (w);
fd_set_flags (fd, flags_orig);
return NULL;
}

Expand Down Expand Up @@ -527,15 +506,19 @@ flux_watcher_t *flux_buffer_write_watcher_create (flux_reactor_t *r, int fd,
{
struct ev_buffer_write *ebw;
flux_watcher_t *w = NULL;
int flags_orig;
int fd_flags;

if (fd < 0) {
errno = EINVAL;
return NULL;
}

if (fd_set_nonblocking (fd, &flags_orig) < 0)
if ((fd_flags = fd_get_flags (fd)) < 0)
return NULL;
if (!(fd_flags & O_NONBLOCK)) {
errno = EINVAL;
return NULL;
}

if (!(w = flux_watcher_create (r,
sizeof (*ebw),
Expand All @@ -559,7 +542,6 @@ flux_watcher_t *flux_buffer_write_watcher_create (flux_reactor_t *r, int fd,

cleanup:
flux_watcher_destroy (w);
fd_set_flags (fd, flags_orig);
return NULL;
}

Expand Down
35 changes: 19 additions & 16 deletions src/common/libflux/test/reactor.c
Expand Up @@ -6,6 +6,7 @@

#include "src/common/libflux/reactor.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/fdutils.h"
#include "src/common/libtap/tap.h"

static const size_t zmqwriter_msgcount = 1024;
Expand Down Expand Up @@ -170,23 +171,12 @@ static void fdreader (flux_reactor_t *r, flux_watcher_t *w,
flux_reactor_stop_error (r);
}

static int set_nonblock (int fd)
{
int flags = fcntl (fd, F_GETFL, NULL);
if (flags < 0 || fcntl (fd, F_SETFL, flags | O_NONBLOCK) < 0) {
fprintf (stderr, "fcntl: %s\n", strerror (errno));
return -1;
}
return 0;
}

static void test_fd (flux_reactor_t *reactor)
{
int fd[2];
flux_watcher_t *r, *w;

ok (socketpair (PF_LOCAL, SOCK_STREAM, 0, fd) == 0
&& set_nonblock (fd[0]) == 0 && set_nonblock (fd[1]) == 0,
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
"fd: successfully created non-blocking socketpair");
r = flux_fd_watcher_create (reactor, fd[0], FLUX_POLLIN, fdreader, NULL);
w = flux_fd_watcher_create (reactor, fd[1], FLUX_POLLOUT, fdwriter, NULL);
Expand Down Expand Up @@ -374,7 +364,7 @@ static void test_buffer (flux_reactor_t *reactor)
int count;
char buf[1024];

ok (socketpair (PF_LOCAL, SOCK_STREAM, 0, fd) == 0,
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
"buffer: successfully created socketpair");

/* read buffer test */
Expand Down Expand Up @@ -587,6 +577,19 @@ static void test_buffer (flux_reactor_t *reactor)
count = 0;
ok (pipe (pfds) == 0,
"buffer: hey I can has a pipe!");

w = flux_buffer_write_watcher_create (reactor,
pfds[1],
1024,
buffer_write,
0,
&count);
ok (w == NULL && errno == EINVAL,
"buffer: write_watcher_create fails with EINVAL if fd !nonblocking");

ok (fd_set_nonblocking (pfds[1]) >= 0,
"buffer: fd_set_nonblocking");

w = flux_buffer_write_watcher_create (reactor,
pfds[1],
1024,
Expand Down Expand Up @@ -811,7 +814,7 @@ static void test_buffer_corner_case (flux_reactor_t *reactor)

/* read buffer corner case test - other end closes stream */

ok (socketpair (PF_LOCAL, SOCK_STREAM, 0, fd) == 0,
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
"buffer corner case: successfully created socketpair");

bfc.count = 0;
Expand Down Expand Up @@ -848,7 +851,7 @@ static void test_buffer_corner_case (flux_reactor_t *reactor)

/* read line buffer corner case test - other end closes stream */

ok (socketpair (PF_LOCAL, SOCK_STREAM, 0, fd) == 0,
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
"buffer corner case: successfully created socketpair");

bfc.count = 0;
Expand Down Expand Up @@ -885,7 +888,7 @@ static void test_buffer_corner_case (flux_reactor_t *reactor)

/* read line buffer corner case test - left over data not a line */

ok (socketpair (PF_LOCAL, SOCK_STREAM, 0, fd) == 0,
ok (socketpair (PF_LOCAL, SOCK_STREAM|SOCK_NONBLOCK, 0, fd) == 0,
"buffer corner case: successfully created socketpair");

bfc.count = 0;
Expand Down
11 changes: 8 additions & 3 deletions src/common/libsubprocess/local.c
Expand Up @@ -37,6 +37,7 @@

#include "src/common/libutil/log.h"
#include "src/common/libutil/fdwalk.h"
#include "src/common/libutil/fdutils.h"

#include "subprocess.h"
#include "subprocess_private.h"
Expand Down Expand Up @@ -192,6 +193,7 @@ static int channel_local_setup (flux_subprocess_t *p,
int fds[2] = { -1, -1 };
char *e = NULL;
int save_errno;
int fd_flags;

if (!(c = channel_create (p, output_f, name, channel_flags))) {
flux_log_error (p->h, "calloc");
Expand All @@ -212,6 +214,11 @@ static int channel_local_setup (flux_subprocess_t *p,
fds[0] = -1;
fds[1] = -1;

if ((fd_flags = fd_set_nonblocking (c->parent_fd)) < 0) {
flux_log_error (p->h, "fd_set_nonblocking");
goto error;
}

if ((channel_flags & CHANNEL_WRITE) && in_cb) {
c->buffer_write_w = flux_buffer_write_watcher_create (p->reactor,
c->parent_fd,
Expand Down Expand Up @@ -426,9 +433,7 @@ static void closefd_child (void *arg, int fd)
c = zhash_first (p->channels);
while (c) {
if (c->child_fd == fd) {
int flags = fcntl (fd, F_GETFD, 0);
if (flags >= 0)
(void) fcntl (fd, F_SETFD, flags & ~FD_CLOEXEC);
(void) fd_unset_cloexec (fd);
return;
}
c = zhash_next (p->channels);
Expand Down