In [1]:
#!/usr/bin/env python3
# inagler 12/09/23

import numpy as np
import xarray as xr
import glob
import pop_tools
import dask.distributed

path = '/Data/gfi/share/ModData/CESM2_LENS2/ocean/monthly/vvel/'
files = glob.glob(path + '*.nc')

grid_name = 'POP_gx1v7'

region_defs = {
    'NorthAtlantic':[{'match': {'REGION_MASK': [6]}, 'bounds': {'TLAT': [-20.0, 66.0]}}],
    'LabradorSea': [{'match': {'REGION_MASK': [8]}, 'bounds': {'TLAT': [45.0, 66.0]}}],
    'MediterraneanSea': [{'match': {'REGION_MASK': [7]}}]
}
mask3d = pop_tools.region_mask_3d(grid_name, region_defs=region_defs, mask_name='North Atlantic')
mask3d = mask3d.sum('region')

len_time = 3012
dept_time_series_maxi = np.zeros((len_time, len(files)))
dept_time_series_rapi = np.zeros((len_time, len(files)))
dept_time_series_spgy = np.zeros((len_time, len(files)))

print('initialization complete')

OMP: Info #276: omp_set_nested routine deprecated, please use omp_set_max_active_levels instead.


initialization complete


In [2]:
def depth_MOC(ds):
    overturning_depth = (ds.VVEL * ds.dz * ds.DXU).sum(dim='nlon').cumsum(dim='z_t') * 1e-6
    maxi = overturning_depth.isel(z_t=slice(27, 51)).max(dim=['nlat','z_t']).values
    rapi = overturning_depth.isel(nlat=274, z_t=slice(27, 51)).max(dim='z_t').values
    spgy = overturning_depth.isel(nlat=345, z_t=slice(27, 51)).max(dim='z_t').values
    return maxi, rapi, spgy

@dask.delayed
def process_file(file):
    ds = xr.open_dataset(file).where(mask3d == 1)
    
    # Update units to SI units
    ds['VVEL'] *= 1e-2
    ds['dz'] *= 1e-2
    ds['z_t'] *= 1e-2
    ds['z_w_top'] *= 1e-2
    ds['z_w_bot'] *= 1e-2
    ds['DXU'] *= 1e-2
    ds['VVEL'].attrs['units'] = 'm/s'
    ds['dz'].attrs['units'] = 'm'
    ds['z_t'].attrs['units'] = 'm'
    ds['z_w_top'].attrs['units'] = 'm'
    ds['z_w_bot'].attrs['units'] = 'm'
    ds['DXU'].attrs['units'] = 'm'
    
    return depth_MOC(ds)

These lines below are responsible for parallelizing the processing of multiple files using Dask. Let's break down what each line does:

with `dask.distributed.Client()` as `client`:: This line creates a Dask client, which is used to manage distributed computing. When you create a Dask client, it connects to a Dask cluster (which can be local or distributed) and provides an interface for parallel and distributed computing. The with statement ensures that the client is properly closed when the block is exited.

`results = dask.compute(*[process_file(file) for filein files], scheduler='threads'`: In this line, a list comprehension is used to create a list of delayed objects by calling the process_file(file) function for each file in the files list. Each `process_file(file) call is wrapped in a Dask delayed object, which represents a computation that can be executed lazily and in parallel.
- `dask.compute` is then used to compute the results of all the delayed objects in parallel. The scheduler='threads' argument specifies that Dask should use a multi-threaded scheduler for parallel execution. This means that Dask will use multiple CPU threads to execute the delayed computations concurrently, which can speed up the processing of multiple 
- `he `*` operator is used to unpack the list of delayed objects and pass them as separate arguments to dask.compute.
The results of the computations are stored in the results variable, which will be  tuple containing the results f - each delayed computation.

In [4]:
# Use concurrent.futures to parallelize file processing
with dask.distributed.Client() as client:
    results = dask.compute(*[process_file(file) for file in files], scheduler='threads')

Perhaps you already have a cluster running?
Hosting the HTTP server on port 35807 instead
2023-09-15 14:57:27,190 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/home/innag3580/.conda/envs/cartopy/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/innag3580/.conda/envs/cartopy/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/innag3580/.conda/envs/cartopy/lib/python3.10/site-packages/distributed/comm/core.py", line 328, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
  Fil

TypeError: Values of an IndexVariable are immutable and can not be modified inplace

In [None]:
# Unpack results
for i, result in enumerate(results):
    dept_time_series_maxi[:, i], dept_time_series_rapi[:, i], dept_time_series_spgy[:, i] = result
    print('file', i+1, '/', len(files), 'executed')

print('computation finished')

In [None]:
# Save time series to a single file
np.save("timeseries/maxi_dens_time_series.npy", dept_time_series_maxi)
np.save("timeseries/rapi_dens_time_series.npy", dept_time_series_rapi)
np.save("timeseries/spgy_dens_time_series.npy", dept_time_series_spgy)

print('saving successful')