In [None]:
# PIPS_sim_to_nc.ipynb: creates intermediate netCDF files from model output and PIPS data for use in the
# Parsivel simulator and other analyses/comparisons
%load_ext autoreload
%autoreload 2
import glob
import sys
import os
import inspect
from datetime import datetime, timedelta
import pytz as pytz
import numpy as np
import numpy.random as random
import pandas as pd
import xarray as xr
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.ticker as ticker
import matplotlib.dates as dates
from mpl_toolkits.axes_grid1 import ImageGrid, make_axes_locatable, host_subplot
from mpl_toolkits.basemap import Basemap
import pyPIPS.utils as utils
import pyPIPS.thermolib as thermo
import pyPIPS.DSDlib as dsd
import pyPIPS.disdrometer_module as dis
import pyPIPS.plotmodule as PIPSplot
import pyPIPS.simulator as sim
import pyPIPS.radarmodule as pyPIPSradar
import pyPIPS.PIPS as pips
import pyPIPS.io as pips_io
from pyCRMtools.modules import plotmodule as plotmod
from pyCRMtools.modules import utils as CRMutils
from pyCRMtools.pycaps import arps_read
from pyCRMtools.pycaps import pycaps_fields
from pyCRMtools.pycaps import calvars_radar as radar
from joblib import Parallel, delayed
%matplotlib inline

In [None]:
# Define dictionaries, keyed by case date (i.e. '060509', '060709', '060909', '033116'), to store parameters related
# to NEXRAD radar data, disdrometer data, and model output, respectively

# Case we are looking at right now. Should only have to change this up here and then execute all the cells below
# to generate the appropriate analysis
casedate = '033116'

# Import the file containing the dictionaries needed to gather the radar, disdrometer, and model data.
sys.path.append('/Users/dawson29/Dropbox/Projects/VORTEXSE/vortexse_enkf_dsd_study/configs/2016_IOP3')

from PIPSsim_1km_dicts import *

init_radar_dict = init_radar_dict[casedate]
init_dis_dict = init_dis_dict[casedate]
init_model_dict = init_model_dict[casedate]

In [None]:
# Set up the model information
modelname = 'ARPS'
# From desired start and end times (UTC) get a range of datetimes and corresponding range of times in
# seconds since model initial time
timestamp_model_init = init_model_dict['timestamp_model_init']  # Start time of model corresponding to 0 s
datetime_model_init = datetime.strptime(timestamp_model_init, '%Y%m%d%H%M%S')

timestamp_start = init_model_dict['timestamp_model_start']  # Start time of desired time window
timestamp_stop = init_model_dict['timestamp_model_stop']  # Stop time of desired time window
datetime_start = datetime.strptime(timestamp_start, '%Y%m%d%H%M%S')
datetime_stop = datetime.strptime(timestamp_stop, '%Y%m%d%H%M%S')
tintv = init_model_dict['model_dt']  # Interval in seconds for model output
tintv_mean = init_model_dict['model_dt_mean'] # Interval in seconds for ensemble mean analysis

datetime_range = CRMutils.get_datetime_range(datetime_start, datetime_stop, tintv)
trange = CRMutils.modeltimes_from_datetimes(datetime_range, datetime_start=datetime_model_init)

datetime_range_mean = CRMutils.get_datetime_range(datetime_start, datetime_stop, tintv_mean)
trange_mean = CRMutils.modeltimes_from_datetimes(datetime_range_mean, datetime_start=datetime_model_init)

#basedir = '/Volumes/scr_fast/Projects/VORTEXSE/simulations/ARPS/2016_IOP3/3DVAR/1km0331163DVARCA00005min180_3km030015min540'
filetype = 'history'
fileformat = init_model_dict['fileformat']
expname = '1km453x453_newse'
basedir = init_model_dict['basedirname']
num_members = 36
nproc_x = 15
nproc_y = 6

# Tell the arps_read module what the processor numbers are.
# Yes, I know this is a bad way to do this through globals. I'll fix it eventually.
# arps_read.nproc_x = nproc_x
# arps_read.nproc_y = nproc_y

