In [1]:
## import libraries
import os, sys
import yaml
import xarray as xr
import pandas as pd
# path_to_repo = '/home/dnash/repos/SEAK_AR_impacts/'
# sys.path.append(path_to_repo+'modules')
# import GEFSv12_funcs as gefs

In [2]:
variable = 'prec'
year = 2000
date = 20000101

In [3]:
path_to_data = "/expanse/lustre/scratch/dnash/temp_project/downloaded/GEFSv12_reforecast/{0}/".format(date)

## read the control
fname = "apcp_sfc_{0}00_c00.grib2".format(date)
dsa = xr.open_dataset(path_to_data+fname, engine='cfgrib',filter_by_keys={'dataType': 'cf'})
dsa = dsa.expand_dims("number") ## expand the dim so we can concat with the other data

## read the ensemble members
fname_pattern = "apcp_sfc_{0}00_p*.grib2".format(date)
dsb = xr.open_mfdataset(path_to_data+fname_pattern, engine='cfgrib', 
                        concat_dim="number", combine='nested',filter_by_keys={'dataType': 'pf'})

## concat into single ds
ds = xr.concat([dsa, dsb], dim='number', coords='minimal', compat='override')

## fix lons 
ds = ds.assign_coords({"longitude": (((ds.longitude + 180) % 360) - 180)}) # Convert DataArray longitude coordinates from 0-359 to -180-179

## subset to N. America [0, 70, 180, 295]
ds = ds.sel(latitude=slice(70, 0), longitude=slice(-179.5, -60.))

## convert precipitation to mm per hour
ts_3hr = pd.timedelta_range(start='0 day', periods=57, freq='3H')
ts_6hr = pd.timedelta_range(start='0 day', periods=29, freq='6H')
tp = ds.tp ## pull out tp
prec_3hr = tp.sel(step=ts_3hr[1::2]) ## grab only the 3hr values
tp2 = tp.diff(dim='step') ## calculate difference in precip
## the values for 6hr timesteps are correct, the values for 3hr timesteps are incorrect
prec_6hr = tp2.sel(step=ts_6hr[1:]) # grab only the 6hr values
new_prec = prec_3hr.combine_first(prec_6hr) # combine the correct 3hr values with the correct 6hr values
ds = ds.drop_vars(["tp"]) # get rid of old tp (accumulated variable)
ds = xr.merge([ds, new_prec]) # merge dataset with new tp


ds

In [None]:
## save data to netCDF file
print('Writing {0} to netCDF ....'.format(date))
path_to_data = '/expanse/lustre/scratch/dnash/temp_project/mclimate/{0}/'.format(variable)

fname = '{0}_{1}.nc'.format(date, variable) 
ds.load().to_netcdf(path=path_to_data + fname, mode = 'w', format='NETCDF4')

### COPY FROM LUSTRE TO MAIN SPACE
path_to_final_data = '/expanse/nfs/cw3e/cwp140/preprocessed/GEFSv12_reforecast/{0}/'.format(variable)
print('Copying preprocessed data...')
inname = path_to_data + fname
outname = path_to_final_data + fname
print('... {0} to {1}'.format(inname, outname))
shutil.copy(inname, outname) # copy file over to data folder

In [None]:
ds.tp.isel(latitude=12, longitude=200, number=4).plot()

In [None]:
######################################################################
# Filename:    preprocess_GEFSv12_reforecast.py
# Author:      Deanna Nash dnash@ucsd.edu
# Description: Script to take downloaded GEFSv12 reforecast u, v, and spfh data for each day, preprocess IVT data and save as single netCDF file
# https://registry.opendata.aws/noaa-gefs-reforecast/ (data link)
#
######################################################################

## import libraries
import os, sys
import yaml
import xarray as xr
import numpy as np

path_to_repo = '/home/dnash/repos/SEAK_AR_impacts/'
sys.path.append(path_to_repo+'modules')
import GEFSv12_funcs as gefs

