In [1]:
from pathlib import Path
import subprocess as sp
import xarray as xr
import numpy as np
import multiprocessing as mp

import gzip

In [2]:
DOE_VARS = ['runoff', 'prcp', 'pres', 'rhum', 'srad', 'tmax', 'tmin', 'wind', 'PRMS_runoff', 'VIC4_runoff']

In [3]:
TC = '/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/{}/TerraClimate/2m30s/Monthly'
NCEP = '/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/{}/NCEP/60min/{}'
DOE = '/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw'

def get_tc(variable, year=None):
    vdir = Path(TC.format(variable))
    assert vdir.exists(), f"{variable} not found"
    
    if year is None:
        return sorted(vdir.glob('*_mTS*.gdbc.gz'))
    else:
        return sorted(vdir.glob(f'*_mTS{year}.gdbc.gz'))
    
def get_ncep(variable, time_step = 'daily', year=None):
    vdir = Path(NCEP.format(variable, time_step.lower().capitalize()))
    assert vdir.exists(), f"{variable} not found"
    
    time_step = time_step.lower()
    if time_step == 'daily':
        tstr = 'dTS' 
    elif time_step == 'monthly':
        tstr = 'mTS'
        
    if year is None:
        return sorted(vdir.glob(f'*_{tstr}*.gdbc.gz'))
    else:
        return sorted(vdir.glob(f'*_{tstr}{year}.gdbc.gz'))
    
def get_doe(variable, year=None):
    var_files = sorted(Path(DOE).glob(f'*_{variable}_*.nc'))
    
    if year is None:
        return var_files
    else:
        return [f for f in var_files if str(year) in f.name]

In [4]:
get_doe('srad')[0:5]

[PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_srad_1980.nc'),
 PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_srad_1981.nc'),
 PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_srad_1982.nc'),
 PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_srad_1983.nc'),
 PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_srad_1984.nc')]

In [5]:
get_doe('prcp', year=2000)

[PosixPath('/asrc/ecr/fabio/NREL_WaterSecurity/DOE9505/raw/DaymetV4_VIC4_prcp_2000.nc')]

In [6]:
get_ncep('Precipitation', time_step = 'daily', year=2000)

[PosixPath('/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/Precipitation/NCEP/60min/Daily/Global_Precipitation_NCEP_60min_dTS2000.gdbc.gz')]

In [7]:
get_ncep('Precipitation', time_step='monthly', year=2000)

[PosixPath('/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/Precipitation/NCEP/60min/Monthly/Global_Precipitation_NCEP_60min_mTS2000.gdbc.gz')]

## Covert DOE netcdf to RGIS gdbc

The prcp and temperature variables are missing a day of data on leap years, so we'll need to handle that as part of this process. 

I am assuming that Feb 29th is left out, rather than December 31st. 

In [8]:
import subprocess as sp
from calendar import isleap
import tempfile
from datetime import date, timedelta

In [9]:
def insert_feb29(ds, var_name):
    
    #TODO: Assert unit of date
    # time is in "days since 1980-01-01"
    new_time = ds.time.data
    new_time = np.append(new_time, [ds.time.data[-1] + 1,])

    # leaves time[-1] with no data, need to shift values forward
    reindexed = ds.reindex({'time': new_time}, method=None)

    feb28 = reindexed[var_name].isel(time=58)

    march1 = reindexed[var_name].isel(time=59)
    
    # interpolate value
    feb29 = (feb28 + march1)/2

    for t in new_time[::-1]:
        # convert from "days since 1980"
        dt = date(1980, 1, 1) + timedelta(int(t))
        # everything from march 1st on shifted forward one index
        if dt >= date(dt.year, 3, 1):
            reindexed[var_name].loc[t,:,:] = reindexed[var_name].loc[t-1,:,:]
        # feb29 interp value takes place of old march1
        if dt == date(dt.year, 2, 29):
            reindexed[var_name].loc[t,:,:] = feb29
    
    return reindexed

In [10]:
def _nc_to_rgis(nc: Path, year: int, output_path: Path):
    """Wrap terminal command:
        netcdf2rgis <netcdf> | grdDateLayers -e day -Y <year> - <output_gdbc>"""
    
    netcdf2rgis = f"netcdf2rgis {nc}".split()
    grdDateLayers = f"grdDateLayers -e day -Y {year} - {str(output_path)}".split()
    
    ps = sp.Popen(netcdf2rgis, stdout=sp.PIPE)
    output = sp.check_output(grdDateLayers, stdin=ps.stdout)
    ps.wait()
    return output_path
    
def nc_to_rgis(nc: Path, year: int, output_path: Path):
    """Perform netcdf conversion to gdbc (rgis format). If leap year data is missing Feb 29, fill in"""
    
    if isleap(year):
        ds = xr.open_dataset(nc)
        var_name = list(ds.data_vars.keys())[0]
        if len(ds.time) < 366:
            ds = insert_feb29(ds, var_name)
            
            # temporarily store new netcdf with feb29 added
            tmp_dir = tempfile.TemporaryDirectory(dir='/tmp', prefix="rgispy_")
            nc = Path(tmp_dir.name).joinpath(nc.name)
            ds.to_netcdf(nc)
           
            # convert temp netcdf
            rgis = _nc_to_rgis(nc, year, output_path)
            
            # delete temporary netcdf
            tmp_dir.cleanup()
            return rgis
        else:
            return _nc_to_rgis(nc, year, output_path)
    else:
        return _nc_to_rgis(nc, year, output_path)

In [11]:
# DOE files that already are converted to gdbc and have missing leap year data filled in
DOE_PREPROCESSED = Path('/home/shared/doe95/raw_gdbc')

In [12]:
RERUN = False

# This takes a while, no need to run it again unless I messed up something
if RERUN:
    POOL = mp.Pool(processes=4)

    for v in DOE_VARS:
        for y in range(1980, 2020):
            nc = get_doe(v, year=y)[0]

            out_name = nc.name.split('.')[0] + '.gdbc.gz'
            out_path = DOE_PREPROCESSED.joinpath(out_name)

            POOL.apply_async(nc_to_rgis, args=(nc, y, out_path))

    POOL.close()
    POOL.join()

In [13]:
def get_doe_preprocessed(variable, year=None):
    """DOE files that already are converted to gdbc and have missing leap year data filled in"""
    
    var_files = sorted(Path(DOE_PREPROCESSED).glob(f'*_{variable}_*.gdbc.gz'))
    
    if year is None:
        return var_files
    else:
        return [f for f in var_files if str(year) in f.name]

In [14]:
get_doe_preprocessed('prcp', year=2001)

[PosixPath('/home/shared/doe95/raw_gdbc/DaymetV4_VIC4_prcp_2001.gdbc.gz')]

## grdCalculate Final Data

Example command line version:
```sh
network="/asrc/ecr/balazs/GHAAS2/RGISarchive/CONUS/Network/HydroSTN30/03min/Static/CONUS_Network_HydroSTN30_03min_Static.gdbn.gz"
nrel="/asrc/ecr/danielv/kubestorage/shared-rw/doe95/prcp_2000.gdbc.gz"
terra="/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/Precipitation/TerraClimate/2m30s/Monthly/Global_Precipitation_TerraClimate_2m30s_mTS2000.gdbc.gz"
ncep_daily="/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/Precipitation/NCEP/60min/Daily/Global_Precipitation_NCEP_60min_dTS2000.gdbc.gz"
ncep_monthly="/asrc/ecr/balazs/GHAAS2/RGISarchive/Global/Precipitation/NCEP/60min/Monthly/Global_Precipitation_NCEP_60min_mTS2000.gdbc.gz"

grdCalculate -x $network \ 
    -c "$nrel == nodata ? $ncep_daily * $terra / $ncep_monthly : $nrel" > nrel_prcp_2000.gdbc.gz
```

In [15]:
conus_03min_gdbn = Path('/asrc/ecr/balazs/GHAAS2/RGISarchive/CONUS/Network/HydroSTN30/03min/Static/CONUS_Network_HydroSTN30_03min_Static.gdbn.gz')
conus_2m30s_gdbn = Path('/asrc/ecr/balazs/GHAAS2/RGISarchive/CONUS/Network/HydroSTN30/2m30s/Static/CONUS_Network_HydroSTN30_2m30s_Static.gdbn.gz')

In [27]:
def grdCalculate(expression:str, out_buffer, network:Path=conus_03min_gdbn, interpolate='surface', title=None, subject=None, domain=None, verbose=False):
        
    cmd = f'grdCalculate -x {str(network)} -n {interpolate}'.split()
    
    if title is not None:
        cmd += f'-t "{title}"'.split()
    
    if subject is not None:
        cmd += f'-u "{subject}"'.split()
    
    if domain is not None:
        cmd += f'-d "{domain}"'.split()
    
    cmd.append('-c')
    cmd.append(expression)
    #cmd += f'-c "{expression}"'.split()
    
    if verbose:
        print(' '.join(cmd))
    
    ps = sp.run(cmd, stdout=out_buffer)
    return ps

def save_grdCalculate(expression:str, output_gz: Path, **kwargs):
    #assert output_gz.is_file(), 'output_gz must be file' 
    if not output_gz.parent.exists():
        ouptut_gz.parent.mkdir(parents=True)
    
    ps = grdCalculate(expression, sp.PIPE, **kwargs)
    with gzip.open(output_gz, "wb") as f:
        f.write(ps.stdout)

In [30]:
nrel_prcp_2000 = get_doe_preprocessed('prcp', year=2000)[0]
terra_prcp_monthly_2000 = get_tc('Precipitation', year=2000)[0]
ncep_prcp_daily_2000 = get_ncep('Precipitation', time_step='daily', year=2000)[0]
ncep_prcp_monthly_2000 = get_ncep('Precipitation', time_step='monthly', year=2000)[0]

In [31]:
expression = f"{nrel_prcp_2000} == nodata ? {ncep_prcp_daily_2000} * {terra_prcp_monthly_2000} / {ncep_prcp_monthly_2000} : {nrel_prcp_2000}"

PRECIP_OUT = Path('/home/shared/doe95/CONUS/Precipitation/NRELDOE9505/03min/Daily')
precip_2000 = PRECIP_OUT.joinpath('CONUS_Precipitation_NRELDOE9505_03min_dTS2000.gdbc.gz')

In [28]:
# This takes a while
save_grdCalculate(expression, precip_2000, network=conus_03min_gdbn)

## Final Function

In [18]:
# data goes here
OUTPUT_ROOT = Path('/home/shared/doe95/')

In [19]:
VAR_CONV = {
    'prcp': {'rgis': 'Precipitation', 'ncep': True, 'terra': True} , # cannot be negative
    'pres': {'rgis': 'AirPressure', 'ncep': True, 'terra': False} ,
    'rhum': {'rgis': 'Humidity-Relative', 'ncep': True, 'terra': False} ,
    'srad': {'rgis': 'Radiation-ShortWave-Downwelling', 'ncep': True, 'terra': True} ,
    'tmax': {'rgis': 'AirTemperature', 'ncep': True, 'terra': True} ,
    'tmin': {'rgis': 'AirTemperature', 'ncep': True, 'terra': True} ,
    'wind': {'rgis': 'WindSpeed', 'ncep': True, 'terra': True} , # cannot be negative
    'PRMS_runoff': {'rgis': 'Runoff', 'ncep': False, 'terra': True} , 
    'VIC4_runoff': {'rgis': 'Runoff', 'ncep': False, 'terra': True}  
}

EXPR = {
    'prcp': "{nrel} == nodata ? ({ncep_monthly} > 0 ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : 0.0) : {nrel}",
    'pres': "{nrel} == nodata ? {ncep_daily} : {nrel}", 
    'srad': "{nrel} == nodata ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : {nrel}",
    'wind': "{nrel} == nodata ? ({ncep_monthly} > 0 ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : 0.0) : {nrel}",
    'tmin': "{nrel_tmin} == nodata ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : ( {nrel_tmin} + {nrel_tmax} ) / 2",
    
    # TODO relative humidity
}

In [20]:
def get_network(res):
    cand = Path(f'/asrc/ecr/balazs/GHAAS2/RGISarchive/CONUS/Network/HydroSTN30/{res}/Static/CONUS_Network_HydroSTN30_{res}_Static.gdbn.gz')
    assert cand.exists()
    return cand

In [21]:
def nrel_to_rgisarchive(nrel_var, year, res, output_dir=Path('/home/shared/doe95')):
    network_gdbn = get_network(res)

    rgis_var = VAR_CONV[nrel_var]['rgis']
    
    # only get data if expected to exist
    if VAR_CONV[nrel_var]['terra']:
        terra_monthly = get_tc(rgis_var, year=year)[0]
    
    if VAR_CONV[nrel_var]['ncep']:
        ncep_daily = get_ncep(rgis_var, time_step='daily', year=year)[0]
        ncep_monthly = get_ncep(rgis_var, time_step='monthly', year=year)[0]
    
    # temperature special case (tmin + tmax)/2
    if nrel_var == 'tmin':
        nrel_tmin = get_doe_preprocessed('tmin', year=year)[0]
        nrel_tmax = get_doe_preprocessed('tmax', year=year)[0]
        expression = EXPR['tmin'].format(nrel_tmin=nrel_tmin, nrel_tmax=nrel_tmax, ncep_daily=ncep_daily, terra_monthly=terra_monthly, ncep_monthly=ncep_monthly)
    else:
        nrel = get_doe_preprocessed(nrel_var, year=year)[0]
        # pressure only has ncep data
        if nrel_var == 'pres':
            expression = EXPR[nrel_var].format(nrel=nrel, ncep_daily=ncep_daily)
        # ideal case
        else:
            expression = EXPR[nrel_var].format(nrel=nrel, ncep_daily=ncep_daily, terra_monthly=terra_monthly, ncep_monthly=ncep_monthly)

    
    outdir = output_dir.joinpath(f'CONUS/{rgis_var}/NRELDOE9505/{res}/Daily')
    if not outdir.exists():
        outdir.mkdir(parents=True)
    
    outname = f"CONUS_{rgis_var}_NRELDOE9505_{res}_dTS{year}.gdbc.gz"
    outpath = outdir.joinpath(outname)
    
    save_grdCalculate(expression, outpath, network=network_gdbn, title=outname.split('.')[0], domain='CONUS', subject=rgis_var, verbose=False)
    return outpath

In [119]:
#nrel_to_rgisarchive('srad', 1980, '2m30s')

In [None]:
def nrel_async(nrel_vars, years, reses):
    def driver_func():
        PROCESSES = 4
        with mp.Pool(PROCESSES) as pool:
            results = []
            for v in nrel_vars:
                for y in years:
                    for res in reses:
                        result = pool.apply_async(nrel_to_rgisarchive, (v, y, res))
                        results.append(result)

            for r in results:
                print('\t', r.get())

    driver_func()
    
nrel_async(['srad',], [1980,], ['03min',])

In [105]:
def nrel_precip(year, res, output_dir=Path('/home/shared/doe95')):
    """Given a year of nrel prcp data in gdbc
    
    1. Get paths of needed ncep and terraclimate data
    2. Create output directory location if needed (see above example, or RGISARCHIVE2)
    3. Generate name for output data (like CONUS_Precipitation_NRELDOE9505_03min_dTS2000.gdbc.gz)
    4. Create a grdCalculate expression string with correct data and logic
    5. Call grdCalculate and save gdbc.gz to /../output_dir/output_name
    """
    network_gdbn = get_network(res)
    
    nrel = get_doe_preprocessed('prcp', year=year)[0]
    terra_monthly = get_tc('Precipitation', year=year)[0]
    ncep_daily = get_ncep('Precipitation', time_step='daily', year=year)[0]
    ncep_monthly = get_ncep('Precipitation', time_step='monthly', year=year)[0]
    
    outdir = output_dir.joinpath(f'CONUS/Precipitation/NRELDOE9505/{res}/Daily')
    if not outdir.exists():
        outdir.mkdir(parents=True)
    
    outname = f"CONUS_Precipitation_NRELDOE9505_{res}_dTS{year}.gdbc.gz"
    expression = f"{nrel} == nodata ? ({ncep_monthly} > 0 ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : 0.0) : {nrel}"
    outpath = outdir.joinpath(outname)
    
    save_grdCalculate(expression, outpath, network=network_gdbn)

In [None]:
nrel_precip(2000, '03min')

In [73]:
def nrel_temp(year, res, output_dir=Path('/home/shared/doe95')):
    """Given a year of nrel prcp data in gdbc
    
    1. Get paths of needed ncep and terraclimate data
    2. Create output directory location if needed (see above example, or RGISARCHIVE2)
    3. Generate name for output data (like CONUS_Precipitation_NRELDOE9505_03min_dTS2000.gdbc.gz)
    4. Create a grdCalculate expression string with correct data and logic
    5. Call grdCalculate and save gdbc.gz to /../output_dir/output_name
    """
    
    network_gdbn = get_network(res)
    nrel_tmin = get_doe_preprocessed('tmin', year=year)[0]
    nrel_tmax = get_doe_preprocessed('tmax', year=year)[0]
    
    terra_monthly = get_tc('AirTemperature', year=year)[0]
    ncep_daily = get_ncep('AirTemperature', time_step='daily', year=year)[0]
    ncep_monthly = get_ncep('AirTemperature', time_step='monthly', year=year)[0]
    
    outdir = output_dir.joinpath(f'CONUS/AirTemperature/NRELDOE9505/{res}/Daily')
    if not outdir.exists():
        outdir.mkdir(parents=True)
    
    outname = f"CONUS_AirTemperature_NRELDOE9505_{res}_dTS{year}.gdbc.gz"
    expression = f"{nrel_tmin} == nodata ? {ncep_daily} * {terra_monthly} / {ncep_monthly} : ( {nrel_tmin} + {nrel_tmax} ) / 2"
    outpath = outdir.joinpath(outname)
    
    save_grdCalculate(expression, outpath, network=network_gdbn, interpolate='flat', verbose=True)

In [78]:
nrel_temp(2000, '03min', output_dir=Path('/home/shared/doe95/temp'))

grdCalculate -x /asrc/ecr/balazs/GHAAS2/RGISarchive/CONUS/Network/HydroSTN30/03min/Static/CONUS_Network_HydroSTN30_03min_Static.gdbn.gz -n flat -c /home/shared/doe95/raw_gdbc/DaymetV4_VIC4_tmin_2000.gdbc.gz == nodata ? /asrc/ecr/balazs/GHAAS2/RGISarchive/Global/AirTemperature/NCEP/60min/Daily/Global_AirTemperature_NCEP_60min_dTS2000.gdbc.gz * /asrc/ecr/balazs/GHAAS2/RGISarchive/Global/AirTemperature/TerraClimate/2m30s/Monthly/Global_AirTemperature_TerraClimate_2m30s_mTS2000.gdbc.gz / /asrc/ecr/balazs/GHAAS2/RGISarchive/Global/AirTemperature/NCEP/60min/Monthly/Global_AirTemperature_NCEP_60min_mTS2000.gdbc.gz : ( /home/shared/doe95/raw_gdbc/DaymetV4_VIC4_tmin_2000.gdbc.gz + /home/shared/doe95/raw_gdbc/DaymetV4_VIC4_tmax_2000.gdbc.gz ) / 2


Vapor Pressure -> Relative Humidity

https://www.engineeringtoolbox.com/relative-humidity-air-d_687.html

https://www.engineeringtoolbox.com/water-vapor-saturation-pressure-air-d_689.html

pres < - > AirPressure

srad < - > Radiation_Shortwave_Downwelling

wind < - > WindSpeed