In [None]:
# Load the ARPS grid
# Get file path for grdbas file (note that call to read_grid handles the reading of the individual patches)
# If the grdbas file doesn't exist, fall back to a history file
member = 1 # 0 is for ensemble mean
cycle = 'posterior'
member_dir, member_prefix = sim.get_ARPS_member_dir_and_prefix(member, cycle)
member_absdir = os.path.join(basedir, expname, member_dir)
trailer = ''
grdbas_path = arps_read.get_file_path(member_absdir, member_prefix, fileformat, filetype='grdbas')

patch_x = 1
patch_y = 1
grdbas_path_test = arps_read.add_patch_number(grdbas_path, patch_x, patch_y)

if not os.path.exists(grdbas_path_test):
    print("grdbas file doesn't exist, trying a history file!")
    grdbas_path = arps_read.get_file_path(member_absdir, member_prefix, fileformat, time=model_trange_sec[0], 
                                          filetype='history')
    grdbas_path_test = arps_read.add_patch_number(grdbas_path, patch_x, patch_y)

    print(grdbas_path_test)
    print(os.path.exists(grdbas_path_test))

# Read in grid information
grid_dict = arps_read.readarpsgrid(grdbas_path, nproc_x=nproc_x, nproc_y=nproc_y)
print(grid_dict.keys())

# Get map projection information and create a Basemap instance
# TODO: convert to use cartopy!

ctrlat, ctrlon, trulat1, trulat2, trulon = arps_read.readarpsmap(grdbas_path, nproc_x=nproc_x, nproc_y=nproc_y)

dx = grid_dict['dx']
dy = grid_dict['dy']
nx = grid_dict['nx']
ny = grid_dict['ny']

mapwidth = nx * dx
mapheight = ny * dy

bgmap = Basemap(projection='lcc', width=mapwidth, height=mapheight, lat_1=trulat1,
                lat_2=trulat2, lat_0=ctrlat, lon_0=ctrlon, resolution='h',
                area_thresh=10., suppress_ticks=False)

In [None]:
# Find coordinates of PIPS stations in the model
# Put the basemap instance into the grid_dict
grid_dict['bgmap'] = bgmap
dis_dict = sim.get_dis_locs_arps_real_grid(init_dis_dict, grid_dict)
coord_array = np.array(dis_dict['dmodcrdlist'])
dxlist = [i[0] for i in dis_dict['dmodloclist']]
dylist = [i[1] for i in dis_dict['dmodloclist']]
xc = grid_dict['xs']
yc = grid_dict['ys']
xe = grid_dict['x']
ye = grid_dict['y']
# Set model grid limits to center on the disdrometer locations 

Dxmin = min(dxlist)
Dxmax = max(dxlist)
Dymin = min(dylist)
Dymax = max(dylist)
gridlims = [Dxmin - 25000., Dxmax + 25000., Dymin - 25000., Dymax + 25000.]

ibgn = np.searchsorted(xc, gridlims[0])
iend = np.searchsorted(xc, gridlims[1]) + 1
jbgn = np.searchsorted(yc, gridlims[2])
jend = np.searchsorted(yc, gridlims[3]) + 1

print(gridlims)
print(ibgn, iend, jbgn, jend)

In [None]:
# Read in PIPS data
print(dis_dict.keys())

dis_dir = dis_dict['dis_dir']
dis_filenames = dis_dict['disfilenames']
dis_names = dis_dict['dis_names']

conv_df_dict = {}
parsivel_df_dict = {}
vd_matrix_da_dict = {}

for dis_filename, dis_name in zip(dis_filenames, dis_names):
    dis_filepath = os.path.join(dis_dir, dis_filename)
    print("Reading {}".format(dis_filepath))
    conv_df, parsivel_df, vd_matrix_da = pips_io.read_PIPS(dis_filepath, starttimestamp=timestamp_start,
                                                           stoptimestamp=timestamp_stop)
    # Calculate some additional thermodynamic quantities and add to the conventional data DataFrame
    conv_df = pips.calc_thermo(conv_df)
    conv_df_dict[dis_name] = conv_df
    parsivel_df_dict[dis_name] = parsivel_df
    vd_matrix_da_dict[dis_name] = vd_matrix_da

