# earthaccess authentication on a distributed Dask Cluster

In [1]:
import os
import logging
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"

import h5netcdf
import h5py
import xarray as xr

import fsspec
import earthaccess

earthaccess.login()


local_cluster = True

# Start a cluster if we don't have one
if 'client' not in locals():
    import dask
    from dask.distributed import Client
    from dask import delayed
    import dask.array as da

    if local_cluster:
        # manual num of workers
        # client = Client(n_workers=8, threads_per_worker=1, silence_logs=logging.ERROR)
        # dask.config.set(scheduler='process')
        client = Client(threads_per_worker=1, silence_logs=logging.ERROR)
    else:
        distributed_cluster_url = "192.168.0.101:8786"
        client = Client(distributed_cluster_url)


for library in (xr, h5netcdf, h5py, fsspec, earthaccess):
    print(f'{library.__name__} v{library.__version__}')

Enter your Earthdata Login username:  earthaccess
Enter your Earthdata password:  ········


xarray v2024.3.0
h5netcdf v1.3.0
h5py v3.11.0
fsspec v2024.3.1.post11+g69f4fe8b
earthaccess v0.9.0.post70.dev0+774aea6


In [2]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 31.28 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42399,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 31.28 GiB

0,1
Comm: tcp://127.0.0.1:46199,Total threads: 1
Dashboard: http://127.0.0.1:44499/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:39105,
Local directory: /tmp/dask-scratch-space/worker-k7dxyf9u,Local directory: /tmp/dask-scratch-space/worker-k7dxyf9u

0,1
Comm: tcp://127.0.0.1:45123,Total threads: 1
Dashboard: http://127.0.0.1:34059/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:36277,
Local directory: /tmp/dask-scratch-space/worker-25k2jql0,Local directory: /tmp/dask-scratch-space/worker-25k2jql0

0,1
Comm: tcp://127.0.0.1:36017,Total threads: 1
Dashboard: http://127.0.0.1:46623/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:40317,
Local directory: /tmp/dask-scratch-space/worker-8aj09gpc,Local directory: /tmp/dask-scratch-space/worker-8aj09gpc

0,1
Comm: tcp://127.0.0.1:46277,Total threads: 1
Dashboard: http://127.0.0.1:36967/status,Memory: 7.82 GiB
Nanny: tcp://127.0.0.1:44773,
Local directory: /tmp/dask-scratch-space/worker-0ckcqxki,Local directory: /tmp/dask-scratch-space/worker-0ckcqxki


## Auth on a distributed cluster

Because processes and distributed workers don't share local variables we need a way to pass them the credentials so each local instance of earthaccess can authenticate and open our granules.

This is not optimal and I anticipate that we start embedding the token in the results themselves so earthaccess will grab it from there without us having to manually forward them to the workers.

In [3]:
# this gets executed on each worker
def auth_env(auth):
    os.environ["EARTHDATA_USERNAME"] = str(auth["EARTHDATA_USERNAME"])
    os.environ["EARTHDATA_PASSWORD"] = str(auth["EARTHDATA_PASSWORD"])
    os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
    
client.run(auth_env, auth=earthaccess.auth_environ())

{'tcp://127.0.0.1:36017': None,
 'tcp://127.0.0.1:45123': None,
 'tcp://127.0.0.1:46199': None,
 'tcp://127.0.0.1:46277': None}

In [4]:
granules = []
import random
max_samples = 4
for year in range(2014,2024):
    
    results = earthaccess.search_data(short_name="MUR25-JPL-L4-GLOB-v04.2",
                                      temporal=(f"{year}-11", f"{year+1}-02")
                                      )
    
    granules.extend(random.sample(results, max_samples))
len(granules)

Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 93
Granules found: 92
Granules found: 93


40

In [None]:
results

In [None]:
type(granules[0])

In [None]:
file = earthaccess.download(granules[0:10], local_path="./data")

In [None]:
# links = [g.data_links()[0] for g in granules]
# fs = earthaccess.get_fsspec_https_session()

In [None]:
io_params ={
    "fsspec_params": {
        # "skip_instance_cache": True
        "cache_type": "blockcache",  # or "first" with enough space
        "block_size": 1024*1024 # could be bigger
    },
    "h5py_params" : {
        "driver_kwds": { # only recent versions of xarray and h5netcdf allow this correctly
            "page_buf_size": 8*1024*1024, # this one only works in repacked files
            "rdcc_nbytes": 1024*1024 # this one is to read the chunks 
        }

    }
}

# from pqdm.threads import pqdm

# def open_file(link):
#     f_h = fs.open(link, **io_params["fsspec_params"])
#     return f_h

# fileset = pqdm(links, open_file, n_jobs=16, disable=True)

In [5]:
%%time
earthaccess.login()
fileset = earthaccess.open(granules, smart_open=True)
ds = xr.open_mfdataset(fileset,
                       engine="h5netcdf",
                       lock=False,
                       data_vars=["analysed_sst"],
                       compat="override",
                       coords="minimal",
                       parallel=True)
