In [1]:
%matplotlib inline

# adding project dirs to path so code may be referenced from the notebook
import sys
sys.path.insert(0, '..')

In [2]:
import gc
import os
import config
import utils
import importlib
import dask
import xarray as xr
from typing import Iterable, Union

import pandas as pd

from datetime import datetime, timedelta
from dask.distributed import Client, LocalCluster

import importlib
import grid_to_parquet
importlib.reload(grid_to_parquet)
importlib.reload(utils)

from rasterio.io import MemoryFile

In [3]:
usgs = utils.get_usgs_gages()
nwm_feature_id_filter = usgs["nwm_feature_id"].to_list()

In [4]:
cluster = LocalCluster()
client = Client(cluster)
cluster

0,1
Dashboard: /user/mgdenno/proxy/8787/status,Workers: 4
Total threads: 16,Total memory: 58.87 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36273,Workers: 4
Dashboard: /user/mgdenno/proxy/8787/status,Total threads: 16
Started: Just now,Total memory: 58.87 GiB

0,1
Comm: tcp://127.0.0.1:35211,Total threads: 4
Dashboard: /user/mgdenno/proxy/46777/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:37055,
Local directory: /tmp/dask-worker-space/worker-sl7xps7z,Local directory: /tmp/dask-worker-space/worker-sl7xps7z

0,1
Comm: tcp://127.0.0.1:45353,Total threads: 4
Dashboard: /user/mgdenno/proxy/37445/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:41379,
Local directory: /tmp/dask-worker-space/worker-0b8w6614,Local directory: /tmp/dask-worker-space/worker-0b8w6614

0,1
Comm: tcp://127.0.0.1:36719,Total threads: 4
Dashboard: /user/mgdenno/proxy/35381/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:36385,
Local directory: /tmp/dask-worker-space/worker-5ux_x17c,Local directory: /tmp/dask-worker-space/worker-5ux_x17c

0,1
Comm: tcp://127.0.0.1:33753,Total threads: 4
Dashboard: /user/mgdenno/proxy/37685/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:36439,
Local directory: /tmp/dask-worker-space/worker-u851nl0e,Local directory: /tmp/dask-worker-space/worker-u851nl0e


In [5]:
def get_dataset(
        blob_name: str,
        use_cache: bool = True,
        nwm_feature_id_filter: Union[Iterable, None] = None
) -> xr.Dataset:
    """Retrieve a blob from the data service as xarray.Dataset.

    Based largely on OWP HydroTools.

    Parameters
    ----------
    blob_name: str, required
        Name of blob to retrieve.
    use_cacahe: bool, default True
        If cache should be used.  
        If True, checks to see if file is in cache, and 
        if fetched from remote will save to cache.

    Returns
    -------
    ds : xarray.Dataset
        The data stored in the blob.

    """
    nc_filepath = os.path.join(utils.get_cache_dir(), blob_name)
    utils.make_parent_dir(nc_filepath)

    # If the file exists and use_cache = True
    if os.path.exists(nc_filepath) and use_cache:
        # Get dataset from cache
        ds = xr.load_dataset(
            nc_filepath,
            engine='h5netcdf',
        )
        return ds
    else:
        # Get raw bytes
        raw_bytes = grid_to_parquet.get_blob(blob_name)
        # Create Dataset
        ds = xr.load_dataset(
            MemoryFile(raw_bytes),
            engine='h5netcdf',
        )
        if use_cache:
            # Subset and cache
            ds.to_netcdf(
                nc_filepath,
                engine='h5netcdf',
            )
        return ds
    
    if nwm_feature_id_filter and isinstance(nwm_feature_id_filter, Iterable):
            try:
                nwm_feature_id_filter = list(nwm_feature_id_filter)
                return ds.sel(feature_id=nwm_feature_id_filter)
            except:
                warnings.warn("Invalid feature_id_filter")
                return ds
        