In [None]:
# Read in needed fields from the ARPS history dumps
# Just for one member now. Later will parallelize this and put it in a script.
# and interpolate them to the PIPS locations, building up a time series.

def read_ensemble(f, member_list, f_args=None, f_kwargs=None, iterate_over='member_list',
                  process_parallel=True, n_jobs=5, verbose=0):
    """
    Reads multiple runs either in serial or parallel (using joblib). TODO. This is duplicating some
    functionality written by T. Supinie for pycaps (util.run_concurrently). Maybe just use that
    instead?

    Parameters
    ----------
    f : callable
        A function used to read in data from a particular run. It must have a positional argument
        that will be iterated over, the name of which is given by "iterate_over"
    member_list : list
        List of member numbers to read
    args : tuple
        List of positional arguments to pass to f
    iterate_over : str
        The name of the positional argument in args to iterate over
    process_parallel : bool
        Whether to process the runs in parallel or not (using joblib)
    n_jobs : int
        Number of jobs to run in parallel
    verbose : int
        verbosity level passed on to joblib.Parallel
    kwargs : dict
        List of keyword arguments to pass to f
    Returns
    -------
    list
        A list of updated run dictionaries

    """
    # The following avoids this gotcha:
    # https://pythonconquerstheuniverse.wordpress.com/2012/02/15/mutable-default-arguments/
    if f_args is None:
        f_args = ()
    if f_kwargs is None:
        f_kwargs = {}
    # Get a list of the function argument names, and find the one that we want to iterate over
    f_argnames = inspect.getargspec(f).args
    try:
        arg_to_iterate = f_argnames.index(iterate_over)
    except ValueError:
        print("{} not found in the function argument list! Stopping!".format(iterate_over))
        return

    # Define a wrapper function to replace the value of the iterated argument in f with a new one
    # Not sure if this is the most elegant way to do this, but it works and lets us use the
    # joblib.Parallel function with list comprehension for the "val" argument
    def g(f, val):
        new_args = tuple(val if i == arg_to_iterate else f_args[i] for i in range(len(f_args)))
        return f(*new_args, **f_kwargs)

    if process_parallel:
        runs = Parallel(n_jobs=n_jobs, verbose=verbose)(delayed(g)(f, val) for val in member_list)
    else:
        runs = []
        for val in member_list:
            runs.append(g(f, val))
            print("Reading files for member {:d}".format(val))
    return runs


def read_member_data(basedir, expname, member, cycle, fileformat, time_range, varnames, filetype='history', 
                     ibgn=None, iend=None, jbgn=None, jend=None, klvls=[1], nproc_x=1, nproc_y=1):
    print("Loading member #{:d}".format(member))
    # Get member run prefix
    member_dir, member_prefix = sim.get_ARPS_member_dir_and_prefix(member, cycle)
    member_absdir = os.path.join(basedir, expname, member_dir)
    vardict_list = []
    # all_files_exist_list = []
    for time in time_range:
        print("Loading time ", time) 
        # Get member file path
        filepath = arps_read.get_file_path(member_absdir, member_prefix, fileformat, time=time, 
                                           filetype='history')
#         all_files_exist = True
#         for ip in range(1, nproc_x + 1):
#             for jp in range(1, nproc_y + 1):
#                 filenamepatch = filepath + "_%03d" % ip + "%03d" % jp
#                 if not os.path.exists(filenamepatch):
#                     print("File {} does not exist!".format(filenamepatch))
#                     all_files_exist = False
#         all_files_exist_list.append(all_files_exist)
        # Get variables from file
        vardict = arps_read.read_hdfvars(filepath, varnames, ibgn=ibgn, jbgn=jbgn, iend=iend, jend=jend,
                                         klvls=klvls, nproc_x=nproc_x, nproc_y=nproc_y)
        vardict_list.append(vardict)
    
    return vardict_list  #, all_files_exist_list


print(ibgn, iend, jbgn, jend)
print(init_model_dict.keys())
varnames = ['p', 'pt', 'qv', 'u', 'v', 'qr', 'nr', 'zr']
member_list = range(1, 2)
klvls=[1]

