# Lesson: Data Exploration

## About 
This notebook shows a user how to load data using the HyTEST `intake` catalog and `dask`, explore that data using `xarray`, and plot that data using `hvplot`.

In [None]:
# load libraries
import intake
import xarray as xr

## using `intake`
The HyTEST catalog is structured to be compatible with the Python `intake` [package](https://intake.readthedocs.io/en/latest/index.html) and facilitates reading the data into this notebook and others in this training course. The intake catalog is stored as a yaml file, which is easy to parse using other programming languages (even if there is no equivalent to the intake package in that programming language). For an in-depth tutorial, please see the [Pangeo intake tutorial](http://gallery.pangeo.io/repos/pangeo-data/pangeo-tutorial-gallery/intake.html). The intake catalog serves a temporary purpose in our HyTEST repository, and we hope this can be replaced with SpatioTemporal Asset Catalogs [(STAC)](https://www.youtube.com/watch?v=Ugazf5bWsGE) in the near future. To read more about the HyTEST intake catalogs, please view the [hytest repo](https://github.com/hytest-org/hytest/tree/main/dataset_catalog).

##### Since we are working on the cloud, we will open a cloud native dataset using `intake`.

In [None]:
# open the hytest data intake catalog
hytest_cat = intake.open_catalog(r"https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")

# list all the datasets in the catalog
list(hytest_cat)

We see some acronyms of modeling applications (i.e., 'nwm', 'nhm', 'conus404') appended with 'cloud' or 'onprem'; this designates the storage location of the data. To view the full filepaths and URLs behind each data source, please see the yaml file on the [hytest repo](https://github.com/hytest-org/hytest/blob/main/dataset_catalog/hytest_intake_catalog.yml).

In [None]:
# choose a dataset from the above list
dataset = "nwm21-streamflow-usgs-gages-cloud"

In [None]:
# and view the metadata
hytest_cat[dataset]

In some cases, `requester_pays` will be set to `true`. If so, you will need to setup your AWS (Amazon Web Services) credentials to load the data from S3 object storage. Please see this [notebook](https://github.com/hytest-org/hytest/blob/main/environment_set_up/Help_AWS_Credentials.ipynb) for assistance. The good news is our request_pays is set to `false` for this particular dataset so we can continue without an AWS crediential.

##### Let's say your data is not in the catalog?

We could add data from a directory on your local machine, HPC, or within your Cloud file structure

We can also access data from s3 object storage. Let's try streamflow from the [National Water Model](https://registry.opendata.aws/nwm-archive/) v2.0.

In [None]:
#load fsspec package; fsspec handles the file access to S3.
import fsspec 

# identify s3 url
url = "s3://noaa-nwm-retro-v2-zarr-pds/"

# generate pseudo file system with fsspec
#fs = fsspec.filesystem('s3')
#mapper = fs.get_mapper(url)
#ds = xr.open_dataset(mapper, engine = 'zarr')



## using `dask`

To load this data, we will start a parallel cluster using the Python package `dask`, in depth tutorial [here](http://gallery.pangeo.io/repos/pangeo-data/pangeo-tutorial-gallery/dask.html). Dask parallelism makes use of 'clusters' of workers, each of which is given some task to do. Cluster configurations vary widely, depending on the task and the hardware available. Dask is extremely useful when loading large amounts of data into the notebook and speeds up data loading significantly, especially when accessing data from the cloud. For tutorial on `dask` bag, see [here](https://github.com/hytest-org/hytest/blob/main/essential_reading/Parallel_Dask.ipynb).

In [None]:
# load libraries
import logging
import os

users need to set up AWS credentials prior to initializing a cluster because the workers need access to writing abilities. 

In [None]:
try:
    from dask_gateway import Gateway
except ImportError:
    logging.error(
        "Unable to import Dask Gateway.  Are you running in a cloud compute environment?\n"
    )
    raise
os.environ["DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION"] = "1.0"

gateway = Gateway()
_options = gateway.cluster_options()
_options.conda_environment = (
    "users/users-pangeo"  ##<< this is the conda environment we use on nebari.
)
_options.profile = "Medium Worker"
_env_to_add = {}
aws_env_vars = [
    "AWS_ACCESS_KEY_ID",
    "AWS_SECRET_ACCESS_KEY",
    "AWS_SESSION_TOKEN",
    "AWS_DEFAULT_REGION",
]
for _e in aws_env_vars:
    if _e in os.environ:
        _env_to_add[_e] = os.environ[_e]
_options.environment_vars = _env_to_add
cluster = gateway.new_cluster(_options)  ##<< create cluster via the dask gateway
cluster.adapt(minimum=2, maximum=30)  ##<< Sets scaling parameters.

client = cluster.get_client()

print(
    "The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'"
)
print(
    "The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' "
)
print(f"The link to view the client dashboard is:\n>  {client.dashboard_link}")

Note: HyTEST has helper scripts to assist with [cluster initialization](https://github.com/hytest-org/hytest/tree/main/environment_set_up) and a user can run a command like `%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb` when running the notebooks in that repo. See other ipynb files regarding 'Start_Dask_Cluster...ipynb'.

### load dataset with `dask` and `xarray`

In [None]:
%%time
ds = hytest_cat[dataset].to_dask()

In [None]:
# let's view this dataset
ds

From examining the xarray dataset above, we have dimensions of 7994 gage_ids and 367,439 time slices. So what is the timestep? You can use the three disk symbol near the `time` coordinate to examine the values or you can call them out explicitly. 

In [None]:
#ds.time
#ds['time.month']
#ds['time.year']
#ds[season]

In [None]:
# sel year
#ds.sel(time=ds.time.dt.year.isin([2005]))

In [None]:
## censoring data, replacing 0 with 0.001 etc. 
## checking Nan, Infs check

We see that our timesteps are hourly, and that in our metadata we lack any information with regards to timezone. This is a good example of why its important to contain metadata from your source data.

In the dataset, we also have several data variables (streamflow and velocity), along with coordinates of elevation, gage_id, latitude, longitude, and stream order. The dimensions of the streamflow and velocity variables are time and gage_id.

In [None]:
# What's the first gage? Indexing-style.
#ds.gage_id[0]

What's the elevation of the first gage? Elevation is a coordinate. 

In [None]:
#ds.gage_id[0].elevation.values

##### Question for user: What's the stream order of the first gage? Order is a coordinate. 

In [None]:
##### fill in your thoughts here! #####







#######################################

Solution! below!

In [None]:
# solution

Let's use `dask` to average streamflow for the first gage in our dataset (01030350)

Use 'sel' to find first gage.

In [None]:
ds0 = ds['streamflow'].sel(gage_id = 'USGS-01030350').mean('time')
ds0.compute()

Let's use `dask` to average streamflow and velocity for EACH GAGE in the dataset (n = 7994). Then we can view the workers performing tasks in real-time using the link that was initialized and supplied to us when we set up our cluster. 

The task stream is a view of which tasks have been running on each thread of each worker. Each row visible in the task stream subwindow is a thread, and each rectangle represents an individual task. 

In [None]:
# ds.plot style first
# hvplot style next, just show one gage

In [None]:
# use the groupby function to calculate a descriptive statistic per gage (streamflow gage)
ds1 = ds.mean("time")
ds1.compute()

We now have one mean streamflow and velocity value for each gage in the dataset! But what if we only wanted to average from the year 2000 to 2005? 

In [None]:
ds2 = ds.sel(time=slice('2000-01-01 00:00','2005-12-31 00:00'))
ds3 = ds2.mean("time")


In [None]:
import dask.array as da
ds3['logQ'] = da.log10(ds3.streamflow)
ds3

In [None]:
#ds3.dask.visualize()

In [None]:
ds3.streamflow.values.min()

In [None]:
ds3.logQ.hvplot.hist(bins = 50)
#ds.hvplot.hist(y=streamflow,bins = 50, rasterize = True)

In [None]:
# dask bag intro.

## using `hvplot`, plot streamflow!

We will see more with regards to the `hvplot` Python package and its capabilities in the next segment of the tutorial, but for now we wanted to show how one might plot a histogram and hydrograph from a national model.

In [None]:
# import relevant libraries

In [None]:
# monthly timeseries
ds2

Let's load our streamflow into memory, for tutorial purposes we will use five years of data per gage.

In [None]:
ds2 = ds.sel(gage_id='USGS-01030350', time=slice('2000-01-01 00:00','2005-12-31 00:00'))
ds2

In [None]:
ds2.streamflow.plot()

In [None]:
import hvplot.xarray
ds2.streamflow.hvplot(x='time', grid = True)

Rasterize = True more than 100 x 200. Good for maps, etc to avoid blowing out memory. 

In [None]:
## maybe add intro to gene's plotting demo

When working on Cloud, its important to make sure to shutdown all clusters so they can be made available for others.

In [None]:
client.close()
cluster.shutdown()

The End.