In [1]:
from dask_jobqueue import PBSCluster
from dask.distributed import Client, metrics, wait
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
import dask.bag as db
import numpy as np
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, progress
import os
from matplotlib import use
use('agg')
os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
import pyart
import netCDF4
import tarfile
import tempfile
import shutil
from netCDF4 import num2date
import json
from time import strftime, sleep
import os
%matplotlib inline
import argparse
import datetime
import glob
import importlib
import subprocess
import time

from cmac import (cmac, get_cmac_values, quicklooks,
                  get_sounding_times, get_sounding_file_name)


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119



  if 'red' in spec:
  if 'red' in spec:
  return f(*args, **kwds)


In [2]:
def run_cmac_and_plotting(
    radar_file_path, sounding_times, config, clutter_file, out_radar, image_directory, bad_directory, meta_append,
    overwrite, dd_lobes):
    """ For dask we need the radar plotting routines all in one subroutine. """
    cmac_config = get_cmac_values(config)

    try:
        radar = pyart.io.read(radar_file_path)
    except TypeError:
        if bad_directory is None:
            path = os.path.expanduser('~') + '/' + 'type_error_radars/'
        else:
            path = bad_directory
        print(radar_file_path + ' has encountered TypeError!')
        if not os.path.exists(path):
            os.makedirs(path)
            subprocess.call('chmod -R g+rw ' + path, shell=True)
        shutil.move(radar_file_path, path)
        return

    radar_start_date = netCDF4.num2date(radar.time['data'][0],
                                        radar.time['units'])
    year_str = "%04d" % radar_start_date.year
    month_str = "%02d" % radar_start_date.month
    day_str = "%02d" % radar_start_date.day
    hour_str = "%02d" % radar_start_date.hour
    minute_str = "%02d" % radar_start_date.minute
    second_str = "%02d" % radar_start_date.second

    save_name = cmac_config['save_name']
    if out_radar is None:
        the_path = (os.path.expanduser('~') + '/'+ year_str + month_str
                    + second_str + '/')
    else:
        the_path = (out_radar + '/' + year_str +  month_str
                    + day_str)
    file_name = (the_path + '/' + save_name + '.'
                 + year_str + month_str + day_str + '.' + hour_str
                 + minute_str + second_str + '.nc')

    # If overwrite is False, checks to see if the cmac_radar file
    # already exists. If so, CMAC 2.0 is not used on the original radar file.
    if overwrite is False and os.path.exists(file_name) is True:
        print(file_name + ' already exists.')
        return

    if not os.path.exists(the_path):
        os.makedirs(the_path)
        subprocess.call('chmod -R g+rw ' + the_path, shell=True)

    # Load clutter files.
    if clutter_file is not None:
        clutter_file_path = clutter_file
        if verbose:
            print('## Loading clutter file ' + clutter_file_path)
        clutter = pyart.io.read(clutter_file_path)
        if verbose:
            print('## Reading dictionary...')
        clutter_field_dict = clutter.fields['xsapr_clutter']
        if verbose:
            print('## Adding clutter field..')
        radar.add_field(
            'xsapr_clutter', clutter_field_dict, replace_existing=True)
        del clutter

    # Retrieve closest sonde in time to the time of the radar file.
    sonde_name = cmac_config['sonde_name']
    closest_time = min(
        sounding_times, key=lambda d: abs(d - radar_start_date))
    sonde_file = get_sounding_file_name(
        sonde_path, sonde_name, closest_time)
    sonde = netCDF4.Dataset(sonde_file)

    try:
        # Running the cmac code to produce a cmac_radar object.
        cmac_radar = cmac(radar, sonde, config, flipped_velocity=False,
                          meta_append=meta_append,
                          verbose=verbose)
    except ValueError:
        del radar
        sonde.close()
        return

    # Free up some memory.
    del radar
    sonde.close()

    # Produce the cmac_radar file from the cmac_radar object.
    pyart.io.write_cfradial(file_name, cmac_radar)
    print('## A CMAC radar object has been created at ' + file_name)

    # Providing the image_directory and checking if it already exists.
    img_directory = (image_directory + '/' + year_str + month_str
                     + day_str + '.' + hour_str + minute_str + second_str)
    if not os.path.exists(img_directory):
        os.makedirs(img_directory)
        subprocess.call('chmod -R g+rw ' + img_directory, shell=True)

    # Producing all the cmac_radar quicklooks.
    quicklooks(cmac_radar, config,
               image_directory=img_directory,
               dd_lobes=dd_lobes)

    # Delete the cmac_radar object and move on to the next radar file.
    del cmac_radar
    return

