# Rechunk the kerchunked GOES dataset
* Rechunk a small piece (24 time steps) of kerchunked GOES data from Lucas Sterzinger kerchunk tutorial
* Show that loading many small chunks for each dask task can significantly improve performance
* Show that using a cluster in the region where the data is located (using Coiled) can also improve performance  

In [None]:
import fsspec
import xarray as xr
from rechunker import rechunk
import zarr
import os

User specific OSN parameters: endpoint, credentials and bucket

In [None]:
# Here we are using the ESIP OSN bucket at:
osn_endpoint_url = 'https://ncsa.osn.xsede.org'  

# AWS Secrets for writing to the ESIP OSN "s3://esip" Bucket
osn_keys = '/shared/pangeo/nebari-setup/osn_keys.env'

# And writing to the "s3://esip" 
temp_url = '/esip/pangeo-unige/rsignell/goes/tmp.zarr'
target_url = '/esip/pangeo-unige/rsignell/goes/rechunked.zarr'

In [None]:
from dotenv import load_dotenv
_ = load_dotenv(osn_keys)

In [None]:
ds = xr.open_dataset("https://ncsa.osn.xsede.org/esip/rsignell/testing/combined.json",
                    engine='kerchunk', chunks={},
                    backend_kwargs=dict(storage_options=dict(remote_protocol='s3', 
                                                             remote_options=dict(anon=True))))

In [None]:
ds['SST'].encoding

In [None]:
ds['SST']

In [None]:
nt = 1 * 12
ny = 226 * 4
nx = 226 * 4

In [None]:
ds = xr.open_dataset("https://ncsa.osn.xsede.org/esip/rsignell/testing/combined.json",
                    engine='kerchunk', chunks={'t':nt, 'x':nx, 'y':ny},
                    backend_kwargs=dict(storage_options=dict(remote_protocol='s3', 
                                                             remote_options=dict(anon=True))))

In [None]:
ds = ds[['SST', 'DQF']]
ds

In [None]:
ds['SST']

In [None]:
ds.nbytes/1e9   # GB

In [None]:
fs_write = fsspec.filesystem('s3', anon=False, skip_instance_cache=True, 
            client_kwargs={'endpoint_url': osn_endpoint_url})

Remove temp and target zarr stores if they exist:

In [None]:
%%time
try:
    fs_write.rm(temp_url, recursive=True)
except:
    pass
try: 
    fs_write.rm(target_url, recursive=True)
except:
    pass

In [None]:
temp_store = fs_write.get_mapper(temp_url)
target_store = fs_write.get_mapper(target_url)

Drop a bunch of coords we don't care about:

In [None]:
ds = ds.reset_coords(['day_solar_zenith_angle',
                 'night_solar_zenith_angle',
                 'quantitative_local_zenith_angle',
                 'retrieval_local_zenith_angle',
                 'retrieval_solar_zenith_angle',
                 'x_image',
                 'y_image'], drop=True)

In [None]:
#cluster_type = 'Coiled'
#cluster_type = 'Gateway'
cluster_type = 'Nebari'

In [None]:
if cluster_type == 'Coiled':
    # Convert OSN S3 Credentials in .env file to python environment variables
    import coiled

    env_vars = {"AWS_ACCESS_KEY_ID":os.environ['AWS_ACCESS_KEY_ID'],
            "AWS_SECRET_ACCESS_KEY":os.environ['AWS_SECRET_ACCESS_KEY']}

    cluster = coiled.Cluster(
        region="us-east-1",
        worker_options={"nthreads": 2},
        compute_purchase_option="spot_with_fallback",
        n_workers=24,
        environ=env_vars,
        software='rechunk',
        workspace='pangeo'
    )

    client = cluster.get_client()
    max_mem = '11GB'

In [None]:
if cluster_type =='Gateway':
    from dask_gateway import Gateway
    gateway = Gateway()  # instantiate Dask gateway 
    # Cluster options on Nebari 
    gateway.cluster_options()    # to see options

In [None]:
if cluster_type =='Gateway':
# Convert OSN S3 Credentials in .env file to python environment variables

    env_vars = {"AWS_ACCESS_KEY_ID":os.environ['AWS_ACCESS_KEY_ID'],
            "AWS_SECRET_ACCESS_KEY":os.environ['AWS_SECRET_ACCESS_KEY']}
    options = gateway.cluster_options()
    options.conda_environment='global/global-pangeo'  # comment out for Daskhub or Planetary Computer
    options.profile = 'Small Worker'   #  Small workers have 8GB RAM 
    options.environment_vars = env_vars
    
    # Create a Dask Gateway cluster
    cluster = gateway.new_cluster(options)
    
    # Get the Dask client for the Dask Gateway cluster
    client = cluster.get_client()
    
    # Scale the cluster
    #cluster.adapt(minimum=2, maximum=20)
    cluster.scale(24)
    max_mem = '6GB'

In [None]:
if cluster_type == 'Nebari':
    import sys, os
    sys.path.append(os.path.join(os.environ['HOME'],'shared','pangeo','nebari-setup','lib'))
    import nebari_tools as nbt
    
    aws_profile = 'osn-esip'
    aws_region = 'us-west-2'
    endpoint_url = f's3.{aws_region}.amazonaws.com'
    
    nbt.set_credentials(profile=aws_profile, region=aws_region, endpoint_url=endpoint_url)
    worker_max = 24
    
    client, cluster = nbt.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                          region=aws_region, use_existing_cluster=True,
                                          adaptive_scaling=False, wait_for_cluster=True, 
                                          worker_profile='Normal Worker', 
                                          propagate_env=True)
    max_mem = '6GB'  # 75% of 8GB/worker

In [None]:
chunk_plan =  {'DQF': {'t':nt, 'y':ny, 'x':nx},
               'SST': {'t':nt, 'y':ny, 'x':nx}}

In [None]:
ds

In [None]:
%%time
# calculate plan
array_plan = rechunk(source=ds, target_chunks=chunk_plan, max_mem=max_mem, 
                    target_store=target_store, temp_store=temp_store, executor='dask')

In [None]:
%%time
# execute plan
array_plan.execute(retries=10)

In [None]:
zarr.convenience.consolidate_metadata(target_store)

Try opening resulting rechunked file:

In [None]:
ds_rechunked = xr.open_dataset(target_store, engine='zarr', chunks={})

In [None]:
ds_rechunked.SST

In [None]:
client.close()
cluster.shutdown()