Skip to content

Commit

Permalink
Merge pull request #2152 from chu11/issue2008_wip
Browse files Browse the repository at this point in the history
libsubprocess: pre-exec & post-fork hooks
  • Loading branch information
grondo committed May 17, 2019
2 parents 4a0dc35 + ef41fe7 commit 15ad6f6
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 55 deletions.
3 changes: 2 additions & 1 deletion src/broker/runlevel.c
Expand Up @@ -296,7 +296,8 @@ static int runlevel_start_subprocess (runlevel_t *r, int level)
if (!(p = flux_exec (r->h,
flags,
r->rc[level].cmd,
&ops)))
&ops,
NULL)))
goto error;

if (flux_subprocess_aux_set (p, "runlevel", r, NULL) < 0)
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/builtin/proxy.c
Expand Up @@ -861,7 +861,8 @@ static int child_create (proxy_ctx_t *ctx, int ac, char **av, const char *workpa
if (!(p = flux_local_exec (ctx->reactor,
FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH,
cmd,
&ops)))
&ops,
NULL)))
goto error;

if (flux_subprocess_aux_set (p, "ctx", ctx, NULL) < 0)
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/flux-start.c
Expand Up @@ -563,7 +563,8 @@ int client_run (struct client *cli)
if (!(cli->p = flux_local_exec (ctx.reactor,
FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH,
cli->cmd,
&ops)))
&ops,
NULL)))
log_err_exit ("flux_exec");
if (flux_subprocess_aux_set (cli->p, "cli", cli, NULL) < 0)
log_err_exit ("flux_subprocess_aux_set");
Expand Down
18 changes: 18 additions & 0 deletions src/common/libsubprocess/local.c
Expand Up @@ -516,6 +516,15 @@ static int local_child (flux_subprocess_t *p)
_exit (1);
}

if (p->hooks.pre_exec) {
/* always a chance caller may destroy subprocess in callback */
flux_subprocess_ref (p);
p->in_hook = true;
(*p->hooks.pre_exec) (p, p->hooks.pre_exec_arg);
p->in_hook = false;
flux_subprocess_unref (p);
}