In [3]:
radar_path = '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/'
sonde_path = '/lustre/or-hydra/cades-arm/proj-shared/CACTI-sounding-testing/'
config = 'cacti_csapr2_ppi'

In [4]:
cmac_config = get_cmac_values(config)
sonde_name = cmac_config['sonde_name']
if os.path.isdir(radar_path):
    radar_files = glob.glob(radar_path + '*')
elif os.path.isfile(radar_path):
    with open(radar_path) as f:
        radar_files = f.readlines()
    radar_files = [x.strip() for x in radar_files]
else:
    raise IOError('The specified radar path does not exist!')

sounding_times = get_sounding_times(sonde_path, sonde_name)

# Get dates of radar files from the file name.
#radar_times = []
#for file_name in radar_files:
 #   where_x = file_name.find(x_compass)
  #  radar_times.append(
   #     datetime.datetime.strptime(file_name[where_x+3:where_x+15],
    #                               '%y%m%d%H%M%S'))

In [5]:
radar_files

['/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181010.154503.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181005.151506.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181006.170003.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181010.063003.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181008.203003.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181009.151503.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181005.131503.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181002.110005.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-ingested/corcsapr2cfrppiM1.a1.20181003.180003.nc',
 '/lustre/or-hydra/cades-arm/proj-shared/CACTI-csapr2-i

In [6]:
sounding_times

[datetime.datetime(2018, 10, 2, 16, 1),
 datetime.datetime(2018, 10, 14, 16, 1),
 datetime.datetime(2018, 10, 9, 20, 0),
 datetime.datetime(2018, 10, 2, 12, 43),
 datetime.datetime(2018, 9, 28, 12, 22),
 datetime.datetime(2018, 10, 5, 16, 0),
 datetime.datetime(2018, 10, 5, 12, 0),
 datetime.datetime(2018, 10, 1, 12, 50),
 datetime.datetime(2018, 10, 1, 11, 59),
 datetime.datetime(2018, 10, 17, 0, 0),
 datetime.datetime(2018, 10, 1, 19, 59),
 datetime.datetime(2018, 10, 14, 12, 0),
 datetime.datetime(2018, 10, 3, 23, 57),
 datetime.datetime(2018, 10, 10, 11, 59),
 datetime.datetime(2018, 10, 10, 0, 0),
 datetime.datetime(2018, 10, 10, 19, 59),
 datetime.datetime(2018, 9, 28, 12, 56),
 datetime.datetime(2018, 10, 1, 15, 58),
 datetime.datetime(2018, 10, 9, 16, 0),
 datetime.datetime(2018, 10, 9, 12, 0),
 datetime.datetime(2018, 10, 8, 0, 0),
 datetime.datetime(2018, 10, 8, 19, 59),
 datetime.datetime(2018, 10, 15, 0, 0),
 datetime.datetime(2018, 10, 16, 20, 0),
 datetime.datetime(2018, 

In [7]:
bad_directory = None
out_radar = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1'
image_directory = '/lustre/or-hydra/cades-arm/proj-shared/cacticsapr2cmacppi.c1.png'
meta_append = 'config'
overwrite = True
clutter_file = None
verbose = False
dd_lobes = False

In [8]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster1 = PBSCluster(processes=16, cores=16, walltime='6:00:00', queue='batch')
cluster1.scale(2*16)         # Ask for ten workers
client1 = Client(cluster1)  # Connect this local process to remote workers

  return f(*args, **kwds)
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [9]:
cluster1

VBox(children=(HTML(value='<h2>PBSCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .d…

In [10]:
client1

0,1
Client  Scheduler: tcp://10.23.216.82:45154  Dashboard: http://10.23.216.82:57364/status,Cluster  Workers: 32  Cores: 32  Memory: 540.16 GB


In [11]:
def com():
    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    from mpl_toolkits.basemap import Basemap
    return os.environ['PROJ_LIB']
client1.run(com)

{'tcp://10.23.216.184:33689': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:34727': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:37648': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:39140': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:39796': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:40689': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:41185': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:42933': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:43478': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:48689': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:50137': '/home/zsherman/anaconda3/envs/cmac_env/share/proj/',
 'tcp://10.23.216.184:51351': '/home/zsherman/anaconda3/envs/cmac_env/share/

In [12]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_cmac_and_plotting(
    x, sounding_times=sounding_times, config='cacti_csapr2_ppi',
    clutter_file=clutter_file, out_radar=out_radar, image_directory=image_directory,
    bad_directory=bad_directory, meta_append=meta_append,
    overwrite=overwrite, dd_lobes=dd_lobes)
futures1 = the_bag.map(the_function)

In [13]:
#start computation in the background
futures1.compute()

KeyboardInterrupt: 

In [14]:
cluster1.stop_all_jobs()

In [None]:
radar_path = '/lustre/or-hydra/cades-arm/proj-shared/cmac_sgpxsaprppiI5.00/201810/'
sonde_path = '/lustre/or-hydra/cades-arm/proj-shared/sgpsondewnpnC1.b1/'
config = 'xsapr_i5_ppi'

In [None]:
cmac_config = get_cmac_values(config)

x_compass = cmac_config['x_compass']
if os.path.isdir(radar_path):
    radar_files = glob.glob(radar_path + '/**/*' + x_compass
                            + '*', recursive=True)
elif os.path.isfile(radar_path):
    with open(radar_path) as f:
        radar_files = f.readlines()
    radar_files = [x.strip() for x in radar_files]
else:
    raise IOError('The specified radar path does not exist!')

sounding_times = get_sounding_times(sonde_path)

# Get dates of radar files from the file name.
radar_times = []
for file_name in radar_files:
    where_x = file_name.find(x_compass)
    radar_times.append(
        datetime.datetime.strptime(file_name[where_x+3:where_x+15],
                                   '%y%m%d%H%M%S'))

In [None]:
bad_directory = None
out_radar = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprcmacsurI5.c1'
image_directory = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprcmacsurI5.c1.png'
meta_append = 'config'
overwrite = False
clutter_file = None
verbose = False
dd_lobes = False

In [None]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster2 = PBSCluster(processes=16, cores=16, walltime='4:00:00')
cluster2.scale(16*16)         # Ask for ten workers
client2 = Client(cluster2)  # Connect this local process to remote workers

In [None]:
print(cluster2.job_script())

In [None]:
cluster2

In [None]:
client2

In [None]:
def com():
    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    from mpl_toolkits.basemap import Basemap
    return os.environ['PROJ_LIB']
client2.run(com)

In [None]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_cmac_and_plotting(
    x, sounding_times=sounding_times, config='xsapr_i5_ppi',
    clutter_file=clutter_file, out_radar=out_radar, image_directory=image_directory,
    bad_directory=bad_directory, meta_append=meta_append,
    overwrite=overwrite, dd_lobes=dd_lobes)
futures2 = the_bag.map(the_function)

In [None]:
#start computation in the background
futures2.compute()# watch progress

In [None]:
cluster2.stop_all_jobs()

In [None]:
radar_path = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprsecI6.00/201808/'
sonde_path = '/lustre/or-hydra/cades-arm/proj-shared/sgpsondewnpnC1.b1/'
config = 'xsapr_i6_sec'

In [None]:
cmac_config = get_cmac_values(config)

x_compass = cmac_config['x_compass']
if os.path.isdir(radar_path):
    radar_files = glob.glob(radar_path + '/**/*' + x_compass
                            + '*', recursive=True)
elif os.path.isfile(radar_path):
    with open(radar_path) as f:
        radar_files = f.readlines()
    radar_files = [x.strip() for x in radar_files]
else:
    raise IOError('The specified radar path does not exist!')

sounding_times = get_sounding_times(sonde_path)

# Get dates of radar files from the file name.
radar_times = []
for file_name in radar_files:
    where_x = file_name.find(x_compass)
    radar_times.append(
        datetime.datetime.strptime(file_name[where_x+3:where_x+15],
                                   '%y%m%d%H%M%S'))

In [None]:
bad_directory = None
out_radar = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprcmacsecI6.c1'
image_directory = '/lustre/or-hydra/cades-arm/proj-shared/sgpxsaprcmacsecI6.c1.png'
meta_append = 'config'
overwrite = False
clutter_file = None
verbose = False
dd_lobes = False

In [None]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster3 = PBSCluster(processes=36, cores=36, walltime='09:00:00',
                      scheduler_file='/home/zsherman/scheduler.json')
cluster3.scale(10)         # Ask for ten workers
client3 = Client(cluster3)  # Connect this local process to remote workers

In [None]:
cluster3

In [None]:
client3

In [None]:
def com():
    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    from mpl_toolkits.basemap import Basemap
    return os.environ['PROJ_LIB']
client3.run(com)

In [None]:
the_bag = db.from_sequence(radar_files)
the_function = lambda x: run_cmac_and_plotting(
    x, sounding_times=sounding_times, config='xsapr_i6_sec',
    clutter_file=clutter_file, out_radar=out_radar, image_directory=image_directory,
    bad_directory=bad_directory, meta_append=meta_append,
    overwrite=overwrite, dd_lobes=dd_lobes)
futures3 = the_bag.map(the_function)

In [None]:
#start computation in the background
futures3.compute()# watch progress

In [None]:
#print('## Opened dask cluster with ' + str(n_cores) + ' cores')

#the_bag = db.from_sequence(radar_files)
#the_function = lambda x: run_cmac_and_plotting(
#    x, sounding_times, args)
#result = the_bag.map(the_function).compute()

## Do radar object loading on compute nodes.
if image_directory is None:
    print('## Quicklooks have been saved in your home directory.')
else:
    print('## Quicklooks have been saved to ' + image_directory)

subprocess.call('chmod -R g+rw ' + image_directory, shell=True)
subprocess.call('chmod -R g+rw ' + out_radar, shell=True)
print('##')
print('## CMAC 2.0 Completed in ' + str(time.time() - bt) + ' s')
client.shutdown()

In [None]:
def manage_tarfile(path_and_file, 
                   experiment_location='/lustre/or-hydra/cades-arm/proj-shared/dask_test/formatted'):
    def examine(fh_like):
        
        radar = pyart.io.read(fh_like)
        time_start = num2date(radar.time['data'][0], radar.time['units'])
        time_end = num2date(radar.time['data'][-1], radar.time['units'])
        stype = radar.scan_type
        nsweeps = radar.nsweeps
        tgates = float(radar.ngates*radar.nrays)
        zdat = radar.fields['reflectivity']['data']
        z0 = float(len(np.where(zdat > 0.)[0]))/tgates
        z10 = float(len(np.where(zdat > 10.)[0]))/tgates
        z40 = float(len(np.where(zdat > 40.)[0]))/tgates
        rdict = {'time_start': time_start,
                 'time_end': time_end,
                 'scan_type': stype,
                 'nsweeps': nsweeps,
                 'z0': z0,
                 'z10': z10,
                 'z40': z40,
                 'expr': radar.metadata['sigmet_task_name'].lower().strip().decode("utf-8")}

        return rdict

    def site_from_name(name):
        fullname = name.split('.')[0]
        site = fullname[-2::]
        return site

    def file_formatter(stime, site, scanmode, base, expr):
        #base/year/monthday
        
        mday = stime.strftime('%m%d')
        odir = os.path.join(base,
                            expr.lower(),
                            scanmode,
                            stime.strftime('%Y'),
                            mday)
        fname1 = 'sgpxsapr' + scanmode + site + stime.strftime('.%Y%m%d.%H%M%S')
        return odir, fname1


    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    top_level = os.path.split(experiment_location)[0]
    tarobj = tarfile.open(path_and_file)
    site = site_from_name(path_and_file)
    members = tarobj.getmembers()
    status = []
    for member in members:
        try:
            radar_info = examine(tarobj.extractfile(member))
            odir_radars, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   experiment_location,
                                                   radar_info['expr'])

            odir_json, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   os.path.join(top_level, 'summary'),
                                                   radar_info['expr'])

            try:
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)
            except: #just wait and try again..
                sleep(1)
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)

            fullpath = os.path.join(odir_radars, file_name_begin+'.iris')

            json_dict = {}
            strconv_keys = ['z0', 'z10', 'z40', 'nsweeps']
            for key in strconv_keys:
                json_dict.update({key : str(radar_info[key])})

            json_dict.update({'start_time' : radar_info['time_start'].strftime('%Y%m%d-%H:%M:%S'),
                             'end_time' : radar_info['time_end'].strftime('%Y%m%d-%H:%M:%S')})

            json_dict.update({'original_name' : member.name,
                             'full_path' : fullpath})

            r = json.dumps(json_dict)
            loaded_r = json.loads(r)
            with open(os.path.join(odir_json, file_name_begin+'.json'), 'w') as outfile:
                json.dump(json_dict, outfile)

            #The actuall writing
            fh = tarobj.extractfile(member)

            shutil.copyfileobj(fh, open(fullpath, 'wb'))
            fh.close()
            status.append(member.name+':OK')
        except IndexError:
            status.append(member.name+':NotOK')
    return status

In [None]:
flist = manage_tarfile(all_fqdn[0])

In [None]:
#cluster = PBSCluster(name='dask-worker', memory='270GB', cores=36, processes=6, interface='ib0', queue='high_mem', project='arm',
#                    walltime='00:30:00')#, job-extra=['-W group_list=cades-arm'])
cluster = PBSCluster(processes=18)
cluster.scale(4)         # Ask for ten workers
client = Client(cluster)  # Connect this local process to remote workers

```jobqueue:
  pbs:
    name: dask-worker
    cores: 36
    memory: 270GB
    processes: 6
    interface: ib0
    local-directory: $localscratch
    queue: high_mem # Can also select batch or gpu_ssd
    project: arm
    walltime: 00:30:00 #Adjust this to job size
    job-extra: ['-W group_list=cades-arm']
```

In [None]:
cluster

In [None]:
client

In [None]:
def com():
    os.environ['PROJ_LIB'] = '/home/zsherman/anaconda3/envs/cmac_env/share/proj/'
    from mpl_toolkits.basemap import Basemap
    return os.environ['PROJ_LIB']
client.run(com)

In [None]:
future = client.map(manage_tarfile, all_fqdn)

In [None]:
progress(future)

In [None]:
my_data = client.gather(future)
flat_list = [item for sublist in my_data for item in sublist]
print(len(flat_list))
succeeded = 0
failed = 0
ff = []
SE_good = []
for item in flat_list:
    if 'NotOK' in item:
        failed +=  1
        ff.append(item)
    else:
        succeeded += 1
        if 'XSE'in item:
            SE_good.append(item)

print(succeeded)
print(failed)