Skip to content

Commit

Permalink
Merge pull request #5911 from grondo/issue#5896
Browse files Browse the repository at this point in the history
notify user of stopped queue in `flux job attach`
  • Loading branch information
mergify[bot] committed Apr 25, 2024
2 parents 359e64b + af43f36 commit 9534cc9
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
89 changes: 89 additions & 0 deletions src/cmd/job/attach.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ struct attach_ctx {
bool statusline;
char *last_event;
bool fatal_exception;
int last_queue_update;
char *queue;
bool queue_stopped;
};

void attach_completed_check (struct attach_ctx *ctx)
Expand Down Expand Up @@ -914,6 +917,61 @@ static struct job_event_notifications attach_notifications[] = {
{ NULL, NULL, 0},
};

static void queue_status_cb (flux_future_t *f, void *arg)
{
struct attach_ctx *ctx = arg;
int start;
if (flux_rpc_get_unpack (f, "{s:b}", "start", &start) == 0)
ctx->queue_stopped = !start;
flux_future_destroy (f);
}

static void fetch_queue_status (struct attach_ctx *ctx)
{
flux_future_t *f = NULL;

/* We don't yet have the queue, do nothing
*/
if (!ctx->queue)
return;

if (streq (ctx->queue, "default"))
f = flux_rpc (ctx->h, "job-manager.queue-status", "{}", 0, 0);
else
f = flux_rpc_pack (ctx->h,
"job-manager.queue-status",
0,
0,
"{s:s?}",
"name", ctx->queue);
if (f && flux_future_then (f, -1., queue_status_cb, ctx) < 0)
flux_future_destroy (f);
}

static void job_queue_cb (flux_future_t *f, void *arg)
{
struct attach_ctx *ctx = arg;
const char *queue = "default";
if (flux_rpc_get_unpack (f, "{s:{s?s}}", "job", "queue", &queue) == 0)
ctx->queue = strdup (queue);
flux_future_destroy (f);
}

static void fetch_job_queue (struct attach_ctx *ctx)
{
flux_future_t *f;

if (!(f = flux_rpc_pack (ctx->h,
"job-list.list-id",
FLUX_NODEID_ANY,
0,
"{s:I s:[s]}",
"id", ctx->id,
"attrs", "queue"))
|| flux_future_then (f, -1., job_queue_cb, ctx) < 0)
flux_future_destroy (f);
}

static const char *job_event_notify_string (const char *name)
{
struct job_event_notifications *t = attach_notifications;
Expand All @@ -940,7 +998,9 @@ static void attach_notify (struct attach_ctx *ctx,
const char *event_name,
double ts)
{
char buf[64];
const char *msg;

if (!event_name)
return;
if (ctx->statusline
Expand All @@ -950,6 +1010,34 @@ static void attach_notify (struct attach_ctx *ctx,
int width = 80;
struct winsize w;

if (streq (msg, "waiting for resources")) {
/* Fetch job queue if not already available so queue status
* can be checked in case allocations are stopped:
*/
if (!ctx->queue)
fetch_job_queue (ctx);
else {
/* Check queue status, only check again every ~10s
*/
if (ctx->last_queue_update <= 0
|| (dt - ctx->last_queue_update >= 10)) {
ctx->last_queue_update = dt;
fetch_queue_status (ctx);
}
}

/* Amend status if queue is stopped:
*/
if (ctx->queue_stopped) {
if (snprintf (buf,
sizeof (buf),
"%s (%s queue stopped)",
msg,
ctx->queue) < sizeof (buf))
msg = buf;
}
}

/* Adjust width of status so timer is right justified:
*/
if (ioctl(0, TIOCGWINSZ, &w) == 0)
Expand Down Expand Up @@ -1245,6 +1333,7 @@ int cmd_attach (optparse_t *p, int argc, char **argv)
free (ctx.service);
free (totalview_jobid);
free (ctx.last_event);
free (ctx.queue);
free (ctx.stdin_ranks);

if (ctx.fatal_exception && ctx.exit_code == 0)
Expand Down
25 changes: 25 additions & 0 deletions t/t2500-job-attach.t
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,31 @@ test_expect_success 'attach: --show-status properly accounts prolog-start events
flux job wait-event $jobid2 clean &&
flux jobtap remove perilog-test.so
'
test_expect_success NO_CHAIN_LINT 'attach: --show-status notes stopped queue' '
flux queue stop &&
test_when_finished "flux queue start" &&
jobid=$(flux submit hostname) &&
$runpty -f asciicast -o stopped-queue.out \
flux job attach --show-status $jobid &
waitfile.lua -v -t 15 -p "default queue stopped" stopped-queue.out &&
flux queue start &&
wait
'
test_expect_success NO_CHAIN_LINT 'attach: --show-status notes stopped named queue' '
flux config load <<-EOF &&
[queues.batch]
[queues.debug]
EOF
flux queue stop --verbose --all &&
jobid=$(flux submit -qbatch hostname) &&
$runpty -f asciicast -o stopped-batch.out \
flux job attach --show-status $jobid &
waitfile.lua -v -t 15 -p "batch queue stopped" stopped-batch.out &&
flux queue start --all &&
wait &&
flux config load </dev/null &&
flux queue status
'
test_expect_success 'attach: shows output from job' '
run_timeout 5 flux job attach $(cat jobid1) | grep foo
'
Expand Down

0 comments on commit 9534cc9

Please sign in to comment.