## Cal-Adapt Analytics Engine Data processing script for DWR

This script download climate data for pr, tasmax, and tasmin, process the data to monthly time step, weights each subasins of interest by Area and Flow and combines that result to average monthing value for the entire watershed. The average weighted results for Area and Flow are then processed into a monthly rolling average series for pr, tasmax, tasmin, and tasave.
The output format of processed climate data are listed below:
* Each subasin is stored stored to exported to CSV without the weights with file name format {source_id}_{experiment_id}_{member_id}_{basin_id}_19.csv.  
* The Area and Flow weighted value are exported to CSV files in seperate directories with the file name format {source_id}_{experiment_id}_{member_id}_19{weight_type}Weighted.csv.
* The monthly rolling average results are exported to CSV files in seperate diretories based on inputs weighting.  The file format is {source_id}_{member_id}_{experiment_id}_30yrAve.csv.
Note: All SSPs/realization(member_1d) must have the corrisponding historical/realization(member_1d) otherwise the 30YrAve postproccesing will throw an error. 

In [1]:
import csv
from io import StringIO
import json
import os
import tempfile
import zipfile

import pandas as pd
import intake
from shapely.geometry import Point, Polygon
import geopandas as gpd
import numpy as np
import xarray as xr
import dask
import panel as pn
#from dask.distributed import progress
#from dask.distributed import Client
#from climakitae.cluster import Cluster

pn.extension()
xr.set_options(keep_attrs=True)
#3dask.config.set({"array.slicing.split_large_chunks": True})

<xarray.core.options.set_options at 0x7fcf53f6d940>

In [2]:
# VARIABLES
#Use these cordinates to clip around the watershed of interest.
# latitude = [34.775317,42.432494]
# longitude = [-123.097421,-117.980799]
bbox = {
    "maxy": 42.432494,
    "miny": 34.775317,
    "minx": -123.097421,
    "maxx": -117.980799,
}

#run_list_path = "data/GCM_Run_List_1-43.csv"
#file_zip = "GCM_1-43.zip"

run_list_path = "data/GCM_Run_List_1-19.csv"
file_zip = "GCM_1-19.zip"

run_list_path = "data/GCM_Run_List_20-43.csv"
file_zip = "GCM_20-43.zip"

run_list_path = "data/GCM_Run_List_44-82.csv"
file_zip = "GCM_44-82.zip"

run_list_path = "data/GCM_Run_List_83-100.csv"
file_zip = "GCM_83-100.zip"

run_list_path = "data/GCM_Run_List_Add_23-47.csv"
file_zip = "GCM_Add_23-47.zip"

#run_list_path = "data/GCM_Run_List_101-117.csv"
#file_zip = "GCM_101-117.zip"

#run_list_path = "data/GCM_Run_List_Test_Rolling.csv"
#file_zip = "GCM_Rolling_Test.zip"

basin_weights_csv = "data/Basin_Weights.csv"
esm_datastore = "https://cadcat.s3.amazonaws.com/cae-collection.json"
output_folder = "outputs"
mask_path = "mask/mask.npy"
dir_area_weighted = 'AREA_WEIGHTED_CENTRALVALLEY'
dir_flow_weighted = 'FLOW_WEIGHTED_CENTRALVALLEY'
dir_non_weighted = 'NON_WEIGHTED_CENTRALVALLEY'
dir_individual = 'INDIVIDUAL_BASIN_LOCA2'
dir_area_weighted_rolling = 'AREA_WEIGHTED_30_YEAR_ROLLING_AVE_CENTRALVALLEY'
dir_flow_weighted_rolling = 'FLOW_WEIGHTED_30_YEAR_ROLLING_AVE_CENTRALVALLEY'


This loads the dask client for faster processing.  If client is running, do not run this cell.

In [3]:
#Load dask Area for faster computing. 
#Note, this will take awhile but in long run processing should be faster when compute is called.
#cluster = Cluster()
#cluster.adapt(minimum=0, maximum=16)
#cluster.adapt(minimum=0, maximum=30)
#client = cluster.get_client()

#Get client link.
#client

In [4]:
def get_region_dict():
    """ Gets flow and area weights for each region"""
    region_dict = {}
    with open(basin_weights_csv, "r") as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            region_dict[int(row["ID"])] = {
                                "region_name": row["Regions"],
                                "flow_ratio": float(row["Flow Ratio"]),
                                "area_ratio": float(row["Area Ratio"]),
                            }
    return region_dict

