## Parallel Computing with cluster-helper

<img src="https://computing.llnl.gov/tutorials/parallel_comp/images/nodesNetwork.gif">

A node is a like a computer within a much bigger computer!

**Install ipython-cluster-helper**:

* Clone repo from https://github.com/roryk/ipython-cluster-helper to your home directory
* Activate your CML Anaconda environment (e.g. <code>source activate environmentname</code>)
* Navigate to the ipython-cluster-helper directory and type <code>python setup.py install</code>

(This installation can sometimes take a while.)

### Basic cluster helper usage works as follows:

In [1]:
# Basic cluster helper usage.

import cluster_helper.cluster

# We define a function to run as a job on each node.
# This function takes a parameter from an iterable of parameter inputs.
# NOTE!  Every instance of this function starts with an empty variable space,
# as each run on each node is starting in its own Python instance.
# This means that you need to import any needed libraries INSIDE the function,
# and any data needed from the launching Python program needs to either be
# loaded from a file inside the function, or passed in as part of the parameter.
def squared(x):
    return x**2

from pathlib import Path
myhomedir = str(Path.home())
# If you are on a cluster other than Rhino, then parameters such as
# scheduler and queue will likely need adjusting to the values for that cluster.
with cluster_helper.cluster.cluster_view(scheduler="sge", queue="RAM.q", num_jobs=10,
      cores_per_job=1, profile=myhomedir + '/.ipython/') as view:
    
    # 'map' applies a function to each value within an interable.
    res = view.map(squared, range(0, 10))

10 Engines running
Sending a shutdown signal to the controller and engines.


## cluster-helper tips

* cluster-helper will act as if a fresh python notebook were started for each job, so it will not inherit your workspace's variables or import statements. **Give each job everything it needs to complete!**
* Jobs are still subject to memory limitations, so you may need to **break up large processes into smaller chunks.** For example, each job could correspond to analyzing one session, instead of one subject. 
* The cluster-helper memory parameter does not work. Supposedly, this will be fixed eventually. If absolutely necessary, you may specify more cores per job to functionally increase the allotted memory. But mind your total core count!
* It is often useful to save the output of each job in a dedicated directory, and sometimes useful to save intermediate values to aid in debugging or later nonparallel analyses. The Python "os" library can be helpful here. 
* **Be respectful!** There are only so many cores available to the entire Kahana lab and our collaborators across the country. 
* **Limit typical jobs to 100 cores or less**. Heavy usage means fewer resources for other users to use, and due to shared disk resources might actually slow down all jobs overall. Please ask for permission before using more.
* You can always use the '**qdel**' command in Terminal, followed by your job number, to kill any of your old jobs that may be wasting rhino's resources. 
* Use the '**qstat**' command in Terminal to see cluster usage information.
* Each rhino2 node has ~128 GB of memory and ~40 cores. 


### Simpler Cluster Helper usage

See the separate ClusterRun.py file in this repository for example code for more conveniently calling cluster-helper.  It will be imported directly from that file in the following example.  As long as you follow the polite usage etiquette described above, you should feel free to customize this to your own needs.  It is helpful to follow the general principles shown here, such as saving computational results for each job directly to disk and logging exceptions.

In [2]:
from ClusterRun import ClusterRun

def squared(x):
    try:
        import numpy as np
    
        res = x**2
        
        # You can directly return results, but saving them in separate files is
        # better practice for large jobs.
        np.save('squared_result_'+str(x)+'.npy', res)
        
        # We return True for success
        return True
    
    except Exception as e:
        # To diagnose programming errors or data issues, you will probably want
        # to write unhandled exceptions to a log file.
        # Try to handle more exceptions closer to where they happen, so that few
        # get here to the top level.
        
        import traceback
        np.savetxt('squared_error_'+str(x)+'.txt', \
                   [str(x), traceback.format_exc()], \
                   fmt='%s')
        return False
    

parameters = range(0, 10)
ClusterRun(squared, parameters)

10 Engines running
Sending a shutdown signal to the controller and engines.


[True, True, True, True, True, True, True, True, True, True]

In [3]:
# To reload the saved results of the parallel run.
import numpy as np
for x in parameters:
    print(np.load('squared_result_'+str(x)+'.npy', allow_pickle=True))

0
1
4
9
16
25
36
49
64
81


### Handling Errors

A slightly more convenient way to integrate parallel executions with a notebook is to use ClusterChecked, which provides a diagnostic and an exception if there is a problem with one or more inputs.  This will helpfully prevent your notebook from continuing on with execution if output data from your parallel jobs was not successfully created or updated.

In [4]:
from ClusterRun import ClusterChecked

