# CAFE60_reduce_write_zarr

**Date:** <br>
16 March 2022 <br>
**Background:** <br>
Issue - https://github.com/Thomas-Moore-Creative/NCI-CAFE-ARD/issues/1 <br>
**Author(s):**<br>
Thomas Moore<br>

## We are using NCI OOD as platform for data processing
### OOD documentation
https://opus.nci.org.au/display/DAE/Setting+up+a+Dask+Cluster+on+OOD

In [1]:
Author1 = {"name": "Thomas Moore", "affiliation": "CSIRO", "email": "thomas.moore@csiro.au", "orcid": "0000-0003-3930-1946"}

In [2]:
import xarray as xr
import numpy as np
import xrft
import xesmf as xe
import scipy
import matplotlib.pyplot as plt
import datetime
import pandas as pd
import matplotlib.dates as mdates
from matplotlib.dates import DateFormatter
import os
import re
import cartopy.crs as ccrs
import proplot as pplt
from rechunker import rechunk
%config Completer.use_jedi = False

## import helper

In [3]:
import importlib.util
spec = importlib.util.spec_from_file_location("helper", "/g/data/v14/tm4888/code/helper-py/helper_tools.py")
helper = importlib.util.module_from_spec(spec)
spec.loader.exec_module(helper)

## OOD cluster

In [4]:
from dask.distributed import Client,Scheduler
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=2,processes=1,memory="47GB",walltime='03:00:00')
client = Client(cluster)
cluster.scale(cores=24)

  from distributed.utils import tmpfile


In [5]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.0.128.155:46319,Workers: 0
Dashboard: /proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


# Load in CAFE60 zarr collection

In [33]:
ds_CAFE60 = xr.open_zarr('/g/data/xv83/dcfp/CAFE60v1/ocean_month.zarr.zip',consolidated=True)

In [34]:
ds_CAFE60.nbytes/1e9

30376.664081592

# 30 TB!

# drop variables not in use

In [35]:
ds_CAFE60 = ds_CAFE60.drop({'age_global','cfc_11','cfc_12','eta_t','neutral_diffusion_salt',
                            'neutral_diffusion_temp','neutral_gm_salt','neutral_gm_temp','salt_sponge_tend',
                            'salt_vdiff_impl','temp_sponge_tend','temp_vdiff_impl','tx_trans_gm','ty_trans_gm',
                            'wt','sw_ocean'})

In [36]:
ds_CAFE60.nbytes/1e9

9125.125649192

# 9 TB!

# Reduce time period to 1980

In [37]:
ds_CAFE60 = ds_CAFE60.isel(time=slice(240,730))

In [38]:
ds_CAFE60.nbytes/1e9

6116.705295264

# 6TB!

# Reduce to ensemble mean!

In [39]:
ds_CAFE60 = ds_CAFE60.mean(dim = 'ensemble', keep_attrs=True, skipna=True)

  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)


In [41]:
ds_CAFE60.nbytes/1e9

63.71569488

# 63 GB $phew!?$


# rechunk to reduce small chunks

In [42]:
ds_CAFE60 = ds_CAFE60.chunk({'time':490})

# Can we write reduced zarr file at this size?

In [44]:
%%time
ds_CAFE60.to_zarr('/g/data/xv83/users/tm4888/data/CAFE/CAFE60_reduced_ocean_month.zarr',consolidated=True)

CPU times: user 33min 8s, sys: 1min 37s, total: 34min 46s
Wall time: 53min 7s


<xarray.backends.zarr.ZarrStore at 0x7f1b0b44fba0>

# Read back in and test

In [45]:
ds_test = xr.open_zarr('/g/data/xv83/users/tm4888/data/CAFE/CAFE60_reduced_ocean_month.zarr',consolidated=True)

In [47]:
ds_test

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,201.87 MiB,201.87 MiB
Shape,"(490, 300, 360)","(490, 300, 360)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 201.87 MiB 201.87 MiB Shape (490, 300, 360) (490, 300, 360) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",360  300  490,

Unnamed: 0,Array,Chunk
Bytes,201.87 MiB,201.87 MiB
Shape,"(490, 300, 360)","(490, 300, 360)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.86 GiB 605.62 MiB Shape (490, 50, 300, 360) (490, 3, 300, 360) Count 18 Tasks 17 Chunks Type float32 numpy.ndarray",490  1  360  300  50,

Unnamed: 0,Array,Chunk
Bytes,9.86 GiB,605.62 MiB
Shape,"(490, 50, 300, 360)","(490, 3, 300, 360)"
Count,18 Tasks,17 Chunks
Type,float32,numpy.ndarray


# $ The\ End$

# Break glass in case of emergency
# $\Downarrow$

In [33]:
client.restart()

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 10
Total threads: 20,Total memory: 437.70 GiB

0,1
Comm: tcp://10.0.128.155:40685,Workers: 10
Dashboard: /proxy/8787/status,Total threads: 20
Started: 23 minutes ago,Total memory: 437.70 GiB

0,1
Comm: tcp://10.0.128.11:38387,Total threads: 2
Dashboard: /proxy/37249/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.11:36529,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-av0qpmgg,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-av0qpmgg

0,1
Comm: tcp://10.0.128.26:43439,Total threads: 2
Dashboard: /proxy/34759/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.26:33217,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-v1p364q_,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-v1p364q_

0,1
Comm: tcp://10.0.128.25:38961,Total threads: 2
Dashboard: /proxy/43289/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.25:42783,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-exndf19y,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-exndf19y

0,1
Comm: tcp://10.0.128.16:39407,Total threads: 2
Dashboard: /proxy/46079/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.16:46005,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-gaffrw46,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-gaffrw46

0,1
Comm: tcp://10.0.128.17:38783,Total threads: 2
Dashboard: /proxy/45017/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.17:35163,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-s65u191t,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-s65u191t

0,1
Comm: tcp://10.0.128.20:45923,Total threads: 2
Dashboard: /proxy/42209/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.20:38277,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-dgbqx4jv,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-dgbqx4jv

0,1
Comm: tcp://10.0.128.23:43611,Total threads: 2
Dashboard: /proxy/37421/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.23:37379,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-k469g5mk,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-k469g5mk

0,1
Comm: tcp://10.0.128.21:45507,Total threads: 2
Dashboard: /proxy/38125/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.21:44939,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-zd5kkrdj,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-zd5kkrdj

0,1
Comm: tcp://10.0.128.18:37029,Total threads: 2
Dashboard: /proxy/42923/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.18:44037,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-sq5gcqph,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-sq5gcqph

0,1
Comm: tcp://10.0.128.24:33513,Total threads: 2
Dashboard: /proxy/35883/status,Memory: 43.77 GiB
Nanny: tcp://10.0.128.24:37845,
Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-km66m6cb,Local directory: /local/v14/tm4888/tmp/dask-worker-space/worker-km66m6cb


In [49]:
client.shutdown()