Skip to content

Commit

Permalink
Merge pull request #3045 from grondo/f58
Browse files Browse the repository at this point in the history
Add support for RFC19 F58 encoded JOBIDs
  • Loading branch information
mergify[bot] committed Jul 24, 2020
2 parents 997d9b1 + b0e2635 commit 7953a1a
Show file tree
Hide file tree
Showing 13 changed files with 894 additions and 109 deletions.
82 changes: 82 additions & 0 deletions src/bindings/python/flux/job.py
Expand Up @@ -86,6 +86,88 @@ def job_kvs_guest(flux_handle, jobid):
return flux.kvs.get_dir(flux_handle, kvs_key)


def id_parse(jobid_str):
"""
returns: An integer jobid
:rtype int
"""
jobid = ffi.new("flux_jobid_t[1]")
RAW.id_parse(jobid_str, jobid)
return int(jobid[0])


def id_encode(jobid, encoding="f58"):
"""
returns: Jobid encoded in encoding
:rtype str
"""
buflen = 128
buf = ffi.new("char[]", buflen)
RAW.id_encode(int(jobid), encoding, buf, buflen)
return ffi.string(buf, buflen).decode("utf-8")


class JobID(int):
"""Class used to represent a Flux JOBID
JobID is a subclass of `int`, so may be used in place of integer.
However, a JobID may be created from any valid RFC 19 FLUID
encoding, including:
- decimal integer (no prefix)
- hexidecimal integer (prefix 0x)
- dotted hex (dothex) (xxxx.xxxx.xxxx.xxxx)
- kvs dir (dotted hex with `job.` prefix)
- RFC19 F58: (Base58 encoding with prefix `ƒ` or `f`)
A JobID object also has properties for encoding a JOBID into each
of the above representations, e.g. jobid.f85, jobid.words, jobid.dothex...
"""

def __new__(cls, value, *args, **kwargs):
if isinstance(value, int):
jobid = value
else:
jobid = id_parse(value)
return super(cls, cls).__new__(cls, jobid)

def encode(self, encoding="dec"):
"""Encode a JobID to alternate supported format"""
return id_encode(self, encoding)

@property
def f58(self):
"""Return RFC19 F58 representation of a JobID"""
return self.encode("f58")

@property
def hex(self):
"""Return 0x-prefixed hexidecimal representation of a JobID"""
return self.encode("hex")

@property
def dothex(self):
"""Return dotted hexidecimal representation of a JobID"""
return self.encode("dothex")

@property
def words(self):
"""Return words (mnemonic) representation of a JobID"""
return self.encode("words")

@property
def kvs(self):
"""Return KVS directory path of a JobID"""
return self.encode("kvs")

def __str__(self):
return self.encode()

def __repr__(self):
return f"JobID({self})"


class SubmitFuture(Future):
def get_id(self):
return submit_get_id(self)
Expand Down
2 changes: 1 addition & 1 deletion src/bindings/python/flux/wrapper.py
Expand Up @@ -190,7 +190,7 @@ def __call__(self, calling_object, *args_in):
# Unpack wrapper objects
args[i] = args[i].handle
elif isinstance(args[i], six.text_type):
args[i] = args[i].encode("utf-8")
args[i] = args[i].encode("utf-8", errors="surrogateescape")

try:
result = self.fun(*args)
Expand Down
85 changes: 29 additions & 56 deletions src/cmd/flux-job.c
Expand Up @@ -33,7 +33,6 @@
#endif
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/fluid.h"
#include "src/common/libjob/job.h"
#include "src/common/libutil/read_all.h"
#include "src/common/libutil/monotime.h"
Expand Down Expand Up @@ -244,12 +243,8 @@ static struct optparse_option status_opts[] = {
};

static struct optparse_option id_opts[] = {
{ .name = "from", .key = 'f', .has_arg = 1,
.arginfo = "dec|kvs|hex|words",
.usage = "Convert jobid from specified form",
},
{ .name = "to", .key = 't', .has_arg = 1,
.arginfo = "dec|kvs|hex|words",
.arginfo = "dec|kvs|hex|dothex|words|f58",
.usage = "Convert jobid to specified form",
},
OPTPARSE_TABLE_END
Expand Down Expand Up @@ -510,6 +505,14 @@ int main (int argc, char *argv[])
return (exitval);
}

static flux_jobid_t parse_jobid (const char *s)
{
flux_jobid_t id;
if (flux_job_id_parse (s, &id) < 0)
log_msg_exit ("error parsing jobid: \"%s\"", s);
return id;
}

/* Parse a free argument 's', expected to be a 64-bit unsigned.
* On error, exit complaining about parsing 'name'.
*/
Expand Down Expand Up @@ -609,7 +612,7 @@ int cmd_priority (optparse_t *p, int argc, char **argv)
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

id = parse_arg_unsigned (argv[optindex++], "jobid");
id = parse_jobid (argv[optindex++]);
priority = parse_arg_unsigned (argv[optindex++], "priority");