In [5]:
# removed 99: {'region_name': 'Diversion from Echo Lake', 'weighting_factor': 0.0},
# DEPRECATED
# region_dict = {
#     0:  {'region_name': 'Goose Lake', 'weighting_factor': 0.0},
#     11: {'region_name': 'Westside Streams of SJR', 'weighting_factor': 0.002758980030193925},
#     16: {'region_name': 'Other Rim Inflows of Sac', 'weighting_factor': 0.06520559638738632},
#     9:  {'region_name': 'Eastside Streams of Delta', 'weighting_factor': 0.047054700553417206},
#     17: {'region_name': 'Other Rim Inflows of SJR', 'weighting_factor': 0.008974390104413033},
#     5:  {'region_name': 'Upper Stanislaus River', 'weighting_factor': 0.03673909977078438},
#     8:  {'region_name': 'Lake Millerton', 'weighting_factor': 0.05571430176496506},
#     10: {'region_name': 'Westside Streams of Sac', 'weighting_factor': 0.0789882019162178},
#     12: {'region_name': 'Valley Floor of Sac', 'weighting_factor': 0.06745839864015579},
#     18: {'region_name': 'Lower Yuba-Bear Rim Inflow', 'weighting_factor': 0.018660200759768486},
#     14: {'region_name': 'Tulare Basin', 'weighting_factor': 0.0},
#     1:  {'region_name': 'Lake Shasta', 'weighting_factor': 0.1778690069913864},
#     15: {'region_name': 'Lake Trinity', 'weighting_factor': 0.04051230102777481},
#     2:  {'region_name': 'Upper Feather River', 'weighting_factor': 0.13809999823570251},
#     13: {'region_name': 'Valley Floor of SJR', 'weighting_factor': 0.008356500416994095},
#     3:  {'region_name': 'Upper Yuba River', 'weighting_factor': 0.07005230337381363},
#     19: {'region_name': 'Delta', 'weighting_factor': 0.026663200929760933},
#     4:  {'region_name': 'Upper American River', 'weighting_factor': 0.08627369999885559},
#     6:  {'region_name': 'Upper Tuolumne River', 'weighting_factor': 0.05876690149307251},
#     7:  {'region_name': 'Upper Merced River', 'weighting_factor': 0.030512800440192223}
# }

In [6]:
def get_model_params(run_list_path):
    """ Read each set of model parameters into dictionary from csv file.
        Return list of dictionaries.
    """
    model_params = []
    with open(run_list_path, "r") as src:
        d = csv.DictReader(src)
        for row in d:
            model_params.append(row)
    return model_params

In [7]:
def get_dataset(esm_datastore, model_params):
    """Return xarray.DataSet from model parameters."""
    # Open catalog of available data sets using intake-esm package
    cat = intake.open_esm_datastore(esm_datastore)
    cat_item = cat.search(
        activity_id=model_params["activity_id"],
        institution_id=model_params["institution_id"],
        table_id=model_params["table_id"], 
        variable_id=['pr','tasmax','tasmin'],
        experiment_id=model_params["experiment_id"],
        grid_label=model_params["grid_label"],
        member_id=model_params["member_id"],
        source_id=model_params["source_id"],  
    )
    
    # Add catalog item to dataset dict
    data_dict = cat_item.to_dataset_dict(
      #  xarray_open_kwargs={'consolidated': True},
        storage_options={'anon': True}
    )
    
    # Construct dataset key to retrieve from the dictionary
    key = "{}.{}.{}.{}.{}.{}".format(
            model_params['activity_id'],
            model_params['institution_id'],
            model_params['source_id'],
            model_params['experiment_id'],
            model_params['table_id'],
            model_params['grid_label'],)
    
    # Slice the dataset to the input time window.
    ds = slice_by_time_years_dataset(data_dict[key],model_params['start_year'],model_params['end_year'])
    ds = convert_daily_to_monthly_dataset(ds)
    return ds

In [8]:
def add_mask_to_dataset(mask_path, ds):
    """ Attach the mask input dataset ds. """
    with open(mask_path, 'rb') as f:
        mask = np.load(f, allow_pickle=True)
    ds.coords['mask'] = (('lat', 'lon'), mask)
    return ds

