From b0a10f4f77632307e1b4738bdb01b67cb5173694 Mon Sep 17 00:00:00 2001 From: Albert Chu Date: Tue, 7 May 2019 11:48:20 -0700 Subject: [PATCH] libsubprocess: A hook callbacks Add flux_subprocess_hooks_t argument to flux_exec() and flux_local_exec(), to allow caller to run special callbacks right before child executes command, in both child and parent. Also add `in_hook` flag to ensure that many libsubprocess API functions cannot be called inside a hook callback. Add unit tests and update callers appropriately. Fixes #2008 --- src/broker/runlevel.c | 3 +- src/cmd/builtin/proxy.c | 3 +- src/cmd/flux-start.c | 3 +- src/common/libsubprocess/local.c | 18 ++ src/common/libsubprocess/server.c | 6 +- src/common/libsubprocess/subprocess.c | 34 ++-- src/common/libsubprocess/subprocess.h | 20 ++- src/common/libsubprocess/subprocess_private.h | 2 + src/common/libsubprocess/test/subprocess.c | 156 ++++++++++++++---- 9 files changed, 190 insertions(+), 55 deletions(-) diff --git a/src/broker/runlevel.c b/src/broker/runlevel.c index a3e7c7715da8..b352f128440f 100644 --- a/src/broker/runlevel.c +++ b/src/broker/runlevel.c @@ -296,7 +296,8 @@ static int runlevel_start_subprocess (runlevel_t *r, int level) if (!(p = flux_exec (r->h, flags, r->rc[level].cmd, - &ops))) + &ops, + NULL))) goto error; if (flux_subprocess_aux_set (p, "runlevel", r, NULL) < 0) diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index 52e61adb0b84..983cd03e7f34 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -861,7 +861,8 @@ static int child_create (proxy_ctx_t *ctx, int ac, char **av, const char *workpa if (!(p = flux_local_exec (ctx->reactor, FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH, cmd, - &ops))) + &ops, + NULL))) goto error; if (flux_subprocess_aux_set (p, "ctx", ctx, NULL) < 0) diff --git a/src/cmd/flux-start.c b/src/cmd/flux-start.c index fb9f97a53cf6..d2781f665198 100644 --- a/src/cmd/flux-start.c +++ b/src/cmd/flux-start.c @@ -563,7 +563,8 @@ int client_run (struct client *cli) if (!(cli->p = flux_local_exec (ctx.reactor, FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH, cli->cmd, - &ops))) + &ops, + NULL))) log_err_exit ("flux_exec"); if (flux_subprocess_aux_set (cli->p, "cli", cli, NULL) < 0) log_err_exit ("flux_subprocess_aux_set"); diff --git a/src/common/libsubprocess/local.c b/src/common/libsubprocess/local.c index 36897bda789c..0edb90ca37ee 100644 --- a/src/common/libsubprocess/local.c +++ b/src/common/libsubprocess/local.c @@ -516,6 +516,15 @@ static int local_child (flux_subprocess_t *p) _exit (1); } + if (p->hooks.pre_exec_cb) { + /* always a chance caller may destroy subprocess in callback */ + flux_subprocess_ref (p); + p->in_hook = true; + (*p->hooks.pre_exec_cb) (p, p->hooks.pre_exec_arg); + p->in_hook = false; + flux_subprocess_unref (p); + } + if (p->flags & FLUX_SUBPROCESS_FLAGS_SETPGRP) { if (setpgrp () < 0) { flux_log_error (p->h, "setpgrp"); @@ -615,6 +624,15 @@ static int local_fork (flux_subprocess_t *p) p->state = FLUX_SUBPROCESS_STARTED; + if (p->hooks.post_fork_cb) { + /* always a chance caller may destroy subprocess in callback */ + flux_subprocess_ref (p); + p->in_hook = true; + (*p->hooks.post_fork_cb) (p, p->hooks.post_fork_arg); + p->in_hook = false; + flux_subprocess_unref (p); + } + return (0); } diff --git a/src/common/libsubprocess/server.c b/src/common/libsubprocess/server.c index 575c27d865ff..196cb77858f7 100644 --- a/src/common/libsubprocess/server.c +++ b/src/common/libsubprocess/server.c @@ -361,7 +361,11 @@ static void server_exec_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } - if (!(p = flux_exec (s->h, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops))) { + if (!(p = flux_exec (s->h, + FLUX_SUBPROCESS_FLAGS_SETPGRP, + cmd, + &ops, + NULL))) { /* error here, generate FLUX_SUBPROCESS_EXEC_FAILED state */ if (flux_respond_pack (h, msg, "{s:s s:i s:i s:i}", "type", "state", diff --git a/src/common/libsubprocess/subprocess.c b/src/common/libsubprocess/subprocess.c index 782fb13254b9..871addea2100 100644 --- a/src/common/libsubprocess/subprocess.c +++ b/src/common/libsubprocess/subprocess.c @@ -146,6 +146,7 @@ static flux_subprocess_t * subprocess_create (flux_t *h, int flags, const flux_cmd_t *cmd, const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks, int rank, bool local) { @@ -179,6 +180,9 @@ static flux_subprocess_t * subprocess_create (flux_t *h, if (ops) p->ops = *ops; + if (hooks) + p->hooks = *hooks; + p->h = h; p->reactor = r; p->rank = rank; @@ -555,7 +559,8 @@ static int subprocess_setup_completed (flux_subprocess_t *p) static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int flags, const flux_cmd_t *cmd, - const flux_subprocess_ops_t *ops) + const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks) { flux_subprocess_t *p = NULL; int valid_flags = (FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH @@ -572,7 +577,7 @@ static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int fla return NULL; } - if (!(p = subprocess_create (h, r, flags, cmd, ops, -1, true))) + if (!(p = subprocess_create (h, r, flags, cmd, ops, hooks, -1, true))) goto error; if (subprocess_local_setup (p) < 0) @@ -597,7 +602,8 @@ static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int fla flux_subprocess_t * flux_exec (flux_t *h, int flags, const flux_cmd_t *cmd, - const flux_subprocess_ops_t *ops) + const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks) { flux_reactor_t *r; @@ -609,14 +615,15 @@ flux_subprocess_t * flux_exec (flux_t *h, int flags, if (!(r = flux_get_reactor (h))) return NULL; - return flux_exec_wrap (h, r, flags, cmd, ops); + return flux_exec_wrap (h, r, flags, cmd, ops, hooks); } flux_subprocess_t * flux_local_exec (flux_reactor_t *r, int flags, const flux_cmd_t *cmd, - const flux_subprocess_ops_t *ops) + const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks) { - return flux_exec_wrap (NULL, r, flags, cmd, ops); + return flux_exec_wrap (NULL, r, flags, cmd, ops, hooks); } flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags, @@ -657,7 +664,7 @@ flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags, if (!(r = flux_get_reactor (h))) goto error; - if (!(p = subprocess_create (h, r, flags, cmd, ops, rank, false))) + if (!(p = subprocess_create (h, r, flags, cmd, ops, NULL, rank, false))) goto error; if (subprocess_remote_setup (p) < 0) @@ -688,7 +695,7 @@ int flux_subprocess_write (flux_subprocess_t *p, const char *stream, flux_buffer_t *fb; int ret; - if (!p || p->magic != SUBPROCESS_MAGIC) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) { errno = EINVAL; return -1; } @@ -748,7 +755,7 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream) { struct subprocess_channel *c; - if (!p || p->magic != SUBPROCESS_MAGIC) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) { errno = EINVAL; return -1; } @@ -800,7 +807,7 @@ static const char *subprocess_read (flux_subprocess_t *p, flux_buffer_t *fb; const char *ptr; - if (!p || p->magic != SUBPROCESS_MAGIC) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) { errno = EINVAL; return NULL; } @@ -869,7 +876,8 @@ flux_future_t *flux_subprocess_kill (flux_subprocess_t *p, int signum) { flux_future_t *f = NULL; - if (!p || p->magic != SUBPROCESS_MAGIC || !signum) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook) + || !signum) { errno = EINVAL; return NULL; } @@ -1060,7 +1068,7 @@ flux_reactor_t * flux_subprocess_get_reactor (flux_subprocess_t *p) int flux_subprocess_aux_set (flux_subprocess_t *p, const char *name, void *x, flux_free_f free_fn) { - if (!p || p->magic != SUBPROCESS_MAGIC) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) { errno = EINVAL; return -1; } @@ -1069,7 +1077,7 @@ int flux_subprocess_aux_set (flux_subprocess_t *p, void * flux_subprocess_aux_get (flux_subprocess_t *p, const char *name) { - if (!p || p->magic != SUBPROCESS_MAGIC) { + if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) { errno = EINVAL; return NULL; } diff --git a/src/common/libsubprocess/subprocess.h b/src/common/libsubprocess/subprocess.h index b3bec734c48f..439ad89c7102 100644 --- a/src/common/libsubprocess/subprocess.h +++ b/src/common/libsubprocess/subprocess.h @@ -79,6 +79,7 @@ typedef void (*flux_subprocess_output_f) (flux_subprocess_t *p, const char *stream); typedef void (*flux_subprocess_state_f) (flux_subprocess_t *p, flux_subprocess_state_t state); +typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg); /* * Functions for event-driven subprocess handling: @@ -96,6 +97,17 @@ typedef struct { flux_subprocess_output_f on_stderr; /* Read of stderr is ready */ } flux_subprocess_ops_t; +/* + * flux_subprocess_hooks_t: Hook functions to execute at pre-defined + * points. Hooks can only be executed on local processes. + */ +typedef struct { + flux_subprocess_hook_f pre_exec_cb; + void *pre_exec_arg; + flux_subprocess_hook_f post_fork_cb; + void *post_fork_arg; +} flux_subprocess_hooks_t; + /* * General support: */ @@ -222,8 +234,6 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name); int flux_cmd_setopt (flux_cmd_t *cmd, const char *var, const char *val); const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var); - - /* * Subprocesses: */ @@ -246,11 +256,13 @@ const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var); */ flux_subprocess_t *flux_exec (flux_t *h, int flags, const flux_cmd_t *cmd, - const flux_subprocess_ops_t *ops); + const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks); flux_subprocess_t *flux_local_exec (flux_reactor_t *r, int flags, const flux_cmd_t *cmd, - const flux_subprocess_ops_t *ops); + const flux_subprocess_ops_t *ops, + const flux_subprocess_hooks_t *hooks); flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags, const flux_cmd_t *cmd, diff --git a/src/common/libsubprocess/subprocess_private.h b/src/common/libsubprocess/subprocess_private.h index 50d99317ccf0..8c9c89934e5f 100644 --- a/src/common/libsubprocess/subprocess_private.h +++ b/src/common/libsubprocess/subprocess_private.h @@ -97,7 +97,9 @@ struct flux_subprocess { /* fds[0] is parent/user, fds[1] is child */ int sync_fds[2]; /* socketpair for fork/exec sync */ + bool in_hook; /* if presently in a hook */ flux_watcher_t *child_w; + flux_subprocess_hooks_t hooks; /* remote */ diff --git a/src/common/libsubprocess/test/subprocess.c b/src/common/libsubprocess/test/subprocess.c index 3c5592268e3e..f9c9424da772 100644 --- a/src/common/libsubprocess/test/subprocess.c +++ b/src/common/libsubprocess/test/subprocess.c @@ -15,6 +15,7 @@ #include #include #include +#include #include "src/common/libtap/tap.h" #include "src/common/libsubprocess/subprocess.h" @@ -77,7 +78,7 @@ void test_basic (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -121,7 +122,7 @@ void test_basic_fail (flux_reactor_t *r) .on_completion = completion_fail_cb }; completion_fail_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -144,13 +145,13 @@ void test_basic_errors (flux_reactor_t *r) ok (!flux_subprocess_server_start (NULL, NULL, NULL, 0) && errno == EINVAL, "flux_subprocess_server_start fails with NULL pointer inputs"); - ok (flux_exec (NULL, 0, NULL, NULL) == NULL + ok (flux_exec (NULL, 0, NULL, NULL, NULL) == NULL && errno == EINVAL, "flux_exec fails with NULL pointer inputs"); - ok (flux_local_exec (NULL, 0, NULL, NULL) == NULL + ok (flux_local_exec (NULL, 0, NULL, NULL, NULL) == NULL && errno == EINVAL, "flux_local_exec fails with NULL pointer inputs"); - ok (flux_local_exec (r, 1234, NULL, NULL) == NULL + ok (flux_local_exec (r, 1234, NULL, NULL, NULL) == NULL && errno == EINVAL, "flux_local_exec fails with invalid flag"); ok (flux_rexec (NULL, 0, 0, NULL, NULL) == NULL @@ -243,7 +244,7 @@ void test_errors (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -344,7 +345,7 @@ void test_basic_stdout (flux_reactor_t *r) completion_cb_count = 0; stdout_output_cb_count = 0; stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -374,7 +375,7 @@ void test_basic_stderr (flux_reactor_t *r) completion_cb_count = 0; stdout_output_cb_count = 0; stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -407,7 +408,7 @@ void test_basic_stdout_and_stderr (flux_reactor_t *r) completion_cb_count = 0; stdout_output_cb_count = 0; stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -436,7 +437,7 @@ void test_basic_default_output (flux_reactor_t *r) .on_stderr = flux_standard_output }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -490,7 +491,7 @@ void test_basic_stdout_default_stream (flux_reactor_t *r) }; completion_cb_count = 0; output_default_stream_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -518,7 +519,7 @@ void test_basic_stdin (flux_reactor_t *r) }; completion_cb_count = 0; stdout_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -552,7 +553,7 @@ void test_basic_stdin_default_stream (flux_reactor_t *r) }; completion_cb_count = 0; stdout_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -630,7 +631,7 @@ void test_basic_no_newline (flux_reactor_t *r) completion_cb_count = 0; stdout_output_cb_count = 0; stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -698,7 +699,7 @@ void test_basic_trimmed_line (flux_reactor_t *r) completion_cb_count = 0; stdout_output_cb_count = 0; stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -781,7 +782,7 @@ void test_basic_multiple_lines (flux_reactor_t *r) completion_cb_count = 0; multiple_lines_stdout_output_cb_count = 0; multiple_lines_stderr_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -822,7 +823,7 @@ void test_write_after_close (flux_reactor_t *r) }; completion_cb_count = 0; stdout_output_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -857,7 +858,11 @@ void test_flag_stdio_fallthrough (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH, cmd, &ops); + p = flux_local_exec (r, + FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH, + cmd, + &ops, + NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -883,7 +888,7 @@ void test_flag_setpgrp (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops); + p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -940,7 +945,7 @@ void test_env_passed (flux_reactor_t *r) }; completion_cb_count = 0; env_passed_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -979,7 +984,7 @@ void test_kill (flux_reactor_t *r) .on_completion = completion_sigterm_cb }; completion_sigterm_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1065,7 +1070,7 @@ void test_kill_setpgrp (flux_reactor_t *r) output_processes_cb_count = 0; parent_pid = -1; child_pid = -1; - p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops); + p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1129,7 +1134,7 @@ void test_kill_eofs (flux_reactor_t *r) completion_sigterm_cb_count = 0; stdout_eof_cb_count = 0; stderr_eof_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1177,7 +1182,7 @@ void test_state_change (flux_reactor_t *r) }; completion_cb_count = 0; state_change_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1216,7 +1221,7 @@ void test_exec_fail (flux_reactor_t *r) ok ((cmd = flux_cmd_create (1, av_eacces, NULL)) != NULL, "flux_cmd_create"); - p = flux_local_exec (r, 0, cmd, NULL); + p = flux_local_exec (r, 0, cmd, NULL, NULL); ok (p == NULL && errno == EACCES, "flux_local_exec failed with EACCES"); @@ -1225,7 +1230,7 @@ void test_exec_fail (flux_reactor_t *r) ok ((cmd = flux_cmd_create (1, av_enoent, NULL)) != NULL, "flux_cmd_create"); - p = flux_local_exec (r, 0, cmd, NULL); + p = flux_local_exec (r, 0, cmd, NULL, NULL); ok (p == NULL && errno == ENOENT, "flux_local_exec failed with ENOENT"); @@ -1247,7 +1252,7 @@ void test_context (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1280,7 +1285,7 @@ void test_refcount (flux_reactor_t *r) .on_completion = completion_cb }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1349,7 +1354,7 @@ void test_channel_fd_env (flux_reactor_t *r) }; completion_cb_count = 0; channel_fd_env_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1412,7 +1417,7 @@ void test_channel_fd_in (flux_reactor_t *r) }; completion_cb_count = 0; channel_in_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1480,7 +1485,7 @@ void test_channel_fd_in_and_out (flux_reactor_t *r) }; completion_cb_count = 0; channel_in_and_out_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1567,7 +1572,7 @@ void test_channel_multiple_lines (flux_reactor_t *r) }; completion_cb_count = 0; multiple_lines_channel_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1650,7 +1655,7 @@ void test_bufsize (flux_reactor_t *r) .on_stderr = flux_standard_output }; completion_cb_count = 0; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p != NULL, "flux_local_exec"); ok (flux_subprocess_state (p) == FLUX_SUBPROCESS_RUNNING, @@ -1683,7 +1688,7 @@ void test_bufsize_error (flux_reactor_t *r) .on_stdout = flux_standard_output, .on_stderr = flux_standard_output }; - p = flux_local_exec (r, 0, cmd, &ops); + p = flux_local_exec (r, 0, cmd, &ops, NULL); ok (p == NULL && errno == EINVAL, "flux_local_exec fails with EINVAL due to bad bufsize input"); @@ -1691,6 +1696,85 @@ void test_bufsize_error (flux_reactor_t *r) flux_cmd_destroy (cmd); } +void shmem_hook_cb (flux_subprocess_t *p, void *arg) +{ + int *shmem_count = arg; + (*shmem_count)++; +} + +void test_pre_exec_hook (flux_reactor_t *r) +{ + char *av[] = { "/bin/true", NULL }; + flux_cmd_t *cmd; + flux_subprocess_t *p = NULL; + int *shmem_count; + + ok ((cmd = flux_cmd_create (1, av, NULL)) != NULL, "flux_cmd_create"); + + /* pre_exec_cb run in child, so we use shared memory */ + shmem_count = mmap (NULL, + sizeof (int), + PROT_READ | PROT_WRITE, + MAP_ANONYMOUS | MAP_SHARED, + -1, + 0); + ok (shmem_count != NULL, "mmap success"); + (*shmem_count) = 0; + + flux_subprocess_ops_t ops = { + .on_completion = completion_cb, + }; + flux_subprocess_hooks_t hooks = { + .pre_exec_cb = shmem_hook_cb, + .pre_exec_arg = shmem_count + }; + completion_cb_count = 0; + p = flux_local_exec (r, FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH, cmd, &ops, &hooks); + ok (p != NULL, "flux_local_exec"); + + int rc = flux_reactor_run (r, 0); + ok (rc == 0, "flux_reactor_run returned zero status"); + ok (completion_cb_count == 1, "completion callback called 1 time"); + ok ((*shmem_count) == 1, "pre_exec hook called correctly"); + flux_subprocess_destroy (p); + flux_cmd_destroy (cmd); + munmap (shmem_count, sizeof (int)); +} + +void count_hook_cb (flux_subprocess_t *p, void *arg) +{ + int *count = arg; + (*count)++; +} + +void test_post_fork_hook (flux_reactor_t *r) +{ + char *av[] = { "/bin/true", NULL }; + flux_cmd_t *cmd; + flux_subprocess_t *p = NULL; + int hook_count = 0; + + ok ((cmd = flux_cmd_create (1, av, NULL)) != NULL, "flux_cmd_create"); + + flux_subprocess_ops_t ops = { + .on_completion = completion_cb, + }; + flux_subprocess_hooks_t hooks = { + .post_fork_cb = count_hook_cb, + .post_fork_arg = &hook_count + }; + completion_cb_count = 0; + p = flux_local_exec (r, 0, cmd, &ops, &hooks); + ok (p != NULL, "flux_local_exec"); + + int rc = flux_reactor_run (r, 0); + ok (rc == 0, "flux_reactor_run returned zero status"); + ok (completion_cb_count == 1, "completion callback called 1 time"); + ok (hook_count == 1, "post_fork hook cb called 1 time"); + flux_subprocess_destroy (p); + flux_cmd_destroy (cmd); +} + int main (int argc, char *argv[]) { flux_reactor_t *r; @@ -1770,6 +1854,10 @@ int main (int argc, char *argv[]) test_bufsize (r); diag ("bufsize_error"); test_bufsize_error (r); + diag ("pre_exec_hook"); + test_pre_exec_hook (r); + diag ("post_fork_hook"); + test_post_fork_hook (r); end_fdcount = fdcount ();