In [1]:
print("STATUS CHECK")

STATUS CHECK


In [2]:
from distributed import Client
client = Client()

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:34994  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 9  Cores: 72  Memory: 405.33 GB


In [4]:
import dask
from dask.diagnostics import ProgressBar
import dask.multiprocessing
import numpy as np 
import xarray as xr
from datetime import date

import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


# Calculate and save the quantiles from large data

This notebook walks through the calculation of quantiles. It's based on a fairly large dataset (1979--2018 daily values on a 0.5° grid). To increase the sample size for each calendar day, we apply a window centered on the day and extending 7 days before and after. 

The main "trick" applied here is to convert from xarray to numpy array for all the calculation (much faster based on testing). The dataset is then converted back to a DataArray, and the final resulting dataset is saved to a netCDF file.

In [5]:
METHOD = "DOY"  # [SIMPLE, DOY]
OUTPUT = f"/project/amp/brianpm/TemperatureExtremes/CPC_tmax_dayofyear_quantiles_15daywindow_c{date.today().strftime('%Y%m%d')}.nc"
logging.info(f"YOUR OUTPUT FILE WILL BE NAMED: {OUTPUT}")

INFO:root:YOUR OUTPUT FILE WILL BE NAMED: /project/amp/brianpm/TemperatureExtremes/CPC_tmax_dayofyear_quantiles_15daywindow_c20190621.nc


In [6]:
%%time
gpat = "/project/amp/jcaron/CPC_Tminmax/tmax.*.nc"
ds = xr.open_mfdataset(gpat, decode_cf=False)
ds = xr.decode_cf(ds)
tmax = ds['tmax']
logging.info("Loaded the tmax DataArray (likely lazy load)")

time_ndx = tmax.dims.index('time')

if 'long_name' in tmax.attrs:
    var_long_name = tmax.attrs['long_name']
else:
    var_lon_name = tmax.name

if 'units' in tmax.attrs:
    var_units = tmax.attrs['units']
else:
    logging.warning("NO UNITS ATTACHED TO VARIABLE.")
    var_units = "N/A"
    
out_file_info = f"Data derived from glob patter {gpat}, resulting in data set with {len(ds['time'])} time slices."

INFO:root:Loaded the tmax DataArray (likely lazy load)


CPU times: user 1.14 s, sys: 215 ms, total: 1.36 s
Wall time: 6.18 s


In [7]:
%%time
tmax_np = tmax.values
logging.info("Convert to numpy array.")

INFO:root:Convert to numpy array.


CPU times: user 14.7 s, sys: 36.9 s, total: 51.6 s
Wall time: 1min 4s


In [8]:
# Use modulo operator (i.e., remainder) to deal with circular index.
# https://stackoverflow.com/questions/8951020/pythonic-circular-list
def get_window_indices(thing, current, look_back, look_ahead):
    """Given an iterable, thing, return the values of thing in circular slice. Go back look_back steps and
       go ahead look_ahead steps.
    """
    N = len(thing)-1
    window_inds = np.arange(-1*(look_back), look_ahead+1)
    result = []
    for w in window_inds:
        result.append(thing[(current + w) % N])
    return np.array(result)  # these are the values of thing

In [9]:
def get_our_quants(data):
        return np.nanquantile(data, [.01, .05, .1, .25, .5, .75, .9, .95, .99], axis=0, overwrite_input=False, interpolation='linear', keepdims=False)


In [10]:
#
# metadata / coordinates / define quantile
#
lat = ds['lat']
lon = ds['lon']
time = ds['time']
quantile = [.01, .05, .1, .25, .5, .75, .9, .95, .99]


In [16]:
%%time
if METHOD == "SIMPLE":
    #
    # SIMPLE APPROACH -- calculate quantiles at each spatial location
    #
    quants = np.nanquantile(tmax_np, [.01, .05, .1, .25, .5, .75, .9, .95, .99], axis=time_ndx, overwrite_input=False, interpolation='linear', keepdims=False)

    logging.info("Quantiles Calculated.")
    quants_xr = xr.DataArray(quants, coords={"quantile":quantile, "lat":lat, "lon":lon}, dims=("quantile","lat","lon"))
    quants_xr.name = "quantile"
    quants_xr.attrs["long_name"] = f"quantiles of {var_long_name}"


CPU times: user 6 µs, sys: 2 µs, total: 8 µs
Wall time: 15.3 µs


In [12]:
# DAY-OF-YEAR METHOD
# The inefficiency of xarray dealing with the data makes this part a little more elaborate
# because we effectively have to do a manual "groupby" operation.
# - get the indices for each day of year
# --> Make a dictionary with keys that are day of year and values that are the data