In [9]:
def convert_daily_to_monthly_dataset(ds):
    """ Convert our daily values to monthly.  Precip is the accumulated and temperature is the average.
        Precip is converted to mm from kg/m2/s: 86400 x kg/m2/s = daily value (mm).
        Temperature is converts to C.  C = K - 273.15
    """
    #Convert our daily values to monthly.  Precip is the accumulated and temperature is the average.
    # 86400 x kg/m2/s = daily value (mm)
    ds['pr'] = ds.pr * 86400
    ds.pr.attrs["units"] = 'mm/day' 
    ds_precip = ds['pr'].resample(time="M").sum()
    ds_precip.attrs["units"] = 'mm/mon' 
    ds_temp = ds[['tasmin','tasmax']].resample(time="M").mean()
    
    #Change the temp to C
    ds_temp = ds_temp[['tasmin','tasmax']] - 273.15
    ds_temp.tasmin.attrs["units"]  = 'degC'
    ds_temp.tasmax.attrs["units"]  = 'degC'
        
    #Merge the dataset back into on dataset.
    ds= xr.merge([ds_precip,ds_temp])
    return ds

In [10]:
def slice_by_time_years_dataset(ds,startyear,endyear):
    """ Slice the dataset to years of interest. """
    ds = ds.sel(
        time=slice(str(startyear), str(endyear))
        )
    return ds

In [11]:
def trim_dataset_to_bbox(ds, bbox):
    """ Clip the dataset to a box. """
    #This needs to be done for the cliping.
    ds.rio.set_spatial_dims(x_dim="lon", y_dim="lat", inplace=True)
    ds.rio.write_crs("EPSG:4326", inplace=True)

    #Get the subset of data for watershed.
    ds = ds.rio.clip_box(
        minx=bbox["minx"],
        miny=bbox["miny"],
        maxx=bbox["maxx"],
        maxy=bbox["maxy"],
    )
    return ds

In [12]:
def get_output_file_name_monthly(model_params,end_part):
    """ Format the output file name from model_params """
    return '%s_%s_%s_%s.csv'%(model_params['source_id'],model_params['experiment_id'],model_params['member_id'],end_part)

In [13]:
def load_dataset_with_mask(esm_datastore_in, model_params_in, mask_path,bbox):
    """ Loads the dataset, adds mask, and trims dataset to box arround area of interest"""
    ds = get_dataset(esm_datastore, model_params)
    ds = add_mask_to_dataset(mask_path, ds)
    ds = trim_dataset_to_bbox(ds, bbox)
    return ds
    

In [14]:
def get_df_map_mask(id_region,ds,use_full_mask = False):
    """ Returns a dataframe for the id_region. Set id_region = -1 and use_full_mask to get the entire domain. """ 
   
    if use_full_mask:
        map_data = ds.where(ds.mask != id_region)
    else:
        map_data = ds.where(ds.mask == id_region)

    results_precip = map_data.pr.mean(['lat','lon'],skipna=True)
    results_precip.attrs["units"]  = 'mm/mon'

    results_tasmin = map_data.tasmin.mean(['lat','lon'],skipna=True)
    results_tasmin.attrs["units"]  = 'degC'

    results_tasmax = map_data.tasmax.mean(['lat','lon'],skipna=True)
    results_tasmax.attrs["units"]  = 'degC'

    ds_all= xr.merge([results_precip,results_tasmax,results_tasmin])
    df = ds_all.to_pandas()

    df.drop('spatial_ref',axis=1, inplace=True)

    df['Year'] = df.index.strftime('%Y')
    df['Month'] = df.index.month
    df['Tave (degC)'] = df[['tasmax','tasmin']].mean(axis=1)
    df.rename({'pr': 'Pr (mm)','tasmax': 'Tasmax (degC)','tasmin' : 'Tasmin (degC)'}, axis=1,inplace=True)

    df_r = df.iloc[:,[3,4,0,1,2,5]]
    df_n = df_r.reset_index()
    return df_n
    

In [15]:
def get_weighted_dataframe(df_in, weight):
    """ Computes the weighted from from df_in and returns the resulting dataframe. """
    df_in['Pr (mm)'] = df_in['Pr (mm)'] * weighting_factor
    df_in['Tasmax (degC)'] = df_in['Tasmax (degC)'] * weighting_factor
    df_in['Tasmin (degC)'] = df_in['Tasmin (degC)'] * weighting_factor
    df_in['Tave (degC)'] = df_in['Tave (degC)'] * weighting_factor
    return df_in
    