# path_to_data = '/expanse/nfs/cw3e/cwp140/'
path_to_data = '/expanse/lustre/scratch/dnash/temp_project/'

config_file = 'config_2.yaml' # this is the config file name
job_info = 'job_492' # this is the job name

config = yaml.load(open(config_file), Loader=yaml.SafeLoader) # read the file
ddict = config[job_info] # pull the job info from the dict

year = ddict['year']
date = ddict['date']
variable = 'ivt' ## can be 'ivt', 'freezing_level', or 'uv1000'

for i, st in enumerate(range(0, 8, 8)):
    print(st, st+8)
    start = st
    stop = st+8
    
    if variable == 'ivt':
        print('Loading u, v, and q data ....')
        varname_lst = ['ugrd', 'vgrd', 'spfh']
        ds_lst = []
        for i, varname in enumerate(varname_lst):
            ds = gefs.read_and_regrid_prs_var(varname, date, year, start, stop)
            ds = ds.isel(step=slice(0, 8))
            ds_lst.append(ds)

    if variable == 'uv1000':
        print('Loading u and v data ....')
        varname_lst = ['ugrd_pres', 'vgrd_pres']
        ds_lst = []
        
        for i, varname in enumerate(varname_lst):
            ds = gefs.read_sfc_var(varname, date, year, start, stop)
            ds = ds.sel(isobaricInhPa=1000.)
            ds = ds.isel(step=slice(0, 8))
            ds_lst.append(ds)
        ds = xr.merge(ds_lst) # merge u, v, and q into single ds



In [None]:
ds_lst

In [None]:
import pandas as pd
from datetime import timedelta
import numpy as np
mon = 11
day = 17
## for each year between 2000 and 2019
date_lst = []
for i, yr in enumerate(range(2000, 2020)):
    ## get 45 days before date
    center_date = '{0}-{1}-{2}'.format(yr, mon, day)
    center_date = pd.to_datetime(center_date)
    start_date = center_date - timedelta(days=45)
    
    ## get 45 days after November 21
    end_date = center_date + timedelta(days=45)

    ## make a list of dates between start_date and end_date
    dates = pd.date_range(start_date, end_date, freq='1D')
    
    date_lst.append(dates)

final_lst = np.concatenate(date_lst)

In [None]:
## load in surface pressure
print('Loading surface pressure data ....')
ds_pres = gefs.read_sfc_var('pres_sfc', date, year, start, stop)
ds_lst.append(ds_pres)

ds = xr.merge(ds_lst) # merge u, v, and q into single ds
ds = ds.sel(isobaricInhPa=slice(300, 1000))
ds = ds.reindex(isobaricInhPa=ds.isobaricInhPa[::-1])

ds

In [None]:

## mask values below surface pressure
print('Masking values below surface ....')
varlst = ['q', 'u', 'v']
for i, varname in enumerate(varlst):
    ds[varname] = ds[varname].where(ds[varname].isobaricInhPa < ds.sp/100., drop=False)

## integrate to calculate IVT
print('Calculating IVT ....')
ds_IVT = gefs.calc_IVT_manual(ds) # calculate IVT
ds_IVT

In [None]:
start = ds_IVT.step.values[0].astype('timedelta64[h]')
stop = ds_IVT.step.values[-1].astype('timedelta64[h]')
start = int(start / np.timedelta64(1, 'h'))
stop = int(stop / np.timedelta64(1, 'h'))
print(start, stop)

In [None]:
# get info for saving file
start = ds_IVT.step.values[0].astype('timedelta64[h]')
stop = ds_IVT.step.values[-1].astype('timedelta64[h]')
start = int(start / np.timedelta64(1, 'h'))
stop = int(stop / np.timedelta64(1, 'h'))

## save IVT data to netCDF file
print('Writing {0} to netCDF ....'.format(date))
path_to_data = '/expanse/nfs/cw3e/cwp140/'
out_fname = path_to_data + 'preprocessed/GEFSv12_reforecast/ivt/{0}_ivt_F{1}_F{2}.nc'.format(date, start, stop) 
ds_IVT.load().to_netcdf(path=out_fname, mode = 'w', format='NETCDF4')

