In [1]:
import warnings
warnings.filterwarnings('ignore')

# Looking at Point Data (CHRTOUT)

## Load the Modules

In [34]:
from dask.distributed import Client, LocalCluster
import xarray as xr
import os
import time
from IPython.display import display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

In [3]:
USER = "x-arnav1710"
xr.set_options(display_style="html")

<xarray.core.options.set_options at 0x7f9a3d69c0d0>

In [4]:
cluster = LocalCluster(n_workers=16)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:39823  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 16  Cores: 16  Memory: 117.19 GiB


In [5]:
client.close()

## Variable Description

**CHRTOUT: Point Type (including Reach ID)**

feature_id: Reach ID
<br>
streamflow: River Flow (m3 s-1)
<br>
q_lateral: Runoff into channel reach (m3 s-1)
<br>
velocity: River Velocity (m s-1)
<br>
qSfcLatRunoff: Runoff from terrain routing (m3 s-1)
<br>
qBucket: Flux from ground water bucket (m3 s-1)
<br>
qBtmVertRunoff: Runoff from bottom of soil to ground water bucket (m3)

### Init Test Cases
Let's first initialize some basic test cases that we would expect researchers to query. For example: Max velocity for a 100 points etc.

In [113]:
def maxVelocity(data, sliceStart, sliceEnd):
    subset_data = data.sel(feature_id = slice(sliceStart, sliceEnd))
    subset_data["velocity"].max().values

In [114]:
tests = [
    ["Max Velocity Test", maxVelocity]
]

In [115]:
# Design chunking schemas in a way such that querying
# any feature of the point data is quick

schemas = [
    ["Everything100", {"time" : 100, "feature_id" : 100}]
]

In [120]:
# we only care about point data here

# researchers would only want to query
# specific data points out

REPEAT_FOR = 5

def openDataSet(reg = ""):
    dirpath = "/anvil/projects/x-cis220065/x-cybergis/compute/WRFHydro-Example-Output/CHRTOUT/"
    dataset = xr.open_mfdataset(dirpath + "*" + reg + ".CHRTOUT_DOMAIN1", 
                                engine="netcdf4", combine="nested",
                                concat_dim="time", parallel="True")
    return dataset

def chunkForScheme(data, dimension, chunkScheme):
    data[dimension] = data[dimension].chunk(chunkScheme)
    return data

def commitToZarr(data, filePath):
    data.to_zarr(filePath, mode="w")

def getTime(data, test):
    start = time.time()
    for _ in range(REPEAT_FOR):
        test[1](data, 3199274, 10038154)
    end = time.time()
    tot = (end - start) / REPEAT_FOR
    return tot

def chunkAndTest(chunkOn):
    if 'cached_dataset' not in globals():
        globals()['cached_dataset'] = openDataSet("104*")

    dataset = globals()['cached_dataset'].copy()
    for schema in schemas:
        filePath = f"/anvil/scratch/{USER}/{schema[0]}.zarr"
        if not os.path.exists(filePath):
            dataset = chunkForScheme(dataset, chunkOn, schema[1])
            commitToZarr(dataset, filePath)
    times = []
    for schema in schemas:
        filePath = f"/anvil/scratch/{USER}/{schema[0]}.zarr"
        for test in tests:
            data = xr.open_zarr(filePath)
            times.append(getTime(data, test))
    return times

In [121]:
test_res = chunkAndTest("velocity")
test_res

[0.010930156707763672]