In [16]:
def get_sum_dataframes(df_in, df_to_add):
    """ Adds the weighted results from df_to_add to the df_in and returns the resulting dataframe. """
    df_in['Pr (mm)'] = df_in['Pr (mm)'].add(df_to_add['Pr (mm)'], fill_value=0)
    df_in['Tasmax (degC)'] =  df_in['Tasmax (degC)'].add(df_to_add['Tasmax (degC)'], fill_value=0)
    df_in['Tasmin (degC)'] =  df_in['Tasmin (degC)'].add(df_to_add['Tasmin (degC)'], fill_value=0)
    df_in['Tave (degC)'] =  df_in['Tave (degC)'].add(df_to_add['Tave (degC)'], fill_value=0)
    return df_in

In [17]:
def get_monthly_rolling_ave(dict_df_weighted_all,average_over_years,append_name):
    """Gets rolling average for each SSP"""
    dict_fil_rolling_df = {}  # dict with filename keys and rolling average dataframe as value.
    dict_gcm_hist_realization = {}  # dict with historical dataframes.
    dict_gcm_other_realization = {} # dict with ssp dataframes.
    #Key is the formated file name and value is dataframe.
    for key_file, value_df in dict_df_weighted_all.items():
        lst_file_parts = key_file.split('_')
        if 'historical' in key_file:
            if not lst_file_parts[0] in dict_gcm_hist_realization:
                dict_gcm_hist_realization[lst_file_parts[0]] = {}
            if not lst_file_parts[1] in dict_gcm_hist_realization[lst_file_parts[0]]:
                dict_gcm_hist_realization[lst_file_parts[0]][lst_file_parts[1]] = {}
            dict_gcm_hist_realization[lst_file_parts[0]][lst_file_parts[1]][lst_file_parts[2]] = value_df
        else:
            if not lst_file_parts[0] in dict_gcm_other_realization:
                dict_gcm_other_realization[lst_file_parts[0]] = {}
            if not lst_file_parts[1] in dict_gcm_other_realization[lst_file_parts[0]]:
                dict_gcm_other_realization[lst_file_parts[0]][lst_file_parts[1]] = {}
            dict_gcm_other_realization[lst_file_parts[0]][lst_file_parts[1]][lst_file_parts[2]]=value_df      
    
    # Do rolling average and output with dictionary key as filename and values as rolling average dataframe.
    df_rolling = pd.DataFrame
    start_year = 1950
    end_year = 2072
    for key_gcm in dict_gcm_hist_realization:
        for key_ssp in dict_gcm_hist_realization[key_gcm]:
            for key_relization in dict_gcm_hist_realization[key_gcm][key_ssp]:
                df_history = dict_gcm_hist_realization[key_gcm][key_ssp][key_relization]
                for key_ssp_other in dict_gcm_other_realization[key_gcm]:
                    df_ssp = dict_gcm_other_realization[key_gcm][key_ssp_other][key_relization]
                    df_out = None
                    df_rolling = pd.concat([df_history,df_ssp], axis=0)
                    df_rolling.drop('time' , axis=1, inplace=True)  
                    for currentYear in range(start_year,end_year):
                        year_30 = currentYear + average_over_years
                        df30year = df_rolling[(df_rolling['Year'].astype(int) >= currentYear) & (df_rolling['Year'].astype(int) < year_30)]
                        dftemp = df30year.groupby(df30year.Month, as_index=False, sort=True)[['Pr (mm)','Tasmax (degC)','Tasmin (degC)','Tave (degC)']].mean().reset_index()
                        dftemp.insert(loc=0,column="Year (30y start)",value=currentYear)
                        dftemp.insert(loc=0,column="Year Range",value='%s-%s'%(currentYear,year_30-1))
                        if df_out is None:
                            df_out = dftemp.copy(deep=True)
                        else:
                            df_out = pd.concat([df_out,dftemp], axis=0)
                    df_out.drop('index', axis=1, inplace=True) 
                    file_out = '%s_%s_%s_%s.csv'%(key_gcm,key_relization,key_ssp_other,append_name)
                    dict_fil_rolling_df[file_out] = df_out
    return dict_fil_rolling_df
    

The loop goes through all GCMs and writes the individual subbasin, the area weighted, and the flow weighted to results dictionaries with filname as the key and value equal to result dataframe. 

In [None]:
region_dict = get_region_dict()
# main loop
all_model_params = get_model_params(run_list_path)

