Skip to content

Commit

Permalink
Merge pull request #35 from flux-framework/add/flux-option-flags
Browse files Browse the repository at this point in the history
adding support for flux option flags
  • Loading branch information
vsoch committed Dec 17, 2022
2 parents 60f9749 + 301cd74 commit d894600
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 9 deletions.
37 changes: 37 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import logging
import os
import re
import shlex

from pydantic import BaseSettings

logger = logging.getLogger(__name__)


def get_int_envar(key, default=None):
"""
Expand All @@ -24,6 +29,35 @@ def get_bool_envar(key, default=False):
return default if not os.environ.get(key) else not default


def get_option_flags(key, prefix="-o"):
"""
Wrapper around parse_option_flags to get from environment.
The function can then be shared to parse flags from the UI
in the same way.
"""
flags = os.environ.get(key) or {}
if not flags:
return flags
return parse_option_flags(flags, prefix)


def parse_option_flags(flags, prefix="-o"):
"""
Parse key value pairs (optionally with a prefix) from the environment.
"""
values = {}
for flag in shlex.split(flags):
if "=" not in flag:
logger.warning(f"Missing '=' in flag {flag}, cannot parse.")
continue
option, value = flag.split("=", 1)
if option.startswith(prefix):
option = re.sub(f"^{prefix}", "", option)
values[option] = value
return values


class Settings(BaseSettings):
"""
Basic settings and defaults for the Flux RESTFul API
Expand All @@ -41,6 +75,9 @@ class Settings(BaseSettings):
flux_token: str = os.environ.get("FLUX_TOKEN")
require_auth: bool = get_bool_envar("FLUX_REQUIRE_AUTH")

# Default server option flags
option_flags: dict = get_option_flags("FLUX_OPTION_FLAGS")

# If the user requests a launcher, be strict.
# We only allow nextflow and snakemake, sorry
known_launchers: list = ["nextflow", "snakemake"]
Expand Down
21 changes: 20 additions & 1 deletion app/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from fastapi import Request

import app.core.config as config
import app.library.flux as flux_cli


Expand All @@ -17,6 +18,7 @@ def __init__(self, request: Request):
self.cores_per_task: Optional[int] = None
self.gpus_per_task: Optional[int] = None
self.exclusive: Optional[bool] = False
self.option_flags: Optional[str] = None
self.is_launcher: Optional[bool] = False
self.exclusive: Optional[bool] = False

Expand All @@ -28,6 +30,7 @@ async def load_data(self):
self.num_nodes = form.get("num_nodes")
self.runtime = form.get("runtime") or 0
self.cores_per_task = form.get("cores_per_task")
self.option_flags = form.get("option_flags")
self.gpus_per_task = form.get("gpus_per_task")
self.exclusive = True if form.get("exclusive") == "on" else False
self.is_launcher = True if form.get("is_launcher") == "on" else False
Expand All @@ -40,7 +43,8 @@ def kwargs(self):
kwargs = {}
as_int = ["num_tasks", "num_nodes", "cores_per_task", "gpus_per_task"]
as_bool = ["exclusive", "is_launcher"]
for key in as_int + as_bool + ["command"]:
as_str = ["command"]
for key in as_int + as_bool + as_str:
if getattr(self, key, None) is not None:
value = getattr(self, key)

Expand All @@ -55,8 +59,23 @@ def kwargs(self):
kwargs[key] = value
else:
kwargs[key] = value

# Custom parsing (or just addition) of settings
kwargs["option_flags"] = self.get_option_flags()
return kwargs

def get_option_flags(self):
"""
Get option flags from the form, add on to server defaults.
"""
flags = config.settings.option_flags
option_flags = getattr(self, "option_flags", None)

# The user setting in the UI takes precedence over the server!
if option_flags not in [None, ""]:
flags.update(config.parse_option_flags(option_flags))
return flags

def is_valid(self):
"""
Determine if the form is valid (devoid of errors)
Expand Down
24 changes: 20 additions & 4 deletions app/library/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ def validate_submit_kwargs(kwargs, envars=None, runtime=None):
f"The server only has {settings.flux_nodes} nodes, you requested {num_nodes}"
)

# Make sure if option_flags defined, we don't have a -o prefix
option_flags = kwargs.get("option_flags") or {}
if not isinstance(option_flags, dict):
errors.append(
f"Please provide option args as a dictionary, type {type(option_flags)} is invalid."
)
else:
for option, _ in option_flags.items():
if "-o" in option:
errors.append(f"Please provide keys without -o, {option} is invalid.")

# If the user asks for gpus and we don't have any, no go
if "gpus_per_task" in kwargs and not settings.has_gpus:
errors.append("This server does not support gpus: gpus_per_task cannot be set.")
Expand All @@ -51,6 +62,7 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None):
After validation, prepare the job (shared function).
"""
envars = envars or {}
option_flags = kwargs.get("option_flags") or {}

