# Calculate monthly means from hourly data

In [1]:
%matplotlib inline
from dask.distributed import Client
import dask
import xarray as xr
import os

# Spawn a `dask` cluster

In [2]:
from dask_jobqueue import SLURMCluster
ncpu=24
cluster = SLURMCluster(cores=ncpu,
        processes=ncpu,
        memory="116.16GB",
        walltime="00:10:00",
        project='ucb164_summit1',
        queue="shas-testing") # Or use shas
cluster.scale(jobs=1)

from dask.distributed import Client
client = Client(cluster)

In [3]:
client

0,1
Client  Scheduler: tcp://10.225.6.110:45456  Dashboard: http://10.225.6.110:8787/status,Cluster  Workers: 24  Cores: 24  Memory: 116.16 GB


# Calculate monthly means with `dask` by formulating as an embarrassingly parallel problem. 

In [4]:
# Define source and target directories for IO NetCDF files
src_directory = "/scratch/summit/erke2265/MERRA2/"
tgt_directory = "/scratch/summit/erke2265/MERRA2_monthly/"

# Clear target directory
!rm /scratch/summit/erke2265/MERRA2_monthly/*


def calc_monthly_mean(src_directory, tgt_directory, filename):
    '''
    Calculates and saves monthly mean of NetCDF file.
    '''
    
    # Define source and target file paths
    src_path = src_directory + filename
    tgt_path = tgt_directory + filename.replace("hourly", "monthly")  
    
    # Calculate and save monthly mean. 
    ds = xr.open_dataset(src_path)
    ds_sorted = ds.sortby('time', ascending=True)
    monthly_mean = ds_sorted.groupby('time.month').mean()
    monthly_mean.to_netcdf(path=tgt_path)

# Perform monthly mean calculations
# Initialize instructions dictionary 
results_interim = {}

# Loop through each hourly NetCDF file and save delayed dask instructions
for filename in os.listdir(src_directory):
    results_interim[filename] = dask.delayed(calc_monthly_mean)(src_directory, tgt_directory, filename)

# Convert instructions dictionary to list and compute in parallel with dask. 
results_scheduled = dask.delayed(list)(results_interim)
results = results_scheduled.compute()

# Shutdown `dask` cluster

In [5]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
