In [14]:
from dask_gateway import Gateway
from dask_gateway import GatewayCluster
from dask.distributed import Client

import socket
import pprint
import numpy
import pystare

import os
import sys

import boto3

In [2]:
def get_s3_keys(bucket, s3_client, prefix = ''):
    """
    Generate the keys in an S3 bucket.

    :param bucket: Name of the S3 bucket.
    :param prefix: Only fetch keys that start with this prefix (optional).
    """
    
    kwargs = {'Bucket': bucket}

    if isinstance(prefix, str):
        kwargs['Prefix'] = prefix

    done = False
    while not done:
        resp = s3_client.list_objects_v2(**kwargs)
        try:
            for obj in resp['Contents']:
                key = obj['Key']
                if key.startswith(prefix):
                    # print('key: ',key)
                    yield key
        except KeyError:
            print('Empty response from s3 for bucket %s with prefix %s'%(bucket,prefix))
            break

        try:
            kwargs['ContinuationToken'] = resp['NextContinuationToken']
        except KeyError:
            break
            


In [3]:
gateway = Gateway(); gateway

Gateway<http://10.100.222.95:8000/services/dask-gateway>

In [7]:
# gateway.close()

In [4]:
options = gateway.cluster_options(); options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

In [5]:
cluster = gateway.new_cluster(cluster_options=options); cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [6]:
clusters = gateway.list_clusters(); clusters

[ClusterReport<name=daskhub.58d2ddab00e2474ab92770b30e05bce9, status=RUNNING>]

In [12]:
# gateway_uri = "gateway://traefik-dhub-dask-gateway.daskhub:80/daskhub."+gateway.list_clusters()[0].name.split('.')[1]; gateway_uri

In [7]:
# client = cluster.get_client(gateway_uri);
client = cluster.get_client();


+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| numpy   | 1.19.2 | 1.19.1    | 1.19.1  |
+---------+--------+-----------+---------+


In [41]:
client

0,1
Client  Scheduler: gateway://traefik-dhub-dask-gateway.daskhub:80/daskhub.b0e0145223444e15a42df978c2d9216a  Dashboard: /services/dask-gateway/clusters/daskhub.b0e0145223444e15a42df978c2d9216a/status,Cluster  Workers: 1  Cores: 2  Memory: 4.29 GB


In [8]:
import dask.array as da
a = da.random.normal(size=(1000,1000),chunks=(50,50))
a.mean().compute()

0.0002658398139231805

In [9]:
client.nthreads()

{'tls://192.168.26.198:44633': 2}

In [10]:
# Dask helpers
def slam(client,action,data,partition_factor=1.5,dbg=0):
    np = sum(client.nthreads().values())
    if dbg>0: # a
        print('slam: np = %i'%np)
    shard_bounds = [int(i*len(data)/(1.0*partition_factor*np)) for i in range(int(partition_factor*np))] 
    if shard_bounds[-1] != len(data):
        if dbg>0: # b
            print('a sb[-1]: ',shard_bounds[-1],len(data))
        shard_bounds = shard_bounds + [len(data)]
        if dbg>0: # c
            print('sb: ',shard_bounds)
    data_shards = [data[shard_bounds[i]:shard_bounds[i+1]] for i in range(len(shard_bounds)-1)]
    if dbg>0: # d
        print('ds len:        ',len(data_shards))
        print('ds item len:   ',len(data_shards[0]))
        print('ds type:       ',type(data_shards[0]))
        try:
            print('ds dtype:      ',data_shards[0].dtype)
        except:
            pass
    big_future = client.scatter(data_shards)
    results    = client.map(action,big_future)
    return results


In [38]:
def action(data_shard):
    ret = []
    for d in data_shard:
        if d == 0:
            ret.append((d,'touched',os.getcwd(),os.environ))
        if d == 1:
            ret.append((d,'touched',os.getcwd(),os.listdir()))
        if d == 2:
            ret.append((d,'touched',socket.gethostname()))
        if d == 3:
            filename='.bashrc'
            with open(filename, 'r') as file:
                data = file.read()
            ret.append((d,'read .bashrc',data))
        if d == 4:
            ret.append((d,os.environ['DASK_PARENT'],os.environ['DASK_GATEWAY_WORKER_NAME']))
        if d == 5:
            # ret.append((d,pystare.from_latlon([30],[30],4)))
            pass
        if d == 6:
            ret.append((d,os.environ))
            
        # Here's what we really need to work.
        if d == 7:
            client = boto3.client('s3')
            keys = get_s3_keys('daskhub-data',client,prefix='')
            ret.append(d,list(keys)[0:10])
            
    return ret

In [49]:
if True:
    s3_client = boto3.client('s3')
    keys = get_s3_keys('daskhub-data',s3_client,prefix='')
    pprint.pprint(list(keys)[0:10])

