# Writing OpenDAP Data to ZARR on S3

The objective demonstrated here is to take a dataset which is already published via an OpenDAP endpoint and write it to S3 in zarr format. The end goal is to make the given dataset 'cloud optimized' for workflows operating in cloud compute environment. 

## Preamble
This is all stuff we are going to need: 

In [None]:
import os
import logging
import configparser

logging.basicConfig(level=logging.INFO, force=True)

import xarray as xr
import dask
import fsspec
import rechunker
import zarr

In [None]:
import sys
print("Python     :", sys.version) 
from importlib.metadata import version as _ver
for m in ['numpy', 'xarray', 'dask', 'zarr', 'fsspec', 's3fs', 'rechunker']:
    print(f"{m:10s} : {_ver(m)}")

## AWS Credentials
Because we will be writing to an S3 object store, we need credentials.  This notebook will assume that the correct credentials are already stored in `~/.aws/credentials` . 

I am using a profile to write to the OSN storage device (profile name `osn-renci`). If you run this notebook and want to write elsewhere with other credentials, you may change the profile name and endpoint in the cell below: 

In [None]:
awsconfig = configparser.ConfigParser()
awsconfig.read(
    os.path.expanduser('~/.aws/credentials') 
    # default location... if yours is elsewhere, change this.
)
_profile_nm  = os.environ.get('AWS_PROFILE', 'osn-renci')
_endpoint = os.environ.get('AWS_S3_ENDPOINT', 'https://renc.osn.xsede.org')
# Set environment vars based on parsed awsconfig
try:
    os.environ['AWS_ACCESS_KEY_ID']     = awsconfig[_profile_nm]['aws_access_key_id']
    os.environ['AWS_SECRET_ACCESS_KEY'] = awsconfig[_profile_nm]['aws_secret_access_key']
    os.environ['AWS_S3_ENDPOINT']       = _endpoint
    os.environ['AWS_PROFILE'] = _profile_nm
    os.environ['AWS_DEFAULT_PROFILE'] = _profile_nm
except KeyError:
    logging.error("Problem parsing the AWS credentials file. ")

## Establish Souce and Output Location

This demonstration will use the PRISM (v2) dataset available from <https://cida.usgs.gov> as the example dataset.

In [None]:
# INPUT: 
OPENDAP_url = 'https://cida.usgs.gov/thredds/dodsC/prism_v2'

... which we will write to the OSN pod: 

In [None]:
# OUTPUT Location: 
workspace = f's3://rsignellbucket2/testing/prism/'

# OUTPUT Dataset Name:
FNAME = 'PRISM_v2.zarr'

# Instantiate a fsspec reference to the workspace: 
fsw = fsspec.filesystem('s3', 
    anon=False, 
    default_fill_cache=False, 
    skip_instance_cache=True, 
    client_kwargs={ 'endpoint_url': os.environ['AWS_S3_ENDPOINT'] },
) # this will take credentials from the environment variables, 
# as defined above. No need to specify profile or keys. The endpoint, 
# unfortunately is necessary to name explicitly.

outdir = workspace + FNAME
target_store = fsw.get_mapper(outdir)

try:
    if fsw.exists(workspace + FNAME):
        logging.warning("Removing existing file/folder: %s", fname)
        fsw.rm(workspace + fname, recursive=True)
except:
    pass

print("READY !!")

fsw.ls(workspace)


## Examine the Data

Now that we have everything set up, let's finally have a look at the data that we're working on

In [None]:
ds_in = xr.open_dataset(OPENDAP_url, decode_times=False)
ds_in

This dataset holds four variables (setting aside `time_bnds` for the moment -- special case). The variables are `tmx`, `ppt`, and `tmn`, which are associated with three indices by which data values are located in space and time (`lat`, `lon`, and `time`).

Looking at the "Dimensions" line, you can see that each of these dimensions is quantified -- how many unique values are available in each dimension: 

* **lon** = 1405
* **lat** = 621
* **time** = 1512

We will need these numbers later, so take note. 

Let's look at one of the data variables to learn more about how it is presented by the OPeNDAP endpoint. 

In [None]:
ds_in.tmn

Note from the top line that this variable is indexed as a tuple in `(time, lat, lon)`.

Also note that in the data attributes, that `_ChunkSizes` gives the chunk sizes of the data, expressed as a tuple to match the dimensions.  This data is broken into chunks, each of which is 
* 1 timestep, 
* 23 latitude steps, and 
* 44 longitude steps. 

Gven that `tmn` is stored as a `float32` (4 bytes), each chunk is of size: 

In [None]:
bytes = 1 * 23 * 44 * 4
kbytes = bytes / (2**10)
mbytes = kbytes / (2**10)
print(f"TMN chunk size: {bytes=} ({kbytes=:.2f})({mbytes=:.4f})")

This is an **extremely** small chunk size, and not at all suitable for cloud storage.  

The good news is that we are not stuck with it. The opendap server is offering us its default chunking for network API requests, but this is configurable. We can change it to something more suitable.  We will definitely do this later. Take note.

## Compute the Preferred Chunking Plan

