In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%matplotlib inline
from matplotlib import pyplot as plt

In [3]:
import sys
import os
from subprocess import call 
import pathlib

In [4]:
import numpy as np
import pandas as pd
import xarray as xr

In [5]:
from dask.distributed import Client
from dask.diagnostics import ProgressBar

### parameters for papermill

In [6]:
region = 'NZ'
ndays = 30
roll = 15
climatology = [1991, 2020]
quantiles = [0.1, 0.25, 0.5, 0.75, 0.9]

### create cluster

In [7]:
client = Client(
    n_workers=6, threads_per_worker=4, memory_limit="8GB", local_directory="./dask"
)

In [8]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 24,Total memory: 44.70 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:39835,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 44.70 GiB

0,1
Comm: tcp://127.0.0.1:45129,Total threads: 4
Dashboard: http://127.0.0.1:38855/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:34859,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-ffxnx4lr,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-ffxnx4lr

0,1
Comm: tcp://127.0.0.1:46671,Total threads: 4
Dashboard: http://127.0.0.1:40013/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:46683,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-w1g41cll,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-w1g41cll

0,1
Comm: tcp://127.0.0.1:46015,Total threads: 4
Dashboard: http://127.0.0.1:43621/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:38779,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-cpkb2kwf,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-cpkb2kwf

0,1
Comm: tcp://127.0.0.1:38551,Total threads: 4
Dashboard: http://127.0.0.1:39353/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:36317,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-k2f69fga,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-k2f69fga

0,1
Comm: tcp://127.0.0.1:44949,Total threads: 4
Dashboard: http://127.0.0.1:34381/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:36973,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-m871fa7v,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-m871fa7v

0,1
Comm: tcp://127.0.0.1:40407,Total threads: 4
Dashboard: http://127.0.0.1:33375/status,Memory: 7.45 GiB
Nanny: tcp://127.0.0.1:45651,
Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-w8rj887c,Local directory: /home/nicolasf/operational/OISST_indices/notebooks/dask/dask-worker-space/worker-w8rj887c


In [9]:
sys.path.append('../code/')

In [10]:
import src

In [11]:
opath = pathlib.Path(f'/media/nicolasf/END19101/data/OISST/daily/{region}')

In [12]:
lfiles = list(opath.glob("sst.day.mean.????.v2.nc"))

In [13]:
lfiles.sort()

In [14]:
lfiles[0]

PosixPath('/media/nicolasf/END19101/data/OISST/daily/NZ/sst.day.mean.1981.v2.nc')

In [15]:
lfiles[-1]

PosixPath('/media/nicolasf/END19101/data/OISST/daily/NZ/sst.day.mean.2022.v2.nc')

In [16]:
dset = xr.open_mfdataset(lfiles, parallel=True, combine_attrs="drop_conflicts", compat="override", coords=['time'])

In [17]:
dset

Unnamed: 0,Array,Chunk
Bytes,328.54 MiB,8.04 MiB
Shape,"(14952, 80, 72)","(366, 80, 72)"
Count,126 Tasks,42 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 328.54 MiB 8.04 MiB Shape (14952, 80, 72) (366, 80, 72) Count 126 Tasks 42 Chunks Type float32 numpy.ndarray",72  80  14952,

Unnamed: 0,Array,Chunk
Bytes,328.54 MiB,8.04 MiB
Shape,"(14952, 80, 72)","(366, 80, 72)"
Count,126 Tasks,42 Chunks
Type,float32,numpy.ndarray


### need to start a first of January 

In [18]:
# dset = dset.sel(time=slice('1982-01-01',None))

### if regions is 'Ninos' or 'IOD', then first calculates the regional averages

In [19]:
if region == 'Ninos':
    
    dset['sst'] = src.calculates_ninos(dset['sst'], nino='all')
    
elif region == 'IOD': 
    
    dset['sst']= src.calculates_IOD_nodes(dset['sst'], IOD_node='all')

In [20]:
dset

Unnamed: 0,Array,Chunk
Bytes,328.54 MiB,8.04 MiB
Shape,"(14952, 80, 72)","(366, 80, 72)"
Count,126 Tasks,42 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 328.54 MiB 8.04 MiB Shape (14952, 80, 72) (366, 80, 72) Count 126 Tasks 42 Chunks Type float32 numpy.ndarray",72  80  14952,

Unnamed: 0,Array,Chunk
Bytes,328.54 MiB,8.04 MiB
Shape,"(14952, 80, 72)","(366, 80, 72)"
Count,126 Tasks,42 Chunks
Type,float32,numpy.ndarray


