In [2]:
# basics
import os, glob, sys, path, datetime, pathlib, json
import numpy as np
import xarray as xr
xr.set_options(keep_attrs=True)
import pandas as pd
import scipy

# Dask
from dask_jobqueue import SLURMCluster
import multiprocessing
import dask
from dask.distributed import Client, progress
dask.config.config.get('distributed').get('dashboard').update({'link':'{JUPYTERHUB_SERVICE_PREFIX}/proxy/{port}/status'})

# Warnings
import warnings
warnings.filterwarnings(action='ignore')

# To save
import numcodecs

In [3]:
import Tools

In [40]:
# Get dataset - zonal band between lat_interest
def get_dataset(experiment, variable, lat_interest, exp_list_check, icondates=True, factor=None, new_units=None):
    native_grid, chunk_exp = EXPERIMENTS_DIC[experiment][2:4]
    native_grid = native_grid == 'native'
    dataset, grid = Tools.get_data(exp_list_check, native_grid=native_grid, path_grid=None,
                                   chunk=chunk_exp, icondates=icondates, open_mfdataset=True)
    if native_grid:
        grid = xr.open_dataset(f'/work/mh0287/m300901/prepost_experiments/grids_cut/{experiment}_grid.zarr', chunks='auto')
        dataset = Tools.get_info_var_cut(dataset, grid, variable, lat_interest, type_file='2d')
        dataset = dataset[variable]
    else:
        dataset = dataset[variable]
        dataset = dataset.sel(lat=slice(-lat_interest, lat_interest))
    if new_units is not None:   
        dataset.attrs['units'] = new_units
    if factor is not None:
        dataset *= factor
    return dataset

In [49]:
# Get the distribution of the desired variable
def analysis_distribution(ds_exp, flag_pre_compute=None, bot_val_interest=None, frac_interest=10, split_dates=False):
    dataset = xr.Dataset()
    for analysis_time in ['3h', '6h', '12h', '1D']: # No '3h' for aquaplanet_06
        print(f'Start of distribution analysis for {analysis_time} - flag {flag_pre_compute}', flush=True)
        ds_exp_sampling = ds_exp.resample(time=analysis_time).mean('time')
        if flag_pre_compute is not None:
            ds_exp_sampling = ds_exp_sampling.compute()
        top_val = np.ceil(ds_exp_sampling.max().values / 1e1) * 1e1
        bot_val = np.floor(ds_exp_sampling.min().values / 1e1) * 1e1 if bot_val_interest is None else bot_val_interest
        array_interest = np.arange(bot_val, top_val + frac_interest, frac_interest)
        # binning
        print(f'Binning Opeartion: BinRange [{array_interest[0]} - {array_interest[-1]}]', flush=True)
        if flag_pre_compute is None:
            counts_per_bin = ds_exp_sampling.groupby_bins(ds_exp_sampling, array_interest,
                                                          labels=(array_interest[1:] + array_interest[:-1]) / 2).count().compute().values
            counts_per_bin = np.nan_to_num(counts_per_bin)
        else:
            counts_per_bin, array_interest = np.histogram(ds_exp_sampling, bins = len(array_interest) - 1, range=[0, top_val])
        if bot_val_interest is not None:
            print('Adding smaller lower bottom value counts into lower bottom count', flush=True)
            negative_counts = np.prod(ds_exp_sampling.values.shape) - counts_per_bin.sum()
            counts_per_bin[0] += negative_counts
        bin_mids = xr.DataArray((array_interest[1:] + array_interest[:-1]) / 2, dims=(f'{analysis_time}_bins',))
        if split_dates:
            counts = xr.DataArray(counts_per_bin, dims=(f'{analysis_time}_bins',))
        else:
            counts = xr.DataArray(counts_per_bin / counts_per_bin.sum(), dims=(f'{analysis_time}_bins',))
        dataset_analysis = xr.Dataset({
            f'{analysis_time}_bins': bin_mids,
            f'{analysis_time}_pr': counts,
        })
        dataset = xr.merge([dataset, dataset_analysis])
    return dataset

### Gobal Variables