# Generate the flux job
command = kwargs["command"]
Expand All @@ -59,12 +71,16 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None):
print(f"⭐️ Command being submit: {command}")

# Delete command from the kwargs (we added because is required and validated that way)
del kwargs["command"]
# From the command line API client is_launcher won't be here, in the UI it will.
for key in ["command", "option_flags", "is_launcher"]:
if key in kwargs:
del kwargs[key]

# From the command line API client this won't be here, in the UI it will.
if "is_launcher" in kwargs:
del kwargs["is_launcher"]
# Assemble the flux job!
fluxjob = flux.job.JobspecV1.from_command(command, **kwargs)
for option, value in option_flags.items():
print(f"⭐️ Setting shell option: {option}={value}")
fluxjob.setattr_shell_option(option, value)

print(f"⭐️ Workdir provided: {workdir}")
if workdir is not None:
Expand Down
7 changes: 4 additions & 3 deletions app/routers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.templating import Jinja2Templates

import app.core.config as config
import app.library.flux as flux_cli
import app.library.helpers as helpers
import app.library.launcher as launcher
from app.core.config import settings
from app.library.auth import alert_auth, check_auth

# Print (hidden message) to give status of auth
alert_auth()
router = APIRouter(
prefix="/v1",
tags=["jobs"],
dependencies=[Depends(check_auth)] if settings.require_auth else [],
dependencies=[Depends(check_auth)] if config.settings.require_auth else [],
responses={404: {"description": "Not found"}},
)
no_auth_router = APIRouter(prefix="/v1", tags=["jobs"])
Expand Down Expand Up @@ -155,8 +155,9 @@ async def submit_job(request: Request):
# Optional arguments
as_int = ["num_tasks", "cores_per_task", "gpus_per_task", "num_nodes"]
as_bool = ["exclusive"]
as_is = ["option_flags"]

for optional in as_int + as_bool:
for optional in as_int + as_bool + as_is:
if optional in payload and payload[optional]:
if optional in as_bool:
kwargs[optional] = bool(payload[optional])
Expand Down
1 change: 1 addition & 0 deletions clients/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are:
The versions coincide with releases on pip. Only major versions will be released as tags on Github.

## [0.0.x](https://github.com/flux-framework/flux-restful-api/tree/main) (0.0.x)
- support for adding option flags to submit (0.0.15)
- support for `is_launcher` parameter to indicate a launcher should be used instead (0.0.14)
- support for streaming job output (0.0.13)
- ensure logs end with one newline! (0.0.12)
Expand Down
3 changes: 3 additions & 0 deletions clients/python/flux_restful_client/main/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def submit(self, command, **kwargs):
cores_per_task (int): Number of cores per task (default to 1)
gpus_per_task (int): Number of gpus per task (defaults to None)
num_nodes (int): Number of nodes (defaults to None)
option_flags (dict): Option flags (as dict, defaults to {})
exclusive (bool): is the job exclusive? (defaults to False)
is_launcher (bool): the command should be submit to a launcher.
This is currently supported for snakemake and nextflow.
Expand All @@ -225,6 +226,7 @@ def submit(self, command, **kwargs):
"num_tasks",
"cores_per_task",
"gpus_per_task",
"option_flags",
"num_nodes",
"exclusive",
"is_launcher",
Expand All @@ -237,6 +239,7 @@ def submit(self, command, **kwargs):
data[optional] = kwargs[optional]

