# Experiment writing to a zarr store on the JASMIN s3 object store.

* Working with `s3fs`, not with `fsspec`
* ORIG: Cannot write large files (3D zoom level 10) - run out of mem.
* UPDATED: can write large files - need to chunk these properly first though.

In [1]:
from pathlib import Path

import fsspec
import s3fs
import xarray as xr
import zarr

In [51]:
# Good chunking is necessary to get this saving without crashing the notebook.
dsOLR = xr.open_mfdataset(
    '/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/toa_outgoing_longwave_flux/hpz10/glm.n2560_RAL3p3.apvera_20200120T00.toa_outgoing_longwave_flux.hpz10.nc',
    chunks={'time': 1, 'cell': 12582912 / 12},
)

  dsOLR = xr.open_mfdataset(


In [2]:
s3cfg = dict([l.split(' = ') for l in Path('/home/users/mmuetz/.s3cfg').read_text().split('\n') if l])

In [3]:
# Internal access allowed in notebooks - no https and s3-ext.
jasmin_s3 = s3fs.S3FileSystem(
    anon=False, secret=s3cfg['secret_key'],
    key=s3cfg['access_key'],
    client_kwargs={'endpoint_url': 'http://hackathon-o.s3.jc.rl.ac.uk'}
)
jasmin_s3.ls('sim-data')

['sim-data/DYAMOND3_example_data']

In [54]:
store= s3fs.S3Map(root='s3://sim-data/DYAMOND3_example_data/healpix/toa_outgoing_longwave_flux/hpz10/glm.n2560_RAL3p3.apvera_20200120T00.toa_outgoing_longwave_flux.hpz10.nc', s3=jasmin_s3, check=False)


In [55]:
task = dsOLR.to_zarr(store=store)

In [4]:
# Good chunking is necessary to get this saving without crashing the notebook.
dsT = xr.open_mfdataset(
    '/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz10/glm.n2560_RAL3p3.apverd_20200120T??.air_temperature.hpz10.zarr',
    chunks={'time': 1, 'pressure': 5, 'cell': 12582912 / 12},
)

  _set_context_ca_bundle_path(ca_bundle_path)
  dsT = xr.open_mfdataset(
  dsT = xr.open_mfdataset(


In [5]:
store= s3fs.S3Map(root='s3://sim-data/DYAMOND3_example_data/healpix/air_temperature/hpz10/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz10.zarr', s3=jasmin_s3, check=False)

In [6]:
dsT.nbytes / 1e9

22.749905148

In [7]:
# Stopped after 17m, transfered 4.3/~10G. Sure this was slower than last time...
dsT.to_zarr(store=store)

KeyboardInterrupt: 

In [None]:
22.75e3 / 10

In [8]:
dsT = xr.open_dataset('/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz0/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz0.nc')


  _set_context_ca_bundle_path(ca_bundle_path)
  dsT = xr.open_dataset('/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz0/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz0.nc')


In [9]:
dsT

In [2]:
s3cfg = dict([l.split(' = ') for l in Path('/home/users/mmuetz/.s3cfg').read_text().split('\n') if l])

In [3]:
# Internal access allowed in notebooks - no https and s3-ext.
jasmin_s3 = s3fs.S3FileSystem(
    anon=False, secret=s3cfg['secret_key'],
    key=s3cfg['access_key'],
    client_kwargs={'endpoint_url': 'http://hackathon-o.s3.jc.rl.ac.uk'}
)
jasmin_s3.ls('data')

['data/20200101T0000Z_ph2496.pp',
 'data/hpz0',
 'data/hpz10',
 'data/zarr_example.zarr']

In [4]:
store= s3fs.S3Map(root='s3://data/hpz0/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz0.zarr', s3=jasmin_s3, check=False)

In [None]:
task = dsT.to_zarr(store=store)

In [14]:
dsT2 = xr.open_zarr(store=store)

  dsT2 = xr.open_zarr(store=store)


In [15]:
dsT2

Unnamed: 0,Array,Chunk
Bytes,40 B,40 B
Shape,"(5,)","(5,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,timedelta64[ns] numpy.ndarray,timedelta64[ns] numpy.ndarray
"Array Chunk Bytes 40 B 40 B Shape (5,) (5,) Dask graph 1 chunks in 2 graph layers Data type timedelta64[ns] numpy.ndarray",5  1,

Unnamed: 0,Array,Chunk
Bytes,40 B,40 B
Shape,"(5,)","(5,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,timedelta64[ns] numpy.ndarray,timedelta64[ns] numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,11.72 kiB,11.72 kiB
Shape,"(5, 25, 12)","(5, 25, 12)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 11.72 kiB 11.72 kiB Shape (5, 25, 12) (5, 25, 12) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",12  25  5,

Unnamed: 0,Array,Chunk
Bytes,11.72 kiB,11.72 kiB
Shape,"(5, 25, 12)","(5, 25, 12)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [17]:
dsT2.air_temperature.sum().compute()

In [25]:
# Try writing a much larger dataset (zoom level 10).
dsT = xr.open_dataset('/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz10/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz10.nc')


  dsT = xr.open_dataset('/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz10/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz10.nc')


In [20]:
# Runs out of mem with 24G, even though compute=False.
store= s3fs.S3Map(root='s3://data/hpz10/glm.n2560_RAL3p3.apverd_20200120T00.4.air_temperature.hpz10.zarr', s3=jasmin_s3, check=False)
#task = dsT.to_zarr(store=store, compute=False)

In [17]:
# Good chunking is necessary to get this saving without crashing the notebook.
dsT = xr.open_mfdataset(
    '/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/air_temperature/hpz10/glm.n2560_RAL3p3.apverd_20200120T??.air_temperature.hpz10.nc',
    chunks={'time': 1, 'pressure': 5, 'cell': 12582912 / 12},
)

  dsT = xr.open_mfdataset(
  dsT = xr.open_mfdataset(


In [18]:
dsT.chunks

Frozen({'time': (1, 1, 1, 1, 1, 1, 1, 1, 1), 'pressure': (5, 5, 5, 5, 5), 'cell': (1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576, 1048576)})

In [21]:
# I don't think this is necessary with good chunking... but might be helpful when we are saving much more data because it should allow for 
# parallel writes from different proces.
task = dsT.to_zarr(store=store, compute=False)

In [24]:
dsT = dsT.drop_vars(['time', 'forecast_period', 'forecast_reference_time'])

In [26]:
dsT.isel(time=[0]).to_zarr(store=store, region='auto')

<xarray.backends.zarr.ZarrStore at 0x7fb74b349ab0>

  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)
  c = np.asarray(c)


In [28]:
dsT.isel(time=slice(1, None)).to_zarr(store=store, region='auto')

<xarray.backends.zarr.ZarrStore at 0x7fb74b3cb520>

In [29]:
dsT.isel(time=0, pressure=-1).air_temperature.mean().compute()

In [30]:
dsTz10 = xr.open_zarr(store=store)

  dsTz10 = xr.open_zarr(store=store)


In [35]:
dsTz10.air_temperature.mean(dim=['pressure', 'cell']).compute()

In [39]:
dsTz10.isel(time=0, pressure=0).nbytes / 1e9

0.20132662

In [40]:
# Good chunking is necessary to get this saving without crashing the notebook.
dsOLR = xr.open_mfdataset(
    '/gws/nopw/j04/hrcm/mmuetz/DYAMOND3_example_data/healpix/toa_outgoing_longwave_flux/hpz10/glm.n2560_RAL3p3.apvera_20200120T00.toa_outgoing_longwave_flux.hpz10.nc',
    chunks={'time': 1, 'cell': 12582912 / 12},
)

  dsOLR = xr.open_mfdataset(


In [41]:
store= s3fs.S3Map(root='s3://data/DYAMOND3_example_data/healpix/toa_outgoing_longwave_flux/hpz10/glm.n2560_RAL3p3.apvera_20200120T00.toa_outgoing_longwave_flux.hpz10.nc', s3=jasmin_s3, check=False)


In [42]:
task = dsOLR.to_zarr(store=store)

In [44]:
dsOLR.nbytes / 1e9

1.308623048

In [45]:
dsOLR2 = xr.open_zarr(store=store)

  dsOLR2 = xr.open_zarr(store=store)


In [48]:
dsOLR2.load()

In [49]:
dsOLR2.toa_outgoing_longwave_flux.mean()

In [50]:
jasmin_s3 = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={'endpoint_url': 'http://hackathon-o.s3.jc.rl.ac.uk'}
)
jasmin_s3.ls('s3://data/DYAMOND3_example_data/healpix/toa_outgoing_longwave_flux/hpz10/glm.n2560_RAL3p3.apvera_20200120T00.toa_outgoing_longwave_flux.hpz10.nc/.zattrs')

PermissionError: Access Denied

## experiment with `fsspec`

* Basic access to s3 store works...
* But, I cannot get this working with `xarray` reading from a store that I've set up using `fsspec`

In [5]:
fs = fsspec.filesystem(
    's3',
    key=s3cfg['access_key'],
    secret=s3cfg['secret_key'],
    client_kwargs={'endpoint_url': 'http://hackathon-o.s3.jc.rl.ac.uk'},  # Works locally withing JASMIN/Notebook service.
)

In [6]:
fs.ls('data')

['data/20200101T0000Z_ph2496.pp', 'data/zarr_example.zarr']

In [19]:
s3_store = fsspec.get_mapper(
    "s3://data/hpz0/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz0.zarr",
    storage_options={
        'key': s3cfg['access_key'],
        'secret': s3cfg['secret_key'],        
        "client_kwargs": {"endpoint_url": "http://hackathon-o.s3.jc.rl.ac.uk"}  # Adjust for your S3-compatible storage
    }
)

In [23]:
s3_store = fsspec.get_mapper("s3://data/hpz0/glm.n2560_RAL3p3.apverd_20200120T00.air_temperature.hpz0.zarr", fs=fs)

In [24]:
dsT3 = xr.open_zarr(s3_store)

TypeError: AioSession.__init__() got an unexpected keyword argument 'fs'