The code and examples that this tutorial is based on can be found here: https://github.com/flux-framework/flux-workflow-examples.git

# Creating Flux Instances

Flux includes a sub-command for bootstrapping: `flux start`.  On an HPC system, you would use this in the same way as an MPI application; for example on a Slurm cluster, you would run `srun flux start`.  For local development and testing purposes, you can start multiple broker ranks on a single node by passing the `--size=N` flag to `flux start`.  For example, to start a Flux session with 4 brokers on the local node:

In [1]:
!flux start --size=4 flux getattr size

4


# Submitting Jobs to Flux
## Submission CLI

To submit jobs to Flux, you can use the `flux mini` command, which has several sub-commands: `submit`, `run`, `bulksubmit`, `batch`, and `alloc`.  The `flux mini submit` command submits a job to Flux and prints out the jobid. 

In [2]:
!flux mini submit hostname

ƒASYbkbZ


The `flux mini run` command submit a job to Flux (similar to `flux mini submit`) but then it attaches to the job (with `flux job attach`), printing the job's stdout/stderr to the terminal and exiting with the same exit code as the job:

In [3]:
!flux mini run /bin/false

flux-job: task(s) exited with exit code 1


`flux mini submit` and `flux mini run` also support many other useful flags:

In [4]:
!flux mini run -n4 --label-io --time-limit=5s --env-remove=LD_LIBRARY_PATH hostname
!flux mini run --help

1: 1edf4b264187
3: 1edf4b264187
0: 1edf4b264187
2: 1edf4b264187
usage: flux-mini run [-h] [-t FSD] [--urgency N] [--job-name NAME] [-o OPT]
                     [--setattr ATTR=VAL] [--env RULE] [--env-remove PATTERN]
                     [--env-file FILE] [--input FILENAME] [--output FILENAME]
                     [--error FILENAME] [-l] [--flags FLAGS] [--dry-run]
                     [-N N] [-n N] [-c N] [-g N] [-v]
                     ...

positional arguments:
  command                   Job command and arguments

optional arguments:
  -h, --help                show this help message and exit
  -t, --time-limit=FSD      Time limit in Flux standard duration, e.g. 2d,
                            1.5h
      --urgency=N           Set job urgency (0-31), hold=0, default=16,
                            expedite=31
      --job-name=NAME       Set an optional name for job to NAME
  -o, --setopt=OPT          Set shell option OPT. An optional value is
                            supported 

The `flux mini bulksubmit` makes submitting the same executable repeatedly very simple.  It leverages the same syntax as GNU's parallel:

In [5]:
!flux mini bulksubmit --watch --wait echo {} ::: foo bar baz

ƒBBk6prK
ƒBBmap8f
ƒBBmap8g
foo
baz
bar


Of course, Flux can launch more than just single-node, single-core jobs.  We can submit multiple heterogeneous jobs, and Flux will co-schedule the jobs while also ensuring no oversubscription of resources (e.g., cores).

Note: in this tutorial, we cannot assume that the host you are running on has multiple cores, thus the examples below only vary the number of nodes per job.  Varying the `cores-per-task` is also possible on Flux when the underlying hardware supports it (e.g., a multi-core node).

In [10]:
!flux mini submit --nodes=2 --ntasks=2 --cores-per-task=1 --job-name simulation sleep inf
!flux mini submit --nodes=1 --ntasks=1 --cores-per-task=1 --job-name analysis sleep inf

ƒefTJnFy
ƒeqeqqhZ


We can now list out the jobs in the queue with `flux jobs`, and we should see both jobs that we just submitted.

In [11]:
!flux jobs

       JOBID USER     NAME       ST NTASKS NNODES  RUNTIME NODELIST
    ƒeqeqqhZ fluxuser analysis    R      1      1   0.539s 1edf4b264187
    ƒefTJnFy fluxuser simulation  R      2      2   0.935s 1edf4b[264187,264187]


Since those jobs won't ever exit (and we didn't specify a timelimit), let's kill them off now and free up the resources.

In [12]:
!flux job killall -f
!flux jobs

flux-job: Terminated 2 jobs (0 errors)
       JOBID USER     NAME       ST NTASKS NNODES  RUNTIME NODELIST


