# Overview

This notebook illustrates how to use Prefect to schedule and keep track of tasks running at NERSC.

In this particular case:
  - we launch 3 independent jobs on Perlmutter (generate 3 independent random numbers)
  - We then launch a final job that depends on the previous ones (prints all the random numbers)

Prefect is used to track the jobs, and to handle asynchronicity.

In [None]:
from sfapi_client import Client
from sfapi_client.compute import Machine
import prefect
from pathlib import Path

In [None]:
# Create Perlmutter job scripts

def random_number_job_script( random_seed ):
    """
    Creates a job script that runs on Perlmutter,
    and generates a random number and saves it in a file `sfapi_demo_{seed}.txt`
    """
    import random
    random.seed(random_seed)
    
    script = f"""#!/bin/bash
#SBATCH -C cpu
#SBATCH -q shared
#SBATCH -N 1
#SBATCH -c 1
#SBATCH -t 1
#SBATCH -o /global/u2/r/rlehe/sfapi_demo_{random_seed}.txt

echo "Completed run {random.randint(1, 100)}"
"""
    return script

print_all_script = f"""#!/bin/bash
#SBATCH -C cpu
#SBATCH -q shared
#SBATCH -N 1
#SBATCH -c 1
#SBATCH -t 1
#SBATCH -o /global/u2/r/rlehe/sfapi_demo_print_all.txt

cat sfapi_demo_*
"""

In [None]:
# Connect to Perlmutter with the SFAPI
key = Path("./priv_key.pem")
client = Client(key=key)
perlmutter = client.compute(Machine.perlmutter)

@prefect.task
def launch_script_on_perlmutter( script ):
    """
    Launches a script on Perlmutter using the SFAPI,
    and track it with Prefect (using prefect.task)
    """
    # Launch a job with the SFAPI and wait until it finishes
    sfapi_job = perlmutter.submit_job( script )
    # Blocking command that waits for the job to complete 
    sfapi_job.complete()
    return 

@prefect.flow
def run_jobs_serial():
    launch_script_on_perlmutter( random_number_job_script(0) )
    launch_script_on_perlmutter( random_number_job_script(3) )
    launch_script_on_perlmutter( random_number_job_script(7) )
    launch_script_on_perlmutter( print_all_script )

@prefect.flow
def run_jobs_async():
    async_jobs = [ 
        launch_script_on_perlmutter.submit( random_number_job_script(0) ),
        launch_script_on_perlmutter.submit( random_number_job_script(3) ),
        launch_script_on_perlmutter.submit( random_number_job_script(7) )
    ]
    # Wait for asynchronous jobs to finish
    for r in async_jobs:
        r.result()
    launch_script_on_perlmutter( print_all_script )

In [None]:
# Launch the job and track it with Prefect
run_jobs_serial()

In [None]:
# Launch the jobs asynchronously and track them with Prefect
run_jobs_async()