# Add AORC Precipitation to Prediction Datasets

- Tony Castronova <acastronova@cuahsi.org>
- Irene Garousi-Nejad <igarousi@cuahsi.org>


In [None]:
%pip install dask[distributed] zarr xarray pandas s3fs kerchunk scikit-learn -q

In [None]:
import os
import dask
import zarr
import numpy
import xarray
import pyproj
import pandas
from s3fs import S3FileSystem
from dask.distributed import Client, progress
from kerchunk.combine import MultiZarrToZarr
from sklearn.metrics import pairwise_distances_argmin

import dask.bag as db  

import pickle
from datetime import datetime, timedelta

from glob import glob

import aorc1

Initiate the Dask client. This will enable us to parallelize our computations.

In [None]:
# use a try accept loop so we only instantiate the client
# if it doesn't already exist.
try:
    print(client.dashboard_link)
except:    
    # The client should be customized to your workstation resources.
    client = Client(n_workers=20)
    print(client.dashboard_link)


In [None]:
@dask.delayed
def extract_dask(search_points, all_points, final_shape):
    index = pairwise_distances_argmin(X=search_points,
                                      Y=all_points)
    i0, j0 = numpy.unravel_index(index, (final_shape))
    return(j0, i0)


@dask.delayed
def get_data_dask(i_locs, j_locs, year='2010', month='01', day='01'):
    ds = aorc1.load_aorc_dataset(year, month, day)
    precip = ds.isel(x=i_locs, y=j_locs).squeeze().RAINRATE
    
    
    with open(f'{year}{month}{day}.pkl', 'wb') as f:
        pickle.dump(precip.values, f)
    
    return datetime(int(year), int(month), int(day)),


def get_data_daskbag(args):
    i_locs = args[0]
    j_locs = args[1]
    dt = args[2]
    path = args[3]
    
    # get the date parts
    month = f'{dt.month:02}'    
    day = f'{dt.day:02}'
    year = f'{dt.year:04}'
    
    # if os.path.exists(f'{path}/{year}{month}{day}.csv'):
    #     return f'{path}/{year}{month}{day}.csv'

    ds = aorc1.load_aorc_dataset(year, month, day)
    
    precip = ds.isel(x=i_locs, y=j_locs) #.squeeze().RAINRATE
    precip = precip.RAINRATE.groupby('time.dayofyear').sum() * 24 * 3600
    pcp_df = precip.to_dataframe().reset_index()
    
    pcp_df['date'] = dt
    pcp_df = pcp_df[['lat', 'lon', 'RAINRATE', 'date']]
    pcp_df.lat = round(pcp_df.lat, 6)  # rounding so they match later 
    pcp_df.lon = round(pcp_df.lon, 6)  # rounding so they match later
    pcp_df.rename(columns={'RAINRATE': 'RAINRATE [mm]'}, inplace=True)
    pcp_df.set_index('date', inplace=True)
    pcp_df.to_csv(f'{path}/{year}{month}{day}.csv')
    
    return f'{path}/{year}{month}{day}.csv'

## Load AORC V1.0 from AWS

In [None]:
%%time

# get all of the lat/lon locations in AORC

ds = aorc1.load_aorc_dataset('2010', '01', '01')
all_pts = numpy.c_[ds['lon'].values.ravel(), ds['lat'].values.ravel()]
all_lats = ds['lat'].values
all_lons = ds['lon'].values
final_shape = ds['lon'].shape

In [None]:
training_path = "/home/jovyan/Snow-Extrapolation/data/RegionTrain_SCA.pkl"
with open(training_path, 'rb') as f:
    region_train = pickle.load(f)

pts = []
locs = []
for key in region_train.keys():
    region_train[key]['pt'] = list(zip(region_train[key].Long, region_train[key].Lat))
    locs.extend([*region_train[key].pt.unique()])

print(f'Number of training points = {len(locs)}')

## Collect AORC Precip

In [None]:
prediction_path = 'Predictions/Hold_Out_Year/Predictions/'
files = glob(f'{prediction_path}/*.pkl')

with open(files[0], 'rb') as f:
    dat = pickle.load(f)
    
pts = []
locs = []
for key in ['N_Sierras', 'S_Sierras_Low', 'S_Sierras_High']:
    dat[key]['pt'] = list(zip(dat[key].Long, dat[key].Lat))
    locs.extend([*dat[key].pt.unique()])

print(f'Number of testing points = {len(locs)}')

In [None]:
%%time

# batch index collection using dask
number_of_groups = 100
pt_groups = numpy.array_split(numpy.array(locs), number_of_groups)

print('scattering...', end='', flush=True)
all_pts_scattered = client.scatter(all_pts)
print('done')

futures = []
for grp in pt_groups:
    futures.append(extract_dask(grp, all_pts_scattered, final_shape)) 

print('finding aorc lats/lons...', end='', flush=True)
results = dask.compute(futures)
print('done')

