### Asynchronous Bulk Job Submission

This notebook demonstrates different implementations of the same basic use-case: submitting a large number of jobs to _flux_.

The first option is to submit _jobspec_ files from a directory and wait for them to complete in any order. The next cell creates a `jobs` directory and populates it with 1024 simple _sleep_ job specs. This cell should only be executed once.

In [None]:
!mkdir jobs

!flux mini run --dry-run -n1 sleep 0 > jobs/0.json

!for i in $(seq 1 1024); do cp jobs/0.json jobs/${i}.json; done

Now import the necessary modules.

In [None]:
import glob
import os
import time
import sys
import flux

from flux import job
from flux import constants

Create the `jobs` "queue" and some logging and callback functions.

In [None]:
jobs = []
label = "bulksubmit"

def log(s):
    print(label + ": " + s)


def progress(fraction, length=72, suffix=""):
    fill = int(round(length * fraction))
    bar = "\u2588" * fill + "-" * (length - fill)
    s = "\r|{0}| {1:.1f}% {2}".format(bar, 100 * fraction, suffix)
    sys.stdout.write(s)
    if fraction == 1.0:
        sys.stdout.write("\n")


def submit_cb(f):
    jobs.append(job.submit_get_id(f))

Submit each of the job specs in the `jobs` directory asynchronously and then wait for all of them to complete in some arbitrary order.

In [None]:
h = flux.Flux()

t0 = time.perf_counter()

log("Starting...")
for file in glob.glob("jobs/*.json"):
    with open(file) as jobspec:
        job.submit_async(h, jobspec.read(), waitable=True).then(submit_cb)
        
if h.reactor_run() < 0:
    h.fatal_error("reactor start failed")

total = len(jobs)
dt = time.perf_counter() - t0
jps = len(jobs) / dt
log("submitted {0} jobs in {1:.2f}s. {2:.2f}job/s".format(total, dt, jps))

count = 0
while count < total:
    # wait for jobs to complete in any order
    job.wait(h)
    count = count + 1
    if count == 1:
        log("First job finished in about {0:.3f}s".format(time.perf_counter() - t0))
    suffix = "({0:.1f} job/s)".format(count / (time.perf_counter() - t0))
    progress(count / total, length=58, suffix=suffix)

dt = time.perf_counter() - t0
log("{0} jobs run in {1:.1f}s. {2:.1f} job/s".format(total, dt, total / dt))

The second option uses the `FluxExecutor` class to asnychronously submit job specs constructed from a command and wait for them complete in any order.

In [None]:
import concurrent.futures as cf

from flux.job import FluxExecutor, JobspecV1

After importing the ncessary modules, specify a command to be run by each job (in this example the command is: `sleep 0`) and then use an instance of `FluxExecutor` to asynchronously submit a job spec created from the command a large number of times. The executor creates a list of `futures` which are used to wait for job completion.

In [None]:
njobs = 1024
command = ["/bin/sleep", "0"]

t0 = time.perf_counter()
label = "bulksubmit_executor"
with FluxExecutor() as executor:
    compute_jobspec = JobspecV1.from_command(command)
    futures = [executor.submit(compute_jobspec) for _ in range(njobs)]
    # wait for the jobid for each job, as a proxy for the job being submitted
    for fut in futures:
        fut.jobid()

    # all jobs submitted - print timings
    dt = time.perf_counter() - t0
    jps = njobs / dt
    log("{0} submitted {1} jobs in {2:.2f}s. {3:.2f}jobs/s".format(label, njobs, dt, jps))
    # wait for jobs to complete
    for i, _ in enumerate(cf.as_completed(futures)):
        if i == 0:
            log("First job finished in about {0:.3f}s".format(time.perf_counter() - t0))
        jps = (i + 1) / (time.perf_counter() - t0)
        progress((i + 1) / njobs, length=58, suffix=f"({jps:.1f} job/s)")
# print time summary
dt = time.perf_counter() - t0
log("{0} jobs run in {1:1f}s. {2:1f} job/s".format(njobs, dt, njobs/dt))
