# Erai monthly preprocessing

Aggregate the ERA interim data into 12 monthly files, with all years for each month (+/- 15 days of buffer)

based on code from Ryan currier:
/glade/u/home/currierw/make_monthly_erai_dataset.ipynb


In [1]:
import glob, os
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import multiprocessing as mp
import psutil

In [2]:
# path_in='/glade/u/home/currierw/scratch/erai/convection/erai/'
# path_out='/glade/u/home/currierw/scratch/erai/convection/erai/clipped_by_month_convert_SST/'

path_in='/glade/scratch/gutmann/icar/forcing/erai_conus/monthly'  # path with erai files (chronological)
path_out='/glade/scratch/bkruyt/erai/erai_greatlakes_month/' # path where 12 monthly files will be written


In [3]:
# load a wps file to crop the ERA data to:
GEO=xr.open_dataset('/glade/work/bkruyt/WPS/ICAR_domains/great_lakes_6km/geo_em.d01.nc') 
# plt.pcolormesh(GEO.XLONG_M.isel(Time=0), GEO.XLAT_M.isel(Time=0), GEO.HGT_M.isel(Time=0) )
print( GEO.XLONG_M.values.min(), GEO.XLONG_M.values.max(), GEO.XLAT_M.values.min(), GEO.XLAT_M.values.max() )

-95.161194 -70.338806 39.514442 50.168922


## Load orginals and crop

In [4]:
%%time
# load all files at once:
files=sorted(glob.glob(path_in+'/erai-*.nc'))
dsERAi=xr.open_mfdataset(files,combine='by_coords') 

#  Crop (optionally)
# lat_bnds, lon_bnds = [25, 56], [-130, -96] # W-Conus

# Crop to Great Lakes domain (+0.5 deg)
buff=1.0 #0.5
lat_bnds = [ (GEO.XLAT_M.values.min()-buff), (GEO.XLAT_M.values.max() +buff)] 
lon_bnds = [ (GEO.XLONG_M.values.min()-buff), (GEO.XLONG_M.values.max()+buff)]
print(lat_bnds, lon_bnds)

# crop and load to memory(?) - Be mindful about memory, uncropped this is ~750 GB
dsERAi_clip=dsERAi.sel(lat=slice(*lat_bnds),lon=slice(*lon_bnds)).load()

print( dsERAi_clip.lat.values.min(), dsERAi_clip.lat.values.max(),  
      dsERAi_clip.lon.values.min(), dsERAi_clip.lon.values.max() )

[38.514442443847656, 51.168922424316406] [-96.16119384765625, -69.33880615234375]
38.94728 50.87706 -95.62491 -69.60928
CPU times: user 3min 37s, sys: 7min 1s, total: 10min 39s
Wall time: 32min 16s


### Calculate water vapor mixing ratio, air temperature, and convective precipitation

In [5]:
# modify parameters:

dsERAi_clip['qv']=(dsERAi_clip['qv']/(1-dsERAi_clip['qv']))
dsERAi_clip['qv']=dsERAi_clip['qv'].assign_attrs({'long_name': 'Water Vapor Mixing Ratio', 'units': 'kg kg**-1'})

# somehow this introduces a few NaN's, although there aren't any in theta or p. 
dsERAi_clip['T'] = dsERAi_clip['theta']*((dsERAi_clip['p']/100000)**(2/7))
dsERAi_clip['T']=dsERAi_clip['T'].assign_attrs({'long_name': 'Air Temperature', 'units': 'K'})

dsERAi_clip['cp']=dsERAi_clip['cp']/21600
dsERAi_clip['cp']=dsERAi_clip['cp'].assign_attrs({'long_name': 'Convective Precipitation', 'units': 'mm/s','Note':'Converted from mm/time step (6 hours) to mm/s by dividing by 21600'})



### Remove NaN's

In [None]:
# vrs=['theta','T']
# for var in vrs:
#     nan_idx = np.isnan(dsERAi_clip[var].values)
#     nan_idx+=nan_idx

# nan_indices=nan_idx.astype(bool)
# print('NaNs found at times ', dsERAi_clip.time[nan_indices[:,0,0,0]].values ,"\n")

# dsERAi_clip = dsERAi_clip.sel(time=~nan_indices[:,0,0,0])  