if (!(f = flux_job_set_priority (h, id, priority)))
Expand All @@ -636,7 +639,7 @@ int cmd_raise (optparse_t *p, int argc, char **argv)
exit (1);
}

id = parse_arg_unsigned (argv[optindex++], "jobid");
id = parse_jobid (argv[optindex++]);
if (optindex < argc)
note = parse_arg_message (argv + optindex, "message");

Expand Down Expand Up @@ -842,7 +845,7 @@ int cmd_kill (optparse_t *p, int argc, char **argv)
exit (1);
}

id = parse_arg_unsigned (argv[optindex++], "jobid");
id = parse_jobid (argv[optindex++]);

s = optparse_get_str (p, "signal", "SIGTERM");
if ((signum = str2signum (s))< 0)
Expand Down Expand Up @@ -932,7 +935,7 @@ int cmd_cancel (optparse_t *p, int argc, char **argv)
exit (1);
}

id = parse_arg_unsigned (argv[optindex++], "jobid");
id = parse_jobid (argv[optindex++]);
if (optindex < argc)
note = parse_arg_message (argv + optindex, "message");

Expand Down Expand Up @@ -1118,7 +1121,7 @@ int cmd_list_ids (optparse_t *p, int argc, char **argv)

ids_len = argc - optindex;
for (i = 0; i < ids_len; i++) {
flux_jobid_t id = parse_arg_unsigned (argv[optindex + i], "id");
flux_jobid_t id = parse_jobid (argv[optindex + i]);
flux_future_t *f;
if (!(f = flux_job_list_id (h, id, list_attrs)))
log_err_exit ("flux_job_list_id");
Expand Down Expand Up @@ -2020,7 +2023,7 @@ int cmd_attach (optparse_t *p, int argc, char **argv)
optparse_print_usage (p);
exit (1);
}
ctx.id = parse_arg_unsigned (argv[optindex++], "jobid");
ctx.id = parse_jobid (argv[optindex++]);
ctx.p = p;

if (!(ctx.h = flux_open (NULL, 0)))
Expand Down Expand Up @@ -2191,7 +2194,7 @@ int cmd_status (optparse_t *p, int argc, char **argv)

for (i = 0; i < njobs; i++) {
struct job_status *stat = &stats[i];
stat->id = parse_arg_unsigned (argv[optindex+i], "jobid");
stat->id = parse_jobid (argv[optindex+i]);
stat->exception_exit_code = exception_exit_code;

if (!(f = flux_job_event_watch (h, stat->id, "eventlog", 0)))
Expand Down Expand Up @@ -2239,51 +2242,21 @@ int cmd_status (optparse_t *p, int argc, char **argv)

void id_convert (optparse_t *p, const char *src, char *dst, int dstsz)
{
const char *from = optparse_get_str (p, "from", "dec");
const char *to = optparse_get_str (p, "to", "dec");
flux_jobid_t id;

/* src to id
/* Parse as any valid JOBID
*/
if (!strcmp (from, "dec")) {
id = parse_arg_unsigned (src, "input");
}
else if (!strcmp (from, "hex")) {
if (fluid_decode (src, &id, FLUID_STRING_DOTHEX) < 0)
log_msg_exit ("%s: malformed input", src);
}
else if (!strcmp (from, "kvs")) {
if (strncmp (src, "job.", 4) != 0)
log_msg_exit ("%s: missing 'job.' prefix", src);
if (fluid_decode (src + 4, &id, FLUID_STRING_DOTHEX) < 0)
log_msg_exit ("%s: malformed input", src);
}
else if (!strcmp (from, "words")) {
if (fluid_decode (src, &id, FLUID_STRING_MNEMONIC) < 0)
log_msg_exit ("%s: malformed input", src);
}
else
log_msg_exit ("Unknown from=%s", from);
if (flux_job_id_parse (src, &id) < 0)
log_msg_exit ("%s: malformed input", src);

/* id to dst
/* Now encode into requested representation:
*/
if (!strcmp (to, "dec")) {
snprintf (dst, dstsz, "%ju", (uintmax_t) id);
}
else if (!strcmp (to, "kvs")) {
if (flux_job_kvs_key (dst, dstsz, id, NULL) < 0)
log_msg_exit ("error encoding id");
if (flux_job_id_encode (id, to, dst, dstsz) < 0) {
if (errno == EPROTO)
log_msg_exit ("Unknown to=%s", to);
log_msg_exit ("Unable to encode id %ju to %s", (uintmax_t) id, to);
}
else if (!strcmp (to, "hex")) {
if (fluid_encode (dst, dstsz, id, FLUID_STRING_DOTHEX) < 0)
log_msg_exit ("error encoding id");
}
else if (!strcmp (to, "words")) {
if (fluid_encode (dst, dstsz, id, FLUID_STRING_MNEMONIC) < 0)
log_msg_exit ("error encoding id");
}
else
log_msg_exit ("Unknown to=%s", to);
}

char *trim_string (char *s)
Expand Down Expand Up @@ -2325,7 +2298,7 @@ int cmd_id (optparse_t *p, int argc, char **argv)
static void print_job_namespace (const char *src)
{
char ns[64];
flux_jobid_t id = parse_arg_unsigned (src, "jobid");
flux_jobid_t id = parse_jobid (src);
if (flux_job_kvs_namespace (ns, sizeof (ns), id) < 0)
log_msg_exit ("error getting kvs namespace for %ju", id);
printf ("%s\n", ns);
Expand Down Expand Up @@ -2501,7 +2474,7 @@ int cmd_eventlog (optparse_t *p, int argc, char **argv)
exit (1);
}

ctx.id = parse_arg_unsigned (argv[optindex++], "jobid");
ctx.id = parse_jobid (argv[optindex++]);
ctx.path = optparse_get_str (p, "path", "eventlog");
ctx.p = p;
entry_format_parse_options (p, &ctx.e);
Expand Down Expand Up @@ -2656,7 +2629,7 @@ int cmd_wait_event (optparse_t *p, int argc, char **argv)
optparse_print_usage (p);
exit (1);
}
ctx.id = parse_arg_unsigned (argv[optindex++], "jobid");
ctx.id = parse_jobid (argv[optindex++]);
ctx.p = p;
ctx.wait_event = argv[optindex++];
ctx.timeout = optparse_get_duration (p, "timeout", -1.0);
Expand Down Expand Up @@ -2734,7 +2707,7 @@ int cmd_info (optparse_t *p, int argc, char **argv)
exit (1);
}