def TryToLoadSubject(sub):
  try:
    from CMLLoad import CMLLoad

    # Remember to update this to where your CMLExamples data is located.
    load = CMLLoad('./CMLExamples')
    df = load.Index()
    df_select = df[(df['subject']==sub)]
    df_sess = df_select.iloc[0]
    eeg_ptsa = load.LoadPTSA(df_sess, 0, 1600)

    return True
  
  except Exception as e:
    # To diagnose programming errors or data issues, you will probably want
    # to write unhandled exceptions to a log file.
    # Try to handle more exceptions closer to where they happen, so that few
    # get here to the top level.

    import traceback
    import numpy as np
    np.savetxt('trytoload_error_'+str(sub)+'.txt', \
               [str(sub), traceback.format_exc()], \
               fmt='%s')
    return False

sub_list = ['R1111M', 'Ricanttypeohno']
ClusterChecked(TryToLoadSubject, sub_list)

2 Engines running
Sending a shutdown signal to the controller and engines.
Error on job parameters:
  Ricanttypeohno


RuntimeError: 1 of 2 jobs failed!

If the input parameter is not sufficient to diagnose the problem, which it usually isn't, then inspecting the log file saved as trytoload_error_\*.txt will reveal where the issue happened.  In this case it is an IndexError from attempting to choose index 0 of a df_select which is empty, because there are no matching frames.

## Solving the challenges of parallel execution

To pass large amounts of data around, np.save and np.load are the most convenient functions to use if all nodes share a file system.  This works for both passing data into a parallel function and for bringing result data out.  For other types of data a convenient Settings class is provided in the ClusterRun file.  The following illustrates a good structure for a typical real-world analysis run in parallel.  Note the role of the Settings class, and the role of the improved logging system which is also capturing session information for sessions having errors.

In [5]:
from ClusterRun import ClusterChecked,Settings
settings = Settings()
settings.exp_list = ['FR1']
settings.logfile = 'recallrate.txt'
settings.Save('recallrate.pkl')

def RecallRate(sub):
  try:
    from ClusterRun import Settings,SetLogger,DFRLabel
    settings = Settings.Load('recallrate.pkl')
    LogErr = SetLogger(logfile=settings.logfile, suffix=sub)
    
    import numpy as np
    from CMLLoad import CMLLoad
    # Remember to update this to where your CMLExamples data is located.
    load = CMLLoad('./CMLExamples')
    df = load.Index()
    df = df[df['subject']==sub]
    
    total_words = 0
    total_recalled = 0
    sess_count = 0
    for df_sess in df.itertuples():
      try:
        if df_sess._asdict()['experiment'] not in settings.exp_list:
          continue
        evs = load.Load(df_sess, 'events')
        word_evs = evs[evs['type']=='WORD']
        total_recalled += sum(word_evs['recalled'])
        total_words += len(word_evs)
        sess_count += 1
      except Exception as e:
        # Log the exception to a subject-labeled filename,
        # along with a label of subject, experiment, and session.
        LogErr(e, DFRLabel(df_sess))
    
    # Save the result.
    np.save('recall_'+sub+'.npy', [total_recalled, total_words, sess_count])
    return True
  except Exception as e:
    LogErr(e)
    return False

from CMLLoad import CMLLoad
# Remember to update this to where your CMLExamples data is located.
load = CMLLoad('./CMLExamples')
df = load.Index()
# Run on all subjects.
sub_list = sorted(set(df['subject']))
ClusterChecked(RecallRate, sub_list)

24 Engines running
Sending a shutdown signal to the controller and engines.
All 24 jobs successful.


In [6]:
import numpy as np
for sub in sub_list:
  total_recalled,total_words,sess_count = np.load('recall_'+sub+'.npy')
  if sess_count==0:
    print(f'{sub} had no sessions from {settings.exp_list}')
  else:
    perc = 100.0*total_recalled / total_words
    print(f'{sub}: {perc:.1f}% recall')

LTP093 had no sessions from ['FR1']
LTP123 had no sessions from ['FR1']
LTP210 had no sessions from ['FR1']
LTP246 had no sessions from ['FR1']
R1060M: 30.4% recall
R1065J: 33.6% recall
R1108J had no sessions from ['FR1']
R1111M: 53.9% recall
R1189M: 37.2% recall
R1236J had no sessions from ['FR1']
R1292E: 29.6% recall
R1332M: 38.0% recall
R1350D: 30.6% recall
R1354E: 29.8% recall
R1361C: 30.2% recall
R1375C: 27.6% recall
R1377M: 47.1% recall
R1378T: 29.2% recall
R1380D: 42.9% recall
R1383J: 29.5% recall
R1385E: 33.8% recall
R1390M: 22.4% recall
R1391T: 30.3% recall
R1401J: 19.9% recall


**Exercise: Write a parallel function that returns the number of (bipolar) electrodes for every subject in the RAM example dataset. Run with 5 jobs and 1 core per job.**