# Read CAFE dataset directly from AWS datastore

The CAFE dataset produced by CSIRO is stored on Open Dataset on AWS. It is possible to directly read the data from the AWS store from Python. Here is an example on how to do so.

The data is organised by realm: atmos_hybrid, atmos_isobaric, ice, land, ocean, ocean_bgc, ocean_force, ocean_scalar

The list of variables in each realm is given in the [CAFE documentation](https://data.csiro.au/dap/ws/v2/collections/49803/support/4029)

The name of the AWS filesystem is S3. Some Python packages have been developed to access this filesystem. Here we are going to use s3fs.

In [1]:
import s3fs
import xarray as xr
import climtas.nci
import kerchunk.hdf
import kerchunk.combine
import dask
import ujson
import fsspec
from glob import glob
from tqdm import tqdm

## Start with an example

First, you will need to open an anonymous connection to the filesystem. The `anon` option stands for anonymous.

In [2]:
fs = s3fs.S3FileSystem(anon=True)

Let's find the path to one of the CAFE files using [the AWS Explorer](https://cafe60-reanalysis-dataset-aws-open-data.s3.amazonaws.com/index.html) and see how to open it. You can see the path in the top banner of the Explorer.

Xarray can open the files for you but you shouldn't give the file path as usual but a file object as is returned by the `open()` operation.

In [3]:
file_path = "cafe60-reanalysis-dataset-aws-open-data/atmos_isobaric/temp.atmos_isobaric.daily.CAFE60.19600101-19600131.nc"
file_obj = fs.open(file_path)   # We use fs.open() as the file is on S3
ds = xr.open_dataset(file_obj)
ds

You can then use the data as usual but the operation will be longer than usual:

In [4]:
ds["temp"].mean(dim="time")

## Generalisation
From the path to the CAFE file used previously, we see that all the files are following this naming pattern:

`"cafe60-reanalysis-dataset-aws-open-data/<realm>/<varname>.<realm>.<temporal resolution>.CAFE60.<starttime>-<endtime>.nc"`

With this information, we can write a function that will return the paths to all the files for a given variable, realm and temporal resolution.

In [5]:
def find_cafe_files(realm, varname, time_res):
    """ Return a list of all the files for a given variable and a temporal resolution in a realm
    realm: str, one of the realms for the CAFE variables
    varname: str, name of one of the CAFE variables
    time_res: 'daily'|'month', temporal resolution for the variable"""
    
    root = "cafe60-reanalysis-dataset-aws-open-data"
    path = f"{root}/{realm}"
    
    my_list = fs.glob(f"{path}/{varname}.{realm}*")
    
    # Stop and return an error message if no files found.
    assert len(my_list)!=0, "There is no such variable in that realm or for that temporal resolution. Please check the CAFE documentation."
    return my_list    

In [6]:
temp_files = find_cafe_files("atmos_isobaric","temp","daily")

Let's try to read in 100 and 250 files and see how long it is. This will allow us to assess if this simple approach can work in this case

In [7]:
%%time
# Open all the files and then read them with open_mfdataset
temp_files_ob = [ fs.open(tt) for tt in temp_files ]
ds = xr.open_mfdataset(temp_files_ob[0:100],
                       combine='nested',
                       concat_dim='time', 
                       join='override',
                       coords='minimal',
                       compat='override',
                       chunks={"time":1}, 
                      )

CPU times: user 56.7 s, sys: 4.74 s, total: 1min 1s
Wall time: 2min 22s


In [None]:
%%time
# Open all the files and then read them with open_mfdataset
temp_files_ob = [ fs.open(tt) for tt in temp_files ]
ds = xr.open_mfdataset(temp_files_ob[0:250],
                       combine='nested',
                       concat_dim='time', 
                       join='override',
                       coords='minimal',
                       compat='override',
                       chunks={"time":1}, 
                      )


The time to read 250 files is actually more than 2.5 times the time it takes to read 100 files. This tells us we can't use this simple way to read in the data in the whole timeseries of 726 files.

## Let's try to optimise this
Cloud is much better at reading a format called zarr instead of netcdf. Luckily, it's possible to fake a zarr file from a netcdf file. Let's see if it speeds things up. We can use what was done here: https://gist.github.com/rsignell-usgs/ef435a53ac530a2843ce7e1d59f96e22

For this, we need to save some data about the chunking in the netcdf files in JSON files. The `kerchunk` package was created especially for this purpose. Those JSON files only need to be created once and can be shared between users. And it's much faster to use Dask for parallelisation at this point.

In [9]:
client = climtas.nci.Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: /node/ood-vn17/6784/proxy/8787/status,

0,1
Dashboard: /node/ood-vn17/6784/proxy/8787/status,Workers: 16
Total threads: 16,Total memory: 44.92 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34737,Workers: 16
Dashboard: /node/ood-vn17/6784/proxy/8787/status,Total threads: 16
Started: Just now,Total memory: 44.92 GiB

0,1
Comm: tcp://127.0.0.1:46287,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/33447/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:46723,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-zzp4i_6r,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-zzp4i_6r

0,1
Comm: tcp://127.0.0.1:43787,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/39611/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:39415,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-h5k85thr,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-h5k85thr

0,1
Comm: tcp://127.0.0.1:46095,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/33539/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:36095,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-h58vexyw,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-h58vexyw

0,1
Comm: tcp://127.0.0.1:38261,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/36037/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:38413,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-3bsojucu,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-3bsojucu

0,1
Comm: tcp://127.0.0.1:35603,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/38869/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:42293,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-pzfx2tpy,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-pzfx2tpy

0,1
Comm: tcp://127.0.0.1:43693,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/36399/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:46081,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-bnt85vbc,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-bnt85vbc

0,1
Comm: tcp://127.0.0.1:40979,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/46217/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:43345,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-uy_ty452,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-uy_ty452

0,1
Comm: tcp://127.0.0.1:40139,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/43841/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:33231,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-0zi0n_ae,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-0zi0n_ae

0,1
Comm: tcp://127.0.0.1:46631,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/42163/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:37799,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-_n8hfc07,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-_n8hfc07

0,1
Comm: tcp://127.0.0.1:33829,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/37321/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:45867,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-ssluhfdg,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-ssluhfdg

0,1
Comm: tcp://127.0.0.1:46037,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/39309/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:39601,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-5nz7kp7g,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-5nz7kp7g

0,1
Comm: tcp://127.0.0.1:34915,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/33617/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:45801,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-7m7i56_k,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-7m7i56_k

0,1
Comm: tcp://127.0.0.1:43039,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/41555/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:40035,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-vo_kug8q,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-vo_kug8q

0,1
Comm: tcp://127.0.0.1:34791,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/41359/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:38311,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-bm6rpcos,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-bm6rpcos

0,1
Comm: tcp://127.0.0.1:44509,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/44255/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:41601,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-9memqrre,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-9memqrre

0,1
Comm: tcp://127.0.0.1:38733,Total threads: 1
Dashboard: /node/ood-vn17/6784/proxy/41813/status,Memory: 2.81 GiB
Nanny: tcp://127.0.0.1:35471,
Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-3l6anh6_,Local directory: /local/w35/ccc561/tmp/dask-worker-space/worker-3l6anh6_


In [None]:
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

def gen_json(u, json_dir):
    with fs.open(u, **so) as infile:
        h5chunks = kerchunk.hdf.SingleHdf5ToZarr(infile, u, inline_threshold=300)
        p = u.split('/')
        fname = p[4]
        outf = f'{json_dir}/{fname}.json'
        with open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

In [None]:
# Let's define the URLs and a path to save the JSON files
urls = ["s3://" + f for f in temp_files]
json_path = f"/g/data/w35/ccc561/CAFE60/json/"

In [None]:
%%time
dask.compute(*[dask.delayed(gen_json)(u, json_path) for u in urls], retries=10);

Now that we have encoded the data we need in JSON files, we can use those to read in the entire dataset. We first need to create a mapping between the JSON files and the data on S3. Then we use `xarray.open_mfdataset()` on this mapping. Note, we then need to specify the `zarr` engine for reading the data.

In [11]:
# Get a list of the json files we need
json_path = f"/g/data/w35/ccc561/CAFE60/json/"
json_list = sorted(glob(f"{json_path}/temp*.json"))

In [12]:
# Get the mapping
m_list = []
for js in tqdm(json_list):
    with open(js) as f:
        m_list.append(fsspec.get_mapper("reference://",
                                fo=ujson.load(f),
                                remote_protocol="s3",
                                remote_options={"anon":True}))

100%|██████████| 726/726 [00:09<00:00, 78.28it/s] 


In [15]:
%%time
# Open the data with the zarr engine from the mapping objects m_list
ds1 = xr.open_mfdataset(m_list,
                        combine="nested",
                        concat_dim="time",
                        engine="zarr",
                        coords="minimal",
                        data_vars="minimal",
                        compat="override",
                        parallel=True
                       )

CPU times: user 20.7 s, sys: 3.45 s, total: 24.1 s
Wall time: 26.5 s


We get a fair amount of warnings but all the files for the timeseries are open in less than a minute.

In [16]:
ds1

Unnamed: 0,Array,Chunk
Bytes,2.10 TiB,3.89 MiB
Shape,"(22097, 96, 21, 90, 144)","(1, 96, 7, 31, 49)"
Count,1193964 Tasks,596619 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.10 TiB 3.89 MiB Shape (22097, 96, 21, 90, 144) (1, 96, 7, 31, 49) Count 1193964 Tasks 596619 Chunks Type float32 numpy.ndarray",96  22097  144  90  21,

Unnamed: 0,Array,Chunk
Bytes,2.10 TiB,3.89 MiB
Shape,"(22097, 96, 21, 90, 144)","(1, 96, 7, 31, 49)"
Count,1193964 Tasks,596619 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,345.27 kiB,16 B
Shape,"(22097, 2)","(1, 2)"
Count,44920 Tasks,22097 Chunks
Type,timedelta64[ns],numpy.ndarray
"Array Chunk Bytes 345.27 kiB 16 B Shape (22097, 2) (1, 2) Count 44920 Tasks 22097 Chunks Type timedelta64[ns] numpy.ndarray",2  22097,

Unnamed: 0,Array,Chunk
Bytes,345.27 kiB,16 B
Shape,"(22097, 2)","(1, 2)"
Count,44920 Tasks,22097 Chunks
Type,timedelta64[ns],numpy.ndarray


Because Python will need to go and read the data on the cloud for each computation, it is slower than with local data. Below is an example of the time it would take to create an ensemble mean of half the timeseries, at one level roughly over Australia.

In [17]:
temp1 = ds1["temp"].isel(level=0,time=slice(0,10000)).sel(lon=slice(120,150), lat=slice(-45,-5))
temp1_mean = temp1.mean(dim="ensemble")
temp1_mean

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,528 B
Shape,"(10000, 20, 12)","(1, 12, 11)"
Count,1403964 Tasks,40000 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.16 MiB 528 B Shape (10000, 20, 12) (1, 12, 11) Count 1403964 Tasks 40000 Chunks Type float32 numpy.ndarray",12  20  10000,

Unnamed: 0,Array,Chunk
Bytes,9.16 MiB,528 B
Shape,"(10000, 20, 12)","(1, 12, 11)"
Count,1403964 Tasks,40000 Chunks
Type,float32,numpy.ndarray


In [18]:
%%time
temp1_mean.load()

CPU times: user 8min 53s, sys: 1min 19s, total: 10min 13s
Wall time: 12min 38s


## Conclusion
We can store the JSON files for the required variables centrally on ua8. Then users can access the data in this way:

In [None]:
from glob import glob
import xarray as xr
import ujson

# Get a list of the json files we need
json_path = f"/g/data/w35/ccc561/CAFE60/json/"  # Update the path to the central location
json_list = sorted(glob(f"{json_path}/temp*.json"))  # Update the path to the variable you need.

# Get the mapping
m_list = []
for js in json_list:
    with open(js) as f:
        m_list.append(fsspec.get_mapper("reference://",
                                fo=ujson.load(f),
                                remote_protocol="s3",
                                remote_options={"anon":True}))
ds1 = xr.open_mfdataset(m_list,
                        combine="nested",
                        concat_dim="time",
                        engine="zarr",
                        coords="minimal",
                        data_vars="minimal",
                        compat="override",
                        parallel=True
                       )