We can use the `flux mini batch` command to easily created nested flux instances.  When `flux mini batch` is invoked, Flux will automatically create a nested instance that spans the resources allocated to the job, and then Flux runs the batch script passed to `flux mini batch` on rank 0 of the nested instance. While a batch script is expected to launch parallel jobs using `flux mini run` or `flux mini submit` at this level, nothing prevents the script from further batching other sub-batch-jobs using the `flux mini batch` interface, if desired.

Note: Flux also provides a `flux mini alloc` which is an interactive version of `flux mini batch`, but demonstrating that in a Jupyter notebook is difficult due to the lack of pseudo-terminal.

In [13]:
!flux mini batch --nslots=2 --cores-per-slot=1 --nodes=2 ./sleep_batch.sh
!flux mini batch --nslots=2 --cores-per-slot=1 --nodes=2 ./sleep_batch.sh

ƒ2zxhErij
ƒ318V47eo


The contents of `sleep_batch.sh`:

``` bash 
    #!/bin/bash
  
    echo "Starting my batch job"
    echo "Print the resources allocated to this batch job"
    flux resource list

    echo "Use sleep to emulate a parallel program"
    echo "Run the program at a total of 2 processes each requiring"
    echo "1 core. These processes are equally spread across 2 nodes."
    flux mini run -N 2 -n 2 sleep 10
    flux mini run -N 2 -n 2 sleep 10
```

In [15]:
!flux jobs

# Copy the Job ID of one of the `flux mini batch`s here to examine the job's resources and output
JOBID="ƒ318V47eo"
!flux job info {JOBID} R
!flux job attach {JOBID}
!cat flux-{JOBID}.out

       JOBID USER     NAME       ST NTASKS NNODES  RUNTIME NODELIST
   ƒ318V47eo fluxuser sleep_batc  R      2      2   48.45s 1edf4b[264187,264187]
   ƒ2zxhErij fluxuser sleep_batc  R      2      2   48.83s 1edf4b[264187,264187]
{"version": 1, "execution": {"R_lite": [{"rank": "0-1", "children": {"core": "5"}}], "nodelist": ["1edf4b[264187,264187]"], "starttime": 1618379436, "expiration": 1618984236}}

0: stdout redirected to flux-ƒ318V47eo.out
0: stderr redirected to flux-ƒ318V47eo.out
Starting my batch job
Print the resources allocated to this batch job
     STATE NNODES   NCORES    NGPUS NODELIST
      free      2        2        0 1edf4b[264187,264187]
 allocated      0        0        0 
      down      0        0        0 
Use sleep to emulate a parallel program
Run the program at a total of 2 processes each requiring
1 core. These processes are equally spread across 2 nodes.


## Submission API
Flux also provides first-class python bindings which can be used to submit jobs programmatically. The following script shows this with the `flux.job.submit()` call:

In [24]:
import os
import json
import flux
from flux.job import JobspecV1
from flux.job.JobID import JobID

In [18]:
f = flux.Flux() # connect to the running Flux instance
compute_jobreq = JobspecV1.from_command(
    command=["./compute.py", "120"], num_tasks=1, num_nodes=1, cores_per_task=1
) # construct a jobspec
compute_jobreq.cwd = os.path.expanduser("~/flux-workflow-examples/job-submit-api/") # set the CWD
print(JobID(flux.job.submit(f,compute_jobreq)).f58) # submit and print out the jobid (in f58 format)

ƒ5Ev8fUo9


In [19]:
!flux jobs

       JOBID USER     NAME       ST NTASKS NNODES  RUNTIME NODELIST
   ƒ5Ev8fUo9 fluxuser compute.py  R      1      1   5.644s 1edf4b264187


Under the hood, the `Jobspec` class is creating a YAML document that ultimately gets serialized as JSON and sent to Flux for ingestion, validation, queueing, scheduling, and eventually execution.  We can dump the raw JSON jobspec that is submitted, where we can see the exact resources requested and the task set to be executed on those resources.

In [29]:
print(compute_jobreq.dumps(indent=2))

{
  "resources": [
    {
      "type": "node",
      "count": 1,
      "with": [
        {
          "type": "slot",
          "count": 1,
          "with": [
            {
              "type": "core",
              "count": 1
            }
          ],
          "label": "task"
        }
      ]
    }
  ],
  "tasks": [
    {
      "command": [
        "./compute.py",
        "120"
      ],
      "slot": "task",
      "count": {
        "per_slot": 1
      }
    }
  ],
  "attributes": {
    "system": {
      "duration": 0,
      "cwd": "/home/fluxuser/flux-workflow-examples/job-submit-api/"
    }
  },
  "version": 1
}


