In [8]:
import os
from dask.distributed import Client
from dask.distributed import LocalCluster
############### Dask cluster specific ###############
clusterType = "local" #or SLURM dask cluster or local cluster 
platform = 'ws' #By default numWorkers = 4 if you run it on workstation
numThreads = 1
numWorkers = 4
numCores = 1
mem='4GB'
processes = False 
##################data analysis specification#################################
varname='tas'
datadir = '/data_cmip6/CMIP6/HighResMIP/NOAA-GFDL/GFDL-CM4C192/highresSST-future/r1i1p1f1/3hr/'+varname+'/gr3/v20180701/'
dashPort = ':1988'
if (platform == "PPAN"):
    try:
        logdir = os.getenv('TMPDIR')
    except:
        sys.exit("Please check the platform settings and try again.")
else:
    logdir = '/local2/home/a1r/logs/'
    processes = True 
        


In [9]:
from sklearn.externals.joblib import parallel_backend
from sklearn.datasets import make_blobs
from sklearn.cluster import DBSCAN

In [10]:
X, y = make_blobs(n_samples = 150000, n_features = 2, centers = 2, cluster_std = 1.9)
model = DBSCAN(eps = 0.5, min_samples = 20)
%time model.fit(X)

CPU times: user 6 s, sys: 638 ms, total: 6.64 s
Wall time: 6.61 s


DBSCAN(algorithm='auto', eps=0.5, leaf_size=30, metric='euclidean',
    metric_params=None, min_samples=20, n_jobs=None, p=None)

In [11]:
#Instantiate Dask client
if (clusterType == "local"):
    from dask.distributed import LocalCluster
    try: 
        numWorkers
    except NameError:
        cluster = LocalCluster(silence_logs=False,processes=processes,dashboard_address=dashPort,local_directory=logdir)
    else:
        cluster = LocalCluster(n_workers=numWorkers,silence_logs=False,processes=processes,threads_per_worker=numThreads,dashboard_address=dashPort,local_directory=logdir)

else:
    from dask_jobqueue import SLURMCluster
    scheduler_options = {}
    scheduler_options["dashboard_address"] = dashPort
    cluster = SLURMCluster(queue='batch',memory=mem,project='gfdl_f',cores=numCores,walltime='2:60:00',
                                   scheduler_options=scheduler_options,log_directory='"'+logdir+'"',local_directory='"'+logdir+'"') 
    print("Cluster spun",cluster)

client = Client(cluster)    


distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:37197
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:1988
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:46471'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:40990'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:35993'
distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:44066'
distributed.scheduler - INFO - Register tcp://127.0.0.1:43935
distributed.scheduler - INFO - Register tcp://127.0.0.1:36445
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:43935
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register tcp://127.0.0.1:38056
distributed.scheduler - INFO - Register tcp://127.0.0.1:44961
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36445
distributed.core - INFO - Star

In [4]:
client 

0,1
Client  Scheduler: tcp://127.0.0.1:45462  Dashboard: http://127.0.0.1:1988/status,Cluster  Workers: 4  Cores: 4  Memory: 16.54 GB


In [None]:
#cluster.scale(1)
cluster.adapt(minimum=1,maximum=4)

##Test with DASK

In [12]:
from joblib  import parallel_backend,parallel
X, y = make_blobs(n_samples = 150000, n_features = 2, centers = 2, cluster_std = 1.9)
model = DBSCAN(eps = 0.5, min_samples = 20,n_jobs=-1)
with parallel_backend('dask'):
    %time model.fit(X)

CPU times: user 8.73 s, sys: 563 ms, total: 9.29 s
Wall time: 3.92 s


In [None]:
client

In [7]:
cluster.close()
client.close()

distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:44906'
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:43788'
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:42372'
distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:35149'
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:46813
distributed.core - INFO - Removing comms to tcp://127.0.0.1:46813
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:41504
distributed.core - INFO - Removing comms to tcp://127.0.0.1:41504
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:37325
distributed.core - INFO - Removing comms to tcp://127.0.0.1:37325
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:41241
distributed.core - INFO - Removing comms to tcp://127.0.0.1:41241
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
