Skip to content

Commit

Permalink
Merge pull request #5232 from grondo/batch-conf
Browse files Browse the repository at this point in the history
add `--conf` option for registering instance config in `flux-batch` and `flux-alloc`
  • Loading branch information
mergify[bot] committed Jun 7, 2023
2 parents c624084 + 1db7d1b commit 54865cf
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 14 deletions.
37 changes: 37 additions & 0 deletions doc/man1/common/submit-other-options.rst
Expand Up @@ -48,6 +48,43 @@ OTHER OPTIONS
encode multiple files. Note: As documented in RFC 14, the file names
``script`` and ``conf.json`` are both reserved.

**--conf=FILE|KEY=VAL|STRING|NAME**
The ``--conf`` option allows configuration for a Flux instance started
via ``flux-batch(1)`` or ``flux-alloc(1)`` to be iteratively built on
the command line. On first use, a ``conf.json`` entry is added to the
internal jobspec file archive, and ``-c{{tmpdir}}/conf.json`` is added
to the flux broker command line. Each subsequent use of the ``--conf``
option updates this configuration.

The argument to ``--conf`` may be in one of several forms:

* A multiline string, e.g. from a batch directive. In this case the string
is parsed as JSON or TOML::

# flux: --conf="""
# flux: [resource]
# flux: exclude = "0"
# flux: """

* A string containing a ``=`` character, in which case the argument is
parsed as ``KEY=VAL``, where ``VAL`` is parsed as JSON, e.g.::

--conf=resource.exclude=\"0\"

* A string ending in ``.json`` or ``.toml``, in which case configuration
is loaded from a JSON or TOML file.

* If none of the above conditions match, then the argument ``NAME`` is
assumed to refer to a "named" config file ``NAME.toml`` or ``NAME.json``
within the following search path, in order of precedence:

- ``XDG_CONFIG_HOME/flux/config`` or ``$HOME/.config/flux/config`` if
``XDG_CONFIG_HOME`` is not set

- ``$XDG_CONFIG_DIRS/flux/config`` or ``/etc/xdg/flux/config`` if
``XDG_CONFIG_DIRS`` is not set. Note that ``XDG_CONFIG_DIRS`` may
be a colon-separated path.

**--begin-time=+FSD|DATETIME**
Convenience option for setting a ``begin-time`` dependency for a job.
The job is guaranteed to start after the specified date and time.
Expand Down
2 changes: 1 addition & 1 deletion doc/man1/flux-jobs.rst
Expand Up @@ -470,7 +470,7 @@ The ``flux-jobs`` command supports registration of named output formats
in configuration files. The command loads configuration files from
``flux-jobs.EXT`` from the following paths in order of increasing precedence:

* ``$XDG_CONFIG_DIRS/flux`` or ``/etc/flux/xdg`` if ``XDG_CONFIG_DIRS`` is
* ``$XDG_CONFIG_DIRS/flux`` or ``/etc/xdg/flux`` if ``XDG_CONFIG_DIRS`` is
not set. Note that ``XDG_CONFIG_DIRS`` is traversed in reverse order
such that entries first in the colon separated path are highest priority.

Expand Down
2 changes: 2 additions & 0 deletions doc/test/spell.en.pws
Expand Up @@ -706,3 +706,5 @@ unbuffered
statex
pmix
SIGUSR
iteratively
noverify
1 change: 1 addition & 0 deletions src/bindings/python/flux/cli/alloc.py
Expand Up @@ -70,6 +70,7 @@ def init_jobspec(self, args):
num_nodes=args.nodes,
broker_opts=base.list_split(args.broker_opts),
exclusive=args.exclusive,
conf=args.conf.config,
)

# For --bg, always allocate a pty, but not interactive,
Expand Down
134 changes: 134 additions & 0 deletions src/bindings/python/flux/cli/base.py
Expand Up @@ -18,6 +18,7 @@
import json
import logging
import os
import pathlib
import re
import resource
import signal
Expand All @@ -28,12 +29,18 @@
from string import Template
from urllib.parse import parse_qs, urlparse

try:
import tomllib # novermin
except ModuleNotFoundError:
from flux.utils import tomli as tomllib

import flux
from flux import debugged, job, util
from flux.constraint.parser import ConstraintParser, ConstraintSyntaxError
from flux.idset import IDset
from flux.job import JobspecV1
from flux.progress import ProgressBar
from flux.util import dict_merge, set_treedict

LOGGER = logging.getLogger("flux")

Expand Down Expand Up @@ -375,6 +382,122 @@ def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, "env", items)


class BatchConfig:
"""Convenience class for handling a --conf=[FILE|KEY=VAL] option
Iteratively build a "config" dict from successive updates.
"""

