Skip to content

Commit

Permalink
Merge pull request #5813 from grondo/issue#5811
Browse files Browse the repository at this point in the history
job-exec: improve cleanup after lost shell events
  • Loading branch information
mergify[bot] committed Mar 21, 2024
2 parents 1f25e05 + 8b0ef6f commit 9c32493
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 81 deletions.
9 changes: 6 additions & 3 deletions src/modules/job-exec/bulk-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ static int exec_exit_notify (struct bulk_exec *exec)
return 0;
}

static void exit_batch_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void exit_batch_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct bulk_exec *exec = arg;
exec_exit_notify (exec);
Expand Down Expand Up @@ -274,7 +276,8 @@ static void subprocess_destroy_finish (flux_future_t *f, void *arg)
flux_subprocess_t *p = arg;
if (flux_future_get (f, NULL) < 0) {
flux_t *h = flux_subprocess_aux_get (p, "flux_t");
flux_log_error (h, "subprocess_kill: %ju: %s",
flux_log_error (h,
"subprocess_kill: %ju: %s",
(uintmax_t) flux_subprocess_pid (p),
future_strerror (f, errno));
}
Expand Down
68 changes: 44 additions & 24 deletions src/modules/job-exec/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ static int exec_barrier_enter (struct bulk_exec *exec)
return 0;
}

