# PSI/J-Python Getting Started Tutorial

<img src="./images/psij_overview.png" width="350"/>

PSI/J (Portable Submission Interface for Jobs), is an abstraction layer over cluster job schedulers. It allows your application to be written in a way that is (mostly) independent of the cluster(s) where it runs. It is a language agnostic specification. PSI/J-Python is a Python implementation of PSI/J.



### Installation

In [None]:
%pip install git+https://github.com/ExaWorks/psij-python.git >/dev/null 2>&1
%pip show psij-python

## Overview 

When running a job, there are a number of things to specify:
- What is to be run, such as executable, arguments, environment, etc. ([JobSpec](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job_spec.JobSpec))
- What resources are needed by the job, such as the number of nodes ([ResourceSpec](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.resource_spec.ResourceSpecV1))
- Various miscellaneous properties, such as the queue to submit the job to ([JobAttributes](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job_attributes.JobAttributes))
- The mechanism through which to run the job, such as local/exec, SLURM, PBS, etc. ([JobExecutor](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job_executor.JobExecutor))

We also need an object to keep track of all this information, as well as the state of the execution. This object is an instance of a [Job](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job.Job).

### Setup

Before we start, let us create a separate directory so that we don't overwrite each others' files

In [None]:
import os
from tempfile import mkdtemp

os.makedirs('/tutorials/userdirs', exist_ok=True)
workdir = mkdtemp(prefix='userdir-', dir='/tutorials/userdirs')
os.chdir(workdir)
print(workdir)

### Basic Usage
Without further ado, let's create a simple job:

In [None]:
from pathlib import Path
from psij import Job, JobSpec

job = Job(JobSpec(executable='/bin/date', stdout_path=Path('the-date.txt')))

Easy. We created a job that runs `/bin/date` and stores the output in `the-date.txt`. 
Now we need to run it. In order to do so, we need an *executor* that knows how to run jobs. We will use a simple fork/exec based executor named `local`. On a real cluster, we would use something like `SLURM` or `LSF`, but we are not doing this on a real cluster. However, I will note here that in most cases, simply changing `local` to the name of the scheduler used by the cluster would be sufficient to run the job through the cluster scheduler.

In [None]:
from psij import JobExecutor

executor = JobExecutor.get_instance('local')

We can now tell the executor to run our job

In [None]:
executor.submit(job)

The [submit()](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job_executor.JobExecutor.submit) method **starts** the job asynchronously.
We would now like to see the result. However, before we can do so, we must ensure that the job has actually finished running. We can do so by [waiting](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.job.Job.wait) for it:

In [None]:
job.wait()

The `wait()` method returns the [JobStatus](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.JobStatus). Since nothing can possibly go wrong, we will assume that the job completed successfully and that there is no need to check the status to confirm it. Now, we can finally read the output

In [None]:
with open('the-date.txt') as f:
    print(f.read())

### Multiple Jobs
Our executor is stateless. That means that we can submit as many jobs as we want to it. That's in theory. In practice, computers have limited resources and there are only so many concurrent jobs that we can run, but hopefully we won't hit those limits today.

In [None]:
jobs = []
for i in range(10):
    job = Job(
        JobSpec(
            executable='/bin/echo', 
            arguments=['Hello from job %s' % i],
            stdout_path=Path('hello-%s.txt' % i)
        )
    )
    executor.submit(job)
    jobs.append(job)

If these jobs weren't so short, they would now be running in parallel. In fact, why not start a longer job:

In [None]:
long_job = Job(JobSpec(executable='/bin/sleep', arguments=['600']))
executor.submit(long_job)

Back to our previous jobs. In order to read their outputs, we must, again, ensure that they are done

In [None]:
for i in range(10):
    jobs[i].wait()
    with open('hello-%s.txt' % i) as f:
        print(f.read())

What about our long job?

In [None]:
print(long_job.status)

Still running. The time shows the instant when the job switched to `ACTIVE` state. Moving on...