We can then replicate our previous example of submitting multiple heterogeneous jobs and testing that Flux co-schedules them.

In [32]:
compute_jobreq = JobspecV1.from_command(
    command=["./compute.py", "120"], num_tasks=4, num_nodes=2, cores_per_task=2
)
compute_jobreq.cwd = os.path.expanduser("~/flux-workflow-examples/job-submit-api/")
print(JobID(flux.job.submit(f, compute_jobreq)))

io_jobreq = JobspecV1.from_command(
    command=["./io-forwarding.py", "120"], num_tasks=1, num_nodes=1, cores_per_task=1
)
io_jobreq.cwd = os.path.expanduser("~/flux-workflow-examples/job-submit-api/")
print(JobID(flux.job.submit(f, io_jobreq)))

ƒ7rvUGZYK
ƒ7rvszN3q


In [33]:
!flux jobs

       JOBID USER     NAME       ST NTASKS NNODES  RUNTIME NODELIST
   ƒ7rvszN3q fluxuser io-forward  R      1      1   1.866s 1edf4b264187
   ƒ7rvUGZYK fluxuser compute.py  R      4      2   1.886s 1edf4b[264187,264187]
   ƒ7bXms1hZ fluxuser io-forward  R      1      1   36.81s 1edf4b264187
   ƒ7bXKBEdM fluxuser compute.py  R      4      2   36.83s 1edf4b[264187,264187]


We can use the FluxExecutor class to submit large numbers of jobs to Flux. This method uses python's `concurrent.futures` interface.  Example snippet from `~/flux-workflow-examples/async-bulk-job-submit/bulksubmit_executor.py`:

``` python 
with FluxExecutor() as executor:
        compute_jobspec = JobspecV1.from_command(args.command)
        futures = [executor.submit(compute_jobspec) for _ in range(args.njobs)]
        # wait for the jobid for each job, as a proxy for the job being submitted
        for fut in futures:
            fut.jobid()
        # all jobs submitted - print timings
```

In [34]:
# Submit a FluxExecutor based script.
%run ./flux-workflow-examples/async-bulk-job-submit/bulksubmit_executor.py -n200 /bin/sleep 0

bulksubmit_executor: submitted 200 jobs in 0.37s. 536.54job/s
bulksubmit_executor: First job finished in about 0.582s
|██████████████████████████████████████████████████████████| 100.0% (47.3 job/s)
bulksubmit_executor: Ran 200 jobs in 4.2s. 47.1 job/s


# Diving Deeper Into Flux's Internals

Flux uses hwloc to detect the resources on each node and then to populate its resource graph.  You can access the hwloc topology information that Flux collects with the `flux hwloc` subcommand:

In [43]:
!flux hwloc info
!flux hwloc topology | head

4 Machines, 28 Cores, 28 PUs
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE topology SYSTEM "hwloc.dtd">
<topology>
  <object type="Machine" os_index="0" cpuset="0x0000007f" complete_cpuset="0x0000007f" online_cpuset="0x0000007f" allowed_cpuset="0x0000007f" nodeset="0x00000001" complete_nodeset="0x00000001" allowed_nodeset="0x00000001">
    <info name="DMIProductName" value="BHYVE"/>
    <info name="DMIProductVersion" value="1.0"/>
    <info name="DMIChassisVendor" value=""/>
    <info name="DMIChassisType" value="2"/>
    <info name="DMIChassisVersion" value="1.0"/>
    <info name="DMIChassisAssetTag" value="None"/>