## Better to interpolate so time stays continuous:
dsERAi_clip['T']=dsERAi_clip['T'].interpolate_na(dim='time')
dsERAi_clip['theta']=dsERAi_clip['theta'].interpolate_na(dim='time')

### Create monthly files (serial)

In [7]:
for month_idx in range(1,13):

    # month_idx=month
    month_grouped=dsERAi_clip['time'].dt.month
    month_grouped[month_grouped!=month_idx]=0 # set month to zero for all but the current month
    idx=np.where(np.diff(month_grouped.values)!=0)[0] # the time-indices where the month changes
    
    for i in range(len(idx)):
        # print(i)
        month_grouped[idx[i]-(15*4)+1:idx[i]+1]=month_idx*np.ones(15*4) # previous time steps
        if i==0 and month_idx==1:
            month_grouped[idx[i]+1:idx[i]+(15*4)+1]=month_idx*np.ones(15*4) # forward time step
        elif i == len(idx)-1 and month_idx==12:
            print('executed')
            month_grouped=month_grouped # leave as is: dont go forward
        elif i < len(idx)-1:
            try:  # originally whole try statement was commented out
            #     # month_grouped[ idx[i+1]+1 : idx[i+1]+16 ] = month_idx*np.ones(15)  # org
            #     month_grouped[ idx[i+1]+1 : idx[i+1]+(15*4)+1 ] = month_idx*np.ones(15*4)  # BK addition
            # except:
            #     month_grouped=month_grouped                  
            month_grouped[idx[i+1]+1:idx[i+1]+(15*4)+1]=month_idx*np.ones(15*4)  # original code. 

    dsGroup=dsERAi_clip.groupby(month_grouped).groups
    ds_month=dsERAi_clip.isel(time=dsGroup[month_idx])


    # validate:
    try:
        print("   ", ds_month.time.values[0].astype('datetime64[D]') ,  ds_month.time.values[-1].astype('datetime64[D]') )
    except: # if calendar is noleap the above will fail and we use:
        print("   ", ds_month.indexes['time'].to_datetimeindex().values.min().astype('datetime64[D]'), 
                     ds_month.indexes['time'].to_datetimeindex().values.max().astype('datetime64[D]'))

    # Check if +/ 15 day buffer worked out:
    if month_idx==1 and np.any(ds_month.time.dt.month==12): 
        print('   month ',12,' ',sum(ds_month.time.dt.month.values==12),'times in ',month_idx)
    elif np.any(ds_month.time.dt.month==month_idx-1): 
        print('   month ',month_idx-1,' ',sum(ds_month.time.dt.month.values==month_idx-1), 'times in ',month_idx)

    if np.any(ds_month.time.dt.month==month_idx): 
        print('   month ',month_idx,' in ',month_idx)

    if month_idx!=12 and np.any(ds_month.time.dt.month==month_idx+1): 
        print('   month ',month_idx+1,' ',sum(ds_month.time.dt.month.values==month_idx+1),' times in ',month_idx)
    elif month_idx==12 and np.any(ds_month.time.dt.month==1): 
        print('   month ',1,' ',sum(ds_month.time.dt.month.values==1),'times in ',month_idx)
        
    # check if NaN;s are really gone:
    for var in ['T','theta']:
        print( '   nans in ' +var+': ' + str(np.isnan( ds_month[var].values).sum()) )
    # print mem usage:
    # Getting % usage of virtual_memory ( 3rd field) !!! This gives % of Node, not of mem allocated, so better use absolute amount below:
    # print('   * * *   RAM memory % used:', psutil.virtual_memory()[2], '   * * *   ')
    # Getting usage of virtual_memory in GB ( 4th field)
    print('   * * *   RAM Used (GB):', psutil.virtual_memory()[3]/1000000000, '   * * *   ')

    # save to 1 file per month:
    ds_month.to_netcdf(path_out+ 'erai_'+str(month_idx).zfill(2)+'.nc',  
                       encoding={'time':{'units': "days since 1900-01-01 00:00:00"}}) # rerun for encoding....
    print('month '+str(month_idx)+" saved to " +path_out+ 'erai_'+str(month_idx).zfill(2)+'.nc' +' \n')
    

    1979-01-01 2019-02-15
   * * *   RAM Used (GB): 187.265990656    * * *   