if (p->flags & FLUX_SUBPROCESS_FLAGS_SETPGRP) {
if (setpgrp () < 0) {
flux_log_error (p->h, "setpgrp");
Expand Down Expand Up @@ -615,6 +624,15 @@ static int local_fork (flux_subprocess_t *p)

p->state = FLUX_SUBPROCESS_STARTED;

if (p->hooks.post_fork) {
/* always a chance caller may destroy subprocess in callback */
flux_subprocess_ref (p);
p->in_hook = true;
(*p->hooks.post_fork) (p, p->hooks.post_fork_arg);
p->in_hook = false;
flux_subprocess_unref (p);
}

return (0);
}

Expand Down
6 changes: 5 additions & 1 deletion src/common/libsubprocess/server.c
Expand Up @@ -361,7 +361,11 @@ static void server_exec_cb (flux_t *h, flux_msg_handler_t *mh,
goto error;
}

if (!(p = flux_exec (s->h, FLUX_SUBPROCESS_FLAGS_SETPGRP, cmd, &ops))) {
if (!(p = flux_exec (s->h,
FLUX_SUBPROCESS_FLAGS_SETPGRP,
cmd,
&ops,
NULL))) {
/* error here, generate FLUX_SUBPROCESS_EXEC_FAILED state */
if (flux_respond_pack (h, msg, "{s:s s:i s:i s:i}",
"type", "state",
Expand Down
34 changes: 21 additions & 13 deletions src/common/libsubprocess/subprocess.c
Expand Up @@ -146,6 +146,7 @@ static flux_subprocess_t * subprocess_create (flux_t *h,
int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks,
int rank,
bool local)
{
Expand Down Expand Up @@ -179,6 +180,9 @@ static flux_subprocess_t * subprocess_create (flux_t *h,
if (ops)
p->ops = *ops;

if (hooks)
p->hooks = *hooks;

p->h = h;
p->reactor = r;
p->rank = rank;
Expand Down Expand Up @@ -555,7 +559,8 @@ static int subprocess_setup_completed (flux_subprocess_t *p)

static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops)
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks)
{
flux_subprocess_t *p = NULL;
int valid_flags = (FLUX_SUBPROCESS_FLAGS_STDIO_FALLTHROUGH
Expand All @@ -572,7 +577,7 @@ static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int fla
return NULL;
}

if (!(p = subprocess_create (h, r, flags, cmd, ops, -1, true)))
if (!(p = subprocess_create (h, r, flags, cmd, ops, hooks, -1, true)))
goto error;

if (subprocess_local_setup (p) < 0)
Expand All @@ -597,7 +602,8 @@ static flux_subprocess_t * flux_exec_wrap (flux_t *h, flux_reactor_t *r, int fla

flux_subprocess_t * flux_exec (flux_t *h, int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops)
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks)
{
flux_reactor_t *r;

Expand All @@ -609,14 +615,15 @@ flux_subprocess_t * flux_exec (flux_t *h, int flags,
if (!(r = flux_get_reactor (h)))
return NULL;

return flux_exec_wrap (h, r, flags, cmd, ops);
return flux_exec_wrap (h, r, flags, cmd, ops, hooks);
}

flux_subprocess_t * flux_local_exec (flux_reactor_t *r, int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops)
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks)
{
return flux_exec_wrap (NULL, r, flags, cmd, ops);
return flux_exec_wrap (NULL, r, flags, cmd, ops, hooks);
}

flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags,
Expand Down Expand Up @@ -657,7 +664,7 @@ flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags,
if (!(r = flux_get_reactor (h)))
goto error;

if (!(p = subprocess_create (h, r, flags, cmd, ops, rank, false)))
if (!(p = subprocess_create (h, r, flags, cmd, ops, NULL, rank, false)))
goto error;

if (subprocess_remote_setup (p) < 0)
Expand Down Expand Up @@ -688,7 +695,7 @@ int flux_subprocess_write (flux_subprocess_t *p, const char *stream,
flux_buffer_t *fb;
int ret;

if (!p || p->magic != SUBPROCESS_MAGIC) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) {
errno = EINVAL;
return -1;
}
Expand Down Expand Up @@ -748,7 +755,7 @@ int flux_subprocess_close (flux_subprocess_t *p, const char *stream)
{
struct subprocess_channel *c;

if (!p || p->magic != SUBPROCESS_MAGIC) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) {
errno = EINVAL;
return -1;
}
Expand Down Expand Up @@ -800,7 +807,7 @@ static const char *subprocess_read (flux_subprocess_t *p,
flux_buffer_t *fb;
const char *ptr;

if (!p || p->magic != SUBPROCESS_MAGIC) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) {
errno = EINVAL;
return NULL;
}
Expand Down Expand Up @@ -869,7 +876,8 @@ flux_future_t *flux_subprocess_kill (flux_subprocess_t *p, int signum)
{
flux_future_t *f = NULL;

if (!p || p->magic != SUBPROCESS_MAGIC || !signum) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)
|| !signum) {
errno = EINVAL;
return NULL;
}
Expand Down Expand Up @@ -1060,7 +1068,7 @@ flux_reactor_t * flux_subprocess_get_reactor (flux_subprocess_t *p)
int flux_subprocess_aux_set (flux_subprocess_t *p,
const char *name, void *x, flux_free_f free_fn)
{
if (!p || p->magic != SUBPROCESS_MAGIC) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) {
errno = EINVAL;
return -1;
}
Expand All @@ -1069,7 +1077,7 @@ int flux_subprocess_aux_set (flux_subprocess_t *p,

void * flux_subprocess_aux_get (flux_subprocess_t *p, const char *name)
{
if (!p || p->magic != SUBPROCESS_MAGIC) {
if (!p || p->magic != SUBPROCESS_MAGIC || (p->local && p->in_hook)) {
errno = EINVAL;
return NULL;
}
Expand Down
20 changes: 16 additions & 4 deletions src/common/libsubprocess/subprocess.h
Expand Up @@ -79,6 +79,7 @@ typedef void (*flux_subprocess_output_f) (flux_subprocess_t *p,
const char *stream);
typedef void (*flux_subprocess_state_f) (flux_subprocess_t *p,
flux_subprocess_state_t state);
typedef void (*flux_subprocess_hook_f) (flux_subprocess_t *p, void *arg);

/*
* Functions for event-driven subprocess handling:
Expand All @@ -96,6 +97,17 @@ typedef struct {
flux_subprocess_output_f on_stderr; /* Read of stderr is ready */
} flux_subprocess_ops_t;

/*
* flux_subprocess_hooks_t: Hook functions to execute at pre-defined
* points. Hooks can only be executed on local processes.
*/
typedef struct {
flux_subprocess_hook_f pre_exec;
void *pre_exec_arg;
flux_subprocess_hook_f post_fork;
void *post_fork_arg;
} flux_subprocess_hooks_t;

/*
* General support:
*/
Expand Down Expand Up @@ -222,8 +234,6 @@ int flux_cmd_add_channel (flux_cmd_t *cmd, const char *name);
int flux_cmd_setopt (flux_cmd_t *cmd, const char *var, const char *val);
const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var);



/*
* Subprocesses:
*/
Expand All @@ -246,11 +256,13 @@ const char *flux_cmd_getopt (flux_cmd_t *cmd, const char *var);
*/
flux_subprocess_t *flux_exec (flux_t *h, int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops);
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks);

flux_subprocess_t *flux_local_exec (flux_reactor_t *r, int flags,
const flux_cmd_t *cmd,
const flux_subprocess_ops_t *ops);
const flux_subprocess_ops_t *ops,
const flux_subprocess_hooks_t *hooks);

flux_subprocess_t *flux_rexec (flux_t *h, int rank, int flags,
const flux_cmd_t *cmd,
Expand Down
2 changes: 2 additions & 0 deletions src/common/libsubprocess/subprocess_private.h
Expand Up @@ -97,7 +97,9 @@ struct flux_subprocess {

/* fds[0] is parent/user, fds[1] is child */
int sync_fds[2]; /* socketpair for fork/exec sync */
bool in_hook; /* if presently in a hook */
flux_watcher_t *child_w;
flux_subprocess_hooks_t hooks;

/* remote */

Expand Down

0 comments on commit 15ad6f6

Please sign in to comment.