In [None]:
import pyart
#imports
import os # Code for dealing with the file system
import numpy as np
from matplotlib import pyplot as plt
from dask.distributed import Client, progress, metrics, LocalCluster
from netCDF4 import num2date
import tarfile, json, shutil

from time import strftime, sleep

import pwd
import grp


%matplotlib inline


In [None]:
# nexrad HAS011340903

In [2]:
def manage_tarfile(path_and_file, 
                   experiment_location='/lustre/or-hydra/cades-arm/proj-shared/scanning_collab'):
    def examine(fh_like):
        
        radar = pyart.io.read(fh_like)
        time_start = num2date(radar.time['data'][0], radar.time['units'])
        time_end = num2date(radar.time['data'][-1], radar.time['units'])
        stype = radar.scan_type
        nsweeps = radar.nsweeps
        tgates = float(radar.ngates*radar.nrays)
        zdat = radar.fields['reflectivity']['data']
        z0 = float(len(np.where(zdat > 0.)[0]))/tgates
        z10 = float(len(np.where(zdat > 10.)[0]))/tgates
        z40 = float(len(np.where(zdat > 40.)[0]))/tgates
        rdict = {'time_start' : time_start,
                'time_end' : time_end,
                 'scan_type' : stype,
                 'nsweeps' : nsweeps,
                 'z0' : z0,
                 'z10' : z10,
                 'z40' : z40,
                'expr' : radar.metadata['sigmet_task_name'].lower().strip().decode("utf-8")}

        return rdict

    def site_from_name(name):
        fullname = name.split('.')[0]
        site = fullname[-2::]
        return site

    def file_formatter(stime, site, scanmode, base, expr):
        #base/year/monthday
        
        mday = stime.strftime('%m%d')
        odir = os.path.join(base,
                            expr.lower(),
                            scanmode,
                            stime.strftime('%Y'),
                            mday)
        fname1 = 'sgpxsapr' + scanmode + site + stime.strftime('.%Y%m%d.%H%M%S')
        return odir, fname1
    
    uid = pwd.getpwnam("scollis").pw_uid
    gid = grp.getgrnam("users").gr_gid
    
    top_level = os.path.split(experiment_location)[0]
    tarobj = tarfile.open(path_and_file)
    site = site_from_name(path_and_file)
    members = tarobj.getmembers()
    status = []
    for member in members:
        try:
            radar_info = examine(tarobj.extractfile(member))
            odir_radars, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   experiment_location,
                                                   radar_info['expr'])

            odir_json, file_name_begin = file_formatter(radar_info['time_start'], 
                                                   site, 
                                                   radar_info['scan_type'],
                                                   os.path.join(experiment_location, 'summary'),
                                                   radar_info['expr'])

            try:
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)
                    #os.chown(odir_radars, uid, gid)
                    #os.chmod(odir_radars, 777)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)
                    #os.chown(odir_json, uid, gid)
                    #os.chmod(odir_json, 777)
            except: #just wait and try again..
                sleep(1)
                if not os.path.exists(odir_radars):
                    os.makedirs(odir_radars)
                    #os.chmod(odir_radars, 777)
                    #os.chown(odir_radars, uid, gid)

                if not os.path.exists(odir_json):
                    os.makedirs(odir_json)
                    #os.chown(odir_json, uid, gid)
                    #os.chmod(odir_json, 777)

            fullpath = os.path.join(odir_radars, file_name_begin+'.iris')

            json_dict = {}
            strconv_keys = ['z0', 'z10', 'z40', 'nsweeps']
            for key in strconv_keys:
                json_dict.update({key : str(radar_info[key])})

            json_dict.update({'start_time' : radar_info['time_start'].strftime('%Y%m%d-%H:%M:%S'),
                             'end_time' : radar_info['time_end'].strftime('%Y%m%d-%H:%M:%S')})

            json_dict.update({'original_name' : member.name,
                             'full_path' : fullpath})

            r = json.dumps(json_dict)
            loaded_r = json.loads(r)
            with open(os.path.join(odir_json, file_name_begin+'.json'), 'w') as outfile:
                json.dump(json_dict, outfile)
            
            #os.chown(os.path.join(odir_json, file_name_begin+'.json'), uid, gid)
            #os.chmod(os.path.join(odir_json, file_name_begin+'.json'), 777)

            #The actuall writing
            fh = tarobj.extractfile(member)

            shutil.copyfileobj(fh, open(fullpath, 'wb'))
            fh.close()
            #os.chown(fullpath, uid, gid)
            #os.chmod(fullpath, 777)
            status.append(member.name+':OK')
        except IndexError:
            status.append(member.name+':NotOK')

        
    return status

In [3]:
client = Client('arm-jupyter.ornl.gov:5555')

In [4]:
client

0,1
Client  Scheduler: tcp://arm-jupyter.ornl.gov:5555  Dashboard: http://arm-jupyter.ornl.gov:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [5]:
experiment_dir = '/lustre/or-hydra/cades-arm/proj-shared/scanning4/'
stage_dir = '/lustre/or-hydra/cades-arm/proj-shared/data_transfer/sgp/'
formatted_subdir = 'formatted'
unformatted_subdir = 'sgpxsaprsecI5.00'

unformatted_dir = os.path.join(stage_dir, unformatted_subdir)
formatted_dir = os.path.join(experiment_dir, formatted_subdir)

all_files = os.listdir(unformatted_dir)
all_fqdn = [os.path.join(unformatted_dir, this_file) for this_file in all_files]

In [6]:
future = client.map(manage_tarfile, all_fqdn)

In [7]:
progress(future)

VBox()

In [153]:
my_data = client.gather(future)

In [154]:
my_data = client.gather(future)
flat_list = [item for sublist in my_data for item in sublist]
print(len(flat_list))
succeeded = 0
failed = 0
ff = []
SE_good = []
for item in flat_list:
    if 'NotOK' in item:
        failed +=  1
        ff.append(item)
    else:
        succeeded += 1
        if 'XSE'in item:
            SE_good.append(item)

print(succeeded)
print(failed)

3686
3686
0
