Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job-manager: make some replay errors non-fatal #5150

Merged
merged 3 commits into from May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/modules/job-list/job_state.c
Expand Up @@ -673,7 +673,7 @@
flux_future_t *f3 = NULL;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit message "treads" -> "treats"

const char *eventlog, *jobspec, *R;
char path[64];
int rc = -1;
int rc = 0; // non-fatal error

if (strlen (key) <= dirskip) {
errno = EINVAL;
Expand Down Expand Up @@ -714,7 +714,7 @@
if (job->states_mask & FLUX_JOB_STATE_RUN) {
if (flux_job_kvs_key (path, sizeof (path), id, "R") < 0) {
errno = EINVAL;
return -1;
goto done;

Check warning on line 717 in src/modules/job-list/job_state.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-list/job_state.c#L717

Added line #L717 was not covered by tests
}
if (!(f3 = flux_kvs_lookup (jsctx->h, NULL, 0, path)))
goto done;
Expand All @@ -736,7 +736,7 @@

rc = 1;
done:
if (rc < 0)
if (rc == 0)
job_destroy (job);
flux_future_destroy (f1);
flux_future_destroy (f2);
Expand Down
34 changes: 30 additions & 4 deletions src/modules/job-manager/restart.c
Expand Up @@ -46,7 +46,10 @@
return count;
}

static struct job *lookup_job (flux_t *h, flux_jobid_t id, flux_error_t *error)
static struct job *lookup_job (flux_t *h,
flux_jobid_t id,
flux_error_t *error,
bool *fatal)
{
flux_future_t *f1 = NULL;
flux_future_t *f2 = NULL;
Expand All @@ -63,24 +66,33 @@
"cannot send lookup requests for job %ju: %s",
(uintmax_t)id,
strerror (errno));
*fatal = true;

Check warning on line 69 in src/modules/job-manager/restart.c

View check run for this annotation

Codecov / codecov/patch

src/modules/job-manager/restart.c#L69

Added line #L69 was not covered by tests
goto done;
}
if (flux_kvs_lookup_get (f1, &eventlog) < 0) {
errprintf (error, "lookup %s: %s", k1, strerror (errno));
*fatal = false;
goto done;
}
if (flux_kvs_lookup_get (f2, &jobspec) < 0) {
errprintf (error, "lookup %s: %s", k2, strerror (errno));
*fatal = false;
goto done;
}
if (!(job = job_create_from_eventlog (id, eventlog, jobspec, &e)))
if (!(job = job_create_from_eventlog (id, eventlog, jobspec, &e))) {
errprintf (error, "replay %s: %s", k1, e.text);
*fatal = true;
}
done:
flux_future_destroy (f1);
flux_future_destroy (f2);
return job;
}

/* Create a 'struct job' from the KVS, using synchronous KVS RPCs.
* Return 1 on success, 0 on non-fatal error, or -1 on a fatal error,
* where a fatal error will prevent flux from starting.
*/
static int depthfirst_map_one (flux_t *h,
const char *key,
int dirskip,
Expand All @@ -101,8 +113,22 @@
errprintf (error, "could not decode %s to job ID", key + dirskip + 1);
return -1;
}
if (!(job = lookup_job (h, id, error)))
return -1;

flux_error_t lookup_error;
bool fatal = false;
if (!(job = lookup_job (h, id, &lookup_error, &fatal))) {
if (fatal) {
errprintf (error, "%s", lookup_error.text);
return -1;
}
flux_log (h,
LOG_ERR,
"job %ju not replayed: %s",
(uintmax_t)id,
lookup_error.text);
return 0;
}

if (cb (job, arg, error) < 0)
goto done;
rc = 1;
Expand Down
15 changes: 15 additions & 0 deletions t/t2219-job-manager-restart.t
Expand Up @@ -19,6 +19,16 @@ restart_flux() {
flux module stats job-manager
}

# Returns 0 if dump file is replayed successfully AND one or more
# "not replayed" warnings were logged
restart_with_job_warning() {
local out=$(basename $1).dmesg
flux start -o,-Scontent.restore=$1 /bin/true 2>$out
result=$?
cat $out
test $result -eq 0 && grep -q "not replayed:" $out
}

test_expect_success 'verify that job manager can restart with current dump' '
restart_flux dump.tar >stats.out
'
Expand Down Expand Up @@ -193,6 +203,11 @@ for dump in ${DUMPS}/valid/*.tar.bz2; do
test_expect_success 'successfully started from '$testname "restart_flux $dump"
done

for dump in ${DUMPS}/warn/*.tar.bz2; do
testname=$(basename $dump)
test_expect_success 'successfully started with warning from '$testname "restart_with_job_warning $dump"
done

for dump in ${DUMPS}/invalid/*.tar.bz2; do
testname=$(basename $dump)
test_expect_success 'failed on '$testname "test_must_fail restart_flux $dump"
Expand Down