month 1 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--01.nc
    1979-01-17 2019-03-15
   * * *   RAM Used (GB): 186.612678656    * * *   

month 2 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--02.nc
    1979-02-14 2019-04-15
   * * *   RAM Used (GB): 187.385483264    * * *   

month 3 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--03.nc
    1979-03-17 2019-05-15
   * * *   RAM Used (GB): 187.099099136    * * *   

month 4 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--04.nc
    1979-04-16 2019-06-15
   * * *   RAM Used (GB): 187.669413888    * * *   

month 5 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--05.nc
    1979-05-17 2019-07-15
   * * *   RAM Used (GB): 187.0998528    * * *   

month 6 saved to /glade/scratch/bkruyt/erai/erai_conus_bymonth/erai--06.nc
    1979-06-16 2019-08-15
   * * *   RAM Used (GB): 187.675348992    * * *   


### Create monthly files (parallel)

Same function as before, but in parallel. If the dsERAi_clip dataset can be loaded (.load()) into memory, this parallel processing is not really needed. Kept here for whoever wants to play with it. 


In [6]:
# import psutil

# from create_monthly_files_prl:
def month_func(month_idx, ds=dsERAi_clip): 
    """ the function to create a monthly file from the entire dataset. To be called in parallel."""

    print("\n    ----- month ", month_idx, "------")

    month_grouped=ds['time'].dt.month
    month_grouped[month_grouped!=month_idx]=0   # if not the current month, set to zero
    idx=np.where(np.diff(month_grouped.values)!=0)[0]  # the (time) indices where the month changes

    for i in range(len(idx)):
        # print(i)
        month_grouped[idx[i]-(15*4)+1:idx[i]+1]=month_idx*np.ones(15*4) # previous time steps
        if i==0 and month_idx==1:
            month_grouped[idx[i]+1:idx[i]+(15*4)+1]=month_idx*np.ones(15*4) # forward time step
        elif i == len(idx)-1 and month_idx==12:
            print('executed')
            month_grouped=month_grouped # leave as is: dont go forward
        elif i < len(idx)-1:
            # try:  # originally whole try statement was commented out
            #     # month_grouped[ idx[i+1]+1 : idx[i+1]+16 ] = month_idx*np.ones(15)  # org
            #     month_grouped[ idx[i+1]+1 : idx[i+1]+(15*4)+1 ] = month_idx*np.ones(15*4)  # BK addition
            # except:
            #     month_grouped=month_grouped                  
            month_grouped[idx[i+1]+1:idx[i+1]+(15*4)+1]=month_idx*np.ones(15*4)  # original code. 

    dsGroup=ds.groupby(month_grouped).groups
    ds_month=ds.isel(time=dsGroup[month_idx])

    # validate:
    try:
        print("   ", ds_month.time.values[0].astype('datetime64[D]') ,  ds_month.time.values[-1].astype('datetime64[D]') )
    except: # if calendar is noleap the above will fail and we use:
        print("   ", ds_month.indexes['time'].to_datetimeindex().values.min().astype('datetime64[D]'), 
                     ds_month.indexes['time'].to_datetimeindex().values.max().astype('datetime64[D]'))

    # print mem usage:
    # Getting % usage of virtual_memory ( 3rd field) !!! This gives % of Node, not of mem allocated, so better use absolute amount below:
    # print('   * * *   RAM memory % used:', psutil.virtual_memory()[2], '   * * *   ')
    # Getting usage of virtual_memory in GB ( 4th field)
    print('   * * *   RAM Used (GB):', psutil.virtual_memory()[3]/1000000000, '   * * *   \n')

    # save the monthly file to disk:
    # ds_month.to_netcdf(out_dir+modLs[z]+'_'+str(month_idx).zfill(2)+'.nc'  ,  encoding={'time':{'units': "days since 1900-01-01 00:00:00"}}) 
    
    ds_month.to_netcdf(path_out+ 'erai_'+str(month_idx).zfill(2)+'.nc',  
                       encoding={'time':{'units': "days since 1900-01-01 00:00:00"}}) 
    print('month '+str(month_idx)+" saved to " +path_out+ 'erai_'+str(month_idx).zfill(2)+'.nc')