### remove the 29th of Feb

In [21]:
standard_calendar = dset.time

In [22]:
dset = dset.convert_calendar('noleap')

### calculates the rolling averages if needed 

In [23]:
if ndays > 1: 
    
    dset['sst'] = dset['sst'].rolling({'time':ndays}, center=False, min_periods=ndays).mean('time')
    
    dset = dset.isel(time=slice(ndays+1,None))

In [24]:
dset

Unnamed: 0,Array,Chunk
Bytes,655.27 MiB,16.04 MiB
Shape,"(14911, 80, 72)","(365, 80, 72)"
Count,1431 Tasks,42 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 655.27 MiB 16.04 MiB Shape (14911, 80, 72) (365, 80, 72) Count 1431 Tasks 42 Chunks Type float64 numpy.ndarray",72  80  14911,

Unnamed: 0,Array,Chunk
Bytes,655.27 MiB,16.04 MiB
Shape,"(14911, 80, 72)","(365, 80, 72)"
Count,1431 Tasks,42 Chunks
Type,float64,numpy.ndarray


### now expands the dataset along a dummy dimension 

In [25]:
dset_roll = dset[['sst']].copy()

In [26]:
dset_roll = dset_roll.rolling({'time':roll}, center=True, min_periods=roll).construct(window_dim='roll')

### selects the climatological period 

In [27]:
clim = dset_roll.sel(time=slice(str(climatology[0]), str(climatology[1])))

In [28]:
clim

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,240.60 MiB
Shape,"(10950, 80, 72, 15)","(365, 80, 72, 15)"
Count,1766 Tasks,31 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.05 GiB 240.60 MiB Shape (10950, 80, 72, 15) (365, 80, 72, 15) Count 1766 Tasks 31 Chunks Type float64 numpy.ndarray",10950  1  15  72  80,

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,240.60 MiB
Shape,"(10950, 80, 72, 15)","(365, 80, 72, 15)"
Count,1766 Tasks,31 Chunks
Type,float64,numpy.ndarray


### transpose, then rechunks 

In [29]:
if region == 'Ninos': 
    
    clim = clim.transpose(*['time','roll','nino'])
    clim = clim.chunk({'time':-1, 'roll':-1, 'nino':len(clim['nino'])})
    
elif region == 'IOD': 

    clim = clim.transpose(*['time','roll','IOD'])
    clim = clim.chunk({'time':-1, 'roll':-1, 'IOD':len(clim['IOD'])})
    
else: 
    
    clim = clim.transpose(*['time','roll','lat','lon'])
    clim = clim.chunk({'time':-1, 'roll':-1, 'lat':2, 'lon':20})

In [30]:
clim

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,50.13 MiB
Shape,"(10950, 15, 80, 72)","(10950, 15, 2, 20)"
Count,3001 Tasks,160 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.05 GiB 50.13 MiB Shape (10950, 15, 80, 72) (10950, 15, 2, 20) Count 3001 Tasks 160 Chunks Type float64 numpy.ndarray",10950  1  72  80  15,

Unnamed: 0,Array,Chunk
Bytes,7.05 GiB,50.13 MiB
Shape,"(10950, 15, 80, 72)","(10950, 15, 2, 20)"
Count,3001 Tasks,160 Chunks
Type,float64,numpy.ndarray


In [31]:
clim_q = clim.groupby(clim.time.dt.dayofyear).quantile(quantiles, dim=['time','roll'])

In [32]:
clim_ave = clim.groupby(clim.time.dt.dayofyear).mean(dim=['time','roll'])

In [33]:
clim_std = clim.groupby(clim.time.dt.dayofyear).std(dim=['time','roll'])

In [34]:
with ProgressBar(): 
    clim_q = clim_q.compute()
    clim_ave = clim_ave.compute()
    clim_std = clim_std.compute()

In [35]:
clim_q = clim_q.rename({'sst':'quantiles'})

In [36]:
clim_ave

In [37]:
clim_q['average'] = clim_ave['sst']

In [38]:
clim_q

In [39]:
clim_q['std'] = clim_std['sst']

In [40]:
opath = pathlib.Path(f'../outputs/{region}')

In [41]:
opath.mkdir(exist_ok=True)

### saves to ZARR

In [42]:
clim_q.to_zarr(opath.joinpath(f'{region}_OISST_{ndays}days_climatology_{roll}_window.zarr'))

<xarray.backends.zarr.ZarrStore at 0x7f074f44c9e0>