loaders = {".toml": tomllib.load, ".json": json.load}

def __init__(self):
self.config = None

def update_string(self, value):
# Update config with JSON or TOML string
try:
conf = json.loads(value)
except json.decoder.JSONDecodeError:
# Try TOML
try:
conf = tomllib.loads(value)
except tomllib.TOMLDecodeError:
raise ValueError(
"--conf: failed to parse multiline as TOML or JSON"
) from None
self.config = dict_merge(self.config, conf)
return self

def update_keyval(self, keyval):
# dotted key (e.g. resource.noverify=true)
key, _, value = keyval.partition("=")
try:
value = json.loads(value)
except json.decoder.JSONDecodeError:
value = str(value)
set_treedict(self.config, key, value)
return self

def update_file(self, path, extension=".toml"):
# Update from file in the filesystem
try:
loader = self.loaders[extension]
except KeyError:
raise ValueError("--conf: {path} must end in .toml or .json")
try:
with open(path, "rb") as fp:
conf = loader(fp)
except OSError as exc:
raise ValueError(f"--conf: {exc}") from None
except (json.decoder.JSONDecodeError, tomllib.TOMLDecodeError) as exc:
raise ValueError(f"--conf: parse error: {path}: {exc}") from None
self.config = dict_merge(self.config, conf)
return self

def _find_config(self, name):
# Find a named config as either TOML or JSON in XDG search path
for path in util.xdg_searchpath(subdir="config"):
# Take the first matching filename preferring TOML:
for ext in (".toml", ".json"):
filename = f"{path}/{name}{ext}"
if os.path.exists(filename):
return filename, self.loaders[ext]
return None, None

def update_named_config(self, name):
# Update from a named configuration file in a standard path or paths.
filename, loader = self._find_config(name)
if filename is not None:
try:
with open(filename, "rb") as fp:
self.config = dict_merge(self.config, loader(fp))
return self
except (
OSError,
tomllib.TOMLDecodeError,
json.decoder.JSONDecodeError,
) as exc:
raise ValueError(f"--conf={name}: {filename}: {exc}") from None
raise ValueError(f"--conf: named config '{name}' not found")

def update(self, value):
"""
Update current config with value using the following rules:
- If value contains one or more newlines, parse it as a JSON or
TOML string.
- Otherwise, if value contains an ``=``, then parse it as a dotted
key and value, e.g. ``resource.noverify=true``. The value (part
after the ``=``) will be parsed as JSON.
- Otherwise, if value ends in ``.toml`` or ``.json`` treat value as
a path and attempt to parse contents of file as TOML or JSON.
- Otherwise, read a named config from a standard config search path.
Configuration can be updated iteratively. The end result is available
in the ``config`` attribute.
"""
if self.config is None:
self.config = {}
if "\n" in value:
return self.update_string(value)
if "=" in value:
return self.update_keyval(value)
extension = pathlib.Path(value).suffix
if extension in (".toml", ".json"):
return self.update_file(value, extension)
return self.update_named_config(value)


class ConfAction(argparse.Action):
"""Handle batch/alloc --conf option"""

def __call__(self, parser, namespace, values, option_string=None):
conf = getattr(namespace, "conf", None)
if conf is None:
conf = BatchConfig()
setattr(namespace, "conf", conf)
conf.update(values)


class Xcmd:
"""Represent a Flux job with mutable command and option args"""