#Defile output dicts.
results_dict = {}
flow_weighted_results_dict = {}
area_weighted_basin_results_dict = {}
for model_params in all_model_params:
    #Add masking to the dataset.
    ds = load_dataset_with_mask(esm_datastore, model_params, mask_path,bbox)
   
    key = "{}.{}.{}.{}.{}.{}".format(
        model_params['activity_id'],
        model_params['institution_id'],
        model_params['source_id'],
        model_params['experiment_id'],
        model_params['table_id'],
        model_params['grid_label'],)
    #Force load the dataset.
    print('Loading: %s'%key)
    ds = ds.compute()
        
    df_w = None
    df_a = None
    df_nw = None
    for id_region, v in region_dict.items():     
        # Get this regions results
        df_n = get_df_map_mask(id_region,ds)
        output_filename = get_output_file_name_monthly(model_params, '%s-19'%'{:02d}'.format(id_region))

        df_out = df_n.drop('time' , axis=1)
        results_dict[output_filename] = df_out
                
        # Get Area Weighted dataframe
        weighting_factor = v['area_ratio']
        df_weighted_a = get_weighted_dataframe(df_n.copy(deep=True),weighting_factor)
        if df_a is None:
            df_a = df_weighted_a.copy(deep=True)
        else:
            df_a = get_sum_dataframes(df_a,df_weighted_a)
            
        # Get Flow Weighted results 
        weighting_factor = v['flow_ratio']
        df_weighted = get_weighted_dataframe(df_n.copy(deep=True),weighting_factor)
        if df_w is None:
            df_w = df_weighted.copy(deep=True)
        else:
            df_w = get_sum_dataframes(df_w,df_weighted)
    print('Processed %s...'%get_output_file_name_monthly(model_params,'').replace('.csv',''))       
    #Add weighted dataframes to output.
    output_filename = get_output_file_name_monthly(model_params, "19FlowWeighted")
    flow_weighted_results_dict[output_filename] = df_w
    output_filename = get_output_file_name_monthly(model_params, "19AreaWeighted")
    area_weighted_basin_results_dict[output_filename] = df_a


--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


Loading: LOCA2.UCSD.HadGEM3-GC31-LL.historical.day.d03


  finalize=lambda sum_, count: sum_ / count,


Processed HadGEM3-GC31-LL_historical_r2i1p1f3_...

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


Loading: LOCA2.UCSD.HadGEM3-GC31-LL.historical.day.d03


  finalize=lambda sum_, count: sum_ / count,


Processed HadGEM3-GC31-LL_historical_r3i1p1f3_...

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


Loading: LOCA2.UCSD.HadGEM3-GC31-LL.ssp585.day.d03


  finalize=lambda sum_, count: sum_ / count,


Processed HadGEM3-GC31-LL_ssp585_r2i1p1f3_...

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


Loading: LOCA2.UCSD.HadGEM3-GC31-LL.ssp585.day.d03


  finalize=lambda sum_, count: sum_ / count,


Processed HadGEM3-GC31-LL_ssp585_r3i1p1f3_...

--> The keys in the returned dictionary of datasets are constructed as follows:
	'activity_id.institution_id.source_id.experiment_id.table_id.grid_label'


Loading: LOCA2.UCSD.INM-CM5-0.historical.day.d03


  finalize=lambda sum_, count: sum_ / count,


In [None]:
#Write to ouput.
zip_path = os.path.join(output_folder, file_zip)

dict_rolling_flow_weighted = get_monthly_rolling_ave(flow_weighted_results_dict,30,'30yrAve')
dict_rolling_area_weighted = get_monthly_rolling_ave(area_weighted_basin_results_dict,30,'30yrAve')

with zipfile.ZipFile(zip_path, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
    for k, v in results_dict.items():
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        fileout = dir_individual + '/' + k
        zf.writestr(fileout, text_stream.getvalue())
    for k, v in flow_weighted_results_dict.items():
        fileout = dir_flow_weighted + '/' + k
        fileout_raw = dir_flow_weighted + '/Raw/' + k
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout_raw, text_stream.getvalue())
        v.drop('time' , axis=1, inplace=True)
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout, text_stream.getvalue())
    for k, v in area_weighted_basin_results_dict.items():
        fileout = dir_area_weighted + '/' + k
        fileout_raw = dir_area_weighted + '/Raw/' + k
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout_raw, text_stream.getvalue())
        v.drop('time' , axis=1, inplace=True)
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout, text_stream.getvalue())
    for k, v in dict_rolling_area_weighted.items():
        fileout = dir_area_weighted_rolling + '/' + k
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout, text_stream.getvalue())
    for k, v in dict_rolling_flow_weighted.items():
        fileout = dir_flow_weighted_rolling + '/' + k
        text_stream = StringIO();
        v.to_csv(text_stream, index=False)
        zf.writestr(fileout, text_stream.getvalue())
    

In [None]:
#client.close()