Given what we know about this data, we can apply some cloud storage principles to form a strategy for how best to chunk the data when we write it to S3. Broadly, we need to specify chunk **size** and chunk **shape**. 

### Size
it's a balance.. here are some constraints: 
* Files Too Big -- In a zarr dataset, each chunk is stored as a separate file. If we 
  need data from a chunk, no matter how little or how much, that file gets 
  opened, decompressed, and the whole thing read in. Large chunk sizes means that there may be 
  a lot of data transferred in situations when only a subset of that chunk's 
  data is needed. ("sharding" is coming in zarr v3, but for now, this is how it works). 
* Files Too Small -- If the chunk size is too small, the time it takes to read and 
  decompress the data for each chunk can become comparable to the latency of S3 (typically 10-100ms). We want the reads to take at least a second or so, so that the latency is not a significant part of the overall timing. 
  
As a general rule, aim for chunk sizes between 10 and 200MB, depending on shape
and expected read pattern (see below). Expect 100ms latency for each separate
chunk that a process needs to open.


### Shape

The preferred shape of each chunk will depend on the read pattern for future analyses. For some datasets, this will be very apparent (NWIS gages, for example -- it very likely will be consumed along the time dimension most of the time).  For datasets where there is no clear preference, we can try to chunk based on likely read patterns, but allow for other patterns without too much of a performance penalty. 
Let's see how we might do this for our sample dataset.  This data will likely be used in one of two dominant read patterns: 

* Time series for a given location (or small spatial extent)
  * As a special case -- is it likely that time series will be subset (by year? month?)
* Full extent for a given point in time. 
  * As a special case -- are specific study areas more used than others


#### TIME dimension