Expand Down Expand Up @@ -1595,6 +1718,17 @@ def add_batch_alloc_args(parser):
Add "batch"-specific resource allocation arguments to parser object
which deal in slots instead of tasks.
"""
parser.add_argument(
"--conf",
metavar="CONF",
default=BatchConfig(),
action=ConfAction,
help="Set configuration for a child Flux instance. CONF may be a "
+ "multiline string in JSON or TOML, a configuration key=value, a "
+ "path to a JSON or TOML file, or a configuration loaded by name "
+ "from a standard search path. This option may specified multiple "
+ "times, in which case the config is iteratively updated.",
)
parser.add_argument(
"--broker-opts",
metavar="OPTS",
Expand Down
1 change: 1 addition & 0 deletions src/bindings/python/flux/cli/batch.py
Expand Up @@ -116,6 +116,7 @@ def init_jobspec(self, args):
num_nodes=args.nodes,
broker_opts=base.list_split(args.broker_opts),
exclusive=args.exclusive,
conf=args.conf.config,
)

# Default output is flux-{{jobid}}.out
Expand Down
21 changes: 19 additions & 2 deletions src/bindings/python/flux/job/Jobspec.py
Expand Up @@ -897,6 +897,7 @@ def from_batch_command(
num_nodes=None,
broker_opts=None,
exclusive=False,
conf=None,
):
"""
Create a Jobspec describing a nested Flux instance controlled by
Expand Down Expand Up @@ -928,6 +929,11 @@ def from_batch_command(
individual nodes
broker_opts (iterable of `str`): options to pass to the new Flux
broker
conf (dict): optional broker configuration to pass to the
child instance brokers. If set, `conf` will be set in the
jobspec 'files' (RFC 37 File Archive) attribute as `conf.json`,
and broker_opts will be extended to add
`-c{{tmpdir}}/conf.json`
"""
if not script.startswith("#!"):
raise ValueError(f"{jobname} does not appear to start with '#!'")
Expand All @@ -940,6 +946,7 @@ def from_batch_command(
num_nodes=num_nodes,
broker_opts=broker_opts,
exclusive=exclusive,
conf=conf,
)
# Copy script contents into jobspec
jobspec.add_file("script", script, perms=0o700, encoding="utf-8")
Expand All @@ -956,6 +963,7 @@ def from_nest_command(
num_nodes=None,
broker_opts=None,
exclusive=False,
conf=None,
):
"""
Create a Jobspec describing a nested Flux instance controlled by
Expand All @@ -981,8 +989,15 @@ def from_nest_command(
individual nodes
broker_opts (iterable of `str`): options to pass to the new Flux
broker
"""
broker_opts = () if broker_opts is None else broker_opts
conf (dict): optional broker configuration to pass to the
child instance brokers. If set, `conf` will be set in the
jobspec 'files' (RFC 37 File Archive) attribute as `conf.json`,
and broker_opts will be extended to add
`-c{{tmpdir}}/conf.json`
"""
broker_opts = [] if broker_opts is None else broker_opts
if conf is not None:
broker_opts.append("-c{{tmpdir}}/conf.json")
jobspec = cls.from_command(
command=["flux", "broker", *broker_opts, *command],
num_tasks=num_slots,
Expand All @@ -993,4 +1008,6 @@ def from_nest_command(
)
jobspec.setattr_shell_option("per-resource.type", "node")
jobspec.setattr_shell_option("mpi", "none")
if conf is not None:
jobspec.add_file("conf.json", conf)
return jobspec
29 changes: 18 additions & 11 deletions src/bindings/python/flux/util.py
Expand Up @@ -987,6 +987,23 @@ def dict_merge(src, new):
return src


def xdg_searchpath(subdir=""):
"""
Build standard Flux config search path based on XDG specification
"""
# Start with XDG_CONFIG_HOME (or ~/.config) since it is the
# highest precedence:
confdirs = [os.getenv("XDG_CONFIG_HOME") or f"{Path.home()}/.config"]

# Append XDG_CONFIG_DIRS as colon separated path (or /etc/xdg)
# Note: colon separated paths are in order of precedence.
confdirs += (os.getenv("XDG_CONFIG_DIRS") or "/etc/xdg").split(":")

# Append "/flux" (with optional subdir) to all members of
# confdirs to build searchpath:
return [Path(directory, "flux", subdir) for directory in confdirs]


class UtilConfig:
"""
Very simple class for loading hierarchical configuration for Flux
Expand Down Expand Up @@ -1032,17 +1049,7 @@ def __init__(self, name, toolname=None, subcommand=None, initial_dict=None):
# specification. Later this will be reversed since we want
# to traverse the paths in _reverse_ precedence order in this
# implementation.
#
# Start with XDG_CONFIG_HOME (or ~/.config) since it is the
# highest precedence:
confdirs = [os.getenv("XDG_CONFIG_HOME") or f"{Path.home()}/.config"]

# Append XDG_CONFIG_DIRS as colon separated path (or /etc/xdg)
# Note: colon separated paths are in order of precedence.
confdirs += (os.getenv("XDG_CONFIG_DIRS") or "/etc/xdg").split(":")

# Append "/flux" to all members of confdirs to build searchpath:
self.searchpath = [Path(directory, "flux") for directory in confdirs]
self.searchpath = xdg_searchpath()

# Reorder searchpath into reverse precedence order
self.searchpath.reverse()
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Expand Up @@ -209,6 +209,7 @@ TESTSCRIPTS = \
t2713-python-cli-bulksubmit.t \
t2714-python-cli-batch.t \
t2715-python-cli-cancel.t \
t2716-python-cli-batch-conf.t \
t2800-jobs-cmd.t \
t2800-jobs-recursive.t \
t2800-jobs-instance-info.t \
Expand Down

0 comments on commit 54865cf

Please sign in to comment.