In [None]:
!pip install swiftspec

In [None]:
!pip install zarr-swiftstore

In [None]:
import xarray as xr
import s3fs
import swiftspec
import zarr
import numpy as np
import dask.array as da
import xarray as xr
from dask.utils import parse_bytes
import math
import pandas as pd
import dask
from zarrswift import SwiftStore

# Access on read only with S3

In [None]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}

fs = s3fs.S3FileSystem(anon=True, client_kwargs=client_kwargs)

In [None]:
fs.ls('foss4g-data')

In [None]:
fs.mkdir('foss4g-data/mydir')

In [None]:
fs.ls('foss4g-data')

# Access with swiftfsspec

In [None]:
fs = swiftspec.SWIFTFileSystem()

In [None]:
import fsspec
with fsspec.open("swift://object-store.cloud.muni.cz/swift/v1/pangeo-test/jupyterbook-html.zip", "r") as f:
    print(f.read())

In [None]:
fs.ls("swift://object-store.cloud.muni.cz/swift/pangeo-test/none")

# Zarr swift store

In [None]:
import os
os.environ["OS_STORAGE_URL"] = ""
os.environ["OS_AUTH_TOKEN"] = ""

In [None]:
auth = {
    "preauthurl": os.environ["OS_STORAGE_URL"],
    "preauthtoken": os.environ["OS_AUTH_TOKEN"],
}

In [None]:
ds = xr.Dataset(
        {"foo": (('x', 'y'), np.random.rand(4, 5))},
        coords = {
          'x': [10, 20, 30, 40],
          'y': [1, 2, 3, 4, 5],
        },
)

store = SwiftStore(container='demo', prefix='xarray-demo', storage_options=auth)
ds.to_zarr(store=store, mode='w', consolidated=True)

In [None]:
ds = xr.open_zarr(store=store, consolidated=True)

# Performance test

## Dataset setup

In [None]:
def timeseries(
    chunk_per_worker=5,
    chunk_size="128 MB",
    num_nodes=12,
    worker_per_node=4,
    chunking_scheme=None,
    lat=320,
    lon=384,
    start="1980-01-01",
    freq="1H",
    nan=False,
):
    """ Create synthetic Xarray dataset filled with random
    data.
    Parameters
    ----------
    chunk_per_worker : int
          number of chunk placed per worker.
          see docs.dask.org, best practices, for chunk.
          Best chunk size is around 100M but, each worker can
          have many chunk, which automate the parallelism in dask.
    chunk_size : str
          chunk size in bytes, kilo, mega or any factor of bytes
    num_nodes : int
           number of compute nodes
    worker_per_node: int
           number of dask workers per node
    chunking_scheme : str
           Whether to chunk across time dimension ('temporal') or
           horizontal dimensions (lat, lon) ('spatial').
           If None, automatically determine chunk sizes along all dimensions.
    lat : int
         number of latitude values
    lon : int
         number of longitude values
    start : datetime (or datetime-like string)
        Start of time series
    freq : string
        String like '2s' or '1H' or '12W' for the time series frequency
    nan : bool
         Whether to include nan in generated data
    Examples
    ---------
    >>> from benchmarks.datasets import timeseries
    >>> ds = timeseries('128MB', 5, chunking_scheme='spatial', lat=500, lon=600)
    >>> ds
    <xarray.Dataset>
    Dimensions:  (lat: 500, lon: 600, time: 267)
    Coordinates:
    * time     (time) datetime64[ns] 1980-01-01 1980-01-02 ... 1980-09-23
    * lon      (lon) float64 -180.0 -179.4 -178.8 -178.2 ... 178.8 179.4 180.0
    * lat      (lat) float64 -90.0 -89.64 -89.28 -88.92 ... 88.92 89.28 89.64 90.0
    Data variables:
        sst      (time, lon, lat) float64 dask.array<shape=(267, 600, 500), .....
    Attributes:
        history:  created for compute benchmarking
    """

    dt = np.dtype("f8")
    itemsize = dt.itemsize
    chunk_size = parse_bytes(chunk_size)
    total_bytes = chunk_size * num_nodes * worker_per_node * chunk_per_worker
    # total_bytes = chunk_size * num_nodes * worker_per_node
    size = total_bytes / itemsize
    timesteps = math.ceil(size / (lat * lon))
    shape = (timesteps, lon, lat)
    if chunking_scheme == "temporal":
        x = math.ceil(chunk_size / (lon * lat * itemsize))
        chunks = (x, lon, lat)
    elif chunking_scheme == "spatial":
        x = math.ceil(math.sqrt(chunk_size / (timesteps * itemsize)))
        chunks = (timesteps, x, x)
    else:
        chunks = "auto"

    lats = xr.DataArray(np.linspace(start=-90, stop=90, num=lat), dims=["lat"])
    lons = xr.DataArray(np.linspace(start=-180, stop=180, num=lon), dims=["lon"])
    times = xr.DataArray(pd.date_range(start=start, freq=freq, periods=timesteps), dims=["time"])
    if chunks == "auto":
        with dask.config.set({"array.chunk-size": chunk_size}):
            random_data = randn(shape=shape, chunks=chunks, nan=nan)
    else:
        random_data = randn(shape=shape, chunks=chunks, nan=nan)
    ds = xr.DataArray(
        random_data,
        dims=["time", "lon", "lat"],
        coords={"time": times, "lon": lons, "lat": lats},
        name="sst",
        attrs={"units": "baz units", "description": "a description"},
    ).to_dataset()
    ds.attrs = {"history": "created for compute benchmarking"}

    return ds


def randn(shape, chunks=None, nan=False, seed=0):
    rng = da.random.RandomState(seed)
    x = 5 + 3 * rng.standard_normal(shape, chunks=chunks)
    if nan:
        x = da.where(x < 0, np.nan, x)
    return x

## Setup Dask

In [None]:
from dask_gateway import Gateway
gateway = Gateway()

In [None]:
cluster = gateway.new_cluster()
cluster.scale(20)
cluster

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

## Create dataset and write with zarr

In [None]:
ds = timeseries(chunk_size='64 MiB', chunking_scheme='temporal', chunk_per_worker=20)
ds

In [None]:
store = SwiftStore(container='pangeo-test', prefix='random-data', storage_options=auth)

In [None]:
store.clear()

In [None]:
%%time
ds.to_zarr(store)

## Write with small chunks (bad)

In [None]:
ds_ios = timeseries(chunk_per_worker=500, chunking_scheme='temporal',chunk_size="256 KB")
ds_ios

In [None]:
store_ios = SwiftStore(container='pangeo-test', prefix='random-data-iops', storage_options=auth)
store_ios.clear()

In [None]:
%%time
ds_ios.to_zarr(store_ios)

## Read with Zarr-swift

In [None]:
ds_read = xr.open_zarr(store).persist()
ds_read

## Read with S3

In [None]:
client_kwargs={'endpoint_url': 'https://object-store.cloud.muni.cz'}
fs = s3fs.S3FileSystem(anon=True, client_kwargs=client_kwargs)

In [None]:
store = s3fs.S3Map(root='pangeo-test/random-data',
                   s3=fs,
                   check=False)

In [None]:
ds_read_s3 = xr.open_zarr(store).persist()
ds_read_s3

## Clean resources

In [None]:
cluster.shutdown()

In [None]:
client.close()