# flox composited_stats_BRAN2020

Date: 22 April, 2024

Author = {"name": "Thomas Moore", "affiliation": "CSIRO", "email": "thomas.moore@csiro.au", "orcid": "0000-0003-3930-1946"}

### BRAN2020 is over 50TB of `float32` data over nearly 9000 `netcdf` file assests in total.

#### required packages

In [4]:
import xarray as xr
import pandas as pd
import numpy as np
import pandas as pd
import datetime
from dask.distributed import Client, LocalCluster
import dask
from flox.xarray import xarray_reduce
import flox
import flox.xarray

#### start a local Dask client

In [5]:
with dask.config.set({"distributed.scheduler.worker-saturation": 1.0,
                      "distributed.nanny.pre-spawn-environ.MALLOC_TRIM_THRESHOLD_": 0}):
    cluster=LocalCluster(n_workers=28,processes=True)
    client = Client(cluster)



In [6]:
print(client)

<Client: 'tcp://127.0.0.1:37087' processes=28 threads=28, memory=251.18 GiB>


In [7]:
var_name = 'mld_chunk4month'

In [8]:
zarr_path = '/scratch/es60/ard/reanalysis/BRAN2020/ARD/'
path_dict = {'mld_chunk4time':'BRAN2020-daily-mld-chunk4time-v04042024.zarr',
                 'mld_chunk4month':'BRAN2020-daily-mld-chunk4month_clim-22042023.zarr',
                 'mld_chunk4space':'BRAN2020-daily-mld-v04042024.zarr'}
results_path = '/g/data/es60/users/thomas_moore/clim_demo_results/daily/draft_delivery/'
results_file = 'BRAN2020_clim_demo_'+var_name+'.nc'

In [9]:
depth_dict = {'eta_t':None,'mld':None,'temp':'st_ocean'}
lon_dict = {'eta_t':'xt_ocean','mld':'xt_ocean','temp':'xt_ocean'}
lat_dict = {'eta_t':'yt_ocean','mld':'yt_ocean','temp':'yt_ocean'}
time_dim = 'Time'



collection_path = zarr_path + path_dict[var_name]
# load BRAN data
ds = xr.open_zarr(collection_path,consolidated=True)

In [10]:
ds

Unnamed: 0,Array,Chunk
Bytes,224.06 GiB,278.09 MiB
Shape,"(11138, 1500, 3600)","(27, 1500, 1800)"
Dask graph,826 chunks in 2 graph layers,826 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 224.06 GiB 278.09 MiB Shape (11138, 1500, 3600) (27, 1500, 1800) Dask graph 826 chunks in 2 graph layers Data type float32 numpy.ndarray",3600  1500  11138,

Unnamed: 0,Array,Chunk
Bytes,224.06 GiB,278.09 MiB
Shape,"(11138, 1500, 3600)","(27, 1500, 1800)"
Dask graph,826 chunks in 2 graph layers,826 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [22]:
test_ds = ds.isel(Time=slice(0,3000))
test_ds

Unnamed: 0,Array,Chunk
Bytes,60.35 GiB,278.09 MiB
Shape,"(3000, 1500, 3600)","(27, 1500, 1800)"
Dask graph,224 chunks in 3 graph layers,224 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 60.35 GiB 278.09 MiB Shape (3000, 1500, 3600) (27, 1500, 1800) Dask graph 224 chunks in 3 graph layers Data type float32 numpy.ndarray",3600  1500  3000,

Unnamed: 0,Array,Chunk
Bytes,60.35 GiB,278.09 MiB
Shape,"(3000, 1500, 3600)","(27, 1500, 1800)"
Dask graph,224 chunks in 3 graph layers,224 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [23]:
def calc_flox_monthclim_stats(ds,var_name,time_dim='time',method='map-reduce'):
    mean = xarray_reduce(ds, time_dim+'.month', func="nanmean",method=method).rename({var_name:'mean_'+var_name})
    std = xarray_reduce(ds, time_dim+'.month', func="nanstd",method=method).rename({var_name:'std_'+var_name})
    max = xarray_reduce(ds, time_dim+'.month', func="argmax",method=method).rename({var_name:'max_'+var_name})
    min = xarray_reduce(ds, time_dim+'.month', func="argmin",method=method).rename({var_name:'min_'+var_name})
    stats_ds = xr.merge([mean,std,max,min])
    return stats_ds

In [24]:
def calc_median_monthclim(ds,var_name,skipna_flag=True,time_dim='time'):
    median_ds = ds.groupby(time_dim+'.month').median(skipna=skipna_flag).rename({var_name:'median_'+var_name})
    return median_ds