ctx.id = parse_arg_unsigned (argv[optindex++], "jobid");
ctx.id = parse_jobid (argv[optindex++]);

if (!(ctx.keys = json_array ()))
log_msg_exit ("json_array");
Expand Down Expand Up @@ -2800,7 +2773,7 @@ int cmd_wait (optparse_t *p, int argc, char **argv)
exit (1);
}
if (optindex < argc) {
id = parse_arg_unsigned (argv[optindex++], "jobid");
id = parse_jobid (argv[optindex++]);
if (optparse_hasopt (p, "all"))
log_err_exit ("jobid not supported with --all");
}
Expand Down
21 changes: 17 additions & 4 deletions src/cmd/flux-jobs.py
Expand Up @@ -22,11 +22,11 @@
import json
from datetime import datetime, timedelta

import flux.job
import flux.constants
import flux.util
from flux.core.inner import raw
from flux.memoized_property import memoized_property
from flux.job import JobID

LOGGER = logging.getLogger("flux-jobs")

Expand Down Expand Up @@ -129,6 +129,9 @@ def __init__(self, info_resp):
combined_dict = self.defaults.copy()
combined_dict.update(info_resp)

# Cast jobid to JobID
combined_dict["id"] = JobID(combined_dict["id"])

# Rename "state" to "state_id" and "result" to "result_id"
# until returned state is a string:
if "state" in combined_dict:
Expand Down Expand Up @@ -305,6 +308,11 @@ def fetch_jobs_flux(args, fields):
# Note there is no attr for "id", its always returned
fields2attrs = {
"id": (),
"id.hex": (),
"id.f58": (),
"id.kvs": (),
"id.words": (),
"id.dothex": (),
"userid": ("userid",),
"username": ("userid",),
"priority": ("priority",),
Expand Down Expand Up @@ -477,8 +485,8 @@ def parse_args():
)
parser.add_argument(
"jobids",
type=int,
metavar="JOBID",
type=JobID,
nargs="*",
help="Limit output to specific Job IDs",
)
Expand Down Expand Up @@ -549,6 +557,11 @@ def get_field(self, field_name, args, kwargs):
# List of legal format fields and their header names
headings = {
"id": "JOBID",
"id.hex": "JOBID",
"id.f58": "JOBID",
"id.kvs": "JOBID",
"id.words": "JOBID",
"id.dothex": "JOBID",
"userid": "UID",
"username": "USER",
"priority": "PRI",
Expand All @@ -571,7 +584,7 @@ def get_field(self, field_name, args, kwargs):
"t_inactive": "T_INACTIVE",
"runtime": "RUNTIME",
"status": "STATUS",
"status_abbrev": "STATUS",
"status_abbrev": "ST",
"exception.occurred": "EXCEPTION-OCCURRED",
"exception.severity": "EXCEPTION-SEVERITY",
"exception.type": "EXCEPTION-TYPE",
Expand Down Expand Up @@ -636,7 +649,7 @@ def main():
fmt = args.format
else:
fmt = (
"{id:>18} {username:<8.8} {name:<10.10} {status_abbrev:>6.6} "
"{id.f58:>12} {username:<8.8} {name:<10.10} {status_abbrev:>2.2} "
"{ntasks:>6} {nnodes:>6h} {runtime!F:>8h} "
"{ranks:h}"
)
Expand Down

0 comments on commit 7953a1a

Please sign in to comment.