# Introduction 
In this lab you will learn how to submit jobs to the CEES queuing system.  CEES uses a queuing sytem called [PBS](https://en.wikipedia.org/wiki/Portable_Batch_System).  The basic idea of a queuing system is to enable "fair" use of a shared resource.  When you want to run a job on the cluster you create a small file that describes your job. This file contains information like how many cores/nodes your job needs, how to run the job, where to store the stdout and stderr of the job, and whether to email you when the job is completed. You must also specify a queue to submit a job to. The CEES cluster has two overlapping types of queues. There are *named* queues which belong to specific users. In these queues you can run unlimited-length jobs.  All of the nodes that are currently not being used by the named queues are part of the *default* queue. Everyone that has access to the cluster can submit jobs to the default queue. The catch is that default queue jobs are limited to no more than two hours.

When a job is submitted to PBS it recalculates every job's priority. Job priority is based on things like how many jobs you have run in the last three weeks and how big the named queue you are member of is.  When enough resources for the highest priority jobs become available, the job is started.

# Getting started
This lab has to be run on the cluster. At this state you should have an account on the CEES cluster. 

First you will have to add this lab to your git repository (check the Git assignment on how to do this). Then open a terminal on your local computer and log into the `cees-rcf` cluster by typing:

```ssh username@cees-rcf.stanford.edu```

It will prompt you for your Stanford password. 

Jupyter is already installed on the CEES servers, so all you have to do is add its path to your environment variables:

* In your `~/.bashrc` file add this line:
```
export PATH=/opt/anaconda3/bin:$PATH
```
Remember to type `source ~/.bashrc` to load the changes.

In order to run the notebook server remotely, but use a local browser to interact with the notebook, follow these extra steps:

* Create a Jupyter config file:

```jupyter notebook --generate-config```

* Add the following lines to the config file:
```python
c = get_config()
c.NotebookApp.open_browser = False
c.NotebookApp.port = <REMOTE-PORT-NUMBER>
c.NotebookApp.password = ''
```
Replace REMOTE-PORT-NUMBER with the port number of your choice.


* Run the jupyter notebook using screen
```bash
screen -R jupyter-notebook
jupyter notebook
```

* Port forwarding:
On your local machine, use ssh to map a local port to the remote port:

```ssh -N -f -L <LOCAL-PORT-NUMBER>:localhost:<REMOTE-PORT-NUMBER> username@cees-rcf.stanford.edu```

* Access http://localhost:LOCAL-PORT-NUMBER from your local browser

# Running a parallel job

In this lab you will building a Python class that submits and monitors jobs through PBS.  We will begin by creating a generic job class.  For any job we will define four classes:
 - `preJob`: How to prepare a job to be run
 - `checkJobFinishedCorrectly`: Check if a job finished correctly
 - `returnJobCommand`: Return a string containing how to run the job
 - `postJob`: Stuff to do after the job is completed

In [None]:
class myJob:
    """A generalized class for describing a parallel job"""
    def __init__(self,tag):
        """Initialize a job"""
        self.tag = tag
    
    def preJob(self):
        """How to prepare a job to be run"""
        #By default we don't need to do anything
    
    def checkJobFinishedCorrectly(self):
        """A routine to see if a job finished correctly"""
        return True
        
    def returnJobCommand(self):
        """Return a string containing how to run the job, stdout, stderr"""
        
    def postJob(self):
        """Stuff I need to do after a job has run"""
     

We can also create a generic class for running a parallel job. We will initialize the class by creating a dictionary of jobs it needs to run.  We define the function `runJobs`  which will run all the parallel tasks, making sure than no more than `maxJobsRunning` are running simultaneously and delaying the start time between jobs by `sleepTime`.  It will also define how to check that jobs are finnished.  It will also define how to check that jobs are finished. 

The class definition:
  - `startJob`:  How to start a given job
  - `checkJobsRunning`: A function to check to see if a job is running
  - `allJobsFinished`: What to do after all jobs are finished

In [None]:
import time

class runParallelJob:
    """A generalized base class for running a series of jobs in parallel"""
    def __init__(self, tags):
        """Initialization of the base class
        tags is a dictionary where tag->job"""
        self.tags = tags
        self.jobsRunning = {}
        self.failed = {}

    def startJob(self, key, str):
        """How to start a job"""
        #Needs to be overwritten
        raise Exception("Need to override how to startJob")
  
    def checkJobsRunning(self):
        """Check to see what jobs are running """ 
        #Force this to be overwritten
        raise Exception("checkJobsRunning must be overwritten")
    
    def checkJobsFinished(self, finished): 
        """Check to see if the jobs finished correctly"""
        for job in finished:
            if not self.jobsRunning[job].checkJobFinishedCorrectly():
                #check to see if the job failed before
                if self.failed.count(job) == 0:
                    self.failed[job] = 0;
                #update the count of failed job
                self.failed[job] += 1
                #if the job has failed more than twice give up on it
                if self.failed[job] > 2:
                    print ("Giving up on %s"%job)
                #try to run to job again
                else:
                    self.tags[job] = self.jobsRunning[job]
            del self.jobsRunning[job]
      
    def allJobsFinished(self):
        """What to do when all the jobs are finished"""

    def runJobs(self,sleepTime,maxJobsRunning):
        """Run a series of parallel jobs"""
        while len(self.tags) > 0 or len(self.jobsRunning) >0:
            jobsFinished = self.checkJobsRunning()
            self.checkJobsFinished(jobsFinished)
            if len(self.jobsRunning) < maxJobsRunning and len(self.tags) > 0:
                key, job = self.tags.popitem()
                print("Starting job ", key)
                self.jobsRunning[key] = job
                c, o, e = job.returnJobCommand()
                self.startJob(key, c, o, e)
            time.sleep(sleepTime)
        self.allJobsFinished()


As an example, below you will find an example of the parallel job script. In this case all the jobs are run on a single node.

In [None]:
import subprocess

class singleNodeParallel(runParallelJob):
    """Run many jobs simultaneously on a single node"""
    def __init__(self, tags):
        runParallelJob.__init__(self, tags)
        self.processPoll = {}

    def startJob(self, key, command, stdo, stde):
        if stde:
            if stdo:
                efile = open(stde, "w")
                ofile = open(stdo, "w")
                self.processPoll[key] = subprocess.Popen(command, stderr=efile, stdout=ofile, shell=True)
            else:
                efile = open(stde, "w");
                self.processPoll[key] = subprocess.Popen(command, stderr=efile, shell=True)
        else:
            if stdo:
                ofile=open(stdo,"w");
                self.processPoll[key] = subprocess.Popen(command, stdout=ofile, shell=True)
            else:
                self.processPoll[key] = subprocess.Popen(command, shell=True)

    def checkJobsRunning(self):
        finished = []
        for job, p in list(self.processPoll.items()):
            if p.poll() is not None:
                finished.append(job)
                del self.processPoll[job] 
        return finished    

We can use a simple program that estimates $\pi$ to demonstrate our `singleNodeParallel` class. This simple script estimates $\pi$ by randomly choosing points in a 2-D interval and then calculating what fraction are inside a circle. 

In [None]:
%%writefile piEstimate.py
import random
import sys

def calcPi(ntrys):
    inside = 0
    for i in range(ntrys):
        x = random.random()
        y = random.random()
        radius = x * x + y * y
        if radius <= 1:
            inside += 1
    return 4. * inside / ntrys

print(calcPi(int(sys.argv[1])))

We can then create a class inherited from our `myJob` class for running `piEstimate`.

In [None]:
import os 
class piCalc(myJob):
    def __init__(self, nestimate, tag):
        myJob.__init__(self, tag);
        self.nestimate = nestimate
        self.tag = tag
    
    def returnJobCommand(self):
        stdo = "/tmp/%s.%d"%(os.environ["USER"], self.tag)
        command = "python piEstimate.py %d "%(self.nestimate)
        return command, stdo, None

Next we can inherit from our `singleNodeParallel` class to run the job in parallel.

In [None]:
class piParallel(singleNodeParallel):
    def __init__(self, nestimate, njobs):
        jobs = {}
        self.nc = njobs;
        for i in range(njobs):
            jobs[i] = piCalc(nestimate, i)
        singleNodeParallel.__init__(self, jobs)
    
    def allJobsFinished(self):
        tot = []
        for i in range(self.nc):
            f = open("/tmp/%s.%d"%(os.environ["USER"], i))
            lines = f.readlines()
            tot.append(float(lines[0].strip()))
        sum = 0.
        for v in tot:
            sum += v
        print(sum / self.nc)

We can then run our `piParallel` class. In this case we are going to run at maximum two jobs so we don't overwhelm the head node.

In [None]:
job = piParallel(10000000, 2)
job.runJobs(.5, 2)

# Working with the grid engine

As mentioned earlier you submit jobs to the grid engine by writing small scripts. Lets begin by creating a class that can write one of these shell scripts. 

In [None]:
class jobCreator:
    """Class for creating job scripts for PBS"""
    def __init__(self, **kw):
        """Initialize the job creator class and default most parameters"""
        self.initializeDefaultParams()
        self.overrideDefaultParams(kw)
      
    def createJobScript(self, fileout, **kw):
        """We can override any parameters then write out file"""
        self.overrideDefaultParams(kw)
        self.writeFile(fileout)
    
    def initializeDefaultParams(self):
        """Setup some defaults"""
        self.params = {}
        self.params["name"] = os.environ["USER"] + "_" + "awesomeness" #name of the job in the queue
        self.params["nodes"] = 1
        self.params["cores"] = 1
        self.params["queue"] = "default"
        self.params["stdout"] = "/data/cees/%s/stdout"%os.environ["USER"]
        self.params["stderr"] = "/data/cees/%s/stderr"%os.environ["USER"]

    def overrideDefaultParams(self, lst):
        """Override the defaults"""
        for k,v in lst.items():
            self.params[k] = v
          
    def writeFile(self, fileout):
        """Write job file"""
        if "commands" not in self.params:
            raise Exception("Must specify commands in either initialization or createJobScript")
        try: 
            f = open(fileout, "w")
        except:
            raise Exception("Could not open "%fileout)
        f.write("#!/bin/bash\n")
        f.write("#PBS -N %s\n"%self.params["name"])
        f.write("#PBS -q %s\n"%self.params["queue"])
        f.write("#PBS -l nodes=%d:ppn=%d\n"%(int(self.params["nodes"]), int(self.params["cores"])))
        f.write("#PBS -e %s\n"%self.params["stderr"])
        f.write("#PBS -o %s\n"%self.params["stdout"])
        if "mail" in self.params:
            f.write("#PBS -M"%self.params["mail"])
        if type(self.params["commands"]) is list:
            for c in self.params["commands"]:
                f.write("%s\n"%c)
        elif type(self.params["commands"]) is str:
            f.write("cd $PBS_O_WORKDIR\n")
            f.write("%s\n"%self.params["commands"])
        else:
            raise Exception("Commands must be a list or string")

# Assignment

Your job is to inherit from the `runParallelJob` class a `pbsParallelJob` class. You need to write the class so that it can submit and monitor jobs. When you are ready to test your class have each `piEstimate` function test 10000000 numbers. Run a total of 50 jobs, running up to 10 jobs simultaneously.  To help you with your task you will find two functions below that submit a job to the PBS engine and get the status of jobs currently being run. 

In [None]:
import subprocess

def submitJob(script):
    """Submit job return job number"""
    out = subprocess.check_output(["qsub", script])
    return out.decode("utf-8").split(".")[0]

In [None]:
import subprocess
def returnJobStatus(queue):
    """Return dictionaryjob id-> [error, running, funished, queued]"""
    lines=subprocess.check_output(["qstat"]).decode("utf-8").split("\n")
    status={}
    lines.pop(0)
    lines.pop(0)
    for line in lines:
        parts=line.split()
        if len(parts)> 1:
            ids=parts[0].split(".")
            if parts[5] == queue:
                status[ids[0]]="finished"
                if len(ids) > 1:
                    if parts[4] == "Q":
                        status[ids[0]]="queued"
                    elif parts[4]== "R":
                        status[ids[0]]="running"
    return status

# Extra credit (required for SEPers)

What you have developed at this stage will run jobs on a single queue.  If you really want to maximize the amount of work you can get done you will submit jobs to multiple queues (the default plus a named queue).   You want to follow this strategy because the default queue represents more potential resources but your priority is higher on your named queue. Make a new class, `ultimatePbsParallelJob` that runs on multiple queues. Have your script submit enough jobs on your named queue until a job is waiting. Once a job is waiting on the named queue have it submit jobs on the default queue until a job is waiting. Continue to monitor the queues, trying to keep three jobs waiting in both queues until all jobs have completed.