### Multi-process Jobs
So far we've run multiple independent jobs. But what if we wanted to run multiple copies of one job, presumably on multiple compute nodes (this is a Docker container, but we can pretend)?
We could tell PSI/J to do this using [ResourceSpecV1](https://exaworks.org/psij-python/#docs/.generated/psij.html/#psij.resource_spec.ResourceSpecV1). We also need to tell PSI/J to start our job a bit differently, so we'll make a short detour to talk about launchers.

Once a job's resources are allocated, a typical job scheduler will launch our job on one of the allocated compute nodes. Then, we'd invoke something like `mpirun` or `srun`, etc. to start all the job copies on the allocated resources. By default, PSI/J uses a custom launcher named `single`, which simply starts a single copy of the job on the lead node of the job. If we wanted to see multiple copies of the job without any of the fancy features offered by `mpirun` or `srun`, we could use PSI/J's `multiple` launcher, which we will do below.

In [None]:
from psij import ResourceSpecV1

mjob = Job(
        JobSpec(
            executable='/bin/date', 
            stdout_path=Path('multi-job-out.txt'),
            resources=ResourceSpecV1(process_count=4),
            launcher="multiple"
        )
    )

We informed PSI/J that we need four copies of our job. On a real scheduler, we could also request that these copies be distributed on multiple compute nodes, but, on this VM, we only have one such compute node, so we shouldn't bother.

In [None]:
executor.submit(mjob)
mjob.wait()
with open('multi-job-out.txt') as f:
    print(f.read())

### MPI Jobs

The previous example ran a multi-process job, which has its use. It is more likely, however, to want to run an MPI job. Assuming that the system has some form of MPI installed, which this Docker container has, and which comes with some generic `mpirun` tool, we can instruct PSI/J to launch MPI jobs. And, as the previous sentence hints, it may be as simple as changing our launcher from `multiple` to `mpirun`, which it is.

But before that, we need a simple MPI executable. 

In [None]:
%%bash
cat <<EOF >hello.c
#include <stdio.h>
#include <mpi.h>

void main(int argc, char **argv) {
    int rank;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    
    printf("Hello from rank %d\n", rank);
    
    MPI_Finalize();
}
EOF

Which we need to compile

In [None]:
!mpicc hello.c -o hello

And now we can construct our job

In [None]:
mpi_job = Job(
        JobSpec(
            executable='hello', 
            stdout_path=Path('mpi-job-out.txt'),
            resources=ResourceSpecV1(process_count=4),
            launcher="mpirun"
        )
    )

... and, as usual, wait for it and display the output

In [None]:
executor.submit(mpi_job)
mpi_job.wait()
with open('mpi-job-out.txt') as f:
    print(f.read())

And the long running job?

In [None]:
print(long_job)

Soon, soon...

### Callbacks

Examples above are more or less synchronous, in that we use `wait()` to suspend the current thread until a job completes. In real life scenarios where scalability is needed, we would use callbacks. Let's implement a quick map/reduce workflow. We'll Monte Carlo calculate π using a map-reduce like algorithm.

The basic idea is to generate some random points on a square that encloses one quadrant of a circle. 

<img src="./images/pi.png" width="150"/>

Some points will fall outside the circle and some inside. As the number of points grows, the ratio of points inside the circle vs points inside the full square (total points) will be proportional to the ratio of their areas: 

N<sub>circle</sub> / N<sub>total</sub> ≈ A<sub>circle</sub> / A<sub>square</sub> = (πr<sup>2</sup> / 4) / r<sup>2</sup>

Hence

π = 4 N<sub>circle</sub> / N<sub>total</sub>

We'll start with some boilerplate, the number of iterations, and the radius of the circle

In [None]:
from threading import Lock
from psij import JobState
import math

N = 100
R = 1000

Then, we'll define a class that keeps track of our points and calculates π once we have all the points in, and we'll create an instance of it to hold actual results.

In [None]:
class Results:
    def __init__(self):
        self.n = 0
        self.inside = 0
        self._lock = Lock()
        
    def point_received(self, x, y):
        with self._lock:
            self.n += 1
            if math.sqrt(x * x + y * y) < R:
                self.inside += 1
            if self.n == N:
                print("π is %s" % (float(self.inside) / self.n * 4))

results = Results()

Then, we'll define a callback function that gets invoked every time a job changes status, and have it read the output and pass it to the `results` instance. The output will be in the form `x y`

In [None]:
def callback(job, status):
    if status.state == JobState.COMPLETED:
        with open(job.spec.stdout_path) as f:
            line = f.read().strip()
            tokens = line.split()
            results.point_received(int(tokens[0]), int(tokens[1]))

Unlike in previous cases, we now need to check the state of the job. That is because the full lifecycle of the job includes states such as `QUEUED` and `ACTIVE`, and the callback is invoked on all state changes.

Finally, we can create and submit our jobs

In [None]:
for i in range(N):
    job = Job(JobSpec('echo', '/bin/bash', 
                      ['-c', 'echo $((RANDOM%{})) $((RANDOM%{}))'.format(R, R)], 
                      stdout_path=Path('pi-x-y-%s.txt' % i)))
    job.set_job_status_callback(callback)
    executor.submit(job)

Sure!
Notice that the main thread is free as soon as the last job is submitted.

That's about it for this tutorial. Oh, the long running job should be done now.

In [None]:
print(long_job)

If not, we can stop it

In [None]:
long_job.cancel()

OK, now we're really done. So it's clean up time. And you know what they say, if all you have is a hammer...

In [None]:
os.chdir('/tutorials')
cleanup_job = Job(
    JobSpec(
        executable='/bin/rm',
        arguments=['-rf', workdir],
        directory=Path('/tutorials/userdirs') # the job's working directory will be /tutorials/userdirs
    )
)
executor.submit(cleanup_job)

Thank you!