In [25]:
def calc_quantile_monthclim(ds,var_name,skipna_flag=True,time_dim='time',q_list=[0.05,0.95]):
    quant = ds.groupby(time_dim+'.month').quantile(q_list,skipna=skipna_flag,dim=time_dim).astype(np.float32)
    quant_ds = xr.merge([quant.isel(quantile=0).reset_coords(drop=True).rename({var_name:'quantile_05_'+var_name}),quant.isel(quantile=1).reset_coords(drop=True).rename({var_name:'quantile_95_'+var_name})])
    return quant_ds

In [26]:
flox_stats_ds = calc_flox_monthclim_stats(test_ds,'mld',time_dim='Time')
flox_stats_ds.nbytes/1e9

1.555240896

In [27]:
median_ds = calc_median_monthclim(test_ds,'mld',time_dim='Time')
median_ds.nbytes/1e9

0.259240896

In [28]:
quant_ds = calc_quantile_monthclim(test_ds,'mld',time_dim='Time')
quant_ds.nbytes/1e9

ValueError: Aggregation nanquantile is only supported for `method='blockwise'`, but the chunking is not right.

In [None]:
%%time
quant_ds = quant_ds.compute()

In [None]:
%%time
median_ds = median_ds.compute()

In [29]:
%%time
flox_stats_ds = flox_stats_ds.compute()

