# Collect AORC v1.0 at lat/lon Points


In [2]:
%pip install dask[distributed] zarr xarray pandas s3fs kerchunk scikit-learn -q

Note: you may need to restart the kernel to use updated packages.


In [2]:
import dask
import zarr
import numpy
import xarray
import pyproj
import pandas
from s3fs import S3FileSystem
from dask.distributed import Client, progress
from kerchunk.combine import MultiZarrToZarr
from sklearn.metrics import pairwise_distances_argmin


import pickle
from datetime import datetime, timedelta


import aorc1

Initiate the Dask client. This will enable us to parallelize our computations.

In [2]:
# use a try accept loop so we only instantiate the client
# if it doesn't already exist.
try:
    print(client.dashboard_link)
except:    
    # The client should be customized to your workstation resources.
    client = Client(n_workers=8) # per worker
    print(client.dashboard_link)


/user/castronova/proxy/8787/status


## Load AORC V1.0 from AWS

In [3]:
%%time
ds = aorc1.load_aorc_dataset('2010', '01', '01')

24
CPU times: user 4.77 s, sys: 217 ms, total: 4.98 s
Wall time: 7.61 s


In [38]:
training_path = "../Snow-Extrapolation/data/RegionTrain_SCA.pkl"
with open(training_path, 'rb') as f:
    region_train = pickle.load(f)

pts = []
key = 'N_Sierras'
region_train[key]['pt'] = list(zip(region_train[key].Long, region_train[key].Lat))
#region_train[key].loc[region_train[key].pt.unique()
#pts.extend(list(region_train[key]['pt'].unique()))


In [50]:
region_train[key].pt.unique()

array([(-121.9394134663883, 41.16664739027599),
       (-120.6188899987326, 39.675880337476684),
       (-120.87940143112729, 39.786416508865145),
       (-121.78669986808801, 40.78005174338435),
       (-120.17871550951399, 41.99314916228401),
       (-121.3195759203458, 39.81402285325959)], dtype=object)

In [48]:
len(region_train[key].index.unique())

6

In [59]:
# open locations
#df_meta = pandas.read_csv('../Snow-Extrapolation/data/PreProcessed/ground_measures_metadata.csv')
training_path = "../Snow-Extrapolation/data/RegionTrain_SCA.pkl"

with open(training_path, 'rb') as f:
    region_train = pickle.load(f)

pts = []
regions = []
for key in ['N_Sierras', 'S_Sierras_Low', 'S_Sierras_High']:
    region_train[key]['pt'] = list(zip(region_train[key].Long, region_train[key].Lat))
    region_train[key]['region'] = key
    region_train = region_train[key].pt.unique()
    
    regions.append(region_train[key])
    
    pts.extend()
    
#pts = list(set(pts))

In [70]:
len(regions[0].drop_duplicates(subset='pt'))

6

In [75]:
regions[0].region.unique()

array(['N_Sierras'], dtype=object)

In [71]:
len(regions[1].drop_duplicates(subset='pt'))

569

In [76]:
regions[1].region.unique()

array(['S_Sierras_Low'], dtype=object)

In [72]:
len(regions[2].drop_duplicates(subset='pt'))

324

In [77]:
regions[2].region.unique()

array(['S_Sierras_High'], dtype=object)

In [5]:
lats = df_meta.latitude.values
lons = df_meta.longitude.values

In [6]:
# function to collect all indexes

@dask.delayed
def extract_dask(search_points, all_points, final_shape):
    index = pairwise_distances_argmin(X=search_points,
                                      Y=all_points)
    i0, j0 = numpy.unravel_index(index, (final_shape))
    return(i0, j0)
#    return ds.isel(x=j0, y=i0).squeeze()

In [7]:
%%time

points = numpy.array(list(zip(lons, lats)))

# batch index collection using dask
pt_groups = numpy.array_split(numpy.array(points), 100)
all_pts = numpy.c_[ds['lon'].values.ravel(), ds['lat'].values.ravel()]
final_shape = ds['lon'].shape


print('scattering...', end='', flush=True)
all_pts_scattered = client.scatter(all_pts)
print('done')

futures = []
for grp in pt_groups:
    futures.append(extract_dask(grp, all_pts_scattered, final_shape)) 
    

scattering...done
CPU times: user 420 ms, sys: 243 ms, total: 663 ms
Wall time: 669 ms


In [8]:
%%time
results = dask.compute(futures)

CPU times: user 1.63 s, sys: 207 ms, total: 1.84 s
Wall time: 10.6 s


In [9]:
# put the x,y coordinates for the matching cells into lists
i_locs = []
j_locs = []
for grp in results[0]:
    num_elements = len(grp[0])
    for idx in range(0, num_elements):
        i_locs.append(grp[0][idx])
        j_locs.append(grp[1][idx])

