Skip to content

Commit

Permalink
job-list: drop code for deferring job data updates
Browse files Browse the repository at this point in the history
Problem: code still exists for deferring job updates until
job-info lookups of R or jobspec have completed, but now that
those objects are included in the journal this cannot happen.

Drop unnecessary code.
  • Loading branch information
garlick committed Mar 30, 2024
1 parent 419d456 commit a8ab819
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 234 deletions.
8 changes: 0 additions & 8 deletions src/modules/job-list/job_data.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ void job_destroy (void *data)
json_decref (job->exception_context);
json_decref (job->jobspec_updates);
json_decref (job->R_updates);
zlist_destroy (&job->updates);
free (job);
errno = save_errno;
}
Expand All @@ -69,13 +68,6 @@ struct job *job_create (flux_t *h, flux_jobid_t id)
job->expiration = -1.0;
job->wait_status = -1;
job->result = FLUX_JOB_RESULT_FAILED;

if (!(job->updates = zlist_new ())) {
errno = ENOMEM;
job_destroy (job);
return NULL;
}

job->states_mask = FLUX_JOB_STATE_NEW;
job->states_events_mask = FLUX_JOB_STATE_NEW;
return job;
Expand Down
13 changes: 2 additions & 11 deletions src/modules/job-list/job_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,14 @@ struct job {
json_t *R;
json_t *exception_context;

/* All internal changes (most notably job state transitions) are
* placed on the updates list. We do not immediately update to
* the new state and place onto a new list until we have retrieved
* any necessary data associated to that state. For example, when
* the 'depend' state has been seen, we don't immediately place it
* on the `pending` list. We wait until we've retrieved data such
* as userid, urgency, etc.
*
* Track which states we have seen and have completed transition
/* Track which states we have seen and have completed transition
* to. States we've processed via the states_mask and states seen
* via events stream in states_events_mask.
*/
zlist_t *updates;
unsigned int states_mask;
unsigned int states_events_mask;
void *list_handle;
/* if updates in eventlog before jobspec / R read from KVS */
/* store updates that were received before jobspec/R objects */
json_t *jobspec_updates;
json_t *R_updates;

Expand Down
259 changes: 44 additions & 215 deletions src/modules/job-list/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,6 @@
#define STATE_TRANSITION_FLAG_REVERT 0x1
#define STATE_TRANSITION_FLAG_CONDITIONAL 0x2

typedef enum {
JOB_UPDATE_TYPE_STATE_TRANSITION,
JOB_UPDATE_TYPE_JOBSPEC_UPDATE,
JOB_UPDATE_TYPE_RESOURCE_UPDATE,
} job_update_type_t;

struct job_update {
job_update_type_t type;

/* state transitions */
flux_job_state_t state;
double timestamp;
int flags;
flux_job_state_t expected_state;

/* jobspec_update, resource_update */
json_t *update_context;

/* all updates */
bool processing; /* indicates we are waiting for
* current update to complete */
bool finished; /* indicates we are done, can remove
* from list */
};

static int submit_context_parse (flux_t *h,
struct job *job,
json_t *context);
Expand All @@ -94,8 +69,6 @@ static int memo_update (flux_t *h,
struct job *job,
json_t *context);

static void process_updates (struct job_state_ctx *jsctx, struct job *job);

static int journal_process_events (struct job_state_ctx *jsctx,
const flux_msg_t *msg);

Expand Down Expand Up @@ -291,154 +264,56 @@ static void eventlog_inactive_complete (struct job *job)
}
}

static void job_update_destroy (void *data)
{
struct job_update *updt = data;
if (updt) {
int saved_errno = errno;
json_decref (updt->update_context);
free (updt);
errno = saved_errno;
}
}

static struct job_update *job_update_create (job_update_type_t type)
{
struct job_update *updt = NULL;

if (!(updt = calloc (1, sizeof (*updt))))
return NULL;
updt->type = type;
updt->processing = false;
updt->finished = false;
return updt;
}

static int append_update (struct job *job, struct job_update *updt)
{
if (zlist_append (job->updates, updt) < 0) {
errno = ENOMEM;
return -1;
}
zlist_freefn (job->updates, updt, job_update_destroy, true);
return 0;
}