In [6]:
def nwm_to_parquet(blob_name, use_cache: bool = False):
    """Calculate the MAP for a single NetCDF file (i.e. one timestep).

    ToDo: add way to filter which catchments are calculated
    """
    # print(f"Processing {blob_name}, {datetime.now()}")

    # Get some metainfo from blob_name
    path_split = blob_name.split("/")
    reference_time = datetime.strptime(
        path_split[0].split(".")[1] + path_split[2].split(".")[1],
        "%Y%m%dt%Hz"
    )
    offset_hours = int(path_split[2].split(".")[4][1:])  # f001
    value_time = reference_time + timedelta(hours=offset_hours)
    configuration = path_split[1]

    # Get xr.Dataset/xr.DataArray
    ds = get_dataset(blob_name, use_cache)
    
    # Convert to DataFrame
    df = ds[['reference_time', 'time', 'streamflow']].to_dataframe().reset_index()
    
    # Rename columns
    df.rename(columns={
        'time': 'value_time',
        'feature_id': 'nwm_feature_id'
    }, inplace=True)

    # Sort values
    df.sort_values(
        by=['nwm_feature_id', 'value_time'],
        ignore_index=True,
        inplace=True
    )
        
    # Additional columns
    df['configuration'] = 'medium_range_mem1'
    df['measurement_unit'] = 'm3/s'
    df['variable_name'] = 'streamflow'

    # Categorize
    df['configuration'] = df['configuration'].astype("category")
    df['measurement_unit'] = df['measurement_unit'].astype("category")
    df['variable_name'] = df['variable_name'].astype("category")
        
    # Save as parquet file
    parquet_filepath = os.path.join(config.PARQUET_CACHE_DIR, f"{blob_name}.parquet")
    utils.make_parent_dir(parquet_filepath)
    df.to_parquet(parquet_filepath)
    
     # This should not be needed, but without memory usage grows
    ds.close()
    del ds
    gc.collect()
    
    # return df

In [7]:
# Setup some criteria
ingest_days = 30
start_dt = datetime(2022, 11, 18) # First one is at 00Z in date
td = timedelta(hours=6)
number_of_forecasts = 1 #ingest_days * 4

In [None]:
print(datetime.now())
# Loop though forecasts, fetch and insert
for f in range(number_of_forecasts):
    reference_time = start_dt + td * f
    ref_time_str = reference_time.strftime("%Y%m%dT%HZ")
    configuration = "medium_range_mem1"

    print(f"Start download of {ref_time_str}")

    blob_list = grid_to_parquet.list_blobs_forcing(
        configuration=configuration,
        reference_time = ref_time_str,
        must_contain = "channel_rt"
    )

    dfs = []
    for blob_name in blob_list:
        df = dask.delayed(nwm_to_parquet)(
            blob_name, 
            use_cache=True, 
        )
        dfs.append(df)
    
    # Join all timesteps into single pd.DataFrame
    # results = dask.compute(*dfs)
    dask.compute(*dfs)
    
#     # df = pd.concat(results)
#     df = pd.concat(results)
    
#     # Sort values
#     df.sort_values(
#         by=['nwm_feature_id', 'value_time'],
#         ignore_index=True,
#         inplace=True
#     )
        
#     # Additional columns
#     df['configuration'] = configuration
#     df['measurement_unit'] = 'm3/s'
#     df['variable_name'] = 'streamflow'

#     # Categorize
#     df['configuration'] = df['configuration'].astype("category")
#     df['measurement_unit'] = df['measurement_unit'].astype("category")
#     df['variable_name'] = df['variable_name'].astype("category")
        
#     # Save as parquet file
#     parquet_filepath = os.path.join(config.MEDIUM_RANGE_1_PARQUET, f"{ref_time_str}.parquet")
#     utils.make_parent_dir(parquet_filepath)
#     df.to_parquet(parquet_filepath)
    
#     del df
#     gc.collect()

    # Print out some DataFrame stats
    # print(df.info(verbose=True, memory_usage='deep'))
    # print(df.memory_usage(index=True, deep=True))
#     
print(datetime.now())

2023-02-08 03:14:58.816022
Start download of 20221118T00Z