In [32]:
EXPERIMENTS_DIC = {
    'aquaplanet_00': ['atm_2d_ml', '198411', 'lonlat', {'time': 48, 'lon': 360, 'lat':180}, 160, 80, 88],
    'aquaplanet_01': ['atm_2d_ml', '198411', 'lonlat', {'time': 48, 'lon': 450, 'lat':225}, 80, 80, 88],
    'aquaplanet_02': ['atm_2d_ml', '198112', 'lonlat', {'time': 12, 'lon': 720, 'lat':360}, 40, 80, 88],
    'aquaplanet_02_re': ['atm_2d_ml', '198112', 'lonlat', {'time': 12, 'lon': 720, 'lat':360}, 40, 86, 94],
    'aquaplanet_03': ['atm_2d_ml', '197912', 'lonlat', {'time': 12, 'lon': 1200, 'lat':600}, 20, 80, 88],
    'aquaplanet_04': ['atm_2d_ml', '197910', 'lonlat', {'time': 6, 'lon': 1200, 'lat':600}, 10, 80, 88],
    'aquaplanet_05': ['dayavg_atm_2d_ml', '197906', 'native', {'time': 1, 'ncells': 5 * 4 ** 9}, 5, 80, 88], #daily data
    'aquaplanet_06': ['dayavg_atm_2d_ml', '197903', 'native', {'time': 1, 'ncells': 5 * 4 ** 9}, 2.5, 86, 94], #daily data
    'aquaplanet_07': ['dayavg_atm_2d_ml', '1979', 'native', {'time': 1, 'ncells': 5 * 4 ** 9}, 1, 82, 90], #daily data
}

### Temporary Directory

In [5]:
local_directory = '/scratch/m/m300901/tempfiles/46320'

In [6]:
!rm -rf /scratch/m/m300901/tempfiles/46320
!mkdir /scratch/m/m300901/tempfiles/46320

# Test of SlurmCluster with dask

Reference: https://jobqueue.dask.org/en/latest/configuration.html
- "cores" is the number of CPUs used per Slurm job. For example, One Levante node has 256 CPUs
- "processes" specifies the number of dask workers in a single Slurm job.
- "memory" specifies the memory requested in a single Slurm job. For example, Levante node has [256, 512, 1024] Gbs
- "project" specifies the project to charge the hours consumed
- "local_directory" location to put temporary data if necessary
- "walltime" specifies the reservation time
- "job_extra" some extra information for slurm dispatcher, in this case error and log files location

Thus if you want a single slurm job to run in just one node in levante, specify cores 256, memory 256Gbs and processes according to the workers of your dask cluster

In [9]:
cluster = SLURMCluster(queue='compute', cores=256, processes=16, memory='256 GB',
                       project='bk1040', walltime='01:00:00',local_directory=local_directory,
                      job_extra=[f'-o {local_directory}/LOG_dask_%j.o',
                                 f'-e {local_directory}/LOG_dask_%j.o'])

In [10]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p compute
#SBATCH -A bk1040
#SBATCH -n 1
#SBATCH --cpus-per-task=256
#SBATCH --mem=239G
#SBATCH -t 01:00:00
#SBATCH -o /scratch/m/m300901/tempfiles/46320/LOG_dask_%j.o
#SBATCH -e /scratch/m/m300901/tempfiles/46320/LOG_dask_%j.o

/sw/spack-levante/mambaforge-4.11.0-0-Linux-x86_64-sobz6z/bin/python -m distributed.cli.dask_worker tcp://136.172.121.47:39979 --nthreads 16 --nprocs 16 --memory-limit 14.90GiB --name dummy-name --nanny --death-timeout 60 --local-directory /scratch/m/m300901/tempfiles/46320 --protocol tcp://



The distributed part of Dask balance the load depending on how many nodes you have reserved such as `cluster.scale(8)`. This reservation can change or scaled up/down but each job submitted to one node, which may be problematic depending on the resources of your group. 

To reserve 2 node you just have to scale the cluster and connect Dask to it

In [20]:
cluster.scale(4)

In [21]:
client = Client(cluster) 

In [22]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /user/m300901/levante-spawner-advanced//proxy/40233/status,

0,1
Dashboard: /user/m300901/levante-spawner-advanced//proxy/40233/status,Workers: 16
Total threads: 256,Total memory: 238.40 GiB

0,1
Comm: tcp://136.172.121.47:39979,Workers: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/40233/status,Total threads: 256
Started: 1 minute ago,Total memory: 238.40 GiB

0,1
Comm: tcp://136.172.118.93:39733,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/38637/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:37037,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-pgdc8ha4,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-pgdc8ha4

0,1
Comm: tcp://136.172.118.93:44021,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/40875/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:42891,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-_rj6ivs5,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-_rj6ivs5

0,1
Comm: tcp://136.172.118.93:40573,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/44701/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:43037,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-_tltqqes,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-_tltqqes

0,1
Comm: tcp://136.172.118.93:34913,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/38333/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:40525,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-343apsq7,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-343apsq7