static int add_state_transition (struct job *job,
flux_job_state_t newstate,
double timestamp,
int flags,
flux_job_state_t expected_state)
{
struct job_update *updt = NULL;

if (!((flags & STATE_TRANSITION_FLAG_REVERT)
|| (flags & STATE_TRANSITION_FLAG_CONDITIONAL))
&& (newstate & job->states_events_mask))
return 0;

if (!(updt = job_update_create (JOB_UPDATE_TYPE_STATE_TRANSITION)))
return -1;

updt->state = newstate;
updt->timestamp = timestamp;
updt->flags = flags;
updt->expected_state = expected_state;

if (append_update (job, updt) < 0)
goto cleanup;

job->states_events_mask |= newstate;
return 0;

cleanup:
job_update_destroy (updt);
return -1;
}

static int add_update (struct job *job, json_t *context, job_update_type_t type)
{
struct job_update *updt = NULL;

if (!(updt = job_update_create (type)))
return -1;

updt->update_context = json_incref (context);

if (append_update (job, updt) < 0)
goto cleanup;

return 0;

cleanup:
job_update_destroy (updt);
return -1;
}

static int add_jobspec_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_JOBSPEC_UPDATE);
}

static int add_resource_update (struct job *job, json_t *context)
{
return add_update (job, context, JOB_UPDATE_TYPE_RESOURCE_UPDATE);
}

