In [1]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client, progress, metrics
# wait for jobs to arrive, depending on the queue, this may take some time
import dask.array as da
import dask.bag as db
import numpy as np
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
import os
import pyart
import tarfile
import tempfile
import shutil
from netCDF4 import num2date
import json
from time import strftime, sleep

%matplotlib inline


## You are using the Python ARM Radar Toolkit (Py-ART), an open source
## library for working with weather radar data. Py-ART is partly
## supported by the U.S. Department of Energy as part of the Atmospheric
## Radiation Measurement (ARM) Climate Research Facility, an Office of
## Science user facility.
##
## If you use this software to prepare a publication, please cite:
##
##     JJ Helmus and SM Collis, JORS 2016, doi: 10.5334/jors.119



In [2]:
experiment_dir = '/lcrc/group/earthscience/radar/xsapr_sgp/'
formatted_subdir = 'scanning_experiment'
unformatted_subdir = 'scanning_experiment_raw'

unformatted_dir = os.path.join(experiment_dir, unformatted_subdir)
formatted_dir = os.path.join(experiment_dir, formatted_subdir)

all_files = os.listdir(unformatted_dir)
all_fqdn = [os.path.join(unformatted_dir, this_file) for this_file in all_files]


In [11]:
def manage_tarfile(path_and_file, 
                   experiment_location='/lcrc/group/earthscience/radar/xsapr_sgp/scanning_experiment'):
    def examine(fh_like):
        radar = pyart.io.read(fh_like)
        time_start = num2date(radar.time['data'][0], radar.time['units'])
        time_end = num2date(radar.time['data'][-1], radar.time['units'])
        stype = radar.scan_type
        nsweeps = radar.nsweeps
        tgates = float(radar.ngates*radar.nrays)
        zdat = radar.fields['reflectivity']['data']
        z0 = float(len(np.where(zdat > 0.)[0]))/tgates
        z10 = float(len(np.where(zdat > 10.)[0]))/tgates
        z40 = float(len(np.where(zdat > 40.)[0]))/tgates
        rdict = {'time_start' : time_start,
                'time_end' : time_end,
                 'scan_type' : stype,
                 'nsweeps' : nsweeps,
                 'z0' : z0,
                 'z10' : z10,
                 'z40' : z40,
                'expr' : radar.metadata['sigmet_task_name'].lower().strip().decode("utf-8")}

        return rdict

    def site_from_name(name):
        fullname = name.split('.')[0]
        site = fullname[-2::]
        return site

    def file_formatter(stime, site, scanmode, base, expr):
        #base/year/monthday
        
        mday = stime.strftime('%m%d')
        odir = os.path.join(base,
                            expr.lower(),
                            scanmode,
                            stime.strftime('%Y'),
                            mday)
        fname1 = 'sgpxsapr' + scanmode + site + stime.strftime('.%Y%m%d.%H%M%S')
        return odir, fname1

    top_level = os.path.split(experiment_location)[0]
    tarobj = tarfile.open(path_and_file)
    site = site_from_name(path_and_file)
    members = tarobj.getmembers()
    status = []
    for member in members:
        try:
            radar_info = examine(tarobj.extractfile(member))
            odir_radars, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   experiment_location,
                                                   radar_info['expr'])

            odir_json, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   os.path.join(top_level, 'summary'),
                                                   radar_info['expr'])

            try:
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)
            except: #just wait and try again..
                sleep(1)
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)

            fullpath = os.path.join(odir_radars, file_name_begin+'.iris')

            json_dict = {}
            strconv_keys = ['z0', 'z10', 'z40', 'nsweeps']
            for key in strconv_keys:
                json_dict.update({key : str(radar_info[key])})

            json_dict.update({'start_time' : radar_info['time_start'].strftime('%Y%m%d-%H:%M:%S'),
                             'end_time' : radar_info['time_end'].strftime('%Y%m%d-%H:%M:%S')})

            json_dict.update({'original_name' : member.name,
                             'full_path' : fullpath})

            r = json.dumps(json_dict)
            loaded_r = json.loads(r)
            with open(os.path.join(odir_json, file_name_begin+'.json'), 'w') as outfile:
                json.dump(json_dict, outfile)

            #The actuall writing
            fh = tarobj.extractfile(member)

            shutil.copyfileobj(fh, open(fullpath, 'wb'))
            fh.close()
            status.append(member.name+':OK')
        except IndexError:
            status.append(member.name+':NotOK')

        
    return status
       