['GESDISC/MERRA2/2019/MERRA2_400.tavg1_2d_slv_Nx.20191203.nc4',
 'GESDISC/MERRA2/2019/MERRA2_400.tavg1_2d_slv_Nx.20191222.nc4',
 'GESDISC/MERRA2/2019/MERRA2_400.tavg1_2d_slv_Nx.20191226.nc4',
 'MODAPS/CATALOGS/MOD05_L2/2019/356/catalogue.sqlite',
 'MODAPS/MOD05/MOD05_L2.A2019336.2315.061.2019337071952.hdf',
 'MODAPS/MOD05/MOD05_L2.A2019336.2315.061.2019337071952_stare.nc',
 'MODAPS/MOD05/MOD05_L2.A2019336.2320.061.2019337072008.hdf',
 'MODAPS/MOD05/MOD05_L2.A2019336.2320.061.2019337072008_stare.nc',
 'MODAPS/MOD05/MOD05_L2.A2019336.2325.061.2019337072403.hdf',
 'MODAPS/MOD05/MOD05_L2.A2019336.2325.061.2019337072403_stare.nc']


In [29]:
data = numpy.arange(10)
data[0] = 2
data[1] = 0
data[2:] = 3
data

array([2, 0, 3, 3, 3, 3, 3, 3, 3, 3])

In [39]:
results = slam(client,action,data)

In [40]:
results

[<Future: finished, type: builtins.list, key: action-f9be749f42cb134583faad75e4445135>,
 <Future: finished, type: builtins.list, key: action-cb6d4baae054af18a4e71cb0c9906cf1>,
 <Future: finished, type: builtins.list, key: action-6b77c139cbbd777ac6187cbdffa43d4b>]

In [41]:
client.gather(results)
# print(results[0].result()[0][2])

[[(2, 'touched', 'dask-worker-58d2ddab00e2474ab92770b30e05bce9-pvmpz'),
  (0,
   'touched',
   '/home/jovyan',
   environ{'CONDA_SHLVL': '1',
           'LC_ALL': 'C.UTF-8',
           'CONDA_EXE': '/srv/conda/bin/conda',
           'PROXY_API_PORT': 'tcp://10.100.182.208:8001',
           'DASK_GATEWAY_CLUSTER_NAME': 'daskhub.58d2ddab00e2474ab92770b30e05bce9',
           'DASK_GATEWAY_API_TOKEN': '/etc/dask-credentials/api-token',
           'PROXY_API_PORT_8001_TCP_PORT': '8001',
           'LANG': 'C.UTF-8',
           'API_DHUB_DASK_GATEWAY_PORT_8000_TCP_PORT': '8000',
           'HUB_PORT_8081_TCP_PROTO': 'tcp',
           'DASK_GATEWAY_API_URL': 'http://api-dhub-dask-gateway.daskhub:8000/api',
           'HUB_SERVICE_HOST': '10.100.33.175',
           'HOSTNAME': 'dask-worker-58d2ddab00e2474ab92770b30e05bce9-pvmpz',
           'PROXY_HTTP_PORT_8000_TCP_ADDR': '10.100.222.95',
           'PANGEO_ENV': 'base-notebook',
           'DASK_GATEWAY_TLS_CERT': '/etc/dask-credentials/dask

In [8]:
cluster.shutdown()

In [9]:
cluster.close()

In [30]:
gateway.list_clusters()

[ClusterReport<name=daskhub.b2d9f9ce32914d2f9cf4af83932d3f66, status=RUNNING>,
 ClusterReport<name=daskhub.b7c48cdfbb234366a318f580154f05a6, status=RUNNING>]

In [32]:
clusters


[ClusterReport<name=daskhub.b2d9f9ce32914d2f9cf4af83932d3f66, status=RUNNING>,
 ClusterReport<name=daskhub.b7c48cdfbb234366a318f580154f05a6, status=RUNNING>]

In [35]:
for i in clusters:
    print(i); cluster = gateway.connect(i.name); cluster.shutdown()

ClusterReport<name=daskhub.b2d9f9ce32914d2f9cf4af83932d3f66, status=RUNNING>
ClusterReport<name=daskhub.b7c48cdfbb234366a318f580154f05a6, status=RUNNING>


Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 1417, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/ssl.py", line 1139, in do_handshake
    self._sslobj.do_handshake()
OSError: [Errno 0] Error

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 711, in _handle_events
    self._handle_read()
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 1498, in _handle_read
    self._do_ssl_handshake()
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 1449, in _do_ssl_handshake
    return self.close(exc_info=err)
  File "/home/jovyan/users_c

In [43]:
cluster = gateway.connect(clusters[0].name)

In [44]:
cluster.shutdown(); cluster.close()

In [10]:
gateway.close()

In [45]:
gateway.list_clusters()

[]

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 1417, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/ssl.py", line 1139, in do_handshake
    self._sslobj.do_handshake()
OSError: [Errno 0] Error

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", line 711, in _handle_events
    self._handle_read()
  File "/home/jovyan/users_conda_envs/stare-1/lib/python3.7/site-packages/tornado/iostream.py", 

In [None]:
gateway