# PROTOTYPE
# This works. But don't actually run it unless we want to work on each day completely individually.
# See below where we apply similar approach for a given time window size.
# doy = set(time.dt.dayofyear.values)
# doy_dict = {day: tmax_np[time.dt.dayofyear == day, ...] for day in doy}

In [11]:
%%time
# Now we use our alternative to grouping to get good sampling.
# If we have around 50 years of data, but we want to calculate the 90th percentile, we have a couple of options:
# (1) Just do what we can and try to calculate confidence intervals for the quantiles we produce,
# (2) Increase sample size by using surrounding days (as in Perkins&Alexander),
# (3) DO BOTH!
# The issue is simply that we want a reasonably confident estimate of the quantile.

# Let's start by doing Option2 because it is now easy with the code we have so far.
# If we have 50 years and we want a better estimate of quantiles, taking a 3-day window increases N to 150
# a 5 day window gets us to 250, which seems better.
# Crossing across the year boundaries is dealt with effciently with our get_window_indices function.

tday = time.dt.dayofyear.values # day of year for every time.
doy = set(tday)  # kind of silly, but will deal with 365 or 366 day year
doy_list = list(doy) # need a version that can be indexed.
doy_dict = dict()
Ndays = len(doy_list)-1
for i, day in enumerate(doy_list):
    use_days = get_window_indices(doy_list, i, 7, 7) # Follows Perkins & Alexander
    use_inds = np.concatenate([np.nonzero(tday == j)[0] for j in use_days])
    doy_dict[day] = tmax_np[use_inds, ...]

# the "use_inds" line does quite a lot:
# uses np.nonzero to get the indices for each calendar day that has been identified, 
# but that returns a tuple, so we just take the first element,
# wrap in list comprehension to do for all the identified days,
# concatenate the result. Should produce a 1d array with all the indices needed.

# doy_dict -- integer day of year key followed by array of values within the time window.

CPU times: user 56.7 s, sys: 1min 7s, total: 2min 4s
Wall time: 2min 11s


From Perkins & Alexander:

        CTX90pct—The threshold is the calendar day 90th percentile of Tmax, based on a 15-day window. 
        That is, there is a different percentile value for each day of the year (thereby accounting for
        the seasonal cycle), where the window is centered on the day in question. Using a moving window
        accounts for temporal dependence while producing a reasonable sample size to calculate a
        realistic percentile value. The thresholds are calculated for each time period and grid box separately.

In [27]:
with ProgressBar():
#     results = {i:dask.delayed(get_our_quants)(i) for i in doy_dict}
    lazy_results = [dask.delayed(get_our_quants)(i) for i in doy_dict]




In [13]:
# This didn't work!
# import dask.bag as db
# b = db.from_sequence(list(doy_dict.values()), npartitions=366)
# b = b.map(get_our_quants)
# %time results_bag = b.compute()

In [14]:

# GETTING MEMORY ISSUE WITH MULTIPROCESS/MULTIPROCESSING

# num_processors = 24
# p=Pool(processes=num_processors)
# logging.info("Pool instantiated, start calculation.")
# doy_quants = p.map(lambda x: get_our_quants(doy_dict[x]), doy_dict)
# # a list that shoudl be in the right order
# # doy_quants[day] has shape (9, 360, 720) ==> nquantiles, lat, lon
# pool.close()


In [30]:
with ProgressBar():
    results_comp = dask.compute(*lazy_results, scheduler='processes')

# run it: 
# futures = dask.persist(*lazy_results)

OSError: [Errno 12] Cannot allocate memory

In [29]:
type(results_comp)

tuple

In [22]:
%%time
doy_quants = results
xr_das = {}
for i in doy_quants:
    xr_das[i] = xr.DataArray(doy_quants[i], coords={"quantile":quantile, "lat":lat, "lon":lon}, dims=("quantile", "lat", "lon"))

xr_output = xr.concat(xr_das, dim="time")
xr_output['time'].values = np.array(doy_list)
xr_output['time']['units'] = 'day-of-year'
xr_output.name = tmax.name
xr_output.attrs['long_name'] = var_lon_name
xr_output.attrs['units'] = var_units

# SAVE OUTPUT
xr_output.to_netcdf(OUTPUT, format='NETCDF4', encoding={tmax.name: {"zlib": True, "_FillValue": None}})
logging.info("All done.")





KeyError: 'quantile'

In [25]:
doy_quants[1].compute()

AxisError: axis 0 is out of bounds for array of dimension 0

In [2]:
ssss = {"one":1, "two":2, "three":3}
for key , value in ssss.items():
    print(key, " :: ", value)

one  ::  1
two  ::  2
three  ::  3


In [None]:
from multiprocess import Process, Manager
def f(d, x):
    d[x] = x**2
    
manager = Manager()
d = manager.dict()
job = [Process(target=f, args=(d, i)) for i in range(5)]
A = [p.start() for p in job]
B = [p.join() for p in job]