**Step 3: Create year long files of daily tas, tasmax, tasmin, dtr, and total_precip.**

In [1]:
%matplotlib inline
import xarray as xr
import numpy as np 
import matplotlib.pyplot as plt
import os
import pandas as pd
from datetime import datetime, timedelta, date

import dask
import dask.array as dda
import dask.distributed as dd

# rhodium-specific kubernetes cluster configuration
import rhg_compute_tools.kubernetes as rhgk

In [10]:
client, cluster = rhgk.get_standard_cluster()
cluster.scale(30)

In [11]:
client

0,1
Client  Scheduler: gateway://traefik-impactlab-hub-dask-gateway.impactlab-hub:80/impactlab-hub.dcbf119d795d457da558e2655cff0e86  Dashboard: /services/dask-gateway/clusters/impactlab-hub.dcbf119d795d457da558e2655cff0e86/status,Cluster  Workers: 30  Cores: 30  Memory: 362.39 GB


In [1]:
cluster.close()

Functions to create yearlong files of daily data

In [2]:
def calc_daily_era5_average(spec):
    '''
    calculate daily-averaged ERA-5 temperature data 
    '''
    filepath, timestep, variable = spec
    var = variable
    with xr.open_dataset(filepath) as ds:
        return(ds[var].mean('time'))

def calc_daily_era5_tmax(spec):
    '''
    calculate daily-averaged ERA-5 temperature data 
    '''
    filepath, timestep, variable = spec
    var = variable
    with xr.open_dataset(filepath) as ds:
        return(ds[var].max('time'))

def calc_daily_era5_tmin(spec):
    '''
    calculate daily-averaged ERA-5 temperature data 
    '''
    filepath, timestep, variable = spec
    var = variable
    with xr.open_dataset(filepath) as ds:
        return(ds[var].min('time'))
    
def calc_daily_dinural_temp_range(spec):
    '''
    calculate daily-averaged diurnal temperature renage (DTR)  
    '''
    filepath, timestep, variable = spec
    var = variable
    with xr.open_dataset(filepath) as ds:
        return(ds[var].max('time') - ds[var].min('time'))
    
def calc_daily_total_precip(spec):
    '''
    calculate daily sum of precipitation  
    '''
    filepaths, timestep, variable = spec
    var = variable
    with xr.open_mfdataset(filepaths, concat_dim='time', combine='by_coords') as ds:
        data_rolled = ds[var].roll(time=-1, roll_coords=True)
        da_resampled = data_rolled[:-1].resample(time='1D', closed='right').sum(dim='time')
        return da_resampled[0]

In [3]:
def create_jobs(year, variable):
    if variable == 'pr':
        start = '01-01-{}'.format(year)
        end = '01-01-{}'.format(year+1)
        
        director_var = 'pr' #precip
        file_var = 'tp'
        title_var = 'total_precip'
        
        # make list of daily datetime indices, this includes leap years 
        dt_index_full = pd.date_range(start=start, end=end, freq='D')
        
        # reformat month/day for the retrieval function 
        dt_index_years = dt_index_full.year.astype(str)
        dt_index_months = dt_index_full.month.map("{:02}".format)
        dt_index_days = dt_index_full.day.map("{:02}".format)
        
        daily_files = ['%s_%s_%s_%s.nc' %(title_var, year, month, day) for year, month, 
                   day in zip(dt_index_years, dt_index_months, dt_index_days)]
        hourly_dir = '/gcs/impactlab-data/climate/source_data/ERA-5/{}/hourly/netcdf/F320/'.format(director_var)

        daily_filepaths = [(os.path.join(hourly_dir, daily_files[i]), os.path.join(hourly_dir, daily_files[i+1])) for i in np.arange(0, len(daily_files[:-1]))]
        JOBS = [(filepath, 'hourly', file_var) for filepath in daily_filepaths] 
        return [JOBS, dt_index_full]
        
    else:
        director_var = 'tas'
        file_var = 't2m'
        
        start = '01-01-{}'.format(year)
        end = '12-31-{}'.format(year)

        # make list of daily datetime indices, this includes leap years 
        dt_index_full = pd.date_range(start=start, end=end, freq='D')

        # reformat month/day for the retrieval function 
        dt_index_years = dt_index_full.year.astype(str)
        dt_index_months = dt_index_full.month.map("{:02}".format)
        dt_index_days = dt_index_full.day.map("{:02}".format)

        daily_files = ['%s_%s_%s_%s.nc' %(file_var, year, month, day) for year, month, 
                       day in zip(dt_index_years, dt_index_months, dt_index_days)]
        hourly_dir = '/gcs/impactlab-data/climate/source_data/ERA-5/{}/hourly/netcdf/F320/'.format(director_var)

        daily_filepaths = [os.path.join(hourly_dir, daily_file) for daily_file in daily_files]
        JOBS = [(filepath, 'hourly', file_var) for filepath in daily_filepaths] 
        return [JOBS, dt_index_full]