In [None]:
# 1512 time steps.... what happens if we chunk in twelves (i.e. a year at a time)
print("We need {} chunks.".format(1512 // 12))

In this case, a user could get an entire year of this monthly data as a single chunk.  If they wanted a full time series across the entire dataset, they would need to read 126 chunks. 

So this is where the judgement call gets made -- which is the more likely read pattern for time?  Year-by-year, or the whole time set (or some sequence of a few years). In this case, I think it is more likely to want more than just one year's data.  A happy medium for chunk size is 6 years of data per chunk: 

In [None]:
test_chunk_size = 12*6
print("TIME chunking: {} chunks of size {}".format(1512 / test_chunk_size, test_chunk_size))


This pattern means only 21 chunks instead of the original 126 for a full time series in a given location. The max latency (using 200ms per read as the theoretical expectation) to read all of that data on a single worker is: 

In [None]:
print("Expected latency in seconds: ", (21 * 200) * 0.001)

Note that for cluster-aware analyses, multiple chunks can be read at the same time. Total wall-clock time will be reduced in that case. 

#### SPACE

We're realy chunking in dimensions -- and there are two dimensions to this dataset which contribute to "space": `lat` and `lon`.  These can have separate chunk sizes. The question to ask is whether future users of this data will want square "tiles" in space, or will they want equal numbers of longitude and latitude?  (That is, is it important that the North-South extent be broken into the same number of chunks as the East-West extent?).  I'll be breaking this into square tiles, so there will be more `lon` chunks than `lat` chunks: 

In [None]:
# The size of the dataset: 
lon=1405
lat=621
test_chunk_size = lat // 4 # split the smaller of the two dimensions into 4 chunks
print("LON chunking: {} chunks of size {}".format(lon / test_chunk_size, test_chunk_size))
print("LAT chunking: {} chunks of size {}".format(lat / test_chunk_size, test_chunk_size))

It is important to note that we have **just over** a round number of chunks.  This means that the last chunk in the given dimension will be extremely thin. In the case of that latitude chunk size, the extra 0.006 of a chunk means that the last chunk is only one `lat` observation. This all but guarantees that two chunks are needed for a small spatial extent near the "end" of the dimension. Ideally, we would want partial chunks to be at least half the size of the standard chunk.  The bigger that 'remainder' fraction, the better. 

Let's adjust numbers a little: 

In [None]:
test_chunk_size = 160
print("LON chunking: {} chunks of size {}".format(lon / test_chunk_size, test_chunk_size))
print("LAT chunking: {} chunks of size {}".format(lat / test_chunk_size, test_chunk_size))

With this pattern, the "remainder" latitude chunk will be 141 (125 for the last longitude chunk).  All others will be a square 160. 

The entire spatial extent for a single time step can be read in 36 chunks, with this pattern. That feels a little high to me, given that this dataset will likely be taken at full extent.  Let's go a little bigger: 


In [None]:
test_chunk_size = 354
print("LON chunking: {} chunks of size {}".format(lon / test_chunk_size, test_chunk_size))
print("LAT chunking: {} chunks of size {}".format(lat / test_chunk_size, test_chunk_size))

This is not *quite* as good, in terms of full-chunk remainders, but the whole extent can be had in only 8 chunks. 

Note, that if were really confident that most analyses wanted the full extent, we would just put the whole lat/lon dimensions into single chunks each. This would ensure (and **require**) that we read the entire extent any time we wanted any **part** of the extent for a given timestep.  Our poor time-series analysis would then be stuck reading the entire dataset to get all time values for a single location. `:sadface:`

### Total Chunk Size
Now that we have a rough idea of the chunk dimensions, let's compute its size in bytes.  This will tell us if we've hit our target of between 10 and 200Mb per chunk.  More importantly, it will tell us if we will overwhelm the OpenDAP server -- the server can only give us 500MB at a time. Chunks should really be smaller than this (which we want anyway, but this 500MB limit is a hard cut-off). 

In [None]:
#       lat   lon  time  float32
bytes = 354 * 354 * 72 * 4
kbytes = bytes / (2**10)
mbytes = kbytes / (2**10)
print(f"TMN chunk size: {bytes=} ## {kbytes=:.2f} ## {mbytes=:.4f}")

We're looking really good for size.  Maybe even a bit low.  But we're in the (admitedly broad) range of 10-200. 

### Making the chunk plan
Now that we know how we want to chunk the data, we need to give that information to the API which will read the data from its OpenDAP endpoint: 

In [None]:
ds_in = xr.open_dataset(OPENDAP_url, decode_times=False, chunks={'time': 72, 'lon': 354, 'lat': 354})
ds_in

Looking more closely at the `tmn` variable: 

In [None]:
ds_in.tmn

**NOTE** that what it is giving us for `_ChunkSizes` in the attributes is irrelevant now.  We've specifically asked the dask interface to request this data according to the chunk pattern specified -- and revealed in the graphical display.  Dask will make specific OPeNDAP requests *per chunk* using appropriate query parameters to the server. 

Because this chunk pattern can be provided by the server, and it is a reasonable pattern for object storage in S3, we do **not** need to add the complexity of `rechunker`. We can just have the zarr driver write it out according to the same plan


In [None]:
chunk_plan = {
    'ppt':{'time': 72, 'lon': 354, 'lat': 354},    
    'tmx':{'time': 72, 'lon': 354, 'lat': 354},    
    'tnm':{'time': 72, 'lon': 354, 'lat': 354},
    'time_bnds': {'time': 1, 'tbnd': 2},
    'lat': (621,),
    'lon': (1405,),
    'time': (1512,),
    'tbnd': (2,)
}

Note that the coordinate variables (`lat`, `lon`, etc) are stored as single-chunk stripes of data. The index will always be needed in its entirity, so we chunk it such that it reads with one chunk. 

Also note -- I gave the full chunk plan **AS IF** we were going to instruct at tool like `rechunker` to chunk in a particular way.  This pattern is already the shape our data is in, because that's how we asked for it from OPeNDAP.  

## Ready to Write

OK... finally we are ready to write out our data. Note that the input data is not in memory, but we pretend that it is.  This is one of the many advantages of using dask arrays.  It will fetch the necessary chunks when the data in them is needed. And the good news about that is that Dask is capable of doing these operations *in parallel* and *without hand-holding*.  We don't have to design a parallel workflow: Dask will sort it out.  BUT... to take advantage of that parallelism, we need to start up a cluster: 

### Start Dask Cluster

In [None]:
from dask.distributed import progress, performance_report
try:
    from dask_gateway import Gateway
except ImportError:
    logging.error("Unable to import Dask Gateway.  Are you running in a cloud compute environment?\n")
    raise
os.environ['DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION'] = "1.0"

gateway = Gateway()
_options = gateway.cluster_options()
_options.conda_environment='users/users-pangeo'  ##<< this is the conda environment we use on nebari.
_options.profile = 'Medium Worker'
_env_to_add={}
aws_env_vars=['AWS_ACCESS_KEY_ID',
              'AWS_SECRET_ACCESS_KEY',
              'AWS_SESSION_TOKEN',
              'AWS_DEFAULT_REGION',
              'AWS_S3_ENDPOINT']
for _e in aws_env_vars:
    if _e in os.environ:
        _env_to_add[_e] = os.environ[_e]
_options.environment_vars = _env_to_add    
cluster = gateway.new_cluster(_options)          ##<< create cluster via the dask gateway
cluster.adapt(minimum=10, maximum=30)             ##<< Sets scaling parameters. 
client = cluster.get_client()

print("The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'")
print("The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' ")
print(f"The link to view the client dashboard is:\n>  {client.dashboard_link}")

## to_zarr()

In [None]:
%%time
with performance_report('./reports/OpenDAP_to_S3-perfreport.html'):
    ds_in.to_zarr(target_store, mode='w')

## Verify
To make sure that we really wrote the whole thing to S3, let's sample the data for some simple plots: 

In [None]:
new_ds = xr.open_dataset(target_store, engine='zarr', chunks={})
new_ds

In [None]:
new_ds.ppt

In [None]:
import hvplot.xarray
new_ds.ppt.sel(lon=-75, lat=41.1, method='nearest').load().plot()

In [None]:
new_ds.tmx.sel(time="1970-01").load().hvplot(x='lon', y='lat', rasterize=True, geo=True, tiles='OSM' )

## Close down cluster

In [None]:
client.close(); cluster.close()