### Example for how to use joblib Parallel to speed up calculations by running them on multiple processors

In [1]:
%matplotlib inline

In [13]:
import numpy as np
import netCDF4 as nc
import time
from datetime import datetime, timedelta
import functools
import sys
import dask
import dask.distributed
import matplotlib.pyplot as plt
import netCDF4
import numpy
sys.path.append("/ocean/brogalla/GEOTRACES/analysis-brogalla/modules")
import evaltools

# Library for running on multiple cores:
from joblib import Parallel

In [18]:
def function_timer(func):
    @functools.wraps(func)
    def wrapper_function_timer(*args, **kwargs):
        t_start = time.time()
        return_value = func(*args, **kwargs)
        t_end = time.time()
        print(f"{t_end - t_start}s")
        return return_value

    return wrapper_function_timer

### Elise's calculation:

In [28]:
j0, j1, i0, i1 = 230, 470, 0, 200

In [29]:
with netCDF4.Dataset("/ocean/eolson/MEOPAR/NEMO-forcing/grid/mesh_mask201702_noLPE.nc") as mesh:
    tmask=numpy.copy(mesh.variables['tmask'][:,:,j0:j1,i0:i1])
    e3t0=numpy.copy(mesh.variables['e3t_0'][:,:,j0:j1,i0:i1])

In [32]:
flistmuZ = evaltools.index_model_files(
    datetime(2015, 6, 1), datetime(2015, 6, 9),
    "/data/eolson/results/MEOPAR/SS36runs/CedarRuns/testmuZ/",
    "long", 10, "ptrc_T", 1
)

first file starts on  2015-05-31 00:00:00


In [20]:
diatoms_muZ_Int, uZ_muZ_Int = calc_depth_avgs(flistmuZ, tmask, e3t0)

167.27834725379944s


My way of calculating it:

In [None]:
def calc_depth_avgs(flist, tmask, e3t0):
    sum1 = numpy.empty((len(flist)*24*10, j1-j0, i1-i0))
    sum1uZ = numpy.empty((len(flist)*24*10, j1-j0, i1-i0))
    for ind, row in flist.iterrows():
        ds = netCDF4.Dataset(row['paths'])
        diatoms = ds.variables['diatoms'][:,:,j0:j1,i0:i1]
        uZ = ds.variables['microzooplankton'][:,:,j0:j1,i0:i1]
        sum1[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*diatoms, 1)
        sum1uZ[(24*10*ind):(24*10*(ind+1)),:,:] = numpy.sum(tmask*e3t0*uZ, 1)
    diatom_int = numpy.mean(sum1, 0)
    microzoo_int = numpy.mean(sum1uZ, 0)
    return diatom_int, microzoo_int

In [None]:
# Define joblib solver such that it passes a file to the main calculation and returns what you want
def joblib_solver(main_calc, fileU):
    calc = main_calc(fileU)
    return calc

# Add items to the list of jobs that need to be calculated
# In this case, I want to read in a bunch of files, so each job reads in a file and performs a 
# calculation on it.

joblist=[]
for fileU in gridU_files:
    positional_args=[main_calc, fileU]
    keyword_args={}
    joblist.append((joblib_solver,positional_args,keyword_args))

In [None]:
# Indicate the number of cores that you want to use (6 in this case)
# and then perform the calculations. 
ncores=8
with Parallel(n_jobs=ncores,backend='threading') as parallel:
    results = parallel(joblist)

The main calculation for this example:

In [3]:
def main_calc(filenameU):
    # Load file
    folder  = '/data/brogalla/ANHA12/'
    file_u  = nc.Dataset(folder + filenameU)
    u_vel   = np.array(file_u.variables['vozocrtx'])

    # Whatever larger computation I want to do (I usually end up calling another function)
    calc = np.multiply(u_vel, u_vel)

    return calc

In [13]:
# Files I want to loop over
gridU_files=['ANHA12-EXH006_y2015m01d05_gridU.nc', 'ANHA12-EXH006_y2015m01d10_gridU.nc', \
            'ANHA12-EXH006_y2015m01d15_gridU.nc', 'ANHA12-EXH006_y2015m01d20_gridU.nc', \
            'ANHA12-EXH006_y2015m01d25_gridU.nc', 'ANHA12-EXH006_y2015m01d30_gridU.nc', \
            'ANHA12-EXH006_y2015m02d04_gridU.nc', 'ANHA12-EXH006_y2015m02d09_gridU.nc']

Time the calculation without threading:

In [14]:
start = time.time()

for fileU in gridU_files:
    calc = main_calc(fileU)
    
end = time.time()
print('Calculation took: ', end - start)

Calculation took:  42.54135608673096


In [15]:
# Define joblib solver such that it passes a file to the main calculation and returns what you want
def joblib_solver(main_calc, fileU):
    calc = main_calc(fileU)
    return calc

# Add items to the list of jobs that need to be calculated
# In this case, I want to read in a bunch of files, so each job reads in a file and performs a 
# calculation on it.

joblist=[]
for fileU in gridU_files:
    positional_args=[main_calc, fileU]
    keyword_args={}
    joblist.append((joblib_solver,positional_args,keyword_args))

In [16]:
start = time.time()

# Indicate the number of cores that you want to use (6 in this case)
# and then perform the calculations. 
ncores=8
with Parallel(n_jobs=ncores,backend='threading') as parallel:
    results = parallel(joblist)

    
end = time.time()
print('Calculation took: ', end - start)

Calculation took:  29.805630207061768


If your calculation function returns multiple variables, it is easiest to zip the results at the end:

In [40]:
results_zip = zip(*results)