In [None]:
import xarray as xr
path_to_data = '/expanse/nfs/cw3e/cwp140/'
out_fname = path_to_data + 'preprocessed/GEFSv12_reforecast/ivt/20050624_ivt_F123_F144.nc'
ds = xr.open_dataset(out_fname)
ds

In [None]:
out_fname = path_to_data + 'preprocessed/GEFSv12_reforecast/ivt/20050623_ivt_F123_F144.nc'
ds2 = xr.open_dataset(out_fname)
ds2

In [None]:
## import libraries
import os, sys
import yaml
import xarray as xr
import numpy as np

path_to_repo = '/cw3e/mead/projects/cwp140/scratch/dnash/repos/SEAK_AR_impacts/'
sys.path.append(path_to_repo+'modules')
import GEFSv12_funcs as gefs

path_to_data = '/cw3e/mead/projects/cwp140/scratch/dnash/data/'

config_file = 'config_1.yaml' # this is the config file name
job_info = 'job_154' # this is the job name

config = yaml.load(open(config_file), Loader=yaml.SafeLoader) # read the file
ddict = config[job_info] # pull the job info from the dict

year = ddict['year']
date = ddict['date']
variable = 'ivt' ## can be 'ivt', 'freezing_level', or 'prec'

In [None]:
import glob
varname_lst = ['ugrd', 'vgrd', 'spfh']
path_to_data = '/cw3e/mead/projects/cwp140/scratch/dnash/data/downloads/GEFSv12_reforecast/{0}/'.format(date) 
    
# read data below 700 mb - 0.25 degree
fname_lst = glob.glob(path_to_data+"{0}_pres_abv700mb_{1}00_*.grib2".format(varname_lst[2], date))
print(fname_lst)

# fname_lst = glob.glob(path_to_data+"{0}_pres_{1}00*.grib2".format(varname_lst[2], date))

In [None]:
def preprocess(ds, start, stop):
    '''keep only the first 24 hours'''
    return ds.isel(step=slice(start, stop))
    
def fix_GEFSv12_open_mfdataset(fname, start, stop):
    list_of_files = glob.glob(fname)
    ds_lst = []
    for i, fi in enumerate(list_of_files):
        ds = xr.open_dataset(fi)
        if ds['time'].size > 1:
            ds = ds.isel(time=0)
        
        ds_lst.append(ds)

    ## get max step size
    step_size_lst = []
    for i, ds in enumerate(ds_lst):
        step_size_lst.append(ds.step.size)
    max_size = max(step_size_lst)
    max_index = step_size_lst.index(max(step_size_lst))
    max_time = ds_lst[max_index].valid_time.values
    max_ds = ds_lst[max_index]
    ## now loop through and fill ds where smaller than max size
    new_ds_lst = []
    for i, tmp in enumerate(ds_lst):
        if tmp.step.size < max_size:
            new_ds = tmp.reindex_like(max_ds, method='nearest', fill_value=np.nan)
            # new_ds = new_ds.drop_dims("valid_time")
            new_ds = new_ds.assign_coords(valid_time=("step", max_time))
            new_ds = preprocess(new_ds, start, stop)
            # new_ds = ds.expand_dims("valid_time").assign_coords(valid_time=max_time)
            # new_ds = ds.update({"valid_time": max_time})
            # ds1, new_ds = xr.align(ds_above[max_index], ds, join="left")
            new_ds_lst.append(new_ds)
    
        elif ds.step.size == max_size:
            ds = preprocess(ds, start, stop)
            new_ds_lst.append(ds)
        
    ds = xr.concat(new_ds_lst, dim="number")
    
    return ds

In [None]:
ds_above = fix_GEFSv12_open_mfdataset_test(fname_lst, 72, 80)
ds_above 