In [None]:
# put the x,y coordinates for the matching cells into lists
i_locs = []
j_locs = []
for grp in results[0]:
    num_elements = len(grp[0])
    for idx in range(0, num_elements):
        i_locs.append(grp[0][idx])
        j_locs.append(grp[1][idx])

In [None]:
# create mapping between nsm lat/lon and aorc i,j 
data = []
for i in range(0, len(i_locs)):
    aorc_lat = round(all_lats[j_locs[i]][i_locs[i]], 6)
    aorc_lon = round(all_lons[j_locs[i]][i_locs[i]], 6)
    data.append([locs[i][0], locs[i][1], i_locs[i], j_locs[i], aorc_lon, aorc_lat])

headers = ['nsm_long', 'nsm_lat', 'aorc_i', 'aorc_j', 'aorc_lon', 'aorc_lat'] 

loc_map = pandas.DataFrame(columns=headers, data=data)

In [None]:
%%time


if not os.path.exists('data_preds'):
    os.mkdir('data_preds')
    
# batch variable collection
t = datetime(2018,9,21)
et = datetime(2019,6,30)

# create dataarrays for subsetting aorc pointwise
ind_x = xarray.DataArray(i_locs, dims=["pt"])
ind_y = xarray.DataArray(j_locs, dims=["pt"])

input_params = []
while t <= et:
    if not os.path.exists(f"data_preds/{t.strftime('%Y%m%d')}.csv"):
        input_params.append([ind_x, ind_y, t, 'data_preds'])
    t += timedelta(days=1)
    
b = db.from_sequence(input_params, npartitions=25)
b = b.map(get_data_daskbag)

In [None]:
%%time

results = b.compute()

In [None]:
# we don't need dask anymore, so release all memory consumed by it
client.cluster.close()

## Summarize Precipitation and Save to Prediction Files

In [None]:
# load aorc precip from CSVs into a dataframe
#df_precip = pandas.concat([pandas.read_csv(f) for f in results])

df_precip = pandas.concat([pandas.read_csv(f) for f in glob('data_preds/*.csv')])

df_precip.date = pandas.to_datetime(df_precip.date)
df_precip.set_index('date', inplace=True)

# compute weekly precip
df_precip = df_precip.groupby(['lat', 'lon', pandas.Grouper(freq='W-Tue')]).sum().reset_index()

# rename columns
df_precip.rename(columns={'RAINRATE [mm]':'aorc-precip-weekly-mm', 'lat':'aorc_lat', 'lon':'aorc_lon'}, inplace=True)

In [None]:
# adding aorc lat/lon so we can join into the training dataset
df_precip = pandas.merge(df_precip, loc_map, how='left',
             left_on=['aorc_lon', 'aorc_lat'],
             right_on=['aorc_lon', 'aorc_lat'])

In [None]:
# compute shifted precip
for idx, _ in df_precip.groupby(['nsm_long', 'nsm_lat']):
    d = df_precip.loc[(df_precip.nsm_long == idx[0]) & (df_precip.nsm_lat == idx[1])]['aorc-precip-weekly-mm'].shift(1)
    df_precip.loc[(df_precip.nsm_long == idx[0]) & (df_precip.nsm_lat == idx[1]), 'Prev_aorc-precip-weekly-mm'] = d 

## TODO reset accumulation every year
# # compute cumulative precip 
# for idx, _ in df_precip.groupby(['nsm_long', 'nsm_lat']):
#     d = df_precip.loc[(df_precip.nsm_long == idx[0]) & (df_precip.nsm_lat == idx[1])]['aorc-precip-weekly-mm'].cumsum()
#     df_precip.loc[(df_precip.nsm_long == idx[0]) & (df_precip.nsm_lat == idx[1]), 'cum-precip-mm'] = d 


In [None]:
df_precip

In [None]:
# save results to the prediction files

prediction_path = '/Predictions/Hold_Out_Year/Predictions/'
files = glob(f'{prediction_path}/*.pkl')

for file in files:
    date = datetime.strptime(file.split('_')[-1].split('.')[0], '%Y-%m-%d')
    with open(file, 'rb') as f:
        regions_df = pickle.load(f)
    
    df_precip_by_date = df_precip.loc[df_precip.date == date]
    
    for key in ['N_Sierras', 'S_Sierras_Low', 'S_Sierras_High']:

        region = regions_df[key]
        region.reset_index(inplace=True)
        region = pandas.merge(region, df_precip_by_date[['aorc-precip-weekly-mm', 'Prev_aorc-precip-weekly-mm', 'nsm_long', 'nsm_lat']],
                              how='left', left_on=['Long', 'Lat'],
                              right_on=['nsm_long', 'nsm_lat']).set_index('cell_id')
        region.drop(['nsm_long', 'nsm_lat'], inplace=True, axis=1)
        
        # save this back to the training dataset
        regions_df[key] = region
        
    with open(file, 'wb') as f:
        pickle.dump(regions_df, f)