static void output_cb (struct bulk_exec *exec, flux_subprocess_t *p,
static void output_cb (struct bulk_exec *exec,
flux_subprocess_t *p,
const char *stream,
const char *data,
int len,
Expand Down Expand Up @@ -163,7 +164,7 @@ static void lost_shell_continuation (flux_future_t *f, void *arg)
}

static int lost_shell (struct jobinfo *job,
bool raise_exception,
bool critical,
int shell_rank,
const char *fmt,
...)
Expand All @@ -173,6 +174,7 @@ static int lost_shell (struct jobinfo *job,
int msglen = sizeof (msgbuf);
char *msg = msgbuf;
va_list ap;
int severity = critical ? 0 : FLUX_JOB_EXCEPTION_CRIT;

if (fmt) {
va_start (ap, fmt);
Expand All @@ -181,8 +183,11 @@ static int lost_shell (struct jobinfo *job,
va_end (ap);
}

if (raise_exception) {
/* Raise a non-fatal job exception */
if (!critical) {
/* Raise a non-fatal job exception if the lost shell was not critical.
* The job exec service will raise a fatal exception later for
* critical shells.
*/
jobinfo_raise (job,
"node-failure",
FLUX_JOB_EXCEPTION_CRIT,
Expand All @@ -201,7 +206,7 @@ static int lost_shell (struct jobinfo *job,
"exception",
"{s:s s:i s:i s:s}",
"type", "lost-shell",
"severity", FLUX_JOB_EXCEPTION_CRIT,
"severity", severity,
"shell_rank", shell_rank,
"message", msg)))
return -1;
Expand All @@ -212,6 +217,11 @@ static int lost_shell (struct jobinfo *job,
return 0;
}

static bool is_critical_rank (struct jobinfo *job, int shell_rank)
{
return idset_test (job->critical_ranks, shell_rank);
}

static void error_cb (struct bulk_exec *exec, flux_subprocess_t *p, void *arg)
{
struct jobinfo *job = arg;
Expand All @@ -226,21 +236,28 @@ static void error_cb (struct bulk_exec *exec, flux_subprocess_t *p, void *arg)
*/
if (cmd) {
if (errnum == EHOSTUNREACH) {
if (!idset_test (job->critical_ranks, shell_rank)
&& lost_shell (job,
true,
shell_rank,
"%s on %s (shell rank %d)",
"lost contact with job shell",
hostname,
shell_rank) == 0)
return;
jobinfo_fatal_error (job,
0,
"%s on broker %s (rank %d)",
"lost contact with job shell",
hostname,
rank);
bool critical = is_critical_rank (job, shell_rank);

/* Always notify rank 0 shell of a lost shell.
*/
lost_shell (job,
critical,
shell_rank,
"%s on %s (shell rank %d)",
"lost contact with job shell",
hostname,
shell_rank);

/* Raise a fatal error and terminate job immediately if
* the lost shell was critical.
*/
if (critical)
jobinfo_fatal_error (job,
0,
"%s on broker %s (rank %d)",
"lost contact with job shell",
hostname,
rank);
}
else if (errnum == ENOSYS) {
jobinfo_fatal_error (job,
Expand Down Expand Up @@ -310,7 +327,8 @@ static void exit_cb (struct bulk_exec *exec,
if (ctx->barrier_completion_count == 0
|| ctx->barrier_enter_count > 0) {
if (bulk_exec_write (exec, "stdin", "exit=1\n", 7) < 0)
jobinfo_fatal_error (job, 0,
jobinfo_fatal_error (job,
0,
"failed to terminate barrier: %s",
strerror (errno));
}
Expand All @@ -328,7 +346,7 @@ static void exit_cb (struct bulk_exec *exec,
if (p && signo > 0) {
if (shell_rank != 0)
lost_shell (job,
false,
is_critical_rank (job, shell_rank),
shell_rank,
"shell rank %d (on %s): %s",
shell_rank,
Expand Down Expand Up @@ -487,8 +505,10 @@ static int exec_init (struct jobinfo *job)
return -1;
}

static void exec_check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void exec_check_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct jobinfo *job = arg;
struct bulk_exec *exec = job->data;
Expand Down
17 changes: 11 additions & 6 deletions src/modules/job-exec/exec_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ static const char *jobspec_get_job_shell (json_t *jobspec)
{
const char *path = NULL;
if (jobspec)
(void) json_unpack (jobspec, "{s:{s:{s:{s:s}}}}",
"attributes", "system", "exec",
"job_shell", &path);
(void) json_unpack (jobspec,
"{s:{s:{s:{s:s}}}}",
"attributes",
"system",
"exec",
"job_shell", &path);
return path;
}

Expand All @@ -53,9 +56,11 @@ static const char *jobspec_get_cwd (json_t *jobspec)
{
const char *cwd = NULL;
if (jobspec)
(void) json_unpack (jobspec, "{s:{s:{s:s}}}",
"attributes", "system",
"cwd", &cwd);
(void) json_unpack (jobspec,
"{s:{s:{s:s}}}",
"attributes",
"system",
"cwd", &cwd);
return cwd;
}

Expand Down
102 changes: 65 additions & 37 deletions src/modules/job-exec/job-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ static int jobid_exception (flux_t *h, flux_jobid_t id,
"note", note);
}

static int jobinfo_respond_error (struct jobinfo *job, int errnum,
static int jobinfo_respond_error (struct jobinfo *job,
int errnum,
const char *msg)
{
return jobid_exception (job->ctx->h,
Expand All @@ -319,21 +320,27 @@ static int jobinfo_send_release (struct jobinfo *job,
int rc;
flux_t *h = job->ctx->h;
// XXX: idset ignored for now. Always release all resources
rc = flux_respond_pack (h, job->req, "{s:I s:s s{s:s s:b}}",
"id", job->id,
"type", "release",
"data", "ranks", "all",
"final", true);
rc = flux_respond_pack (h,
job->req,
"{s:I s:s s{s:s s:b}}",
"id", job->id,
"type", "release",
"data",
"ranks", "all",
"final", true);
return rc;
}

static int jobinfo_respond (flux_t *h, struct jobinfo *job,
static int jobinfo_respond (flux_t *h,
struct jobinfo *job,
const char *event)
{
return flux_respond_pack (h, job->req, "{s:I s:s s:{}}",
"id", job->id,
"type", event,
"data");
return flux_respond_pack (h,
job->req,
"{s:I s:s s:{}}",
"id", job->id,
"type", event,
"data");
}

static void jobinfo_complete (struct jobinfo *job, const struct idset *ranks)
Expand All @@ -348,11 +355,13 @@ static void jobinfo_complete (struct jobinfo *job, const struct idset *ranks)
jobinfo_emit_event_pack_nowait (job, "complete",
"{ s:i }",
"status", job->wait_status);
if (flux_respond_pack (h, job->req, "{s:I s:s s:{s:i}}",
"id", job->id,
"type", "finish",
"data",
"status", job->wait_status) < 0)
if (flux_respond_pack (h,
job->req,
"{s:I s:s s:{s:i}}",
"id", job->id,
"type", "finish",
"data",
"status", job->wait_status) < 0)
flux_log_error (h, "jobinfo_complete: flux_respond");
}
}
Expand All @@ -370,8 +379,10 @@ static void kill_shell_timer_cb (flux_reactor_t *r,
(*job->impl->kill) (job, SIGKILL);
}

static void kill_timer_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
static void kill_timer_cb (flux_reactor_t *r,
flux_watcher_t *w,
int revents,
void *arg)
{
struct jobinfo *job = arg;
flux_future_t *f;
Expand Down Expand Up @@ -424,7 +435,12 @@ static void timelimit_cb (flux_reactor_t *r,
/* Timelimit reached. Generate "timeout" exception and send SIGALRM.
* Wait for a gracetime then forcibly terminate job.
*/
if (jobid_exception (job->h, job->id, job->req, "timeout", 0, 0,
if (jobid_exception (job->h,
job->id,
job->req,
"timeout",
0,
0,
"resource allocation expired") < 0)
flux_log_error (job->h,
"failed to generate timeout exception for %s",
Expand Down Expand Up @@ -540,8 +556,10 @@ static void jobinfo_cancel (struct jobinfo *job)

static int jobinfo_finalize (struct jobinfo *job);

static void jobinfo_fatal_verror (struct jobinfo *job, int errnum,
const char *fmt, va_list ap)
static void jobinfo_fatal_verror (struct jobinfo *job,
int errnum,
const char *fmt,
va_list ap)
{
int n;
char msg [256];
Expand Down Expand Up @@ -575,7 +593,8 @@ static void jobinfo_fatal_verror (struct jobinfo *job, int errnum,
}
}

void jobinfo_fatal_error (struct jobinfo *job, int errnum,
void jobinfo_fatal_error (struct jobinfo *job,
int errnum,
const char *fmt, ...)
{
flux_t *h = job->ctx->h;
Expand Down Expand Up @@ -1077,12 +1096,14 @@ static int job_start (struct job_exec_ctx *ctx, const flux_msg_t *msg)

job->ctx = ctx;

if (flux_request_unpack (job->req, NULL, "{s:I s:i s:O s:b s:o}",
"id", &job->id,
"userid", &job->userid,
"jobspec", &job->jobspec,
"reattach", &job->reattach,
"R", &R) < 0) {
if (flux_request_unpack (job->req,
NULL,
"{s:I s:i s:O s:b s:o}",
"id", &job->id,
"userid", &job->userid,
"jobspec", &job->jobspec,
"reattach", &job->reattach,
"R", &R) < 0) {
flux_log_error (ctx->h, "start: flux_request_unpack");
jobinfo_decref (job);
return -1;
Expand Down Expand Up @@ -1143,8 +1164,10 @@ static int job_start (struct job_exec_ctx *ctx, const flux_msg_t *msg)
return -1;
}

static void start_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void start_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_exec_ctx *ctx = arg;

Expand All @@ -1158,19 +1181,23 @@ static void start_cb (flux_t *h, flux_msg_handler_t *mh,
}
}

static void exception_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
static void exception_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct job_exec_ctx *ctx = arg;
flux_jobid_t id;
int severity = 0;
const char *type = NULL;
struct jobinfo *job = NULL;

if (flux_event_unpack (msg, NULL, "{s:I s:s s:i}",
"id", &id,
"type", &type,
"severity", &severity) < 0) {
if (flux_event_unpack (msg,
NULL,
"{s:I s:s s:i}",
"id", &id,
"type", &type,
"severity", &severity) < 0) {
flux_log_error (h, "job-exception event");
return;
}
Expand Down Expand Up @@ -1204,7 +1231,8 @@ static void critical_ranks_cb (flux_t *h,
struct idset *idset;
struct jobinfo *job;

if (flux_request_unpack (msg, NULL,
if (flux_request_unpack (msg,
NULL,
"{s:I s:s}",
"id", &id,
"ranks", &ranks) < 0)
Expand Down
8 changes: 5 additions & 3 deletions src/modules/job-exec/rset.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,13 @@ static int rset_read_time_window (struct resource_set *r, json_error_t *errp)
*/
r->expiration = 0.;
r->starttime = 0.;
if (json_unpack_ex (r->R, errp, 0,
if (json_unpack_ex (r->R,
errp,
0,
"{s:{s?F s?F}}",
"execution",
"starttime", &r->starttime,
"expiration", &r->expiration) < 0)
"starttime", &r->starttime,
"expiration", &r->expiration) < 0)
return -1;
return 0;
}
Expand Down

0 comments on commit 9c32493

Please sign in to comment.