ds

Opening 40 granules, approx size: 0.07 GB


QUEUEING TASKS | :   0%|          | 0/40 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/40 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/40 [00:00<?, ?it/s]

CPU times: user 6.31 s, sys: 724 ms, total: 7.03 s
Wall time: 2min 22s


Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 316.41 MiB 7.91 MiB Shape (40, 720, 1440) (1, 720, 1440) Dask graph 40 chunks in 81 graph layers Data type float64 numpy.ndarray",1440  720  40,

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 316.41 MiB 7.91 MiB Shape (40, 720, 1440) (1, 720, 1440) Dask graph 40 chunks in 81 graph layers Data type float64 numpy.ndarray",1440  720  40,

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,158.20 MiB,3.96 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 158.20 MiB 3.96 MiB Shape (40, 720, 1440) (1, 720, 1440) Dask graph 40 chunks in 81 graph layers Data type float32 numpy.ndarray",1440  720  40,

Unnamed: 0,Array,Chunk
Bytes,158.20 MiB,3.96 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 316.41 MiB 7.91 MiB Shape (40, 720, 1440) (1, 720, 1440) Dask graph 40 chunks in 81 graph layers Data type float64 numpy.ndarray",1440  720  40,

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 316.41 MiB 7.91 MiB Shape (40, 720, 1440) (1, 720, 1440) Dask graph 40 chunks in 81 graph layers Data type float64 numpy.ndarray",1440  720  40,

Unnamed: 0,Array,Chunk
Bytes,316.41 MiB,7.91 MiB
Shape,"(40, 720, 1440)","(1, 720, 1440)"
Dask graph,40 chunks in 81 graph layers,40 chunks in 81 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [6]:
ds = xr.open_dataset(fileset[0], engine="h5netcdf")

In [10]:
fileset[1].cache


        <BlockCache:
            block size  :   102400
            block count :   19
            file size   :   1895805
            cache hits  :   0
            cache misses:   0
            total requested bytes: 0>
        

In [None]:
sst = ds.analysed_sst.persist()

In [None]:
dask.visualize(sst)

In [None]:
ds.analysed_sst

In [None]:
help(dask.persist)

In [None]:
pip install boto3

In [None]:
sst = ds.analysed_sst.persist()
sst

In [None]:
eq_zone = sst.sel(lat=slice(-10, 10), lon=slice(-150, -80))


mzone = eq_zone.groupby('time.year').std(dim='time').compute()
mzone

In [None]:
import matplotlib.pyplot as plt

df = mzone.mean(dim=["lon","lat"])


ax = df.plot(figsize=(12,4))
ax[0].axes.set_xticks(df.year.values)
ax[0].axes.set_xticklabels(df.year.values)
baseline=0.5
upper_bound=1.0
ax[0].axes.fill_between(df.year, df.values, baseline, where=(df.values >= 0.5), color='orange', alpha=0.4, interpolate=True)
# ax[0].axes.fill_between(df.year, df.values, baseline, where=df.values >= 1.0, color='red', alpha=0.4, interpolate=True)
ax[0].axes.fill_between(df.year, df.values, upper_bound, where=(df.values >= 1.0), color='red', alpha=0.4, interpolate=True)

ax

In [None]:
computation = sst.groupby('time.year').std(dim='time')
dask.visualize(computation, optimize_graph=True, filename="vis.png")

In [None]:
%%time
from dask.distributed import performance_report
import gc

with performance_report(filename="dask-pi-earthaccess-smart-2.html"):
    computation.plot.pcolormesh(
        x="lon",
        y="lat",
        col="year",
        col_wrap=4,
        aspect=2,
        robust=True,
        cmap="viridis",
        cbar_kwargs={"label": "Raspberry Pi generated plot"},
    )
client.run(gc.collect)

In [None]:
dask.visualize(computation, optimize_graph=True, filename="vis.png")

In [None]:
ds.analysed_sst.std(dim="time").plot()

In [None]:
import gc

def sstmean_1file(gran_info_single):
    fileobj = fs.open(gran_info_single.data_links()[0], **io_params["fsspec_params"])
    # fileobj = earthaccess.open([gran_info_single])[0]
    data = xr.open_dataset(fileobj)
    mean = data['analysed_sst'].mean().item()
    del fileobj, data
    gc.collect()
    return mean

In [None]:
# Process several granules in parallel using Dask:
sstmean_1file_parallel = delayed(sstmean_1file)
tasks = [sstmean_1file_parallel(gi) for gi in results]

In [None]:
dask.visualize(tasks)

In [None]:
%%time
results = da.compute(*tasks)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
plt.plot( results)
plt.xlabel('Time')
plt.ylabel('Value')
plt.title('Time Series Plot')
plt.show()

In [None]:
import gc
client.run(gc.collect)