2024-04-22 16:46:57,172 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/g/data/es60/users/thomas_moore/miniconda3/envs/pangeo_streamjoy/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
    frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/g/data/es60/users/thomas_moore/miniconda3/envs/pangeo_streamjoy/lib/python3.10/site-packages/distributed/worker.py", line 1252, in heartbeat
    response = await retry_operation(
  File "/g/data/es60/users/thomas_moore/miniconda3/envs/pangeo_streamjoy/lib/python3.10/site-packages/distributed/utils_comm.py", line 455, in retry_operation
    return await retry(
  File "/g/data/es60/users/thomas_moore/miniconda3/envs/pangeo_streamjoy/lib/python3.10/site-packages/distributed

In [None]:
clim = ds.groupby('Time.month').median(skipna=True)

In [None]:
clim = xarray_reduce(ds, 'Time.month', func="nanmedian",method='cohorts')
clim

In [None]:
%%time
clim = clim.compute()

In [None]:
clim.mld.mean('month').plot()

In [None]:
time_coords = ds['Time.month']  # Replace 'time' with the actual name of your time dimension if different

# Check for duplicates in the time coordinate
duplicates = time_coords.to_index().duplicated()

# Print out the duplicate times
if duplicates.any():
    print("Duplicate times found:", time_coords[duplicates].values)
else:
    print("No duplicate times found.")

# workflow

In [None]:
%%time

    
## make masks for ENSO composites

### load ONI data
ONI_DF = pd.read_csv('/g/data/xv83/users/tm4888/data/ENSO/NCAR_ONI.csv')
ONI_DF.set_index('datetime',inplace=True)
ONI_DF.index = pd.to_datetime(ONI_DF.index)
###
el_nino_threshold = 0.5
la_nina_threshold = -0.5
el_nino_threshold_months = ONI_DF["ONI"].ge(el_nino_threshold)
la_nina_threshold_months = ONI_DF["ONI"].le(la_nina_threshold)
ONI_DF = pd.concat([ONI_DF, el_nino_threshold_months.rename('El Nino threshold')], axis=1)
ONI_DF = pd.concat([ONI_DF, la_nina_threshold_months.rename('La Nina threshold')], axis=1)
ONI_DF = pd.concat([ONI_DF, el_nino_threshold_months.diff().ne(0).cumsum().rename('El Nino event group ID')], axis=1)
ONI_DF = pd.concat([ONI_DF, la_nina_threshold_months.diff().ne(0).cumsum().rename('La Nina event group ID')], axis=1)
#
El_Nino_Series = ONI_DF.groupby('El Nino event group ID')['ONI'].filter(lambda x: len(x) >= 5,dropna=False).where(ONI_DF['El Nino threshold'] == True)
ONI_DF = pd.concat([ONI_DF, El_Nino_Series.rename('El Nino')], axis=1)
La_Nina_Series = ONI_DF.groupby('La Nina event group ID')['ONI'].filter(lambda x: len(x) >= 5,dropna=False).where(ONI_DF['La Nina threshold'] == True)
ONI_DF = pd.concat([ONI_DF, La_Nina_Series.rename('La Nina')], axis=1)
#
ONI_DF_BRANtime = ONI_DF['1993-01':'2023-06']
ONI_DF_BRANtime['El Nino LOGICAL'] = ONI_DF_BRANtime['El Nino'].notnull()
ONI_DF_BRANtime['La Nina LOGICAL'] = ONI_DF_BRANtime['La Nina'].notnull()
# shift back from middle of month
ONI_DF_BRANtime.index += pd.Timedelta(-14, 'd')
# modify end value for upsample
ONI_DF_BRANtime.loc[pd.to_datetime('2023-07-01 00:00:00')] = 'NaN'
#upsample
ONI_DF_BRANtime = ONI_DF_BRANtime.resample('D').ffill()
#drop last dummy date
ONI_DF_BRANtime = ONI_DF_BRANtime[:-1]
#

### run var on what variable
#var_name = 'temp'
var_name = 'mld'
#var_name = 'eta_t'

# Write log file
log_path = '/scratch/es60/ard/reanalysis/BRAN2020/ARD/logs/'
log_file = log_path + f'log_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")}_{var_name}_stats.txt'
with open(log_file, 'a') as f:
    f.write(f'{var_name} start processing . . .\n')

#
zarr_path = '/scratch/es60/ard/reanalysis/BRAN2020/ARD/'
path_dict = {'eta_t':'BRAN2020-daily-eta_t-chunk4time-v14032024.zarr',
                 'mld':'BRAN2020-daily-mld-chunk4month_clim-22042023.zarr',
                 'temp':'BRAN2020-daily-temp-chunk4time-v07022024.zarr'}
results_path = '/g/data/es60/users/thomas_moore/clim_demo_results/daily/draft_delivery/'
results_file = 'BRAN2020_clim_demo_'+var_name+'.nc'

########### vvvvvv for testing
#path_dict = {'eta_t':'error',
#                 'mld':'coarsened_tests/BRAN2020_mld_chunked4time_COARSENED.zarr',
#                 'temp':'error'}
#results_path = '/g/data/es60/users/thomas_moore/clim_demo_results/daily/draft_delivery/COARSENED/'
#results_file = 'BRAN2020_clim_demo_'+var_name+'_COARSENED.nc'
##### ^^^^^ for testing 

depth_dict = {'eta_t':None,'mld':None,'temp':'st_ocean'}
lon_dict = {'eta_t':'xt_ocean','mld':'xt_ocean','temp':'xt_ocean'}
lat_dict = {'eta_t':'yt_ocean','mld':'yt_ocean','temp':'yt_ocean'}
time_dim = 'Time'

collection_path = zarr_path + path_dict[var_name]
# load BRAN data
ds = xr.open_zarr(collection_path,consolidated=True)

#
### ENSO masks
El_Nino_mask = ONI_DF_BRANtime['El Nino LOGICAL']
El_Nino_mask = El_Nino_mask.to_xarray()
El_Nino_mask = El_Nino_mask.rename({'datetime':'Time'})
sync_Time = ds.Time
El_Nino_mask['Time'] = sync_Time
#
La_Nina_mask = ONI_DF_BRANtime['La Nina LOGICAL']
La_Nina_mask = La_Nina_mask.to_xarray()
La_Nina_mask = La_Nina_mask.rename({'datetime':'Time'})
sync_Time = ds.Time
La_Nina_mask['Time'] = sync_Time
#
ONI_DF_BRANtime['Neutral LOGICAL'] = (ONI_DF_BRANtime['El Nino LOGICAL'] == False) & (ONI_DF_BRANtime['La Nina LOGICAL'] == False)
neutral_mask = ONI_DF_BRANtime['La Nina LOGICAL']
neutral_mask = neutral_mask.to_xarray()
neutral_mask = neutral_mask.rename({'datetime':'Time'})
sync_Time = ds.Time
neutral_mask['Time'] = sync_Time

### calculate "all time" stats

clim_ds = xr.merge([ds.groupby(time_dim+'.month').mean(dim=time_dim,skipna=True).rename({var_name:'mean_'+var_name}),
                      ds.groupby(time_dim+'.month').min(dim=time_dim,skipna=True).rename({var_name:'min_'+var_name}),
                      ds.groupby(time_dim+'.month').max(dim=time_dim,skipna=True).rename({var_name:'max_'+var_name}),
                      ds.groupby(time_dim+'.month').std(dim=time_dim,skipna=True).rename({var_name:'std_'+var_name}),
                      ds.groupby(time_dim+'.month').median(dim=time_dim,skipna=True).rename({var_name:'median_'+var_name})
])
quant = ds.groupby(time_dim+'.month').quantile([0.05,0.95],skipna=True,dim=time_dim).astype(np.float32)
quant_ds = xr.merge([quant.isel(quantile=0).reset_coords(drop=True).rename({var_name:'quantile_05_'+var_name}),quant.isel(quantile=1).reset_coords(drop=True).rename({var_name:'quantile_95_'+var_name})])
result_ds = xr.merge([clim_ds,quant_ds])

#### mask out data

El_Nino_ds = ds.where(El_Nino_mask)
La_Nina_ds = ds.where(La_Nina_mask)
neutral_ds = ds.where(neutral_mask)

##### El Nino calc
clim_El_Nino_ds = xr.merge([El_Nino_ds.groupby(time_dim+'.month').mean(dim=time_dim,skipna=True).rename({var_name:'mean_'+'el_nino_'+var_name}),
                      El_Nino_ds.groupby(time_dim+'.month').min(dim=time_dim,skipna=True).rename({var_name:'min_'+'el_nino_'+var_name}),
                      El_Nino_ds.groupby(time_dim+'.month').max(dim=time_dim,skipna=True).rename({var_name:'max_'+'el_nino_'+var_name}),
                      El_Nino_ds.groupby(time_dim+'.month').std(dim=time_dim,skipna=True).rename({var_name:'std_'+'el_nino_'+var_name}),
                      El_Nino_ds.groupby(time_dim+'.month').median(dim=time_dim,skipna=True).rename({var_name:'median_'+'el_nino_'+var_name})
])
quant_El_Nino = El_Nino_ds.groupby(time_dim+'.month').quantile([0.05,0.95],skipna=True,dim=time_dim).astype(np.float32)
quant_El_Nino_ds = xr.merge([quant_El_Nino.isel(quantile=0).reset_coords(drop=True).rename({var_name:'quantile_05_'+'el_nino_'+var_name}),quant_El_Nino.isel(quantile=1).reset_coords(drop=True).rename({var_name:'quantile_95_'+'el_nino_'+var_name})])
result_El_Nino_ds = xr.merge([clim_El_Nino_ds,quant_El_Nino_ds])

#### La Nina calc
clim_La_Nina_ds = xr.merge([La_Nina_ds.groupby(time_dim+'.month').mean(dim=time_dim,skipna=True).rename({var_name:'mean_'+'la_nina_'+var_name}),
                      La_Nina_ds.groupby(time_dim+'.month').min(dim=time_dim,skipna=True).rename({var_name:'min_'+'la_nina_'+var_name}),
                      La_Nina_ds.groupby(time_dim+'.month').max(dim=time_dim,skipna=True).rename({var_name:'max_'+'la_nina_'+var_name}),
                      La_Nina_ds.groupby(time_dim+'.month').std(dim=time_dim,skipna=True).rename({var_name:'std_'+'la_nina_'+var_name}),
                      La_Nina_ds.groupby(time_dim+'.month').median(dim=time_dim,skipna=True).rename({var_name:'median_'+'la_nina_'+var_name})
])
quant_La_Nina = La_Nina_ds.groupby(time_dim+'.month').quantile([0.05,0.95],skipna=True,dim=time_dim).astype(np.float32)
quant_La_Nina_ds = xr.merge([quant_La_Nina.isel(quantile=0).reset_coords(drop=True).rename({var_name:'quantile_05_'+'la_nina_'+var_name}),quant_La_Nina.isel(quantile=1).reset_coords(drop=True).rename({var_name:'quantile_95_'+'la_nina_'+var_name})])
result_La_Nina_ds = xr.merge([clim_La_Nina_ds,quant_La_Nina_ds])
#### neutral calc
clim_neutral_ds = xr.merge([neutral_ds.groupby(time_dim+'.month').mean(dim=time_dim,skipna=True).rename({var_name:'mean_'+'neutral_'+var_name}),
                      neutral_ds.groupby(time_dim+'.month').min(dim=time_dim,skipna=True).rename({var_name:'min_'+'neutral_'+var_name}),
                      neutral_ds.groupby(time_dim+'.month').max(dim=time_dim,skipna=True).rename({var_name:'max_'+'neutral_'+var_name}),
                      neutral_ds.groupby(time_dim+'.month').std(dim=time_dim,skipna=True).rename({var_name:'std_'+'neutral_'+var_name}),
                      neutral_ds.groupby(time_dim+'.month').median(dim=time_dim,skipna=True).rename({var_name:'median_'+'neutral_'+var_name})
])
quant_neutral = neutral_ds.groupby(time_dim+'.month').quantile([0.05,0.95],skipna=True,dim=time_dim).astype(np.float32)
quant_neutral_ds = xr.merge([quant_neutral.isel(quantile=0).reset_coords(drop=True).rename({var_name:'quantile_05_'+'neutral_'+var_name}),quant_neutral.isel(quantile=1).reset_coords(drop=True).rename({var_name:'quantile_95_'+'neutral_'+var_name})])
result_neutral_ds = xr.merge([clim_neutral_ds,quant_neutral_ds])
#
result_ds = xr.merge([result_ds,result_El_Nino_ds,result_La_Nina_ds,result_neutral_ds])

result_ds.to_netcdf(results_path+results_file,engine='netcdf4')

with open(log_file, 'a') as f:
    f.write(f'{var_name} .... finished processing\n DONE!\n')


# $The$ $End$

In [None]:
client.shutdown()