def save_yearlong_dailydata_file(directory, year, ds, var):
    '''
    save file of daily data for one variable for one year
    directory(str)
    year(str)
    ds(Dataset)
    var(str)
    '''
    today = str(date.today())
    daily_file = xr.Dataset( {var: ds},
                           attrs={
        'author': 'Meredith Fish',
        'contact': 'meredith.fish@rutgers.edu',
        'project': ('global downscaling [ERA-5]'),
        'source': ('impactlab-data/climate/source-data/ERA-5/{}/hourly/netcdf/'.format(var)),
        'created': today})
    #daily_file = daily_file.compute()
    filename = '%s_daily_%s-%s.nc' %(var, year, year)
    daily_file.to_netcdf(os.path.join(directory, filename), mode='w')

In [14]:
def execute_daily_file_creation(year, variable):
    '''
    uses `create_jobs` to create the JOBS files. 
    calculate the metric based on variable of interest. 
    save netcdf file.
    '''
    
    [JOBS, dt_index_full] = create_jobs(year, variable)

    save_directory = '/gcs/impactlab-data/climate/source_data/ERA-5/{}/daily/netcdf/v1.2'.format(variable)

    if variable == 'dtr':
        calc = calc_daily_dinural_temp_range
    elif variable == 'tas':
        calc = calc_daily_era5_average
    elif variable == 'tasmax':
        calc = calc_daily_era5_tmax
    elif variable == 'tasmin':
        calc = calc_daily_era5_tmin
    elif variable == 'pr':
        calc = calc_daily_total_precip
    
    futures = client.submit(calc, JOBS)
    da_list = client.gather(futures)

    # concatenate DataArrays in list 
    da_year = xr.concat(da_list, dim='time')
    # add datetime index 
    if variable == 'pr':
        da_year['time'] = dt_index_full[:-1]
    else:
        da_year['time'] = dt_index_full
    
    # save file
    save_yearlong_dailydata_file(save_directory, year, da_year, variable)

**Execute file creation**

Variable options: pr, dtr, tas, tasmax, tasmin (they correspond with the directory name)

In [2]:
var = 'tas'

for i_yr in np.arange(1994,2016):
    execute_daily_file_creation(i_yr, var)

Precip v1.3 created with function versions below

In [6]:
def execute_daily_file_creation_precip(year, variable):
    '''
    uses `create_jobs` to create the JOBS files. 
    calculate the metric based on variable of interest. 
    save netcdf file.
    '''
    
    [JOBS, dt_index_full] = create_jobs(year, variable)

    save_directory = '/gcs/impactlab-data/climate/source_data/ERA-5/{}/daily/netcdf/v1.3'.format(variable)
    
    da_list = []
    for iJOBS in JOBS:
        da = calc_daily_total_precip(iJOBS)
        da_list.append(da)

    # concatenate DataArrays in list 
    da_year = xr.concat(da_list, dim='time')
    # add datetime index 
    if variable == 'pr':
        da_year['time'] = dt_index_full[:-1]
    else:
        da_year['time'] = dt_index_full
    
    da_year = da_year.chunk({'time':len(da_year.time), 'latitude': len(da_year['latitude']), 'longitude': len(da_year['longitude'])})
    ds_year = da_year.to_dataset()
    
    return ds_year

In [10]:
def save_yearlong_dailydata_file_precip(directory, year, ds, var):
    '''
    save file of daily data for one variable for one year
    directory(str)
    year(str)
    ds(Dataset)
    var(str)
    '''
    today = str(date.today())
    filename = '%s_daily_%s-%s.nc' %(var, year, year)
    
    attrsdt = {
                'author': 'Meredith Fish',
                'contact': 'meredith.fish@rutgers.edu',
                'project': ('global downscaling [ERA-5]'),
                'source': ('impactlab-data/climate/source-data/ERA-5/{}/hourly/netcdf/'.format(var)),
                'created': today}

    ds.attrs.update(attrsdt)

    ds.to_netcdf(os.path.join(directory, filename), mode='w')

In [3]:
var = 'pr'
directory = '/gcs/impactlab-data/climate/source_data/ERA-5/pr/daily/netcdf/v1.3'

for i_yr in np.arange(1994,2016):
    ds = execute_daily_file_creation_precip(i_yr, var)
    save_yearlong_dailydata_file_precip(directory, i_yr, ds, var) 