In [1]:
import numpy as np
import pandas as pd
import xarray as xr
import xagg as xa
import geopandas as gp
import rioxarray
import dask
from os.path import isfile
from dask.distributed import Queue

import pyproj
pyproj.datadir.set_data_dir('/storage/home/d/dcl5300/work/ENVS/cropswitching/share/proj')

import warnings
warnings.simplefilter("ignore", RuntimeWarning) # Ignore invalid arcsin() in EDD calculation

dask.config.set({'array.slicing.split_large_chunks': False})

<dask.config.set at 0x2b64db7909d0>

## Dask (cluster)

In [2]:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores=1, resource_spec='pmem=20GB', memory='20GB', walltime='12:30:00')

In [3]:
print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -A kaf26_c_g_sc_default
#PBS -l pmem=20GB
#PBS -l walltime=12:30:00
#PBS -e /gpfs/scratch/dcl5300/
#PBS -o /gpfs/scratch/dcl5300/

/storage/work/dcl5300/ENVS/cropswitching/bin/python -m distributed.cli.dask_worker tcp://10.102.201.209:46858 --nthreads 1 --memory-limit 18.63GiB --name dummy-name --nanny --death-timeout 60 --local-directory /gpfs/scratch/dcl5300 --protocol tcp://



In [4]:
cluster.scale(jobs=40)  # ask for jobs

In [5]:
from dask.distributed import Client
client = Client(cluster)

In [6]:
client

0,1
Connection method: Cluster object,Cluster type: PBSCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.102.201.209:46858,Workers: 0
Dashboard: /proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


## Function definitions

In [7]:
# Get unique fips codes
def get_unique_fips():
    yield_path = '../data/historical/yields/'

    ufips = np.array([])
    for crop in ['maize', 'cotton', 'soy', 'rice', 'sorghum', 'barley', 'spring_wheat', 'winter_wheat']:
        tmp = pd.read_csv(yield_path + crop + '_all.csv')
        tmp['fips'] = tmp['fips'].astype(str).str.zfill(5)
        ufips = np.append(ufips, tmp['fips'].unique())
    
    ufips = np.unique(ufips)
    
    return ufips

In [8]:
# Degree day function
def above_threshold_each(mins, maxs, threshold):
    """Use a sinusoidal approximation to estimate the number of Growing
    Degree-Days above a given threshold, using daily minimum and maximum
    temperatures.
    mins and maxs are numpy arrays; threshold is in the same units."""

    """
    Code from James Rising (https://github.com/jrising/research-common/blob/master/python/gdd.py)
    """

    # Determine crossing points, as a fraction of the day
    plus_over_2 = (mins + maxs)/2
    minus_over_2 = (maxs - mins)/2
    two_pi = 2*np.pi
    # d0s is the times of crossing above; d1s is when cross below
    d0s = np.arcsin((threshold - plus_over_2) / minus_over_2) / two_pi
    d1s = .5 - d0s

    # If always above or below threshold, set crossings accordingly
    aboves = mins >= threshold
    belows = maxs <= threshold

    d0s[aboves] = 0
    d1s[aboves] = 1
    d0s[belows] = 0
    d1s[belows] = 0

    # Calculate integral
    F1s = -minus_over_2 * np.cos(2*np.pi*d1s) / two_pi + plus_over_2 * d1s
    F0s = -minus_over_2 * np.cos(2*np.pi*d0s) / two_pi + plus_over_2 * d0s
    return F1s - F0s - threshold * (d1s - d0s)

# ufunc for dask
def edd_ufunc_annual(tasmin, tasmax, threshold):
    return xr.apply_ufunc(above_threshold_each,
                          tasmin, tasmax, threshold,
                          dask = 'allowed')