In [12]:
flist = manage_tarfile('/lcrc/group/earthscience/radar/xsapr_sgp/scanning_experiment_raw/sgpxsaprsecI4.00.20180701.000010.raw.tar')


In [13]:
flist

['XSE180701000010.RAWNJ0L:OK',
 'XSE180701000512.RAWNJ0M:OK',
 'XSE180701001012.RAWNJ0N:OK',
 'XSE180701001512.RAWNJ0P:OK',
 'XSE180701002014.RAWNJ0R:OK',
 'XSE180701002514.RAWNJ0S:OK',
 'XSE180701003013.RAWNJ0T:OK',
 'XSE180701003513.RAWNJ0U:OK',
 'XSE180701004014.RAWNJ0V:OK',
 'XSE180701004515.RAWNJ0W:OK',
 'XSE180701005014.RAWNJ0X:OK',
 'XSE180701005513.RAWNJ0Y:OK']

In [14]:
cluster = SLURMCluster(cores=36, project='acpc', walltime='3:00:00', 
                       job_cpu=36, memory='128GB', diagnostics_port=8787)
cluster.scale(5)         # Ask for ten workers
client = Client(cluster)  # Connect this local process to remote workers





In [17]:
cluster

VBox(children=(HTML(value='<h2>SLURMCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    …

In [16]:
client

0,1
Client  Scheduler: tcp://10.70.128.7:43766  Dashboard: http://10.70.128.7:8787/status,Cluster  Workers: 5  Cores: 180  Memory: 640.00 GB


In [18]:
future = client.map(manage_tarfile, all_fqdn)



In [19]:
progress(future)

VBox()

object.__init__() takes no parameters
This is deprecated in traitlets 4.2.This error will be raised in a future release of traitlets.
  super(Widget, self).__init__(**kwargs)
object.__init__() takes no parameters
This is deprecated in traitlets 4.2.This error will be raised in a future release of traitlets.
  super(Widget, self).__init__(**kwargs)


In [20]:
my_data = client.gather(future)


In [24]:
flat_list = [item for sublist in my_data for item in sublist]

In [34]:
print(len(flat_list))
succeeded = 0
failed = 0
ff = []
SE_good = []
for item in flat_list:
    if 'NotOK' in item:
        failed +=  1
        ff.append(item)
    else:
        succeeded += 1
        if 'XSE'in item:
            SE_good.append(item)

print(succeeded)
print(failed)

115098
114637
461


In [31]:
ff.sort()
print(ff)

['XSE180623134537.RAWGD8M:NotOK', 'XSE180623135154.RAWGD8N:NotOK', 'XSE180623135812.RAWGD8P:NotOK', 'XSE180623140429.RAWGD8R:NotOK', 'XSE180623141046.RAWGD8S:NotOK', 'XSE180623141703.RAWGD8T:NotOK', 'XSE180623142320.RAWGD8U:NotOK', 'XSE180623142936.RAWGD8V:NotOK', 'XSE180623143554.RAWGD8W:NotOK', 'XSE180623144212.RAWGD8X:NotOK', 'XSE180623144829.RAWGD8Y:NotOK', 'XSE180623145446.RAWGD8Z:NotOK', 'XSE180623150103.RAWGD90:NotOK', 'XSE180623150720.RAWGD91:NotOK', 'XSE180623151337.RAWGD92:NotOK', 'XSE180623151954.RAWGD93:NotOK', 'XSE180623152612.RAWGD94:NotOK', 'XSE180623153227.RAWGD95:NotOK', 'XSE180623153845.RAWGD96:NotOK', 'XSE180623154502.RAWGD97:NotOK', 'XSE180623155119.RAWGD98:NotOK', 'XSE180623155736.RAWGD99:NotOK', 'XSE180623160353.RAWGD9A:NotOK', 'XSE180623161011.RAWGD9B:NotOK', 'XSE180623161629.RAWGD9C:NotOK', 'XSE180623162245.RAWGD9D:NotOK', 'XSE180623162902.RAWGD9E:NotOK', 'XSE180623163519.RAWGD9F:NotOK', 'XSE180623164136.RAWGD9G:NotOK', 'XSE180623164753.RAWGD9H:NotOK', 'XSE18062

In [35]:
print(SE_good)

['sgpxsaprI4.00.20180408.201001.mnt.XSE180408191437.RAWUE7E.maint:OK', 'sgpxsaprI4.00.20180408.201001.mnt.XSE180408192737.RAWUE7J.maint:OK', 'sgpxsaprI4.00.20180408.201001.mnt.XSE180408194038.RAWUE7N.maint:OK', 'sgpxsaprI4.00.20180408.201001.mnt.XSE180408195337.RAWUE7T.maint:OK', 'sgpxsaprI4.00.20180408.201001.mnt.XSE180408200637.RAWUE7X.maint:OK', 'XSE180805090607.RAWHUB1:OK', 'XSE180805091623.RAWHUB5:OK', 'XSE180805092622.RAWHUB9:OK', 'XSE180805093612.RAWHUBD:OK', 'XSE180805094615.RAWHUBH:OK', 'XSE180805095616.RAWHUBM:OK', 'XSE180803140709.RAWHTCS:OK', 'XSE180803141700.RAWHTCW:OK', 'XSE180803142652.RAWHTD0:OK', 'XSE180803143645.RAWHTD4:OK', 'XSE180803144627.RAWHTD8:OK', 'XSE180803145623.RAWHTDC:OK', 'XSE180601070928.RAWFM6R:OK', 'XSE180601071912.RAWFM6V:OK', 'XSE180601073050.RAWFM6Z:OK', 'XSE180601074031.RAWFM73:OK', 'XSE180601075155.RAWFM77:OK', 'XSE180801130002.RAWHSA3:OK', 'XSE180801131002.RAWHSA7:OK', 'XSE180801132002.RAWHSAB:OK', 'XSE180801133002.RAWHSAF:OK', 'XSE180801134002.RA

In [33]:
print(ff[0])
print(ff[-1])

XSE180623134537.RAWGD8M:NotOK
XSE180716125314.RAWHNYE:NotOK


In [50]:
import datetime
from datetime import timedelta
tstart = datetime.datetime(2018, 6, 23)
strlist = []
for i in range(30):
    dt = timedelta(days=i)
    datetm = (tstart + dt).strftime('%y%m%d')
    strlist.append(datetm)


for datestris in  strlist:
    nbad = 0
    for itis in ff:
        if datestris in itis:
            nbad += 1
    print(datestris, ': ', nbad )
    
    

180623 :  98
180624 :  229
180625 :  127
180626 :  0
180627 :  0
180628 :  0
180629 :  0
180630 :  0
180701 :  0
180702 :  0
180703 :  0
180704 :  0
180705 :  0
180706 :  0
180707 :  0
180708 :  0
180709 :  0
180710 :  0
180711 :  0
180712 :  0
180713 :  0
180714 :  0
180715 :  0
180716 :  7
180717 :  0
180718 :  0
180719 :  0
180720 :  0
180721 :  0
180722 :  0


### Error for later examination

```---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-146-9752318f2f1f> in <module>()
----> 1 flist = manage_tarfile('/lcrc/group/earthscience/radar/xsapr_sgp/scanning_experiment_raw/sgpxsaprsecI4.00.20180625.090057.raw.tar')

<ipython-input-145-fc335f3d3308> in manage_tarfile(path_and_file, experiment_location)
     44     for member in members:
     45         #try:
---> 46         radar_info = examine(tarobj.extractfile(member))
     47         odir_radars, file_name_begin = file_formatter(radar_info['time_start'], 
     48                                                site,

<ipython-input-145-fc335f3d3308> in examine(fh_like)
      2                    experiment_location='/lcrc/group/earthscience/radar/xsapr_sgp/scanning_experiment'):
      3     def examine(fh_like):
----> 4         radar = pyart.io.read(fh_like)
      5         time_start = num2date(radar.time['data'][0], radar.time['units'])
      6         time_end = num2date(radar.time['data'][-1], radar.time['units'])

/homes/scollis/anaconda/envs/dask/lib/python3.6/site-packages/pyart/io/auto_read.py in read(filename, use_rsl, **kwargs)
    132             return read_rsl(filename, **kwargs)
    133         else:
--> 134             return read_sigmet(filename, **kwargs)
    135     if filetype == "UF":
    136         if use_rsl:

/homes/scollis/anaconda/envs/dask/lib/python3.6/site-packages/pyart/io/sigmet.py in read_sigmet(filename, field_names, additional_metadata, file_field_names, exclude_fields, time_ordered, full_xhdr, noaa_hh_hdr, debug, ignore_xhdr, ignore_sweep_start_ms, **kwargs)
    428         type_mask = task_config['task_dsp_info']['current_data_type_mask']
    429         htype = type_mask['extended_header_type']
--> 430         if full_xhdr and htype == 2 and sigmet_data['XHDR_FULL'][0, 3] == 136:
    431             noaa_hh_hdr = True
    432         else:

IndexError: index 0 is out of bounds for axis 0 with size 0```

In [None]:
from datetime import timedelta