Skip to content

Commit

Permalink
Merge pull request #5635 from chu11/job_info_update_jobspec_remove_ma…
Browse files Browse the repository at this point in the history
…nual

job-info: support lookup of updated jobspec, remove manual construction of updated R / jobspec
  • Loading branch information
mergify[bot] committed Feb 21, 2024
2 parents 8190b78 + d5b38ff commit 45ad784
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 129 deletions.
88 changes: 24 additions & 64 deletions src/bindings/python/flux/job/kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
import errno
import json

import flux.constants
from _flux._core import ffi, lib
from flux.future import WaitAllFuture
from flux.job import JobID, JobspecV1
from flux.job.event import EventLogEvent
from flux.job import JobID
from flux.rpc import RPC
from flux.util import set_treedict


def _decode_field(data, key):
Expand Down Expand Up @@ -48,26 +47,19 @@ def get_decode(self):
return data


def job_info_lookup(flux_handle, jobid, keys=["jobspec"]):
payload = {"id": int(jobid), "keys": keys, "flags": 0}
def job_info_lookup(flux_handle, jobid, keys=["jobspec"], flags=0):
payload = {"id": int(jobid), "keys": keys, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
rpc.jobid = jobid
return rpc


def _setup_lookup_keys(keys, original, base):
def _setup_lookup_keys(keys, original):
if "jobspec" in keys:
if original:
keys.remove("jobspec")
if "J" not in keys:
keys.append("J")
elif not base:
if "eventlog" not in keys:
keys.append("eventlog")
if "R" in keys:
if not base:
if "eventlog" not in keys:
keys.append("eventlog")


def _get_original_jobspec(job_data):
Expand All @@ -78,58 +70,14 @@ def _get_original_jobspec(job_data):
return result.decode("utf-8")


def _get_updated_jobspec(job_data):
if isinstance(job_data["jobspec"], str):
data = json.loads(job_data["jobspec"])
else:
data = job_data["jobspec"]
jobspec = JobspecV1(**data)
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "jobspec-update":
for key, value in event.context.items():
jobspec.setattr(key, value)
return jobspec.dumps()


def _get_updated_R(job_data):
if isinstance(job_data["R"], str):
R = json.loads(job_data["R"])
else:
R = job_data["R"]
for entry in job_data["eventlog"].splitlines():
event = EventLogEvent(entry)
if event.name == "resource-update":
for key, value in event.context.items():
if key == "expiration":
set_treedict(R, "execution.expiration", value)
return json.dumps(R, ensure_ascii=False)


def _update_keys(job_data, decode, keys, original, base):
remove_eventlog = False
def _update_keys(job_data, decode, keys, original):
if "jobspec" in keys:
if original:
job_data["jobspec"] = _get_original_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "J" not in keys:
job_data.pop("J")
elif not base:
job_data["jobspec"] = _get_updated_jobspec(job_data)
if decode:
_decode_field(job_data, "jobspec")
if "eventlog" not in keys:
remove_eventlog = True
if "R" in keys:
if not base:
job_data["R"] = _get_updated_R(job_data)
if decode:
_decode_field(job_data, "R")
if "eventlog" not in keys:
remove_eventlog = True
if remove_eventlog:
job_data.pop("eventlog")


# jobs_kvs_lookup simple variant for one jobid
Expand All @@ -153,15 +101,21 @@ def job_kvs_lookup(
:base: For 'jobspec' or 'R', get base value, do not apply updates from eventlog
"""
keyslookup = list(keys)
_setup_lookup_keys(keyslookup, original, base)
payload = {"id": int(jobid), "keys": keyslookup, "flags": 0}
_setup_lookup_keys(keyslookup, original)
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
flags = 0
if not base:
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
payload = {"id": int(jobid), "keys": keyslookup, "flags": flags}
rpc = JobInfoLookupRPC(flux_handle, "job-info.lookup", payload)
try:
if decode:
rsp = rpc.get_decode()
else:
rsp = rpc.get()
_update_keys(rsp, decode, keys, original, base)
_update_keys(rsp, decode, keys, original)
# The job does not exist!
except FileNotFoundError:
return None
Expand Down Expand Up @@ -242,7 +196,7 @@ def __init__(
self.original = original
self.base = base
self.errors = []
_setup_lookup_keys(self.keyslookup, self.original, self.base)
_setup_lookup_keys(self.keyslookup, self.original)

def fetch_data(self):
"""Initiate the job info lookup to the Flux job-info module
Expand All @@ -255,9 +209,15 @@ def fetch_data(self):
JobKVSLookupFuture.errors is non-empty, then it will contain a
list of errors returned via the query.
"""
flags = 0
# N.B. JobInfoLookupRPC() has a "get_decode()" member
# function, so we will always get the non-decoded result from
# job-info.
if not self.base:
flags |= flux.constants.FLUX_JOB_LOOKUP_CURRENT
listids = JobKVSLookupFuture()
for jobid in self.ids:
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup))
listids.push(job_info_lookup(self.handle, jobid, self.keyslookup, flags))
return listids

def data(self):
Expand All @@ -276,5 +236,5 @@ def data(self):
if hasattr(rpc, "errors"):
self.errors = rpc.errors
for job_data in data:
_update_keys(job_data, self.decode, self.keys, self.original, self.base)
_update_keys(job_data, self.decode, self.keys, self.original)
return data
64 changes: 3 additions & 61 deletions src/cmd/flux-job.c
Original file line number Diff line number Diff line change
Expand Up @@ -3308,42 +3308,6 @@ int cmd_wait_event (optparse_t *p, int argc, char **argv)
return (0);
}

char *reconstruct_current_jobspec (const char *jobspec_str,
const char *eventlog_str)
{
json_t *jobspec;
json_t *eventlog;
size_t index;
json_t *entry;
char *result;

if (!(jobspec = json_loads (jobspec_str, 0, NULL)))
log_msg_exit ("error decoding jobspec");
if (!(eventlog = eventlog_decode (eventlog_str)))
log_msg_exit ("error decoding eventlog");
json_array_foreach (eventlog, index, entry) {
const char *name;
json_t *context;

if (eventlog_entry_parse (entry, NULL, &name, &context) < 0)
log_msg_exit ("error decoding eventlog entry");
if (streq (name, "jobspec-update")) {
const char *path;
json_t *value;

json_object_foreach (context, path, value) {
if (jpath_set (jobspec, path, value) < 0)
log_err_exit ("failed to update jobspec");
}
}
}
if (!(result = json_dumps (jobspec, JSON_COMPACT)))
log_msg_exit ("failed to encode jobspec object");
json_decref (jobspec);
json_decref (eventlog);
return result;
}

void info_usage (void)
{
fprintf (stderr,
Expand Down Expand Up @@ -3403,33 +3367,11 @@ int cmd_info (optparse_t *p, int argc, char **argv)
log_msg_exit ("Failed to unwrap J to get jobspec: %s", error.text);
val = new_val;
}
/* The current (non --base) jobspec has to be reconstructed by fetching
* the jobspec and the eventlog and updating the former with the latter.
*/
else if (!optparse_hasopt (p, "base") && streq (key, "jobspec")) {
const char *jobspec;
const char *eventlog;

// fetch the two keys in parallel
if (!(f = flux_rpc_pack (h,
"job-info.lookup",
FLUX_NODEID_ANY,
0,
"{s:I s:[ss] s:i}",
"id", id,
"keys", "jobspec", "eventlog",
"flags", 0))
|| flux_rpc_get_unpack (f,
"{s:s s:s}",
"jobspec", &jobspec,
"eventlog", &eventlog) < 0)
log_msg_exit ("%s", future_strerror (f, errno));
val = new_val = reconstruct_current_jobspec (jobspec, eventlog);
}
/* The current (non --base) R is obtained through the
/* The current (non --base) R and jobspec are obtained through the
* job-info.lookup RPC w/ the CURRENT flag.
*/
else if (!optparse_hasopt (p, "base") && streq (key, "R")) {
else if (!optparse_hasopt (p, "base")
&& (streq (key, "R") || streq (key, "jobspec"))) {
if (!(f = flux_rpc_pack (h,
"job-info.lookup",
FLUX_NODEID_ANY,
Expand Down
2 changes: 1 addition & 1 deletion src/common/libjob/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum job_lookup_flags {
FLUX_JOB_LOOKUP_JSON_DECODE = 1,
/* get current value of special fields by applying eventlog
* updates for those fields
* - currently works for R
* - currently works for jobspec and R
*/
FLUX_JOB_LOOKUP_CURRENT = 2,
};
Expand Down
13 changes: 11 additions & 2 deletions src/modules/job-info/lookup.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ static int lookup_current (struct lookup_ctx *l,

if (streq (key, "R"))
update_event_name = "resource-update";
else if (streq (key, "jobspec"))
update_event_name = "jobspec-update";

if (!(value_object = json_loads (value, 0, NULL))) {
errno = EINVAL;
Expand Down Expand Up @@ -205,6 +207,12 @@ static int lookup_current (struct lookup_ctx *l,
if (streq (name, update_event_name)) {
if (streq (key, "R"))
apply_updates_R (l->ctx->h, l->id, key, value_object, context);
else if (streq (key, "jobspec"))
apply_updates_jobspec (l->ctx->h,
l->id,
key,
value_object,
context);
}
}

Expand Down Expand Up @@ -288,7 +296,8 @@ static void info_lookup_continuation (flux_future_t *fall, void *arg)
}

if ((l->flags & FLUX_JOB_LOOKUP_CURRENT)
&& streq (keystr, "R")) {
&& (streq (keystr, "R")
|| streq (keystr, "jobspec"))) {
if (lookup_current (l, fall, keystr, s, &current_value) < 0)
goto error;
s = current_value;
Expand Down Expand Up @@ -442,7 +451,7 @@ static int lookup_cached (struct lookup_ctx *l)

key_str = json_string_value (key);

if (!streq (key_str, "R"))
if (!streq (key_str, "R") && !streq (key_str, "jobspec"))
return 0;

if ((ret = update_watch_get_cached (l->ctx,
Expand Down
16 changes: 15 additions & 1 deletion src/modules/job-info/update.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ static struct update_ctx *update_ctx_create (struct info_ctx *ctx,
goto error;
if (streq (key, "R"))
uc->update_name = "resource-update";
else if (streq (key, "jobspec"))
uc->update_name = "jobspec-update";
else {
errno = EINVAL;
goto error;
Expand Down Expand Up @@ -180,6 +182,12 @@ static void eventlog_continuation (flux_future_t *f, void *arg)
uc->key,
uc->update_object,
context);
else if (streq (uc->key, "jobspec"))
apply_updates_jobspec (uc->ctx->h,
uc->id,
uc->key,
uc->update_object,
context);

msg = flux_msglist_first (uc->msglist);
while (msg) {
Expand Down Expand Up @@ -315,6 +323,12 @@ static void lookup_continuation (flux_future_t *f, void *arg)
uc->key,
uc->update_object,
context);
else if (streq (uc->key, "jobspec"))
apply_updates_jobspec (uc->ctx->h,
uc->id,
uc->key,
uc->update_object,
context);
uc->initial_update_count++;
}
else if (streq (name, "clean"))
Expand Down Expand Up @@ -474,7 +488,7 @@ void update_watch_cb (flux_t *h, flux_msg_handler_t *mh,
errmsg = "update-watch request rejected without streaming RPC flag";
goto error;
}
if (!streq (key, "R")) {
if (!streq (key, "R") && !streq (key, "jobspec")) {
errno = EINVAL;
errmsg = "update-watch unsupported key specified";
goto error;
Expand Down
19 changes: 19 additions & 0 deletions src/modules/job-info/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ void apply_updates_R (flux_t *h,
}
}

void apply_updates_jobspec (flux_t *h,
flux_jobid_t id,
const char *key,
json_t *jobspec,
json_t *context)
{
const char *ckey;
json_t *value;

json_object_foreach (context, ckey, value) {
if (jpath_set (jobspec,
ckey,
value) < 0)
flux_log (h, LOG_INFO,
"%s: failed to update job %s %s",
__FUNCTION__, idf58 (id), key);
}
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
7 changes: 7 additions & 0 deletions src/modules/job-info/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ void apply_updates_R (flux_t *h,
json_t *R,
json_t *context);

/* apply context updates to the jobspec object */
void apply_updates_jobspec (flux_t *h,
flux_jobid_t id,
const char *key,
json_t *jobspec,
json_t *context);

#endif /* ! _FLUX_JOB_INFO_UTIL_H */

/*
Expand Down
1 change: 1 addition & 0 deletions t/python/t0014-job-kvslookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def test_info_00_job_info_lookup(self):
data = rpc.get()
self.check_jobspec_str(data, self.jobid1, 0)
data = rpc.get_decode()
self.check_jobspec_decoded(data, self.jobid1, 0)
self.assertEqual(data["id"], self.jobid1, 0)

def test_info_01_job_info_lookup_keys(self):
Expand Down

0 comments on commit 45ad784

Please sign in to comment.