Flux can also bootstrap its resource graph based on static input files, like in the case of a multi-user system instance setup by site adminstrators.  [More information on Flux's static resource configuration files](https://flux-framework.readthedocs.io/en/latest/adminguide.html#resource-configuration).  Flux provides a more standard interface to listing available resources that works regardless of the resource input source: `flux resource`.

In [10]:
# To view status of resources
!flux resource status
# To view scheduler's perspective on resources (allocated, free, etc)
!flux resource list

    STATUS NNODES RANKS           NODELIST
     avail      4 0-3             491ea08a8cee,491ea08a8cee,491ea08a8cee,491ea08a8cee
     STATE NNODES   NCORES    NGPUS NODELIST
      free      4       28        0 491ea08a8cee,491ea08a8cee,491ea08a8cee,491ea08a8cee
 allocated      0        0        0 
      down      0        0        0 


Flux has a command for controlling the queue within the `job-manager`: `flux queue`.  This includes disabling job submission, re-enabling it, waiting for the queue to become idle or empty, and checking the queue status:

In [4]:
!flux queue disable "maintenance outage"
!flux queue enable
!flux queue -h

flux-queue: Job submission is disabled: maintenance outage
flux-queue: Job submission is enabled
Usage: flux-queue [OPTIONS] COMMAND ARGS
  -h, --help             Display this message.

Common commands from flux-queue:
   enable          Enable job submission
   disable         Disable job submission
   start           Start scheduling
   stop            Stop scheduling
   status          Get queue status
   drain           Wait for queue to become empty.
   idle            Wait for queue to become idle.


Each Flux instance has a set of attributes that are set at startup that affect the operation of Flux, such as `rank`, `size`, and `local-uri` (the Unix socket usable for communicating with Flux).  Many of these attributes can be modified at runtime, such as `log-stderr-level` (1 logs only critical messages to stderr while 7 logs everything, including debug messages).

In [36]:
!flux getattr rank
!flux getattr size
!flux getattr local-uri
!flux setattr log-stderr-level 3
!flux lsattr -v

0
4
local:///tmp/flux-ULJeOm/local-0
broker.mapping                          (vector,(0,1,4))
broker.pid                              16
broker.quorum                           0-3
broker.rc1_path                         /etc/flux/rc1
broker.rc3_path                         /etc/flux/rc3
conf.connector_path                     /usr/lib/flux/connectors
conf.exec_path                          /usr/libexec/flux/cmd
conf.module_path                        /usr/lib/flux/modules
conf.pmi_library_path                   /usr/lib/flux/libpmi.so
conf.shell_initrc                       /etc/flux/shell/initrc.lua
conf.shell_pluginpath                   /usr/lib/flux/shell/plugins
config.path                             -
content.acct-dirty                      0
content.acct-entries                    12202
content.acct-size                       6087217
content.acct-valid                      12202
content.backing-module                  content-sqlite
content.backing-path                    /tmp

Services within a Flux instance are implemented by modules. To query and manage broker modules, use `flux module`.  Modules that we have already directly interacted with in this tutorial include `resource` (via `flux resource`), `job-ingest` (via `flux mini` and the Python API) `job-list` (via `flux jobs`) and `job-manager` (via `flux queue`), and we will interact with the `kvs` module in a few cells. For the most part, services are implemented by modules of the same name (e.g., `kvs` implements the `kvs` service and thus the `kvs.lookup` RPC).  In some circumstances, where multiple implementations for a service exist, a module of a different name implements a given service (e.g., in this instance, `sched-fluxion-qmanager` provides the `sched` service and thus `sched.alloc`, but in another instance `sched-simple` might provide the `sched` service).

In [37]:
!flux module list

Module                       Size Digest  Idle  S Service
job-ingest                1453656 AB27448 idle  R 
job-info                  1638128 E3A21EA idle  R 
job-exec                  1509296 ED8BF74 idle  R 
connector-local           1298368 5972E0B    0  R 
content-sqlite            1326664 368815B idle  R content-backing,kvs-checkpoint
cron                      1407496 7D62B82 idle  R 
job-manager               1722016 C480039 idle  R 
kvs                       1835336 9E19B98 idle  R 
barrier                   1312256 8402DD6 idle  R 
heartbeat                 1291720 16D0F76    1  R 
resource                  1706968 B5C4125 idle  R 
sched-fluxion-qmanager    6374080 BEDC833 idle  R sched
kvs-watch                 1528416 B7B3A25 idle  R 
job-list                  1710264 C3A95FB idle  R 
sched-fluxion-resource   31145912 80BC659 idle  R 


We can actually unload the Fluxion modules (the scheduler modules from flux-sched) and replace them with `sched-simple` (the scheduler that comes built-into flux-core) as a demonstration of this functionality:

In [38]:
!flux module unload sched-fluxion-qmanager
!flux module unload sched-fluxion-resource
!flux module load sched-simple
!flux module list

Module                       Size Digest  Idle  S Service
job-ingest                1453656 AB27448 idle  R 
job-info                  1638128 E3A21EA idle  R 
job-exec                  1509296 ED8BF74 idle  R 
connector-local           1298368 5972E0B    0  R 
content-sqlite            1326664 368815B idle  R content-backing,kvs-checkpoint
cron                      1407496 7D62B82 idle  R 
job-manager               1722016 C480039    0  R 
kvs                       1835336 9E19B98 idle  R 
barrier                   1312256 8402DD6 idle  R 
heartbeat                 1291720 16D0F76    0  R 
sched-simple              1579800 0E8E6EC    0  R sched
resource                  1706968 B5C4125    0  R 
kvs-watch                 1528416 B7B3A25 idle  R 
job-list                  1710264 C3A95FB idle  R 


We can now reload the Fluxion scheduler, but this time, let's pass some extra arguments to specialize our Flux instance.  In particular, let's populate our resource graph with nodes, sockets, and cores and limit the scheduling depth to 4.

In [40]:
!flux dmesg -C
!flux module unload sched-simple
!flux module load sched-fluxion-resource load-allowlist=node,socket,core
!flux module load sched-fluxion-qmanager queue-params=queue-depth=4
!flux module list
!flux dmesg | grep queue-depth

flux-module: broker.rmmod sched-simple: No such file or directory
flux-module: broker.insmod: sched-fluxion-resource module/service is in use
flux-module: broker.insmod: sched-fluxion-qmanager module/service is in use
Module                       Size Digest  Idle  S Service
sched-fluxion-qmanager    6374080 BEDC833   54  R sched
job-ingest                1453656 AB27448 idle  R 
job-info                  1638128 E3A21EA idle  R 
job-exec                  1509296 ED8BF74 idle  R 
sched-fluxion-resource   31145912 80BC659   54  R 
connector-local           1298368 5972E0B    0  R 
content-sqlite            1326664 368815B idle  R content-backing,kvs-checkpoint
cron                      1407496 7D62B82 idle  R 
job-manager               1722016 C480039   54  R 
kvs                       1835336 9E19B98 idle  R 
barrier                   1312256 8402DD6 idle  R 
heartbeat                 1291720 16D0F76    1  R 
resource                  1706968 B5C4125   55  R 
kvs-watch                 

The key-value store (KVS) is a core component of a Flux instance. The `flux kvs` command provides a utility to list and manipulate values of the KVS. Modules of Flux use the KVS to persistently store information and retrieve it later on (potentially after a restart of Flux).  One example of KVS use by Flux is the `resource` module, which stores the resource set `R` of the current Flux instance:

In [42]:
!flux kvs ls 
!flux kvs ls resource
!flux kvs get resource.R | jq

job         resource
R           eventlog
[1;39m{
  [0m[34;1m"version"[0m[1;39m: [0m[0;39m1[0m[1;39m,
  [0m[34;1m"execution"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"R_lite"[0m[1;39m: [0m[1;39m[
      [1;39m{
        [0m[34;1m"rank"[0m[1;39m: [0m[0;32m"0-3"[0m[1;39m,
        [0m[34;1m"children"[0m[1;39m: [0m[1;39m{
          [0m[34;1m"core"[0m[1;39m: [0m[0;32m"0-6"[0m[1;39m
        [1;39m}[0m[1;39m
      [1;39m}[0m[1;39m
    [1;39m][0m[1;39m,
    [0m[34;1m"starttime"[0m[1;39m: [0m[0;39m0[0m[1;39m,
    [0m[34;1m"expiration"[0m[1;39m: [0m[0;39m0[0m[1;39m,
    [0m[34;1m"nodelist"[0m[1;39m: [0m[1;39m[
      [0;32m"1edf4b[264187,264187,264187,264187]"[0m[1;39m
    [1;39m][0m[1;39m
  [1;39m}[0m[1;39m
[1;39m}[0m


Flux provides a built-in mechanism for executing commands on nodes without requiring a job or resource allocation: `flux exec`.  `flux exec` is typically used by sys admins to execute administrative commands and load/unload modules across multiple ranks simultaneously.

In [44]:
!flux exec -r 2 flux getattr rank # only execute on rank 2
!flux exec flux getattr rank # execute on all ranks

2
0
1
2
3
