### Notebook to identify grid level drought events
#### Baseline = 1980 to 2016

In [3]:
%who

Interactive namespace is empty.


In [4]:
! squeue -u ad9701

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
             56613       vdi sys/dash   ad9701  R    8:18:01      1 ood-vn25


In [5]:
# set up a cluster
from dask.distributed import Client,Scheduler
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4,memory="31GB",walltime='04:00:00') #'01:30:00')
client = Client(cluster)
cluster.scale(cores=4)
client

  from distributed.utils import tmpfile


0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /proxy/8787/status,

0,1
Dashboard: /proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.0.128.153:46495,Workers: 0
Dashboard: /proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [6]:
%%time

import xarray as xr
import numpy as np
import pandas as pd
import glob

projdir = '/g/data/w97/ad9701/drought_2017to2020/drought_breakProb/awra/'
out_dir = projdir + 'sm_droughts/'
prefix = "events"
perc_drought_start = 0.1     # this is the thershold percentile to identify dry soil moisture conditions
perc_drought_end = 0.3

awra_dir = '/g/data/fj8/BoM/AWRA/DATA/SCHEDULED-V6/processed/values/day/'
file_names = 'sm_[1-2]*.nc'

# the Tinderbox drought region
lat_slice = slice(-20, -44)
lon_slice = slice(135, 154)

# startyr_list = list(range(1911, 2022, 1))  # performing calculations for each year using the full dataset to avoid memory problems
startyr_list = list(range(1981, 2022, 1))

# identifying drought based on an array created for two thresholds
# lowFlag for start of drought (10th perc) & highFlag for the end of drought (30th perc)
def identify_drought(x, prevDrght = None, lowFlag = -100, highFlag = 100, btwFlag = 0):
    '''x is a numpy array. 
    -100 (lowFlag) corresponds to values less than 10th perc, 
    +100 (highFlag) corresponds to greater than 30th perc, 
    0 (btwFlag) is between 10 & 30 where drought depends on the days leading upto the specific day
    function returns a 0 & 1 array, where 1 corresponds to drought
    prevDrght indicates whether the last day from the previous time sclice was in drought or not (0 or 1)
    ''' 
    y = np.zeros(len(x), dtype = int)
    droughtInd = np.where(x == lowFlag)       # days at which the grid is in drought (sm < drought threshold)
    y[droughtInd] = 1
    if (prevDrght == 1) & (x[0] != highFlag): # if the previous day is in drought & first day from this time slice did not exceed highFlag
        y[0] = 1
    btwInd = np.where(x == btwFlag)[0]        # days at which the grid may be in drought depending on the days leading upto that day

    if len(btwInd) > 0:                       # if such days exist
        for i in btwInd:
            if i != 0:                        # skip the first day
                if y[i-1] == 1:               # if the previous day was in drought the btwDay would also be in drought
                    y[i] = 1
    return(y)

# # Testing the function
# x = np.array([0, 0, 100, -100, -100, 0, 0, 0, 100, 0])    
# y = identify_drought(x, prevDrght = 0)
# print(x)
# print(y)

def create_filepath(ds, prefix='filename', root_path="."):
    """
    Generate a filepath when given an xarray dataset or dataarray
    """
    start = ds.time[0].dt.strftime("%Y-%m").data
    end = ds.time[-1].dt.strftime("%Y-%m").data
    filepath = f'{root_path}/{prefix}_{start}_{end}.nc'
    return filepath