In [None]:
Nprocs=4  # maybe 12 is a bit much but 2-4 might work? Look at mem usage for serial...

## Call in parallel :
with mp.Pool(processes = Nprocs) as p:
    p.map( month_func, range(1,13) )          # for month_idx in range(1,13):

            # Original code:

## Calculate water vapor mixing ratio, air temperature, and convective precipitation (units)

Original code from RYan

In [None]:
# lat_bnds, lon_bnds = [25, 56], [-130, -96] # W-Conus

# Crop to Great Lakes domain (+0.5 deg)
buff=0.5
lat_bnds = [ GEO.XLONG_M.values.min()-buff, GEO.XLONG_M.values.max()+buff]
lon_bnds = [ GEO.XLAT_M.values.min()-buff, GEO.XLAT_M.values.max() +buff] 
print(lat_bnds, lon_bnds)

years=np.arange(1979,2020,1)
months=np.arange(1,13,1)

for year in years:
    for month in months:
    
        try:
            files=sorted(glob.glob(path_in+'/erai-'+str(year).zfill(2)+'-'+str(month).zfill(2)+'*'))
            # print(files)
            dsERAi=xr.open_mfdataset(files,combine='by_coords') 

            # crop to desired domain (gives error?)
            # dsERAi_clip=dsERAi.sel(lat=slice(*lat_bnds),lon=slice(*lon_bnds)) # gives error?
            # dsERAi_clip=dsERAi.sel(lat=slice(lat_bnds),lon=slice(lon_bnds)) # gives error?
            dsERAi_clip=dsERAi

            dsERAi_clip['qv']=(dsERAi_clip['qv']/(1-dsERAi_clip['qv']))
            dsERAi_clip['qv']=dsERAi_clip['qv'].assign_attrs({'long_name': 'Water Vapor Mixing Ratio', 'units': 'kg kg**-1'})

            dsERAi_clip['T'] = dsERAi_clip['theta']*((dsERAi_clip['p']/100000)**(2/7))
            dsERAi_clip['T']=dsERAi_clip['T'].assign_attrs({'long_name': 'Air Temperature', 'units': 'K'})

            dsERAi_clip['cp']=dsERAi_clip['cp']/21600
            dsERAi_clip['cp']=dsERAi_clip['cp'].assign_attrs({'long_name': 'Convective Precipitation', 'units': 'mm/s','Note':'Converted from mm/time step (6 hours) to mm/s by dividing by 21600'})

            dsERAi_clip.to_netcdf(path_out+'erai-'+str(year).zfill(2)+'-'+str(month).zfill(2)+'.nc', engine='netcdf4')
            print('finished: '+str(year).zfill(2)+'-'+str(month).zfill(2))

        except:
            print('skipping: '+str(year).zfill(2)+'-'+str(month).zfill(2))

## add 15 days before/after:
(org code from Ryan)

In [None]:
    for month in range(1,13):

        month_idx=month
        month_grouped=ds['time'].dt.month
        month_grouped[month_grouped!=month_idx]=0
        idx=np.where(np.diff(month_grouped.values)!=0)[0]
        for i in range(len(idx)):
            month_grouped[idx[i]-(15*4)+1:idx[i]+1]=month_idx*np.ones(15*4) # previous time steps
            if i==0 and month_idx==1:
                month_grouped[idx[i]+1:idx[i]+(15*4)+1]=month_idx*np.ones(15*4) # forward time step
            elif i == len(idx)-1 and month_idx==12:
                print('executed')
                month_grouped=month_grouped # leave as is: dont go forward
            elif i < len(idx)-1:
#                 try:
#                     month_grouped[idx[i+1]+1:idx[i+1]+16]=month_idx*np.ones(15)
#                 except:
#                     month_grouped=month_grouped                  
                month_grouped[idx[i+1]+1:idx[i+1]+(15*4)+1]=month_idx*np.ones(15*4)

        dsGroup=ds.groupby(month_grouped).groups
        ds_month=ds.isel(time=dsGroup[month_idx])
        ds_month.to_netcdf(directory+modLs[z]+'_'+str(month_idx).zfill(2)+'.nc')