0,1
Comm: tcp://136.172.118.93:39643,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/34929/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:38301,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-m68vp76e,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-m68vp76e

0,1
Comm: tcp://136.172.118.93:37009,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/45475/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:38215,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-gwn1_pam,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-gwn1_pam

0,1
Comm: tcp://136.172.118.93:38635,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/39079/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:45637,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-el2r79h1,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-el2r79h1

0,1
Comm: tcp://136.172.118.93:45651,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/39941/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:44607,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-ci74c9s1,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-ci74c9s1

0,1
Comm: tcp://136.172.118.93:34753,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/36033/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:37395,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-306yekxf,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-306yekxf

0,1
Comm: tcp://136.172.118.93:33357,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/41355/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:41953,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-8cfrljyg,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-8cfrljyg

0,1
Comm: tcp://136.172.118.93:35913,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/35917/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:45727,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-6zsmdp50,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-6zsmdp50

0,1
Comm: tcp://136.172.118.93:44537,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/36467/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:34627,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-sskuilfl,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-sskuilfl

0,1
Comm: tcp://136.172.118.93:34201,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/34967/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:46201,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-vl4zwxh_,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-vl4zwxh_

0,1
Comm: tcp://136.172.118.93:45233,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/38581/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:36849,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-mpe10hmh,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-mpe10hmh

0,1
Comm: tcp://136.172.118.93:40637,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/33777/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:42931,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-vir4sh60,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-vir4sh60

0,1
Comm: tcp://136.172.118.93:45677,Total threads: 16
Dashboard: /user/m300901/levante-spawner-advanced//proxy/44627/status,Memory: 14.90 GiB
Nanny: tcp://136.172.118.93:34029,
Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-20eefyjm,Local directory: /scratch/m/m300901/tempfiles/46320/dask-worker-space/worker-20eefyjm


In [58]:
!squeue --format="%.18i %.9P %.30j %.8u %.8T %.10M %.9l %.6D %R" --me

             JOBID PARTITION                           NAME     USER    STATE       TIME TIME_LIMI  NODES NODELIST(REASON)
           2266881   compute                 apb.trialPdf07  m300901  PENDING       0:00     45:00      1 (Priority)
           2267444   compute             spawner-jupyterhub  m300901  RUNNING      54:05   2:00:00      1 l40305
           2267969   compute                    dask-worker  m300901  RUNNING      20:25   1:00:00      1 l30308


# Basic Test

In [43]:
experiment = 'aquaplanet_05'
variable = 'pr'
lat_interest = 20
lon_interest = 180
res_des = 0.1

In [44]:
# Get basic Information
lat_window, lon_window, new_lat, new_lon = Tools.get_basic_info_cut(lat_interest, lon_interest, res_des)
type_file, date_interest, native_grid, chunk_exp, resolution, left_char, right_char = EXPERIMENTS_DIC[experiment]

In [45]:
# Paths
remap_file = pathlib.Path(f'/work/mh0287/m300901/prepost_experiments/lonlat_descriptions/lonlat_N{lat_interest}_S{lat_interest}.txt')
path_input = pathlib.Path(f'/work/mh0287/m300901/experiments/{experiment}/someinfo')

In [46]:
#Get Files - day average for aquaplanet_05, 06 and 07
exp_list = sorted([str(f) for f in (path_input).glob(f'{experiment}_{type_file}_{date_interest}*')]);
date_list = [str(exp[left_char:right_char]) for exp in exp_list]
unique_date = np.unique(date_list)

In [56]:
dataset = get_dataset(experiment, variable, lat_interest, exp_list, icondates=True, factor=None, new_units=None)
dataset = dataset.to_dataset(name=variable)

In [57]:
dataset

Unnamed: 0,Array,Chunk
Bytes,813.44 MiB,4.65 MiB
Shape,"(30, 7107910)","(1, 1219625)"
Count,1204 Tasks,240 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 813.44 MiB 4.65 MiB Shape (30, 7107910) (1, 1219625) Count 1204 Tasks 240 Chunks Type float32 numpy.ndarray",7107910  30,

Unnamed: 0,Array,Chunk
Bytes,813.44 MiB,4.65 MiB
Shape,"(30, 7107910)","(1, 1219625)"
Count,1204 Tasks,240 Chunks
Type,float32,numpy.ndarray


### Distribution of variable

In [59]:
#ds_sampling = client.compute(dataset.resample(time='5D').mean('time'), )
ds_sampling = Tools.run_in_dask(client, dataset, dataset.resample(time='5D').mean('time'))

[########################################] | 100% Completed |  5.6s

In [60]:
ds_sampling