# Pack the args and kwargs for the call to the parallel reader wrapper function
args = (basedir, expname, member, cycle, fileformat, trange, varnames)
kwargs = {'filetype': filetype, 'ibgn': ibgn, 'iend': iend, 'jbgn': jbgn, 'jend': jend, 'klvls': klvls,
          'nproc_x': nproc_x, 'nproc_y': nproc_y}

# Set up the parallel read of all the model members
runs_list = read_ensemble(read_member_data, member_list, f_args=args, f_kwargs=kwargs, iterate_over='member',
                          process_parallel=False, n_jobs=1, verbose=10)

In [1]:
for vardict_list in runs_list:
    xc_patch = xc[ibgn:iend+1]
    yc_patch = yc[jbgn:jend+1]
    coord_dict = {'time': model_dict['datetime_range'],
                  'yc': ('yc', yc_patch),
                  'xc': ('xc', xc_patch)}
    # First, create a dict of lists out of the above list of dicts
    vardict_combined = CRMutils.make_dict_of_lists(vardict_list)
    # Set things up for creating the xr Dataset
    for varname, var in vardict_combined.items():
        var_arr = np.array(var).T.squeeze()
        var_arr = np.rollaxis(var_arr, 2, 0)
        # Trim variables down to just the patch we want to work with
        var_arr_patch = var_arr[:, jbgn:jend+1, ibgn:iend+1]
        vardict_combined[varname] = (['time', 'yc', 'xc'], var_arr_patch)

    # Create an xarray Dataset out of the variable dictionary
    var_ds = xr.Dataset(vardict_combined, coords=coord_dict)

    # Compute raw model DSD paramters and add them to the model Dataset
    rhor = 1000.
    cr = np.pi / 6. * rhor
    var_ds['rho'] = thermo.calrho(var_ds['p'], var_ds['pt'], var_ds['qv'])

    # Shape parameter
    var_ds['alphar'] = dsd.solve_alpha(var_ds['rho'], cr, var_ds['qr'], var_ds['nr'], var_ds['zr'])
    # var_ds['alphar'] = var_ds['alphar'].interpolate_na()
    # Intercept parameter
    var_ds['N0r'] = dsd.calc_N0_gamma(var_ds['rho'], var_ds['qr'], var_ds['nr'], cr, var_ds['alphar'])
    # Slope parameter
    var_ds['lamdar'] = dsd.calc_lamda_gamma(var_ds['rho'], var_ds['qr'], var_ds['nr'], cr, var_ds['alphar'])

    # Try computing ND and logND here

    mid_diameters_da = vd_matrix_da_dict['PIPS1A']['diameter']
    # Broadcast DSD parameter DataArrays to get everyone on the same dimensional page
    mid_diameters_da, N0r_model_da, lamdar_model_da, alphar_model_da = \
        xr.broadcast(mid_diameters_da, var_ds['N0r'], var_ds['lamdar'] , var_ds['alphar'])

    # Transpose these DataArrays to get time as the first dimension
    mid_diameters_da = mid_diameters_da.transpose('time', 'diameter_bin', 'yc', 'xc')
    N0r_model_da = N0r_model_da.transpose('time', 'diameter_bin', 'yc', 'xc')
    lamdar_model_da = lamdar_model_da.transpose('time', 'diameter_bin', 'yc', 'xc')
    alphar_model_da = alphar_model_da.transpose('time', 'diameter_bin', 'yc', 'xc')

    ND_model = dsd.calc_binned_DSD_from_params(N0r_model_da, lamdar_model_da, alphar_model_da, mid_diameters_da)
    ND_model = ND_model.fillna(0.0)
    logND_model = np.log10(ND_model)
    logND_model = logND_model.where(logND_model > -np.inf)
    #logND_model = logND_model.fillna(0.0)
    #logND_model = logND_model.where(logND_model > -1.0)

    var_ds['ND'] = ND_model
    var_ds['logND'] = logND_model

    print(var_ds)
    # Save Dataset to nc file
    savedir = '/depot/dawson29/data/Projects/vortexse_enkf_dsd_study/data/nc'
    if not os.path.exists(savedir):
        os.makedirs(savedir)
    filename = "{}_fields.nc".format(member_prefix)
    filepath = os.path.join(savedir, filename)
    var_ds.to_netcdf(filepath)

NameError: name 'runs_list' is not defined