In [9]:
# Calculate predictors: EDD conditioned on SM difference from mean (Haqiqi 21)
def get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, T_thresh, crop):
    # DataArray
    dsSM = dsSM['soilMoist1']
    
    # Soil moisture difference
    sm_mean_current = dsSM.chunk(dict(time=-1, lat=20, lon=20)).mean(dim='time').compute()
    sm_diff_current = (dsSM - sm_mean_current)
    
    sm_mean_hist = xr.open_dataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/Livneh/quantiles/Livneh_VIC_sm_mean_4-9.nc')
    sm_diff_hist = (dsSM - sm_mean_hist['soilMoist1'])
    
    # Calculate daily EDD
    EDD = edd_ufunc_annual(dsTmin['tasmin'], dsTmax['tasmax'], threshold = T_thresh[1])
    
    # Calculate daily GDD
    GDD = edd_ufunc_annual(dsTmin['tasmin'], dsTmax['tasmax'], threshold = T_thresh[0])
    GDD = GDD - EDD
    
    # Combine
    DD_SM = xr.combine_by_coords([EDD.to_dataset(name='EDD'),
                              GDD.to_dataset(name='GDD'),
                              dsP['pr'].to_dataset(name = 'prcp')])
    
    # Annual degree days without SM
    DD_SM = DD_SM.resample(time='Y').sum().compute()
    
    # EDD/SM variables based on historical threshold
    DD_SM['EDD_SM_75_below_HIST'] = EDD.where(sm_diff_hist < -7.5).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_25_75_below_HIST'] = EDD.where((sm_diff_hist > -7.5) & (sm_diff_hist <= -2.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_0_25_norm_HIST'] = EDD.where((sm_diff_hist > -2.5) & (sm_diff_hist < 2.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_25_75_above_HIST'] = EDD.where((sm_diff_hist >= 2.5) & (sm_diff_hist < 7.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_75_above_HIST'] = EDD.where(sm_diff_hist >= 7.5).resample(time='Y').sum().compute()
    
    # EDD/SM variables based on current threshold
    DD_SM['EDD_SM_75_below_CUR'] = EDD.where(sm_diff_current < -7.5).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_25_75_below_CUR'] = EDD.where((sm_diff_current > -7.5) & (sm_diff_current <= -2.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_0_25_norm_CUR'] = EDD.where((sm_diff_current > -2.5) & (sm_diff_current < 2.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_25_75_above_CUR'] = EDD.where((sm_diff_current >= 2.5) & (sm_diff_current < 7.5)).resample(time='Y').sum().compute()
    DD_SM['EDD_SM_75_above_CUR'] = EDD.where(sm_diff_current >= 7.5).resample(time='Y').sum().compute()
    
    # Standard SM metrics
    DD_SM['SM_mean'] = dsSM.resample(time='Y').mean().compute()
    DD_SM['SM_mean2'] = DD_SM['SM_mean']**2
    # SM weekly max/min
    DD_SM['SM_week_max'] = dsSM.resample(time='W').mean().resample(time='Y').max().compute()
    DD_SM['SM_week_min'] = dsSM.resample(time='W').mean().resample(time='Y').min().compute()
    
    # Get squares
    DD_SM['prcp2'] = DD_SM['prcp']**2
    
    # Tidy
    DD_SM.attrs['NOTE1'] = 'Degree Days calculated as in DOI: 10.1111/agec.12315 supplementary material with threshold 29C. Author: David Lafferty - University of Illinois (davidcl2@illinois.edu). Date: Nov 2021'
    DD_SM.attrs['NOTE2'] = 'For original netcdf files see: http://loca.ucsd.edu/'
    DD_SM['SM_mean'].attrs['units'] = 'mm'
    DD_SM['SM_week_max'].attrs['units'] = 'mm'
    DD_SM['SM_week_min'].attrs['units'] = 'mm'
    DD_SM['prcp'].attrs['units'] = 'mm'
    
    # Save 
    DD_SM.to_netcdf('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model + 
                    '/' + model + '_VIC_' + crop + '_H21_RCP45_30-50.nc')

In [10]:
# # Calculate predictors: EDD and SM monthly values
# def get_predictors_EDD_SM_intra(dsT, dsSM, model, T_thresh, crop):
#     # DataArray
#     dsSM = dsSM['soilMoist1']
    
#     # Calculate daily EDD
#     EDD = edd_ufunc_annual(dsT['Tmin'], dsT['Tmax'], threshold = T_thresh[1])
    
#     # Calculate daily GDD
#     GDD = edd_ufunc_annual(dsT['Tmin'], dsT['Tmax'], threshold = T_thresh[0])
#     GDD = GDD - EDD
    
#     # Combine
#     DD_SM = xr.combine_by_coords([EDD.to_dataset(name='EDD'),
#                               GDD.to_dataset(name='GDD'),
#                               dsT['Prec'].to_dataset(name = 'prcp')])
    
#     # Monthly degree days
#     DD_SM = DD_SM.resample(time='M').sum().compute()
    
#     # Monthly SM
#     DD_SM['SM_mean'] = dsSM.resample(time='M').mean().compute()
    
#     # Weekly SM
#     DD_SM['SM_week_max'] = dsSM.resample(time='W').mean().resample(time='M').max().compute()
#     DD_SM['SM_week_min'] = dsSM.resample(time='W').mean().resample(time='M').min().compute()
    
#     # Get squares
#     DD_SM['SM_mean2'] = DD_SM['SM_mean']**2
#     DD_SM['prcp2'] = DD_SM['prcp']**2
    
#     # Tidy
#     DD_SM.attrs['NOTE1'] = 'Degree Days calculated as in DOI: 10.1111/agec.12315 supplementary material with threshold 29C. Author: David Lafferty - University of Illinois (davidcl2@illinois.edu). Date: Aug 2021'
#     DD_SM.attrs['NOTE2'] = 'For original netcdf files see: http://loca.ucsd.edu/'
#     DD_SM['SM_mean'].attrs['units'] = 'mm'
#     DD_SM['SM_week_max'].attrs['units'] = 'mm'
#     DD_SM['SM_week_min'].attrs['units'] = 'mm'
#     DD_SM['prcp'].attrs['units'] = 'mm'
    
#     # Save 
#     DD_SM.to_netcdf('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model + 
#                     '/' + model + '_VIC_' + crop + '_intra_predictors.nc')

In [11]:
# Aggregation function for dask delayed
def func_agg(fips, model, predictors, crop, rcp, timeframe):
    weights_path = '/gpfs/group/kaf26/default/dcl5300/cropland_data_layer/county_weights/'
    
    try:
        # Shapefile and county bounds
        gdf = gp.read_file('/storage/work/dcl5300/bcsd_maize_scripts/tools/plotting_tools/counties_contig_plot.shp')
        gdf = gdf.to_crs('WGS 84')
        gdf_fips = gdf.query('fips == "' + fips + '"')
        xmin, ymin, xmax, ymax = gdf_fips.geometry.total_bounds
        
        # Predictor vars
        path = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/'
        ds = xr.open_dataset(path + model + '/' + model + '_VIC_' + crop + '_' + predictors + '_' + 
                             rcp + '_' + timeframe + '.nc')
        ds['lon'] = ds['lon'] - 360.
        ds = ds.where((ds.lat >= ymin) & (ds.lat <= ymax) & (ds.lon >= xmin) & (ds.lon <= xmax), drop=True)
        
        # Read in weights
        ds_weights = xr.open_dataset(weights_path + fips + '_08-20.nc')
        ds_weights = ds_weights.coarsen(x=10, y=10, boundary='pad').mean()

        # Aggregate
        weightmap = xa.pixel_overlaps(ds, gdf_fips, weights = ds_weights.cropland_avg)
        aggregated = xa.aggregate(ds, weightmap).to_dataset().to_dataframe().reset_index().drop(columns = ['pix_idx'])
        
        # For a few counties the weighting results in all values being zero
        if len(aggregated.query('GDD == 0.0')) > 0.0:
            weightmap = xa.pixel_overlaps(ds, gdf_fips)
            aggregated = xa.aggregate(ds, weightmap).to_dataset().to_dataframe().reset_index().drop(columns = ['pix_idx'])    
        
        # Otherwise workers will store for each county 
        del ds
        del ds_weights
        del gdf
        del gdf_fips
        del weightmap
        
        return aggregated
    
    except Exception as e:
        print("{} / {} Failed : {}".format(model, fips, str(e)))

In [12]:
def get_models():
    models_SM = !ls /gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/
    models_T = !ls /gpfs/group/kzk10/default/public/LOCA/
    models = [model for model in models_SM if model in models_T]
    models.remove('download.pbs')
    return models

## Maize

In [16]:
# # Get historical mean
# dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/Livneh/raw/*.nc', chunks='auto', parallel=True)
# dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
# dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
# dsSM['lon'] = dsSM['lon'] + 360.

# dsSM_mean = dsSM['soilMoist1'].mean(dim='time').compute()

# dsSM_mean.to_netcdf('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/Livneh/quantiles/Livneh_VIC_sm_mean_4-9.nc')

In [13]:
%%time
# Haqiqi, RCP 4.5, 2030-2050
models = get_models()

for model in models:
    # check if model is already done
    model_dir = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model
    fname = model_dir + '/' + model + '_VIC_maize_H21_RCP45_30-50.nc'
    if isfile(fname): 
        continue
    # else
    !mkdir $model_dir
    
    # Temperature & Precip
    which_r = '/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/'
    which_r = !ls $which_r
    dsTmin = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmin/*.nc', chunks='auto', parallel=True)
    dsTmax = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmax/*.nc', chunks='auto', parallel=True)
    dsP = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/pr/*.nc', chunks='auto', parallel=True)
    
    # Otherwise does not align with SM
    dsTmin['time'] = dsTmin.time.dt.floor('D')
    dsTmax['time'] = dsTmax.time.dt.floor('D')
    dsP['time'] = dsP.time.dt.floor('D')
    
    # Soil moisture
    dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/' + model + '/vic_output.rcp45.netcdf/*.nc', chunks='auto', parallel=True)
    # Need same grid and coords
    dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
    dsSM['lon'] = dsSM['lon'] + 360.
    
    # Future period
    dsSM = dsSM.sel(time=dsSM.time.dt.year.isin(np.arange(2030,2051)))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.year.isin(np.arange(2030,2051)))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.year.isin(np.arange(2030,2051)))
    dsP = dsP.sel(time=dsP.time.dt.year.isin(np.arange(2030,2051)))

    # Corn growing season (following Ortiz-Bobea ERL)
    dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsP = dsP.sel(time=dsP.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    
    # Calculate predictors
    get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, [10.0 + 273.0, 29.0 + 273.0], 'maize')

mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/GFDL-ESM2M': File exists
CPU times: user 39min 27s, sys: 1min 50s, total: 41min 17s
Wall time: 2h 15min 21s


In [31]:
# %%time
# # Get soil moisture quantiles over entire historical time period (1950-2013)
# # get_SM_quantiles(dsSM, 'Livneh', ['10', '30', '70', '90'], '4-9')

# # Calculate predictors (EDD conditioned on quantiles)
# get_predictors_EDD_SM_Q(dsT, dsSM, 'Livneh', [10.0, 29.0], 'maize', '4-9') # bounds from Schenker-Roberts 2009 (I checked lower bound on Powerpoint)

In [16]:
# %%time
# # Calculate predictors (intra-seasonal monthly)
# get_predictors_EDD_SM_intra(dsT, dsSM, 'Livneh', [10.0, 29.0], 'maize')

### Aggregate to county level

In [12]:
# Get unique fips codes
ufips = get_unique_fips()

In [13]:
# Get models
models = get_models()

In [14]:
%%time
# Parallelize
for model in models:
    # check if already done
    fname = '../data/future/predictors/maize/' + model + '_maize_H21_RCP45_30-50.csv'
    if isfile(fname): 
        continue
    # else
    delayed_res = []
    for fips in ufips:
        tmp_agg = dask.delayed(func_agg)(fips, model, 'H21', 'maize', 'RCP45', '30-50')
        delayed_res.append(tmp_agg)
    
    # Run
    res = dask.compute(*delayed_res)

    # To dataframe
    df_res = pd.concat(res)
    df_res['year'] = df_res['time'].dt.year
    df_res.drop(columns = 'time', inplace=True)

    df_res.to_csv('../data/future/predictors/maize/' + model + '_maize_H21_RCP45_30-50.csv', index=False)

CPU times: user 56min 26s, sys: 1min 34s, total: 58min 1s
Wall time: 2h 33min 47s


In [37]:
# %%time
# # Parallelize
# delayed_res = []
# for fips in ufips:
#     tmp_agg = dask.delayed(func_agg)(fips, 'Livneh', 'intra_predictors', 'maize')
#     delayed_res.append(tmp_agg)
    
# # Run
# res = dask.compute(*delayed_res)

CPU times: user 2min 25s, sys: 5.36 s, total: 2min 30s
Wall time: 15min 23s


In [49]:
# # Monthly predictors need some organization:
# # To dataframe
# df_res = pd.concat(res)
# df_res['year'] = df_res['time'].dt.year
# df_res['month'] = df_res['time'].dt.month
# df_res.drop(columns = 'time', inplace=True)

# # Subset growing season
# months = [4,5,6,7,8,9]
# df_res_season = df_res[df_res.month.isin(months)]

# # Predictors
# predictor_vars = df_res_season.columns[1:-2]

# # Final product
# df_final = df_res_season[['fips' ,'year']].copy()

# # Loop through each month, predictor
# for month in months:
#     # Temporary df with single month
#     temp = df_res_season.query('month == ' + str(month)).copy()
    
#     # Rename each predictor to include monthly suffix
#     for var in predictor_vars:
#         temp.rename(columns = {var : var + '_' + str(month)}, inplace=True)
        
#     # Merge
#     df_final = pd.merge(df_final, temp.drop(columns = 'month'), how='inner', on = ['fips', 'year'])
    
# # There are many duplicate rows, not sure why
# df_final.drop_duplicates(inplace=True)
# # Save
# df_final.to_csv('../data/processed/predictors/Livneh/Livneh_maize_intra_predictors.csv', index=False)

## Soy

In [15]:
%%time
# Haqiqi, RCP 4.5, 2030-2050
models = get_models()

for model in models:
    # check if model is already done
    model_dir = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model
    fname = model_dir + '/' + model + '_VIC_soy_H21_RCP45_30-50.nc'
    if isfile(fname): 
        continue
    # else
    # Temperature & Precip
    which_r = '/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/'
    which_r = !ls $which_r
    dsTmin = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmin/*.nc', chunks='auto', parallel=True)
    dsTmax = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmax/*.nc', chunks='auto', parallel=True)
    dsP = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/pr/*.nc', chunks='auto', parallel=True)
    
    # Otherwise does not align with SM
    dsTmin['time'] = dsTmin.time.dt.floor('D')
    dsTmax['time'] = dsTmax.time.dt.floor('D')
    dsP['time'] = dsP.time.dt.floor('D')
    
    # Soil moisture
    dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/' + model + '/vic_output.rcp45.netcdf/*.nc', chunks='auto', parallel=True)
    # Need same grid and coords
    dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
    dsSM['lon'] = dsSM['lon'] + 360.
    
    # Future period
    dsSM = dsSM.sel(time=dsSM.time.dt.year.isin(np.arange(2030,2051)))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.year.isin(np.arange(2030,2051)))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.year.isin(np.arange(2030,2051)))
    dsP = dsP.sel(time=dsP.time.dt.year.isin(np.arange(2030,2051)))

    # Corn growing season (following Ortiz-Bobea ERL)
    dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsP = dsP.sel(time=dsP.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    
    # Calculate predictors
    get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, [10.0 + 273.0, 30.0 + 273.0], 'soy')

mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/ACCESS1-0': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/ACCESS1-3': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/CCSM4': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/CESM1-BGC': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/CESM1-CAM5': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/CMCC-CM': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/CMCC-CMS': File exists
mkdir: cannot create directory '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_p

### Aggregate to county level

In [16]:
# Get unique fips codes
ufips = get_unique_fips()

In [17]:
# Get models
models = get_models()

In [18]:
%%time
# Parallelize
for model in models:
    # check if already done
    fname = '../data/future/predictors/soy/' + model + '_soy_H21_RCP45_30-50.csv'
    if isfile(fname): 
        continue
    # else
    delayed_res = []
    for fips in ufips:
        tmp_agg = dask.delayed(func_agg)(fips, model, 'H21', 'soy', 'RCP45', '30-50')
        delayed_res.append(tmp_agg)
    
    # Run
    res = dask.compute(*delayed_res)

    # To dataframe
    df_res = pd.concat(res)
    df_res['year'] = df_res['time'].dt.year
    df_res.drop(columns = 'time', inplace=True)

    df_res.to_csv('../data/future/predictors/soy/' + model + '_soy_H21_RCP45_30-50.csv', index=False)

CPU times: user 46min 45s, sys: 1min 17s, total: 48min 3s
Wall time: 2h 26min 29s


In [22]:
# %%time
# # Parallelize
# delayed_res = []
# for fips in ufips:
#     tmp_agg = dask.delayed(func_agg)(fips, 'Livneh', 'intra_predictors' ,'soy')
#     delayed_res.append(tmp_agg)
    
# # Run
# res = dask.compute(*delayed_res)

# # Monthly predictors need some organization:

# # To dataframe
# df_res = pd.concat(res)
# df_res['year'] = df_res['time'].dt.year
# df_res['month'] = df_res['time'].dt.month
# df_res.drop(columns = 'time', inplace=True)

# # Subset growing season
# months = [4,5,6,7,8,9]
# df_res_season = df_res[df_res.month.isin(months)]

# # Predictors
# predictor_vars = df_res_season.columns[1:-2]

# # Final product
# df_final = df_res_season[['fips' ,'year']].copy()

# # Loop through each month, predictor
# for month in months:
#     # Temporary df with single month
#     temp = df_res_season.query('month == ' + str(month)).copy()
    
#     # Rename each predictor to include monthly suffix
#     for var in predictor_vars:
#         temp.rename(columns = {var : var + '_' + str(month)}, inplace=True)
        
#     # Merge
#     df_final = pd.merge(df_final, temp.drop(columns = 'month'), how='inner', on = ['fips', 'year'])
    
# # There are many duplicate rows, not sure why
# df_final.drop_duplicates(inplace=True)
# # Save
# df_final.to_csv('../data/processed/predictors/Livneh/Livneh_soy_intra_predictors.csv', index=False)

CPU times: user 2min 49s, sys: 7.15 s, total: 2min 56s
Wall time: 16min 4s


## Sorghum

In [None]:
##### Same as soy ########

# Growing season: Ortiz-Bobea ERL
# Bounds: https://iopscience.iop.org/article/10.1088/1748-9326/5/1/014010

## Cotton

In [17]:
%%time
# Haqiqi, RCP 4.5, 2030-2050
models = get_models()

for model in models:
    # check if model is already done
    model_dir = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model
    fname = model_dir + '/' + model + '_VIC_cotton_H21_RCP45_30-50.nc'
    if isfile(fname): 
        continue
    # else
    print(model)
    # Temperature & Precip
    which_r = '/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/'
    which_r = !ls $which_r
    dsTmin = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmin/*.nc', chunks='auto', parallel=True)
    dsTmax = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmax/*.nc', chunks='auto', parallel=True)
    dsP = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/pr/*.nc', chunks='auto', parallel=True)
    
    # Otherwise does not align with SM
    dsTmin['time'] = dsTmin.time.dt.floor('D')
    dsTmax['time'] = dsTmax.time.dt.floor('D')
    dsP['time'] = dsP.time.dt.floor('D')
    
    # Soil moisture
    dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/' + model + '/vic_output.rcp45.netcdf/*.nc', chunks='auto', parallel=True)
    # Need same grid and coords
    dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
    dsSM['lon'] = dsSM['lon'] + 360.
    
    # Future period
    dsSM = dsSM.sel(time=dsSM.time.dt.year.isin(np.arange(2030,2051)))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.year.isin(np.arange(2030,2051)))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.year.isin(np.arange(2030,2051)))
    dsP = dsP.sel(time=dsP.time.dt.year.isin(np.arange(2030,2051)))

    # Corn growing season (following Ortiz-Bobea ERL)
    dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    dsP = dsP.sel(time=dsP.time.dt.month.isin([4, 5, 6, 7, 8, 9]))
    
    # Calculate predictors
    get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, [15.0 + 273.0, 32.0 + 273.0], 'cotton')

MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 10min 56s, sys: 21.4 s, total: 11min 18s
Wall time: 22min 46s


### Aggregate to county level

In [14]:
# Get unique fips codes
ufips = get_unique_fips()

In [15]:
# Get models
models = get_models()

In [21]:
%%time
# Parallelize
for model in models:
    # check if already done
    fname = '../data/future/predictors/cotton/' + model + '_cotton_H21_RCP45_30-50.csv'
    if isfile(fname): 
        continue
    # else
    print(model)
    delayed_res = []
    for fips in ufips:
        tmp_agg = dask.delayed(func_agg)(fips, model, 'H21', 'cotton', 'RCP45', '30-50')
        delayed_res.append(tmp_agg)
    
    # Run
    res = dask.compute(*delayed_res)

    # To dataframe
    df_res = pd.concat(res)
    df_res['year'] = df_res['time'].dt.year
    df_res.drop(columns = 'time', inplace=True)

    df_res.to_csv('../data/future/predictors/cotton/' + model + '_cotton_H21_RCP45_30-50.csv', index=False)

MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 5min 47s, sys: 7.32 s, total: 5min 54s
Wall time: 15min 45s


In [None]:
# Bio2: Mean 'diurnal' range
bio2 = (ds_monthly['Tmax'] - ds_monthly['Tmin']).mean(dim='time')

In [None]:
# Bio4: Temperature seasonality (SD)
bio4 = (ds_monthly['Tmax'] + ds_monthly['Tmin']) / 2.
bio4 = bio4.resample(time='Y').std().mean(dim='time')

In [None]:
# Bio12: Annual (total) precipitation
bio12 = ds['Prec'].resample(time='Y').sum().mean(dim='time').compute() / 100.

In [None]:
# Bio7: Annual temperature range
bio7 = ds_monthly['Tmax'].resample(time='Y').max() - ds_monthly['Tmin'].resample(time='Y').min()
bio7 = bio7.mean(dim='time')

In [None]:
# Bio3: Isothermality (bio2 / bio7)
bio3 = 100 * bio2 / bio7

In [None]:
# Bio15: Precip CV
bio15 = ds_monthly['Prec'].resample(time='Y').mean() / ds_monthly['Prec'].resample(time='Y').std()
bio15 = bio15.mean(dim='time')

In [290]:
# Save
bio_all = xr.merge([bio1.rename('bio_1'),
                       bio2.rename('bio_2'),
                       bio3.rename('bio_3'),
                       bio4.rename('bio_4'),
                       bio7.rename('bio_7'),
                       bio12.rename('bio_12'),
                       bio15.rename('bio_15')])

bio_all.attrs['NOTE'] = 'https://www.worldclim.org/data/bioclim.html'
bio_all.to_netcdf('/gpfs/group/kaf26/default/dcl5300/bioclim_all.nc')

## Barley

In [13]:
%%time
# Haqiqi, RCP 4.5, 2030-2050
models = get_models()

for model in models:
    # check if model is already done
    model_dir = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model
    fname = model_dir + '/' + model + '_VIC_barley_H21_RCP45_30-50.nc'
    if isfile(fname): 
        continue
    # else
    print(model)
    # Temperature & Precip
    which_r = '/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/'
    which_r = !ls $which_r
    dsTmin = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmin/*.nc', chunks='auto', parallel=True)
    dsTmax = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmax/*.nc', chunks='auto', parallel=True)
    dsP = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/pr/*.nc', chunks='auto', parallel=True)
    
    # Otherwise does not align with SM
    dsTmin['time'] = dsTmin.time.dt.floor('D')
    dsTmax['time'] = dsTmax.time.dt.floor('D')
    dsP['time'] = dsP.time.dt.floor('D')
    
    # Soil moisture
    dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/' + model + '/vic_output.rcp45.netcdf/*.nc', chunks='auto', parallel=True)
    # Need same grid and coords
    dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
    dsSM['lon'] = dsSM['lon'] + 360.
    
    # Future period
    dsSM = dsSM.sel(time=dsSM.time.dt.year.isin(np.arange(2030,2051)))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.year.isin(np.arange(2030,2051)))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.year.isin(np.arange(2030,2051)))
    dsP = dsP.sel(time=dsP.time.dt.year.isin(np.arange(2030,2051)))

    # Growing season (following USDA NASS)
    dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsP = dsP.sel(time=dsP.time.dt.month.isin([4, 5, 6, 7, 8]))
    
    # Calculate predictors
    get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, [0.0 + 273.0, 15.0 + 273.0], 'barley')

ACCESS1-0
ACCESS1-3
CCSM4
CESM1-BGC
CESM1-CAM5
CMCC-CM
CMCC-CMS
CNRM-CM5
CSIRO-Mk3-6-0
CanESM2
EC-EARTH
FGOALS-g2
GFDL-CM3
GFDL-ESM2G
GFDL-ESM2M
GISS-E2-H
GISS-E2-R
HadGEM2-AO
HadGEM2-CC
HadGEM2-ES
IPSL-CM5A-LR
IPSL-CM5A-MR
MIROC-ESM
MIROC-ESM-CHEM
MIROC5
MPI-ESM-LR
MPI-ESM-MR
MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 50min 19s, sys: 2min 49s, total: 53min 8s
Wall time: 3h 37min 6s


### Aggregate to county level

In [14]:
# Get unique fips codes
ufips = get_unique_fips()

In [15]:
# Get models
models = get_models()

In [16]:
%%time
# Parallelize
for model in models:
    # check if already done
    fname = '../data/future/predictors/barley/' + model + '_barley_H21_RCP45_30-50.csv'
    if isfile(fname): 
        continue
    # else
    print(model)
    delayed_res = []
    for fips in ufips:
        tmp_agg = dask.delayed(func_agg)(fips, model, 'H21', 'barley', 'RCP45', '30-50')
        delayed_res.append(tmp_agg)
    
    # Run
    res = dask.compute(*delayed_res)

    # To dataframe
    df_res = pd.concat(res)
    df_res['year'] = df_res['time'].dt.year
    df_res.drop(columns = 'time', inplace=True)

    df_res.to_csv('../data/future/predictors/barley/' + model + '_barley_H21_RCP45_30-50.csv', index=False)

ACCESS1-0
ACCESS1-3
CCSM4
CESM1-BGC
CESM1-CAM5
CMCC-CM
CMCC-CMS
CNRM-CM5
CSIRO-Mk3-6-0
CanESM2
EC-EARTH
FGOALS-g2
GFDL-CM3
GFDL-ESM2G
GFDL-ESM2M
GISS-E2-H
GISS-E2-R
HadGEM2-AO
HadGEM2-CC
HadGEM2-ES
IPSL-CM5A-LR
IPSL-CM5A-MR
MIROC-ESM
MIROC-ESM-CHEM
MIROC5
MPI-ESM-LR
MPI-ESM-MR
MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 25min 30s, sys: 1min 2s, total: 26min 33s
Wall time: 2h 33min 41s


## Spring wheat

In [17]:
%%time
# Haqiqi, RCP 4.5, 2030-2050
models = get_models()

for model in models:
    # check if model is already done
    model_dir = '/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models_predictors/' + model
    fname = model_dir + '/' + model + '_VIC_springwheat_H21_RCP45_30-50.nc'
    if isfile(fname): 
        continue
    # else
    print(model)
    # Temperature & Precip
    which_r = '/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/'
    which_r = !ls $which_r
    dsTmin = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmin/*.nc', chunks='auto', parallel=True)
    dsTmax = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/tasmax/*.nc', chunks='auto', parallel=True)
    dsP = xr.open_mfdataset('/gpfs/group/kzk10/default/public/LOCA/' + model + '/16th/rcp45/' + which_r[0] + '/pr/*.nc', chunks='auto', parallel=True)
    
    # Otherwise does not align with SM
    dsTmin['time'] = dsTmin.time.dt.floor('D')
    dsTmax['time'] = dsTmax.time.dt.floor('D')
    dsP['time'] = dsP.time.dt.floor('D')
    
    # Soil moisture
    dsSM = xr.open_mfdataset('/gpfs/group/kaf26/default/dcl5300/LOCA_VIC_soilMoisture/models/' + model + '/vic_output.rcp45.netcdf/*.nc', chunks='auto', parallel=True)
    # Need same grid and coords
    dsSM = dsSM.rename({'Time':'time', 'Lat':'lat', 'Lon':'lon'})
    dsSM['lon'] = dsSM['lon'] + 360.
    
    # Future period
    dsSM = dsSM.sel(time=dsSM.time.dt.year.isin(np.arange(2030,2051)))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.year.isin(np.arange(2030,2051)))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.year.isin(np.arange(2030,2051)))
    dsP = dsP.sel(time=dsP.time.dt.year.isin(np.arange(2030,2051)))

    # Corn growing season (following Ortiz-Bobea ERL)
    dsSM = dsSM.sel(time=dsSM.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsTmin = dsTmin.sel(time=dsTmin.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsTmax = dsTmax.sel(time=dsTmax.time.dt.month.isin([4, 5, 6, 7, 8]))
    dsP = dsP.sel(time=dsP.time.dt.month.isin([4, 5, 6, 7, 8]))
    
    # Calculate predictors
    get_predictors_EDD_SM_H21(dsTmin, dsTmax, dsP, dsSM, model, [5.0 + 273.0, 26.0 + 273.0], 'springwheat')

ACCESS1-0
ACCESS1-3
CCSM4
CESM1-BGC
CESM1-CAM5
CMCC-CM
CMCC-CMS
CNRM-CM5
CSIRO-Mk3-6-0
CanESM2
EC-EARTH
FGOALS-g2
GFDL-CM3
GFDL-ESM2G
GFDL-ESM2M
GISS-E2-H
GISS-E2-R
HadGEM2-AO
HadGEM2-CC
HadGEM2-ES
IPSL-CM5A-LR
IPSL-CM5A-MR
MIROC-ESM
MIROC-ESM-CHEM
MIROC5
MPI-ESM-LR
MPI-ESM-MR
MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 48min 59s, sys: 3min 13s, total: 52min 13s
Wall time: 3h 19min 59s


### Aggregate to county level

In [18]:
# Get unique fips codes
ufips = get_unique_fips()

In [19]:
# Get models
models = get_models()

In [20]:
%%time
# Parallelize
for model in models:
    # check if already done
    fname = '../data/future/predictors/springwheat/' + model + '_springwheat_H21_RCP45_30-50.csv'
    if isfile(fname): 
        continue
    # else
    print(model)
    delayed_res = []
    for fips in ufips:
        tmp_agg = dask.delayed(func_agg)(fips, model, 'H21', 'springwheat', 'RCP45', '30-50')
        delayed_res.append(tmp_agg)
    
    # Run
    res = dask.compute(*delayed_res)

    # To dataframe
    df_res = pd.concat(res)
    df_res['year'] = df_res['time'].dt.year
    df_res.drop(columns = 'time', inplace=True)

    df_res.to_csv('../data/future/predictors/cotton/' + model + '_springwheat_H21_RCP45_30-50.csv', index=False)

ACCESS1-0
ACCESS1-3
CCSM4
CESM1-BGC
CESM1-CAM5
CMCC-CM
CMCC-CMS
CNRM-CM5
CSIRO-Mk3-6-0
CanESM2
EC-EARTH
FGOALS-g2
GFDL-CM3
GFDL-ESM2G
GFDL-ESM2M
GISS-E2-H
GISS-E2-R
HadGEM2-AO
HadGEM2-CC
HadGEM2-ES
IPSL-CM5A-LR
IPSL-CM5A-MR
MIROC-ESM
MIROC-ESM-CHEM
MIROC5
MPI-ESM-LR
MPI-ESM-MR
MRI-CGCM3
NorESM1-M
inmcm4
CPU times: user 25min 27s, sys: 1min 4s, total: 26min 32s
Wall time: 2h 29min 5s
