# Adaptive-scheduler example

[Read the documentation](https://adaptive-scheduler.readthedocs.io/en/latest/#what-is-this) to see what this is all about.

## Step 1: define the simulation

Often one wants to sweep a continuous 1D or 2D space for multiple parameters. [Adaptive](http://adaptive.readthedocs.io) is the ideal program to do this. We define a simulation by creating several `adaptive.Learners`. 

We **need** to define the following variables:
* `learners` a list of learners
* `fnames` a list of file names, one for each learner

In [None]:
%%writefile learners_file.py

import adaptive
from functools import partial


def h(x, width=0.01, offset=0):
    import numpy as np
    import random

    for _ in range(10):  # Burn some CPU time just because
        np.linalg.eig(np.random.rand(1000, 1000))

    a = width
    return x + a ** 2 / (a ** 2 + (x - offset) ** 2)


offsets = [i / 10 - 0.5 for i in range(10)]

combos = adaptive.utils.named_product(offset=offsets, width=[0.01, 0.05])

learners = []
fnames = []

for combo in combos:
    f = partial(h, **combo)
    learner = adaptive.Learner1D(f, bounds=(-1, 1))
    fnames.append(f"data/{combo}")
    learners.append(learner)

# Step 2 (option 1): the simple way

After defining the `learners` and `fnames` in an file (above) we can start to run these learners.

We split up all learners into seperate jobs, all you need to do is to specify how many cores per job you want.

## Simple example

In [None]:
import adaptive_scheduler

def goal(learner):
    return learner.npoints > 200

run_manager = adaptive_scheduler.server_support.RunManager(
    learners_file="learners_file.py",
    goal=goal,
    cores_per_job=12,
    log_interval=30,
    save_interval=30,
)
run_manager.start()

In [None]:
# See the current queue with
import pandas as pd
queue = adaptive_scheduler.slurm.queue()
df = pd.DataFrame(queue).transpose()
df.head()

In [None]:
# Read the logfiles and put it in a `pandas.DataFrame`.
# This only returns something when there are log-files to parse!
# So after `run_manager.log_interval` has passed.
df = run_manager.parse_log_files()
df.head()

In [None]:
# See the database
df = pd.DataFrame(run_manager.get_database())
df.head()

In [None]:
# After the calculation started and some data has been saved, we can display the learners
import adaptive
from learners_file import learners, fnames, combos
from adaptive_scheduler.utils import load_parallel
adaptive.notebook_extension()
load_parallel(learners, fnames)

learner = adaptive.BalancingLearner(learners, cdims=combos)
learner.plot()

## Extended example
Sometimes you cannot formulate your problem with Adaptive, instead you just want to run a function as a sequence of parameters.

Surprisingly, this approach with a `SequenceLearner` [is slightly faster than `ipyparallel.Client.map`](https://github.com/python-adaptive/adaptive/pull/193#issuecomment-491062073).

In [None]:
%%writefile learners_file_sequence.py

import numpy as np

from adaptive_scheduler.sequence_learner import SequenceLearner # https://github.com/python-adaptive/adaptive/pull/193
from adaptive_scheduler.utils import split, combo_to_fname
from adaptive.utils import named_product


def g(combo):
    combo = dict(combo)  # the sequence learner passes dicts as tuples
    x, y, z = combo['x'], combo['y'], combo['z']

    for _ in range(5):  # Burn some CPU time just because
        np.linalg.eig(np.random.rand(1000, 1000))

    return x ** 2 + y ** 2 + z ** 2


combos = named_product(x=np.linspace(0, 10), y=np.linspace(-1, 1), z=np.linspace(-3, 3))

print(f"Length of combos: {len(combos)}.")


# We could run this as 1 job with N nodes, but we can also split it up in multiple jobs.
# This is desireable when you don't want to run a single job with 300 nodes for example.
njobs = 100
split_combos = list(split(combos, njobs))

print(f"Length of split_combos: {len(split_combos)} and length of split_combos[0]: {len(split_combos[0])}.")

learners, fnames = [], []
learners = [SequenceLearner(g, combos_part) for combos_part in split_combos]
fnames = [combo_to_fname(combos_part[0], folder="data") for combos_part in split_combos]

We now start the `RunManager` with a lot of arguments to showcase some of the options you can use to customize your run.

In [None]:
from functools import partial
import adaptive_scheduler

job_script = partial(
    adaptive_scheduler.slurm.make_job_script,
    executor_type="ipyparallel",
    extra_sbatch=["--exclusive", "--time=24:00:00"],
    extra_env_vars=["PYTHONPATH='my_dir:$PYTHONPATH'"],
)


def goal(learner):
    return learner.done()  # the standard goal for a SequenceLearner


run_manager2 = adaptive_scheduler.server_support.RunManager(
    goal=goal,
    cores_per_job=24,
    log_interval=10,
    save_interval=30,
    runner_kwargs=dict(retries=5, raise_if_retries_exceeded=False),
    job_script_function=job_script,
    executor_type="ipyparallel",
    kill_on_error="srun: error:",  # cancel a job if this is inside a log
    log_file_folder="logs",
    learners_file="learners_file_sequence.py",  # the file that has `learners` and `fnames`
    job_name="example-sequence",  # this is used to generate unqiue job names
    db_fname="example-sequence.json",  # the database keeps track of job_id <-> (learner, is_done)
    start_job_manager_kwargs=dict(
        max_fails_per_job=10,  # the RunManager is cancelled after njobs * 10 fails
        max_simultaneous_jobs=300,  # limit the amount of simultaneous jobs
        python_executable="/escratch/home/t-banij/miniconda3/envs/py37/bin/python",
    ),
)

In [None]:
run_manager2.start()

In [None]:
df = run_manager2.parse_log_files()
df.head()

In [None]:
from learners_file_sequence import learners, fnames, combos
import adaptive
from adaptive_scheduler.utils import load_parallel
load_parallel(learners, fnames)
result = sum([l.result() for l in learners], [])  # combine all learner's result into 1 list

# Step 2 (option 2): the manual way 

The `adaptive_scheduler.server_support.RunManager` above essentially does everything we do below.

## Create the Python script that is run on the nodes

In [None]:
# Make sure to use the headnode's address in the next cell
from adaptive_scheduler import server_support
server_support.get_allowed_url()

In [None]:
%%writefile run_learner.py

import adaptive
from adaptive_scheduler import client_support
from mpi4py.futures import MPIPoolExecutor

from learners_file import learners, fnames

if __name__ == "__main__":  # ← use this, see warning @ https://bit.ly/2HAk0GG
    url = "tcp://10.75.0.5:57101"
    learner, fname = client_support.get_learner(url, learners, fnames)
    learner.load(fname)
    runner = adaptive.Runner(
        learner, executor=MPIPoolExecutor(), shutdown_executor=True, goal=None
    )
    runner.start_periodic_saving(dict(fname=fname), interval=600)
    client_support.log_info(runner, interval=600)  # log info in the job output script
    runner.ioloop.run_until_complete(runner.task)  # wait until runner goal reached
    client_support.tell_done(url, fname)

## Create a new database

In [None]:
from adaptive_scheduler import server_support
from learners_file import learners, fnames

db_fname = 'running.json'

In [None]:
server_support.create_empty_db(db_fname, fnames)

## Check the running learners in the database
All the ones that are `None` are still `PENDING`, reached their goal, or are not scheduled.

In [None]:
server_support.get_database(db_fname)

## Start the job scripts with the `job_manager` and `database_manager`

In [None]:
import asyncio
from adaptive_scheduler import server_support, slurm
from learners_file import learners, fnames

# create unique names for the jobs
job_names = [f"test-job-{i}" for i in range(len(learners))]

# start the "job manager" and the "database manager"
database_task = server_support.start_database_manager("tcp://10.75.0.5:57101", db_fname)

job_task = server_support.start_job_manager(
    job_names,
    db_fname=db_fname,
    cores=2,
    interval=60,
    run_script="run_learner.py",  # optional
    job_script_function=slurm.make_job_script,  # optional
)

In [None]:
job_task.print_stack()

In [None]:
database_task.print_stack()

In [None]:
# Run this to STOP managing the database and jobs
from adaptive_scheduler import cancel_jobs
job_task.cancel(), database_task.cancel(), cancel_jobs(job_names)