In [20]:
# function to collect variables for time range


@dask.delayed
def get_data_dask(i_locs, j_locs, year='2010', month='01', day='01'):
    ds = aorc1.load_aorc_dataset(year, month, day)
    precip = ds.isel(x=i_locs, y=j_locs).squeeze().RAINRATE
    
    
    with open(f'{year}{month}{day}.pkl', 'wb') as f:
        pickle.dump(precip.values, f)
    
    return datetime(int(year), int(month), int(day)),
#            'precip': precip.values}


def get_data_daskbag(args):
    i_locs = args[0]
    j_locs = args[1]
    dt = args[2]
    
    # get the date parts
    month = f'{dt.month:02}'    
    day = f'{dt.day:02}'
    year = f'{dt.year:04}'

    ds = aorc1.load_aorc_dataset(year, month, day)
    
    precip = ds.isel(x=i_locs, y=j_locs) #.squeeze().RAINRATE
    precip = precip.RAINRATE.groupby('time.dayofyear').sum() * 24 * 3600
    pcp_df = precip.to_dataframe().reset_index()
    
    pcp_df['date'] = dt
    pcp_df = pcp_df[['lat', 'lon', 'RAINRATE', 'date']]
    pcp_df.rename(columns={'RAINRATE': 'RAINRATE [mm]'}, inplace=True)
    pcp_df.set_index('date', inplace=True)
    pcp_df.to_csv(f'{year}{month}{day}.csv')
    
    return f'{year}{month}{day}.csv'


In [21]:
%%time 

# isolate the lat/lon points that we're interested in
ind_x = xarray.DataArray(i_locs, dims=["pt"])
ind_y = xarray.DataArray(j_locs, dims=["pt"])


CPU times: user 232 µs, sys: 0 ns, total: 232 µs
Wall time: 238 µs


In [22]:
%%time

# batch variable collection

t = datetime(2010,1,1)
et = datetime(2010,2,1)

input_params = []
while t <= et:
    input_params.append([ind_x, ind_y, t])
    t += timedelta(days=1)
    
import dask.bag as db  
b = db.from_sequence(input_params, npartitions=6)
b = b.map(get_data_daskbag)


CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 3.63 ms


In [23]:
%%time

results = b.compute()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


CPU times: user 14.2 s, sys: 1.51 s, total: 15.7 s
Wall time: 2min 39s


In [None]:
results

In [14]:
d = pandas.read_pickle('20100101.pkl')

In [29]:
sub_ds = ds.isel(x=ind_x, y=ind_y)
pt_lats = sub_ds.lat.values
pt_lons = sub_ds.lon.values
lat_ravel = pt_lats.ravel()
lon_ravel = pt_lons.ravel()
#d_ravel = d[0].ravel()

# dats = []
# for day in days:
#     fname = f'201001{day}.pkl'
#     data = df.read_pickle(fname)
    
#     dats.append(pandas.read_pickle(fname))

Unnamed: 0_level_0,lat,lon,RAINRATE
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2010-01-01,53.489871,-109.476319,0.000000e+00
2010-01-01,53.693732,-103.210843,0.000000e+00
2010-01-01,54.082276,-105.437942,0.000000e+00
2010-01-01,53.642236,-102.003555,0.000000e+00
2010-01-01,53.574999,-100.285731,0.000000e+00
...,...,...,...
2010-01-01,48.479689,-115.341881,0.000000e+00
2010-01-01,49.960336,-120.380652,0.000000e+00
2010-01-01,46.588421,-104.825212,0.000000e+00
2010-01-01,50.073268,-120.448658,0.000000e+00


In [31]:
for i in range(0, 10):
    print(f'{lon_ravel[i]}, {lat_ravel[i]} = {d_ravel[i]}')

-109.47631928348011, 53.48987108815394 = 0.0
-103.24115304529501, 53.92343906099967 = 0.0
-105.38869134717802, 53.80741879893348 = 0.0
-102.0387665206436, 53.973365590696204 = 5.683185008820146e-06
-100.3172504890106, 54.026214976099574 = 0.0
-110.04235388253495, 53.43571256120676 = 0.0
-102.0387665206436, 53.973365590696204 = 5.683185008820146e-06
-101.49144125895054, 53.9925415336226 = 0.0
-107.44705815264058, 53.66353852972689 = 0.0
-108.8936754652265, 53.54300208500296 = 0.0


In [59]:
# find AORC files on AWS
s3 = S3FileSystem(anon=True)
files = s3.glob(f'{BUCKET}{year}/{year}{month}{day}*')  

