In [1]:
import xarray as xr
import pandas as pd
import numpy as np

In [2]:
%%time

def preprocess(ds):
    ds = ds.drop_vars(["uIVT", "vIVT", "IWV"])
    ds = ds.sel(lat=slice(23., 52.), lon=slice(-127., -110.))
    return ds

start_date='2022-10-01'
end_date='2023-09-30'

y = 60.
x = -140.

dates = pd.date_range(start=start_date, end=end_date, freq='1D')
# put into pandas df
d ={"date": dates}
df = pd.DataFrame(data=d)
df['day']= df['date'].dt.day.map("{:02}".format)
df['month']= df['date'].dt.month.map("{:02}".format)
df['year']= df['date'].dt.year

# create list of daily ERA5 files for each AR
filenames = []
for j, row in df.iterrows():
    filenames.append('/data/downloaded/Reanalysis/ERA5/IVT/{0}/ERA5_IVT_{0}{1}{2}.nc'.format(row['year'], row['month'], row['day']))
    # open all files within the AR period

era = xr.open_mfdataset(filenames, combine='by_coords', preprocess=preprocess)

# ds = era.sel(lat=y, lon=x, method='nearest')
ds = era
ds

CPU times: user 5.13 s, sys: 4.81 s, total: 9.94 s
Wall time: 3min 29s


Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 539.55 MiB 1.48 MiB Shape (8760, 117, 69) (24, 117, 69) Dask graph 365 chunks in 1096 graph layers Data type float64 numpy.ndarray",69  117  8760,

Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [11]:
%%time
## compute duration of IVT >= 250.
AR = xr.where(ds.IVT >= 250, 1, 0)
a = AR != 0 # this will place True for all rows where AR is not 0

# get the temporal resolution in hours
t = ds['time'].isel(time=1) - ds['time'].isel(time=0) 
nhrs = t.values.astype('timedelta64[h]') # convert to hours

## this grabs the start and stop indices of each AR
tmp = a.cumsum()-a.cumsum().where(~a).ffill(dim='time').fillna(0).astype(int) # cumulative sum where not 0
duration = tmp*nhrs.astype(int)
duration = duration.rename("duration")
duration = duration.compute()
ds = xr.merge([ds, duration])
ds

CPU times: user 14.3 s, sys: 43.3 s, total: 57.5 s
Wall time: 1h 2min 32s


Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 539.55 MiB 1.48 MiB Shape (8760, 117, 69) (24, 117, 69) Dask graph 365 chunks in 1096 graph layers Data type float64 numpy.ndarray",69  117  8760,

Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [12]:
%%time

## compute preliminary rank
pr1 = xr.where((ds.IVT >= 250.) & (ds.IVT < 500.), 1, np.nan)
pr2 = xr.where((ds.IVT >= 500.) & (ds.IVT < 750.), 2, np.nan)
pr3 = xr.where((ds.IVT >= 750.) & (ds.IVT < 1000.), 3, np.nan)
pr4 = xr.where((ds.IVT >= 1000.) & (ds.IVT < 1250.), 4, np.nan)
pr5 = xr.where((ds.IVT >= 1250.), 5, np.nan)

prelim_rank = xr.merge([pr1, pr2, pr3, pr4, pr5], compat='no_conflicts')
prelim_rank = prelim_rank.rename({"IVT": "prelim_rank"})
prelim_rank = prelim_rank.compute()
## put into ds
ds = xr.merge([ds, prelim_rank])

ds

CPU times: user 37.8 s, sys: 1min 40s, total: 2min 18s
Wall time: 1h 44min 38s


Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 539.55 MiB 1.48 MiB Shape (8760, 117, 69) (24, 117, 69) Dask graph 365 chunks in 1096 graph layers Data type float64 numpy.ndarray",69  117  8760,

Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [13]:
%%time
## compute final rank
rank24 = xr.where((ds.duration < 24.), ds.prelim_rank - 1, np.nan)
rank48 = xr.where((ds.duration >= 48.), ds.prelim_rank + 1, np.nan)
rank0 = xr.where((ds.duration >= 24.) & (ds.duration <48.), ds.prelim_rank, np.nan)

rank = xr.merge([rank24.rename('rank'), rank48.rename('rank'), rank0.rename('rank')], compat='no_conflicts')
ds = xr.merge([ds, rank])
ds

CPU times: user 3.25 s, sys: 4.39 s, total: 7.64 s
Wall time: 7.69 s


Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 539.55 MiB 1.48 MiB Shape (8760, 117, 69) (24, 117, 69) Dask graph 365 chunks in 1096 graph layers Data type float64 numpy.ndarray",69  117  8760,

Unnamed: 0,Array,Chunk
Bytes,539.55 MiB,1.48 MiB
Shape,"(8760, 117, 69)","(24, 117, 69)"
Dask graph,365 chunks in 1096 graph layers,365 chunks in 1096 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [14]:
path_to_data = '/data/projects/Comet/cwp140/'
fname_out = path_to_data + 'preprocessed/ARScale_ERA5/ERA5_ARScale_WY2023.nc'
ds.to_netcdf(path=fname_out, mode = 'w', format='NETCDF4')