static void process_state_transition_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
flux_job_state_t state,
double timestamp,
int flags,
flux_job_state_t expected_state)
{
if ((updt->flags & STATE_TRANSITION_FLAG_REVERT)) {
if ((flags & STATE_TRANSITION_FLAG_REVERT)) {
/* only revert if the current state is what is expected */
if (job->state == updt->expected_state) {
if (job->state == expected_state) {
job->states_mask &= ~job->state;
job->states_mask &= ~updt->state;
update_job_state_and_list (jsctx, job, updt->state, updt->timestamp);
job->states_mask &= ~state;
update_job_state_and_list (jsctx, job, state, timestamp);
}
else {
updt->finished = true;
else
return;
}
}
else if ((updt->flags & STATE_TRANSITION_FLAG_CONDITIONAL)) {
else if ((flags & STATE_TRANSITION_FLAG_CONDITIONAL)) {
/* if current state isn't what we expected, move on */
if (job->state != updt->expected_state) {
updt->finished = true;
if (job->state != expected_state)
return;
}
}
if (updt->state == FLUX_JOB_STATE_DEPEND) {
if (state == FLUX_JOB_STATE_DEPEND) {
// process job->jobspec which was obtained from journal
if (job_parse_jobspec_cached (job, job->jobspec_updates) < 0) {
flux_log_error (jsctx->h,
"%s: error parsing jobspec",
idf58 (job->id));
}
update_job_state_and_list (jsctx, job, updt->state, updt->timestamp);
updt->finished = true;
update_job_state_and_list (jsctx, job, state, timestamp);
}
else if (updt->state == FLUX_JOB_STATE_RUN) {
else if (state == FLUX_JOB_STATE_RUN) {
// process job->R which was obtained from journal
if (job_parse_R_cached (job, NULL) < 0) {
flux_log_error (jsctx->h,
"%s: error parsing R",
idf58 (job->id));
}
update_job_state_and_list (jsctx, job, updt->state, updt->timestamp);
updt->finished = true;
update_job_state_and_list (jsctx, job, state, timestamp);
}
else {
/* FLUX_JOB_STATE_PRIORITY */
/* FLUX_JOB_STATE_SCHED */
/* FLUX_JOB_STATE_CLEANUP */
/* FLUX_JOB_STATE_INACTIVE */

if (updt->state == FLUX_JOB_STATE_INACTIVE)
if (state == FLUX_JOB_STATE_INACTIVE)
eventlog_inactive_complete (job);

update_job_state_and_list (jsctx, job, updt->state, updt->timestamp);
updt->finished = true;
update_job_state_and_list (jsctx, job, state, timestamp);
}
}

Expand Down Expand Up @@ -474,23 +349,6 @@ static void update_jobspec (struct job_state_ctx *jsctx,
job_stats_add_queue (jsctx->statsctx, job);
}

static void process_jobspec_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
{
/* Generally speaking, after a job is running, jobspec-update
* events should have no effect. Note that in some cases,
* such as job duration, jobspec-updates can alter a job's
* behavior, but it is via an update to R. In this case, we
* elect to not update the job duration seen by the user in
* the jobspec. The effect will be seen changes in R (in this
* example, via the job expiration time in R).
*/
if (job->state < FLUX_JOB_STATE_RUN)
update_jobspec (jsctx, job, updt->update_context, true);
updt->finished = true;
}

static void update_resource (struct job_state_ctx *jsctx,
struct job *job,
json_t *context)
Expand All @@ -513,40 +371,6 @@ static void update_resource (struct job_state_ctx *jsctx,
job_R_update (job, context);
}

static void process_resource_update (struct job_state_ctx *jsctx,
struct job *job,
struct job_update *updt)
{
/* Generally speaking, resource-update events only have an effect
* when a job is running. */
if (job->state == FLUX_JOB_STATE_RUN)
update_resource (jsctx, job, updt->update_context);
updt->finished = true;
}

static void process_updates (struct job_state_ctx *jsctx, struct job *job)
{
struct job_update *updt;

while ((updt = zlist_head (job->updates))
&& (!updt->processing || updt->finished)) {

if (updt->finished)
goto next;

if (updt->type == JOB_UPDATE_TYPE_STATE_TRANSITION)
process_state_transition_update (jsctx, job, updt);
else if (updt->type == JOB_UPDATE_TYPE_JOBSPEC_UPDATE)
process_jobspec_update (jsctx, job, updt);
else /* updt->type == JOB_UPDATE_TYPE_RESOURCE_UPDATE */
process_resource_update (jsctx, job, updt);

next:
if (updt->finished)
zlist_remove (job->updates, updt);
}
}

void job_state_pause_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -602,16 +426,19 @@ static int job_transition_state (struct job_state_ctx *jsctx,
int flags,
flux_job_state_t expected_state)
{
if (add_state_transition (job,
newstate,
timestamp,
flags,
expected_state) < 0) {
flux_log_error (jsctx->h, "%s: add_state_transition",
__FUNCTION__);
return -1;
}
process_updates (jsctx, job);
if (!((flags & STATE_TRANSITION_FLAG_REVERT)
|| (flags & STATE_TRANSITION_FLAG_CONDITIONAL))
&& (newstate & job->states_events_mask))
return 0;

job->states_events_mask |= newstate;

process_state_transition_update (jsctx,
job,
newstate,
timestamp,
flags,
expected_state);
return 0;
}

Expand Down Expand Up @@ -979,12 +806,16 @@ static int journal_jobspec_update_event (struct job_state_ctx *jsctx,
errno = EPROTO;
return -1;
}

if (add_jobspec_update (job, context) < 0) {
flux_log_error (jsctx->h, "%s: add_jobspec_update", __FUNCTION__);
return -1;
}
process_updates (jsctx, job);
/* Generally speaking, after a job is running, jobspec-update
* events should have no effect. Note that in some cases,
* such as job duration, jobspec-updates can alter a job's
* behavior, but it is via an update to R. In this case, we
* elect to not update the job duration seen by the user in
* the jobspec. The effect will be seen changes in R (in this
* example, via the job expiration time in R).
*/
if (job->state < FLUX_JOB_STATE_RUN)
update_jobspec (jsctx, job, context, true);
return 0;
}

Expand All @@ -999,12 +830,10 @@ static int journal_resource_update_event (struct job_state_ctx *jsctx,
errno = EPROTO;
return -1;
}

if (add_resource_update (job, context) < 0) {
flux_log_error (jsctx->h, "%s: add_resource_update", __FUNCTION__);
return -1;
}
process_updates (jsctx, job);
/* Generally speaking, resource-update events only have an effect
* when a job is running. */
if (job->state == FLUX_JOB_STATE_RUN)
update_resource (jsctx, job, context);
return 0;
}

Expand Down

0 comments on commit a8ab819

Please sign in to comment.