In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import xagg as xa
import shapely
import geopandas as gp
import rioxarray
import dask
from os.path import isfile
from dask.distributed import Queue

from utils import state_alphas

import warnings
warnings.simplefilter("ignore", RuntimeWarning) # Ignore invalid arcsin() in EDD calculation

dask.config.set({'array.slicing.split_large_chunks': False})

<dask.config.set at 0x2b68f66d8580>

## Dask (cluster)

In [33]:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=1, resource_spec='pmem=20GB', memory='20GB', walltime='00:10:00')

In [34]:
# print(cluster.job_script())

In [35]:
cluster.scale(jobs=25)  # ask for jobs

In [36]:
from dask.distributed import Client
client = Client(cluster)

In [37]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.102.201.239:36912,Workers: 0
Dashboard: /proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Function definitions

In [None]:
# Degree day function
def above_threshold_each(mins, maxs, threshold):
    """Use a sinusoidal approximation to estimate the number of Growing
    Degree-Days above a given threshold, using daily minimum and maximum
    temperatures.
    mins and maxs are numpy arrays; threshold is in the same units."""

    """
    Code from James Rising (https://github.com/jrising/research-common/blob/master/python/gdd.py)
    """

    # Determine crossing points, as a fraction of the day
    plus_over_2 = (mins + maxs)/2
    minus_over_2 = (maxs - mins)/2
    two_pi = 2*np.pi
    # d0s is the times of crossing above; d1s is when cross below
    d0s = np.arcsin((threshold - plus_over_2) / minus_over_2) / two_pi
    d1s = .5 - d0s

    # If always above or below threshold, set crossings accordingly
    aboves = mins >= threshold
    belows = maxs <= threshold

    d0s[aboves] = 0
    d1s[aboves] = 1
    d0s[belows] = 0
    d1s[belows] = 0

    # Calculate integral
    F1s = -minus_over_2 * np.cos(2*np.pi*d1s) / two_pi + plus_over_2 * d1s
    F0s = -minus_over_2 * np.cos(2*np.pi*d0s) / two_pi + plus_over_2 * d0s
    return F1s - F0s - threshold * (d1s - d0s)

# ufunc for dask
def edd_ufunc_annual(tasmin, tasmax, threshold):
    return xr.apply_ufunc(above_threshold_each,
                          tasmin, tasmax, threshold,
                          dask = 'allowed')

In [8]:
def calculate_predictors(ds_tmin, ds_tmax, ds_prcp, T_thresh, product_name, timeperiod):
    """
    Calculates local climate variables:
        tavg: average temperature
        GDD: growing degree days
        EDD: extreme degree days
        prcp: precipitation sum (mm)
    """
    # Calculate daily EDD
    EDD = edd_ufunc_annual(ds_tmin, ds_tmax, threshold = T_thresh[1])
    
    # Calculate daily GDD
    GDD = edd_ufunc_annual(ds_tmin, ds_tmax, threshold = T_thresh[0])
    GDD = GDD - EDD
    
    # Combine
    ds_out = xr.combine_by_coords([EDD.to_dataset(name='EDD'),
                              GDD.to_dataset(name='GDD')])
    
    # Annual degree days
    ds_out = ds_out.resample(day='Y').sum().compute()
    
    # Annual tavg
    ds_tavg = (ds_tmax + ds_tmin) / 2.
    ds_out['tavg'] = ds_tavg.resample(day='Y').mean().compute()

    # Precip annual total
    ds_out['prcp'] = ds_prcp.resample(day='Y').sum().compute()
    ds_out['prcp2'] = (ds_prcp**2).resample(day='Y').sum().compute()
    
    # Tidy
    ds_out.attrs['NOTE1'] = 'Degree Days calculated as in DOI: 10.1111/agec.12315 supplementary material with threshold 30C. Author: David Lafferty - University of Illinois (davidcl2@illinois.edu)'
    ds_out.attrs['NOTE2'] = 'For original netcdf files see: https://www.climatologylab.org/gridmet.html'
    
    # Save
    ds_out.to_netcdf('../input_data/' + product_name + '_weather_variables_' + timeperiod + '.nc')

