In [1]:
import h5py
import numpy as np
import pandas as pd
from astropy.time import Time
import os
import vaex
import time
import pickle
import dask

In [2]:
from dask_gateway import Gateway
gateway = Gateway()

  from distributed.utils import LoopRunner, format_bytes


In [3]:
options = gateway.cluster_options()
options.worker_specification = '8CPU, 32GB'

In [4]:
cluster = gateway.new_cluster(options)
cluster.scale(16)

In [5]:
client = cluster.get_client(set_as_default=True)

In [7]:
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: /services/dask-gateway/clusters/prod.164905564264470eb0701c9580d6ba60/status,


Downloader Code
---------------

This code is for workers to download data files, and read them into memory

In [8]:
def gps2dyr(time):
    """Converts GPS time to datetime (can also do decimal years)."""
    return Time(time, format='gps').datetime

In [9]:
def read_atl06(fname, cycle):
    """Read one ATL06 file and output 6 reduced files. 
    
    Extract variables of interest and separate the ATL06 file 
    into each beam (ground track) and ascending/descending orbits.
    """

    # Each beam is a group
    group = ['/gt1l', '/gt1r', '/gt2l', '/gt2r', '/gt3l', '/gt3r']
    
    # Loop trough beams
    dataframes = []
    
    with h5py.File(fname, 'r') as fi:
        # Check which ground tracks are present in this file
        gtracks = sorted(['/'+k for k in fi.keys() if k.startswith('gt')])
    
        for k, g in enumerate(gtracks): 
            # Read in data for a single beam
            data = {}
            # this is our unique key (per beam)
            data['id'] = fi[g+'/land_ice_segments/segment_id'][:]
            npts = len(data['id'])
            # Load vars into memory (include as many as you want)
            data['lat'] = fi[g+'/land_ice_segments/latitude'][:]
            data['lon'] = fi[g+'/land_ice_segments/longitude'][:]
            
            data['slope_y'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dy'][:]
            data['slope_x'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dx'][:]
            data['slope_x_sigma'] = fi[g+'/land_ice_segments/fit_statistics/dh_fit_dx_sigma'][:]

            data['h_li'] = fi[g+'/land_ice_segments/h_li'][:]
            data['s_li'] = fi[g+'/land_ice_segments/h_li_sigma'][:]
            data['q_flag'] = fi[g+'/land_ice_segments/atl06_quality_summary'][:]
            data['s_fg'] = fi[g+'/land_ice_segments/fit_statistics/signal_selection_source'][:]
            data['snr'] = fi[g+'/land_ice_segments/fit_statistics/snr_significance'][:]
            data['h_rb'] = fi[g+'/land_ice_segments/fit_statistics/h_robust_sprd'][:]
            data['bsnow_conf'] = fi[g+'/land_ice_segments/geophysical/bsnow_conf'][:]
            
            data['cloud_flg_asr'] = fi[g+'/land_ice_segments/geophysical/cloud_flg_asr'][:]
            data['cloud_flg_atm'] = fi[g+'/land_ice_segments/geophysical/cloud_flg_atm'][:]
            data['msw_flag'] = fi[g+'/land_ice_segments/geophysical/msw_flag'][:]
            data['fbsnow_h'] = fi[g+'/land_ice_segments/geophysical/bsnow_h'][:]
            data['bsnow_od'] = fi[g+'/land_ice_segments/geophysical/bsnow_od'][:]
            data['layer_flag'] = fi[g+'/land_ice_segments/geophysical/layer_flag'][:]
            data['bckgrd'] = fi[g+'/land_ice_segments/geophysical/bckgrd'][:]
            data['e_bckgrd'] = fi[g+'/land_ice_segments/geophysical/e_bckgrd'][:]
            data['n_fit_photons'] = fi[g+'/land_ice_segments/fit_statistics/n_fit_photons'][:]
            data['w_surface_window_final'] = fi[g+'/land_ice_segments/fit_statistics/w_surface_window_final'][:]
            
            delta_t = fi[g+'/land_ice_segments/delta_time'][:]     # for time conversion
            t_ref = fi['/ancillary_data/atlas_sdp_gps_epoch'][:]     # single value
            
            # Time in GPS seconds (secs since Jan 5, 1980)
            t_gps = t_ref + delta_t

            # GPS sec to datetime
            data['t_year'] = gps2dyr(t_gps)
            data['cycle'] = np.ones(npts, dtype=np.int8)*cycle
            data['track'] = np.repeat(g[1:], npts)
            
            # Make a dataframe out of our data dict and store it.

            dataframes.append(vaex.from_dict(data))
        if len(dataframes) > 0:
            result = vaex.concat(dataframes).to_pandas_df()
            return result

In [10]:
def get_thing(thing, user='grigsbye', passw='Namku666', delayMax=199):
    delay = np.random.randint(0, delayMax)
    time.sleep(delay*0.01)
    preamble = "wget --http-user=" + user + " --http-password=" + passw 
    middle = ' --load-cookies mycookies.txt --no-check-certificate --auth-no-challenge -r --reject "index.html*" -np -e robots=off --show-progress=off --cut-dirs=6 ' 
    cmmd = preamble + middle + thing + " -P /tmp/"
    os.system(cmmd)
    lfilepath = '/tmp/n5eil01u.ecs.nsidc.org' + thing[-40:]
    res = read_atl06(lfilepath, cycle=int(3))
    return res

In [11]:
with open("test.txt", "rb") as fp:
    iceFiles11 = pickle.load(fp)

In [13]:
reslist0 = []
#for ice2 in masterList[::2]:
for ice0 in iceFiles11[::2]:
    reslist0.append(client.submit(get_thing, ice0, retries=1000))

In [14]:
dask.sizeof.getsizeof(reslist0[6].result())

100529848

In [17]:
%%time
#stuff1 = vaex.concat(list(filter(None, swarm.gather(reslist0, errors='skip', direct=True))))
stuff1 = list(filter(None, client.gather(reslist0, errors='skip', direct=True)))

CommClosedError: in <TLS (closed) ConnectionPool.gather local=tls://192.168.57.107:57296 remote=gateway://traefik-prod-dask-gateway.prod:80/prod.164905564264470eb0701c9580d6ba60>: Stream is closed