# Validate the data first.
print(data)
jsonschema.validate(data, schema=schemas.job_submit_schema)
result = self.do_request("jobs/submit", "POST", data=data)
if result.status_code == 404:
Expand Down
4 changes: 4 additions & 0 deletions clients/python/flux_restful_client/main/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
"type": ["number", "null"],
"description": "number of nodes for the job.",
},
"option_flags": {
"type": "object",
"description": "option flags (dict) or key value pairs for flux.",
},
"exclusive": {
"type": ["boolean", "null"],
"description": "ask for exclusive nodes for the job.",
Expand Down
18 changes: 18 additions & 0 deletions clients/python/flux_restful_client/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import time

import jsonschema
import pytest
from flux_restful_client.main import get_client


Expand Down Expand Up @@ -77,6 +79,22 @@ def test_job_output():
assert "pancakes 🥞️🥞️🥞️\n" in lines["Output"]


def test_option_flags():
"""
Test adding valid and invalid option flags
"""
client = get_client()

with pytest.raises(jsonschema.ValidationError):
client.submit("sleep 1", option_flags="invalid format")

# The server should reject a key with -o
result = client.submit("sleep 1", option_flags={"-ompi": "noodles"})
assert "Errors" in result
assert result["Errors"]
assert "keys without -o" in result["Errors"][0]


def test_job_query():
"""
Test endpoint to query jobs
Expand Down
2 changes: 1 addition & 1 deletion clients/python/flux_restful_client/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.14"
__version__ = "0.0.15"
AUTHOR = "Vanessa Sochat"
EMAIL = "vsoch@users.noreply.github.com"
NAME = "flux-restful-client"
Expand Down
24 changes: 24 additions & 0 deletions docs/getting_started/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,30 @@ The following variables are available (with their defaults):
|FLUX_USER| The username to require for Basic Auth (if `FLUX_REQUIRE_AUTH` is set) | unset |
|FLUX_HAS_GPU | GPUs are available for the user to request | unset |
|FLUX_NUMBER_NODES| The number of nodes available in the cluster | 1 |
|FLUX_OPTION_FLAGS | Option flags to give to flux, in the same format you'd give on the command line | unset |

### Flux Option Flags

Option flags can be set server-wide or on the fly by a user in the interface
(or restful API). An option set by a user will over-ride the server setting.
An example setting a server-level option flags is below:

```bash
export FLUX_OPTION_FLAGS="-ompi=openmpi@5"
```

This would be translated to:

```python
fluxjob = flux.job.JobspecV1.from_command(command, **kwargs)
fluxjob.setattr_shell_option("mpi", "openmpi@5")
```

And note that you can easily set more than one:

```bash
export FLUX_OPTION_FLAGS="-ompi=openmpi@5 -okey=value"
```

## Code Linting

Expand Down
5 changes: 5 additions & 0 deletions templates/jobs/submit.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ <h3>Submit a Job</h3>
<input type="number" min="1" max="{{num_nodes}}" name="num_nodes" class="form-control" id="num_nodes" aria-describedby="num_nodesHelp">
<div id="num_nodesHelp" class="form-text">Number of nodes to request for the job, defaults to 1 (optional).</div>
</div>
<div class="mb-3">
<label for="workdir" class="form-label">Option Flags</label>
<input class="form-control" {% if form.option_flags %}value="{{ form.option_flags }}"{% endif %} name="option_fags" id="option_flags" aria-describedby="optionFlagsHelp">
<div id="optionFlagsHelp" class="form-text">One off option flags, space separated (e.g.,) -ompi=openmpi@5 (optional).</div>
</div>
<div class="mb-3 form-check">
<input type="checkbox" class="form-check-input" name="exclusive" {% if form.exclusive %}checked{% endif %} id="exclusive" aria-describedby="exclusiveHelp">
<label class="form-check-label" for="exclusive">Exclusive</label>
Expand Down
39 changes: 39 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,45 @@ def test_cancel_job():
# TODO we don't have way to actually verify that cancel happened


def test_submit_option_flags():
"""
Test that option flags are parsed.
"""
# Submit a job with invalid flags
response = client.post(
"/v1/jobs/submit",
json={"command": "sleep 1", "option_flags": "invalid"},
headers=headers,
)
assert response.status_code == 400
result = response.json()
errors = result.get("Errors")
assert errors
assert "is invalid" in errors[0]

# Another invalid pattern
response = client.post(
"/v1/jobs/submit",
json={"command": "sleep 1", "option_flags": {"-ompi": "nope"}},
headers=headers,
)
assert response.status_code == 400
result = response.json()
errors = result.get("Errors")
assert errors
assert "keys without -o" in errors[0]

# Valid
response = client.post(
"/v1/jobs/submit",
json={"command": "sleep 1", "option_flags": {"ompi": "openmpi@5"}},
headers=headers,
)
assert response.status_code == 200
result = response.json()
assert "id" in result


def test_job_output():
"""
Test endpoint to retrieve list of job output
Expand Down

0 comments on commit d894600

Please sign in to comment.