## Prerequisite: tidy US shapefiles

In [8]:
# County shapefile
county_gdf = gp.read_file('../input_data/us_county_shp/cb_2018_us_county_20m.shp')
county_gdf.rename(columns={'GEOID':'fips', 'STATEFP':'state'}, inplace=True)
county_gdf = county_gdf[~county_gdf['state'].isin(['02', '15', '60', '66', '69', '72', '78'])] # remove non-contiguous

county_gdf[['fips', 'state', 'geometry']].to_file('../input_data/us_county_shp/us_county_shape.shp')

In [13]:
# State shapefile
state_gdf = gp.read_file('../input_data/us_state_shp/cb_2018_us_state_20m.shp')
state_gdf.rename(columns={'STATEFP':'state'}, inplace=True)
state_gdf = state_gdf[~state_gdf['state'].isin(['02', '15', '60', '66', '69', '72', '78'])] # remove non-contiguous

state_gdf[['state', 'geometry']].to_file('../input_data/us_state_shp/us_state_shape.shp')

# GMFD

## Calculate Midwest average temperature

In [7]:
# Midwest shapefile
gdf = gp.read_file('../input_data/us_state_shp/us_state_shape.shp')
gdf_mw = gdf[gdf['state'].isin(state_alphas)]

In [26]:
%%time
# Read GMFD
ds = xr.open_mfdataset('../input_data/gmfd_raw/*.nc', chunks='auto', parallel=True)
# ds = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/gmfd_tas/*.nc', chunks='auto', parallel=True)

# Subset to US
ds['lon'] = ds['lon'] - 360.
ds = ds.where((ds.lat > 20) & (ds.lat < 60) & (ds.lon < -60) & (ds.lon > -130), drop=True)