# json_list = []
#     for f in files:
#         parts = f.split('/')
#         parts[0] += '.s3.amazonaws.com'
#         parts.insert(0, 'https:/')
#         new_name = '/'.join(parts)
#         json_list.append(new_name)


In [60]:
files

['ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010100.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010101.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010102.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010103.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010104.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010105.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010106.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/2010/2010010107.LDASIN_DOMAIN1.json',
 'ciroh-nwm-zarr

In [58]:
len(s3.glob(f'{BUCKET}{year}/{year}*'))

8760

In [None]:
fs = fsspec.filesystem('s3', anon=True)

flist = fs.glob("s3://noaa-goes16/ABI-L2-SSTF/2020/210/*/*.nc")

Note that the method we're using will associate grid cell with the watershed that it overlaps the most with. There are more advanced ways to create a mapping using various interpolation methods that will distribute values cells across all watershed boundaries that they intersect with. This is left as a future exercize. 

In [None]:
figure, ax = plt.subplots(figsize=(10,7))


# plot the gridded catchment mapping
ds.cat.plot()

## create a discrete color mapping such that each catchment 
## is represented by a single color
# cmap = colors.ListedColormap(['green', 'lightskyblue', 'cyan', 'red', 'navy'])
# bounds = [catchment_ids[0]] + [c+0.9 for c in catchment_ids]
# norm = colors.BoundaryNorm(bounds, cmap.N)
# ds.cat.plot(cmap=cmap, norm=norm, ax=ax)

# preview map geometries
gdf.iloc[:].plot(ax=ax, linewidth=2, edgecolor='k', facecolor='None');


## Compute basin-averaged forcing data

Define functions that will be used to perform basin averages on the AORC data. These functions leverage `dask` to parallelize the computation.

In [None]:
# call once per catchment
# distribute zonal stats to sub processes
def perform_zonal_computation(ds, cat_id):

    # subset by catchment id
    ds_catchment = ds.where(ds.cat==cat_id, drop=True)
#    ds_catchement_future = client.scatter(ds_catchment, broadcast=True)
    
    delayed = []
    # loop over variables   
    for variable in ['LWDOWN', 'PSFC',
                     'Q2D', 'RAINRATE', 'SWDOWN',
                     'T2D', 'U2D', 'V2D']:
                
        delay = dask.delayed(compute_zonal_mean)(ds_catchment[variable], variable)
        delayed.append(delay)
        
    res = dask.compute(*delayed)
    
    # combine outputs (list of dicts) into a single dict.
    res = {k: v for d in res for k, v in d.items()}
    
    # return results
    return {f'cat-{int(cat_id)}': res}

def compute_zonal_mean(ds, variable):
    return {variable: ds.mean(dim=['x','y']).values}

Slice the data to the temporal period of our choice.

In [None]:
# define the start and end time of the data we want to use
start_time = f'{year}-01-01 00:00'
end_time = f'{year}-01-10 00:00'

# isolate the desired time period of our data
ds_subset = ds.sortby('time').sel(time=slice(start_time, end_time))

print(f'The dataset contains {len(ds_subset.time)} timesteps')

Let's rechunk our data now that we have many fewer elements.

In [None]:
ds_subset = ds_subset.chunk(chunks={'time': 1000})

In [None]:
ds_subset.chunks

Drop all data that we don't need. The goal here is to make the dataset as small as possible before we start running computations on the data.

In [None]:
# drop unused coordinates
ds_subset = ds_subset.drop(['lat','lon'])

Tell `dask` to perform the subsetting computations on the data now. That way when we process the zonal statistics, the entire dataset won't need to be moved around. This will save a considerable amount of processing in future steps time.

In [None]:
%%time 
ds_subset = ds_subset.compute()

In [None]:
ds_subset

Scatter the dataset to the cluster so all workers will have access to it. This is good practice and especially necessary if working on a large dataset.

In [None]:
%%time
scattered_ds = client.scatter(ds_subset, broadcast=True)

Build a list of `delayed` tasks. This will not execute the computation.

In [None]:
%%time
delayed = []

# loop over each catchment in our domain
# create delayed tasks to compute zonal mean
for cat_id in catchment_ids:
    delay = dask.delayed(perform_zonal_computation)(scattered_ds, cat_id)
    delayed.append(delay)

Invoke the computation using `dask.compute`.

In [None]:
%%time 

# run the computation
results = dask.compute(*delayed)

Save the basin averaged meteorological data in the format expected by `ngen`.

The summarized AORC variables need to be mapped to the `ngen` model that we'll be using. The following table illustrates the mapping.

|AORC Variable Name|NGEN Variable Name|Description|
|---|---|---|
| LWDOWN   | DLWRF_surface         | Surface downward long-wave radiation flux (W m-2) 
| PSFC     | PRES_surface          | Surface Pressure (Pa)
| Q2D      | SPFH_2maboveground    | 2-m Specific Humidity (kg kg-1)
| RAINRATE | ---                   | precipitation_flux (mm s^-1)
| SWDOWN   | DSWRF_surface         | Surface downward short-wave radiation flux (W m-2)
| T2D      | TMP_2maboveground     | 2-m Air Temperature (K)
| U2D      | UGRD_10maboveground   | 10-m U-component of wind (m s-1)
| V2D      | VGRD_10maboveground   | 10-m V-component of wind (m s-1)
| ---      | APCP_surface          | Surface precipitation (kg/m^2)

Note: our `ngen` model is expecting shortwave and longwave radiation at a height of 0 meters above ground whereas the AORC data has values are 2 meters above ground.

References: [tshirt_c.h](https://github.com/NOAA-OWP/ngen/blob/f2725dfbb52f3af5083ce927e69733edbf059f57/models/tshirt/include/tshirt_c.h#L52), [sample forcing csv](https://github.com/NOAA-OWP/ngen/blob/master/data/forcing/cat-27_2015-12-01%2000_00_00_2015-12-30%2023_00_00.csv)

## Save data as csv files

In [None]:

# compute the date range for our data using start and end times
# that were used in the subsetting process.
dates = pandas.date_range(start_time, end_time, freq="60min")

# save the zonal means for each catchment
for dat in results:
    for cat in dat:
        df = pandas.DataFrame({k:list(v) for k,v in dat[cat].items()})
        df.fillna(0., inplace=True)
        
        # convert rainrate from mm/s to kg/m2
        # mm/s - mm/hr = df.RAINRATE * 3600
        # since the timestep is one hour, this is effectively the total rain in mm.
        # 1 mm of rainfall is equal to 1kg/m2 so our conversion is:
        # NOTE: we should only be considering the fraction of liquid precip which can
        #       be computed using LQFRAC. However LQFRAC is zero for our data which 
        #       does not seem correct, so we'll assume that all precip is liquid. This
        #       is something that needs to be revisited.
        df['APCP_surface'] = df.RAINRATE * 3600

        # rename columns to match the variable names expected by the ngen t-shirt model
        df.rename(columns={
            'LWDOWN'   : 'DLWRF_surface',
            'PSFC'     : 'PRES_surface',
            'Q2D'      : 'SPFH_2maboveground',
            'SWDOWN'   : 'DSWRF_surface',
            'T2D'      : 'TMP_2maboveground',
            'U2D'      : 'UGRD_10maboveground',
            'V2D'      : 'VGRD_10maboveground',
            'RAINRATE' : 'precip_rate',
        },
                  inplace=True)
               
        # add the time index
        df['time'] = dates
        df.set_index('time', inplace=True)


        # write to file
        with open(f'{wb_id}/forcings/{cat}.csv', 'w') as f:
            # Note: saving "precip_rate" because this column exists in the example 
            #       forcing files. It's not clear if this is being used or not.
            df.to_csv(f,
                      columns = ['APCP_surface',
                                 'DLWRF_surface',
                                 'DSWRF_surface',
                                 'PRES_surface',
                                 'SPFH_2maboveground',
                                 'TMP_2maboveground',
                                 'UGRD_10maboveground',
                                 'VGRD_10maboveground',
                                 'precip_rate'])
            

In [None]:
# check the number of catchments. 
print(gdf.shape[0])
print(len(results))

# If these are not equal, run the following code cell.

Here is an example showing why some catchments are missing in the results.

<img src="./figures/missing_catchment_example.png">

In [None]:
computed_catchments = [list(r.keys())[0] for r in results]
for cat_id in gdf['cat'].values:
    known_catchment = f'cat-{int(cat_id)}'
    if known_catchment not in computed_catchments:
        print(f'Creating Synthetic Forcing for {known_catchment}')
        synthetic_df = pandas.DataFrame(0, index=df.index, columns=['APCP_surface',
                                                                    'DLWRF_surface',
                                                                    'PRES_surface',
                                                                    'SPFH_2maboveground',
                                                                    'DSWRF_surface',
                                                                    'TMP_2maboveground',
                                                                    'UGRD_10maboveground',
                                                                    'VGRD_10maboveground',
                                                                    'precip_rate'])
        # write to file
        with open(f'{wb_id}/forcings/{known_catchment}.csv', 'w') as f:
            df.to_csv(f,
                      columns = ['APCP_surface',
                                 'DLWRF_surface',
                                 'DSWRF_surface',
                                 'PRES_surface',
                                 'SPFH_2maboveground',
                                 'TMP_2maboveground',
                                 'UGRD_10maboveground',
                                 'VGRD_10maboveground',
                                 'precip_rate'])
            
        