# Unable to hold the full dataset in memory and identify droughts. So looping over 1-year time chunks and identifying drought for each chunk
for i in range(0, len(startyr_list)):
    startyr_sel = startyr_list[i]
    time_slice = slice(str(startyr_sel) + '-01-01', str(startyr_sel) + '-12-31') #slice('1911-01-02','2020-03-29')

    # get the daily soil moisture dataset
    ds_temp = xr.open_mfdataset(awra_dir + file_names) #, chunks = {'latitude':40,'longitude':40})
    # converting the datatypes of SM to match P
    lat_new = np.float32(ds_temp['latitude'])
    lon_new = np.float32(ds_temp['longitude'])
    ds = ds_temp.rename({'latitude':'lat','longitude':'lon'}).assign_coords(lat = lat_new, lon = lon_new).sel(time = time_slice)
    da_sm_temp = ds['sm'].chunk(chunks = {'lat':50, 'lon': 50, 'time':-1})

    # get the reference quantiles used to identify drought
    ds_sm_perc = xr.open_dataset(projdir + 'sm_1980to2016_perc/sm_percentiles.nc')
    da_sm_low = ds_sm_perc['sm'].sel(quantile = perc_drought_start)
    da_sm_high = ds_sm_perc['sm'].sel(quantile = perc_drought_end)
    
    # select the threshold values corresponding to each day of the year from the soil moisture dataset
    ds_doy = np.array(ds['time.dayofyear'])   # day of the year in ds, the sm dataset
    ds_start_thresh = da_sm_low.sel(dayofyear = ds_doy)
    ds_end_thresh = da_sm_high.sel(dayofyear = ds_doy)

    # create a data array with high & low flags to identify when sm crosses the thersholds
    da_sm = da_sm_temp.reset_index('time').rename({'time':'dayofyear', 'time_':'time'}).assign_coords({'dayofyear': ds_doy})
    da_sm = da_sm.sel(lat = lat_slice, lon = lon_slice)
    
    da_sm_start_thresh = xr.where(da_sm <= ds_start_thresh, -100, 0).reset_index('dayofyear').rename({'dayofyear':'time','dayofyear_':'dayofyear'})
    da_sm_end_thresh = xr.where(da_sm >= ds_end_thresh, 100, 0).reset_index('dayofyear').rename({'dayofyear':'time','dayofyear_':'dayofyear'})
    da_sm_thresh = da_sm_start_thresh + da_sm_end_thresh

    # identify drought events
    # if condition is used to differentiate between the first time chunk (no previous day info available) and the next few time chunks
    # dask_gufunc_kwargs = {'allow_rechunk':True}
    if i >= 1:
        prev_events_file = ''.join(glob.glob(out_dir + prefix + '_' + str(startyr_list[i-1]) + '*.nc'))
        ds_drought_prev = xr.open_dataset(prev_events_file)
        da_prev = ds_drought_prev['sm_drought'].isel(time = -1) # select the lasy time step from the previous files
        da_drought = xr.apply_ufunc(
            identify_drought,                      # first the function
            da_sm_thresh,                          # function arg
            da_prev,
            input_core_dims = [["time"], []],      # list with one entry per arg, these are the dimensions not to be broadcast
            output_core_dims = [["time"]],         # dimensions of the output
            vectorize = True,                      # broadcast over non-core dimensions of the input object?
            dask = "parallelized",                 # enable dask?                     
            # dask_gufunc_kwargs=dask_gufunc_kwargs,
            output_dtypes = [int]
        )
    else:
        da_drought = xr.apply_ufunc(
            identify_drought,                      # first the function
            da_sm_thresh,                          # function arg
            input_core_dims = [["time"]],          # list with one entry per arg, these are the dimensions not to be broadcast
            output_core_dims = [["time"]],         # dimensions of the output
            vectorize = True,                      # broadcast over non-core dimensions of the input object?
            dask = "parallelized",                 # enable dask?                     
            # dask_gufunc_kwargs=dask_gufunc_kwargs,
            output_dtypes = [int]
        )
        
    da_drought = da_drought.rename('sm_drought').drop('dayofyear', errors = 'ignore')
    out_file = create_filepath(da_drought, prefix = prefix, root_path = out_dir)
    da_drought
    da_drought.to_netcdf(out_file)

cluster.scale(cores = 0)

CPU times: user 19min 15s, sys: 5min 22s, total: 24min 38s
Wall time: 1h 36min 59s


In [9]:
cluster.scale(cores = 0)