# Calculate average growing season temperature
ds = ds.sel(time=ds.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
ds = ds.resample(time='Y').mean().compute()

CPU times: user 15.3 s, sys: 578 ms, total: 15.9 s
Wall time: 52.2 s


In [30]:
# Clip to Midwest
ds = ds.rio.write_crs(gdf_mw.crs)
ds = ds.rio.set_spatial_dims(x_dim='lon', y_dim='lat')
ds_mw = ds.rio.clip(gdf_mw.unary_union)

  for index, item in enumerate(shapes):


In [44]:
# Get spatial average
ds_mw_mean = ds['tas'].mean(dim=['lat','lon'])

# Store
df_out = pd.DataFrame({'tavg_mw':ds_mw_mean.to_pandas()}).reset_index()
df_out['year'] = df_out['time'].dt.year
df_out['tavg_mw'] = df_out['tavg_mw'] - 273.15 # K -> C
df_out[['year', 'tavg_mw']].to_csv('../input_data/gmfd_midwest_tavg_1948-2016.csv', index=False)

# gridMET

## Calculate local variables at the grid point scale

In [9]:
yrs = np.arange(1979, 2021)
path = '../input_data/gridmet_raw/'

# path1 = '/gpfs/group/kzk10/default/public/UofI_MetData/raw/'
# path2 = '/gpfs/group/kaf26/default/dcl5300/gridMET_2019-2020/'

In [None]:
%%time
# Read in variables
ds_tmax = xr.open_mfdataset([path + 'tmmx_' + str(yr) + '.nc' for yr in yrs], chunks='auto', parallel=True)['air_temperature']
ds_tmin = xr.open_mfdataset([path + 'tmmn_' + str(yr) + '.nc' for yr in yrs], chunks='auto', parallel=True)['air_temperature']
ds_prcp = xr.open_mfdataset([path + 'pr_' + str(yr) + '.nc' for yr in yrs], chunks='auto', parallel=True)['precipitation_amount']

In [11]:
# %%time
# # Sort lat index and drop crs
# def preprocess(ds):
#     return ds.sortby('lat').drop_vars('crs')

# # Read in variables
# tmax_paths = [path1 + 'tmmx_' + str(yr) + '.nc' for yr in yrs[:-2]] + [path2 + 'tmmx_' + str(yr) + '.nc' for yr in yrs[-2:]]
# ds_tmax = xr.open_mfdataset(tmax_paths, preprocess=preprocess, chunks='auto', parallel=True)['air_temperature']

# tmin_paths = [path1 + 'tmmn_' + str(yr) + '.nc' for yr in yrs[:-2]] + [path2 + 'tmmn_' + str(yr) + '.nc' for yr in yrs[-2:]]
# ds_tmin = xr.open_mfdataset(tmin_paths, preprocess=preprocess, chunks='auto', parallel=True)['air_temperature']

# prcp_paths = [path1 + 'pr_' + str(yr) + '.nc' for yr in yrs[:-2]] + [path2 + 'pr_' + str(yr) + '.nc' for yr in yrs[-2:]]
# ds_prcp = xr.open_mfdataset(prcp_paths, preprocess=preprocess, chunks='auto', parallel=True)['precipitation_amount']

CPU times: user 7.86 s, sys: 734 ms, total: 8.59 s
Wall time: 1min 49s


In [12]:
# Maize/soy growing season: April - September
ds_tmax = ds_tmax.sel(day=ds_tmax.day.dt.month.isin([4, 5, 6, 7, 8, 9]))
ds_tmin = ds_tmin.sel(day=ds_tmin.day.dt.month.isin([4, 5, 6, 7, 8, 9]))
ds_prcp = ds_prcp.sel(day=ds_prcp.day.dt.month.isin([4, 5, 6, 7, 8, 9]))

In [13]:
%%time
# Calculate predictors
calculate_predictors(ds_tmin, ds_tmax, ds_prcp, [10.0 + 273.15, 30.0 + 273.15], 'gridmet', '1979-2020')

CPU times: user 4min 42s, sys: 15.8 s, total: 4min 58s
Wall time: 21min 53s


## Calculate Midwest average temperature

In [38]:
# Midwest shapefile
gdf = gp.read_file('../input_data/us_state_shp/us_state_shape.shp')
gdf_mw = gdf[gdf['state'].isin(state_alphas)]

In [39]:
yrs = np.arange(1979, 2021)
path = '../input_data/gridmet_raw/'

path1 = '/gpfs/group/kzk10/default/public/UofI_MetData/raw/'
path2 = '/gpfs/group/kaf26/default/dcl5300/gridMET_2019-2020/'

In [None]:
%%time
# Read in variables
ds_tmax = xr.open_mfdataset([path + 'tmmx_' + str(yr) + '.nc' for yr in yrs], chunks='auto', parallel=True)['air_temperature']
ds_tmin = xr.open_mfdataset([path + 'tmmn_' + str(yr) + '.nc' for yr in yrs], chunks='auto', parallel=True)['air_temperature']

In [40]:
%%time
def preprocess(ds):
    return ds.sortby('lat').drop_vars('crs')

# Read in variables
tmax_paths = [path1 + 'tmmx_' + str(yr) + '.nc' for yr in yrs[:-2]] + [path2 + 'tmmx_' + str(yr) + '.nc' for yr in yrs[-2:]]
ds_tmax = xr.open_mfdataset(tmax_paths, preprocess=preprocess, chunks='auto', parallel=True)['air_temperature']

tmin_paths = [path1 + 'tmmn_' + str(yr) + '.nc' for yr in yrs[:-2]] + [path2 + 'tmmn_' + str(yr) + '.nc' for yr in yrs[-2:]]
ds_tmin = xr.open_mfdataset(tmin_paths, preprocess=preprocess, chunks='auto', parallel=True)['air_temperature']

CPU times: user 8.62 s, sys: 731 ms, total: 9.35 s
Wall time: 1min 32s


In [46]:
# Get tas
ds = (ds_tmax + ds_tmin) / 2.

# Calculate average growing season temperature
ds = ds.sel(day=ds.day.dt.month.isin([4, 5, 6, 7, 8, 9]))
ds = ds.resample(day='Y').mean().compute()

In [48]:
# Clip to Midwest
ds = ds.rio.write_crs(gdf_mw.crs)
ds = ds.rio.set_spatial_dims(x_dim='lon', y_dim='lat')
ds_mw = ds.rio.clip(gdf_mw.unary_union)

  for index, item in enumerate(shapes):


In [49]:
# Get spatial average
ds_mw_mean = ds.mean(dim=['lat','lon'])

# Store
df_out = pd.DataFrame({'tavg_mw':ds_mw_mean.to_pandas()}).reset_index()
df_out['year'] = df_out['day'].dt.year
df_out['tavg_mw'] = df_out['tavg_mw'] - 273.15 # K -> C
df_out[['year', 'tavg_mw']].to_csv('../input_data/gridmet_midwest_tavg_1979-2020.csv', index=False)

## Aggregate to county level (no weighting)

In [7]:
# Get unique fips codes
maize_fips = pd.read_csv('../input_data/usda_maize_yields_1979-2020.csv')['fips'].astype(str).str.zfill(5).to_numpy()
soy_fips = pd.read_csv('../input_data/usda_soy_yields_1979-2020.csv')['fips'].astype(str).str.zfill(5).to_numpy()

gdf_fips = gp.read_file('/storage/home/dcl5300/work/lafferty-etal_inprep_tbd/workflow/model_construction/input_data/us_county_shp/us_county_shape.shp')['fips']

ufips = np.unique(np.append(maize_fips, soy_fips))
ufips = np.intersect1d(ufips, gdf_fips)

In [12]:
# Aggregation function for dask delayed
def func_agg(fips):
    """
    Aggregates local variables from gridmet to the county level without weighting
    """
    # Shapefile and county bounds
    # gdf = gp.read_file('../input_data/us_county_shp/us_county_shape.shp')
    gdf = gp.read_file('/storage/home/dcl5300/work/lafferty-etal_inprep_tbd/workflow/model_construction/input_data/us_county_shp/us_county_shape.shp')
    gdf = gdf.to_crs('WGS 84')
    gdf_fips = gdf[gdf['fips'] == fips]
    xmin, ymin, xmax, ymax = gdf_fips.geometry.total_bounds
    
    # Local variables
    # ds = xr.open_dataset('../input_data/gridmet_weather_variables_1979-2020.nc')
    ds = xr.open_dataset('/storage/home/dcl5300/work/lafferty-etal_inprep_tbd/workflow/model_construction/input_data/gridmet_weather_variables_1979-2020.nc')
    ds = ds.where((ds.lat >= ymin) & (ds.lat <= ymax) & (
            ds.lon >= xmin) & (ds.lon <= xmax), drop=True)
        
    # Aggregate
    weightmap = xa.pixel_overlaps(ds, gdf_fips)
    aggregated = xa.aggregate(ds, weightmap).to_dataframe()
    
    # Format
    vars_to_keep = ['EDD', 'GDD', 'tavg', 'prcp', 'prcp2', 'day']
    out = pd.DataFrame(aggregated.reset_index()[vars_to_keep])
    out['year'] = out['day'].dt.year
    out['fips'] = fips
    out['tavg'] = out['tavg'] - 273.15 # K -> C

    # Otherwise workers will store for each county
    del ds
    del gdf
    del gdf_fips
    del weightmap
    del aggregated
    
    return out[['fips', 'year', 'EDD', 'GDD', 'tavg', 'prcp', 'prcp2']]

In [9]:
%%time
# Parallelize
delayed_res = []
for fips in ufips:
    tmp_agg = dask.delayed(func_agg)(fips)
    delayed_res.append(tmp_agg)
    
# Run
res = dask.compute(*delayed_res)

# Store dataframe
df_res = pd.concat(res)
df_res.to_csv('../input_data/gridmet_county_weather_variables_1979-2020.csv', index=False)

CPU times: user 33.9 s, sys: 1.63 s, total: 35.5 s
Wall time: 3min 53s


## Aggregate to state level (weighted by irrigated acreage)

In [21]:
# Irrigated areas
maize_irr = pd.read_csv('../input_data/usda_maize_irrigated_acres_1997-2017.csv')
maize_irr['fips'] = maize_irr['fips'].astype(str).str.zfill(5)
maize_irr['state'] = maize_irr['state'].astype(str).str.zfill(2)

soy_irr = pd.read_csv('../input_data/usda_soy_irrigated_acres_1997-2017.csv')
soy_irr['fips'] = soy_irr['fips'].astype(str).str.zfill(5)
soy_irr['state'] = soy_irr['state'].astype(str).str.zfill(2)

In [22]:
##########################################################################################
# We assume the irrigated areas in 2012, 2017 are transferrable to 2013, 2018
# which is what we need for combining with the USDA Irrigation & Water Management Survey
##########################################################################################
maize_irr_shifted = maize_irr.copy()
maize_irr_shifted['year'] = maize_irr_shifted['year'] + 1
maize_irr_shifted = maize_irr_shifted[maize_irr_shifted['year'] >= 2013]

soy_irr_shifted = soy_irr.copy()
soy_irr_shifted['year'] = soy_irr_shifted['year'] + 1
soy_irr_shifted = soy_irr_shifted[soy_irr_shifted['year'] >= 2013]

In [24]:
# County local climate variables
gridmet_weather_vars = pd.read_csv('../input_data/gridmet_county_weather_variables_1979-2020.csv')
gridmet_weather_vars['fips'] = gridmet_weather_vars['fips'].astype(str).str.zfill(5)

In [27]:
# Merge
maize_irr_weather_vars = pd.merge(gridmet_weather_vars, maize_irr_shifted, on=['fips','year'])
soy_irr_weather_vars = pd.merge(gridmet_weather_vars, soy_irr_shifted, on=['fips','year'])

In [29]:
# Take weighted sum to state level 
def weighted_average(df, col):
    return pd.DataFrame({col : df.groupby(['state','year']).apply(lambda x: np.average(x[col], weights=x['irrigated_acreage']))})

Note that here we have averaged the weather variables to the county level and now are taking a weighted average of that to the state level, i.e. we are taking an average of averages. This will introduce some bias since the number of grid points in each county is unequal since counties are not of equal spatial size. Ideally we would account for this in the weighting for the second average but we choose to neglect since (1) the bias should be small since county sizes are typically the same order of magnitude and (2) the weighting we apply (irrigated acreage) is likely correlated with county size since larger counties are more likely to have larger irrigated acres.

In [30]:
# Maize
maize_irr_state = weighted_average(maize_irr_weather_vars, 'GDD')

for col in ['EDD', 'tavg', 'prcp', 'prcp2']:
    df_tmp = weighted_average(maize_irr_weather_vars, col)
    maize_irr_state = pd.merge(maize_irr_state, df_tmp, on=['state', 'year'])
    
# Soy
soy_irr_state = weighted_average(soy_irr_weather_vars, 'GDD')

for col in ['EDD', 'tavg', 'prcp', 'prcp2']:
    df_tmp = weighted_average(soy_irr_weather_vars, col)
    soy_irr_state = pd.merge(soy_irr_state, df_tmp, on=['state', 'year'])

In [32]:
# Store
maize_irr_state.to_csv('../input_data/gridmet_state_weather_variables_maize_irr_weighted.csv')
soy_irr_state.to_csv('../input_data/gridmet_state_weather_variables_soy_irr_weighted.csv')