In [1]:
#import dask
#dask.config.config

## Running Dask on Summit via Ipython Terminal
You will need 2 terminals and a browser for this lab
___
#### In terminal 1 
1. login to summit
2. activate conda environment 
` module load ibm-wml-ce/1.7.0-1
conda activate wmlce17-ornl`

3. launch ipython <br>
ipython


### In terminal 2
1. forward ssh ports from login node to your laptop.  Here XXXX should be an unused port on the system.  Use 7777 as example<br> Pay attention to making sure the right 
ssh -N -L XXXX:loginYY.summit.olcf.ornl.gov:XXXX  userid@summit.olcf.ornl.gov
e.g.
ssh -N -L 3761:login4.summit.olcf.ornl.gov:3761  vanstee@summit.olcf.ornl.gov


## Dask on Summit

In [1]:
# This library enables interoperability with clusters (like LSF)
import sys
from dask_jobqueue import LSFCluster

In [4]:
# Per node specification
dask_worker_prefix = "jsrun -n1 -a1 -g0 -c2"

cluster = LSFCluster(
    scheduler_options={"dashboard_address": ":3761"},
    cores=8,
    processes=1,     
    memory="4 GB",
    project="VEN201",
    walltime="00:30",
    job_extra=["-nnodes 1"],          # <--- new!
    header_skip=["-R", "-n ", "-M"],  # <--- new!
    interface='ib0',
    use_stdin=False,
    python= f"{dask_worker_prefix} {sys.executable}"
)

## Lets See what is sent to LSF

In [5]:
print(cluster.job_script())

#!/usr/bin/env bash

#BSUB -J dask-worker
#BSUB -P VEN201
#BSUB -W 00:30
#BSUB -nnodes 1

/ccs/home/vanstee/.conda/envs/powerai-ornl/bin/python -m distributed.cli.dask_worker tcp://10.41.0.32:36525 --nthreads 8 --memory-limit 4.00GB --name name --nanny --death-timeout 60 --interface ib0



In [6]:
from dask.distributed import Client
client = Client(cluster)

In [7]:
client

0,1
Client  Scheduler: tcp://10.41.0.32:36525  Dashboard: http://10.41.0.32:3762/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [8]:
# Open another terminal here and run bjobs ..
cluster.scale(4)
# takes a couple of mins potentially ....

In [None]:
client
#In [11]: client
#Out[11]: <Client: 'tcp://10.41.0.34:37579' processes=2 threads=16, memory=8.00 GB>

In [12]:
watch !bjobs

JOBID   USER       STAT   SLOTS    QUEUE       START_TIME    FINISH_TIME   JOB_NAME                      
376497  vanstee    RUN    43       batch       Sep 29 16:23  Sep 29 16:53  dask-worker                   
376498  vanstee    RUN    43       batch       Sep 29 16:23  Sep 29 16:53  dask-worker                   


# Numpy simple example ...

In [14]:
import dask.array as da
# 2.5 B element array , 500 chunks
x = da.random.random([5000,5000], chunks=[250,250])


In [18]:
cluster.scale(8)

In [15]:
x = x.persist()
x

Unnamed: 0,Array,Chunk
Bytes,200.00 MB,500.00 kB
Shape,"(5000, 5000)","(250, 250)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 200.00 MB 500.00 kB Shape (5000, 5000) (250, 250) Count 400 Tasks 400 Chunks Type float64 numpy.ndarray",5000  5000,

Unnamed: 0,Array,Chunk
Bytes,200.00 MB,500.00 kB
Shape,"(5000, 5000)","(250, 250)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray


In [16]:
y = x.T ** x - x.mean()

In [17]:
# Note if you run y.compute() the result is not saved ... 
# each request triggers computation..
print(y.compute())
print(y.compute())

# Now lets pin it to memory ... and re-run
y.persist()
print(y.compute())

Unnamed: 0,Array,Chunk
Bytes,200.00 MB,500.00 kB
Shape,"(5000, 5000)","(250, 250)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 200.00 MB 500.00 kB Shape (5000, 5000) (250, 250) Count 400 Tasks 400 Chunks Type float64 numpy.ndarray",5000  5000,

Unnamed: 0,Array,Chunk
Bytes,200.00 MB,500.00 kB
Shape,"(5000, 5000)","(250, 250)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray


In [None]:
# Persist vs Compute https://distributed.dask.org/en/latest/memory.html
# use compute when the return value is small and you want to feed result into other analyses.
# use persist (similar to cache in spark) to trigger computation and pin results to memory.  
# Follow actions build task graphs, but only up to this point as it will use the value calculated by persist.

## Simple  Pandas Example with our lending club data ...



In [None]:
dtype={'acc_now_delinq': 'float64',
       'acc_open_past_24mths': 'float64',
       'all_util': 'float64',
       'avg_cur_bal': 'float64',
       'chargeoff_within_12_mths': 'float64',
       'collections_12_mths_ex_med': 'float64',
       'delinq_2yrs': 'float64',
       'delinq_amnt': 'float64',
       'desc': 'object',
       'fico_range_high': 'float64',
       'fico_range_low': 'float64',
       'funded_amnt': 'float64',
       'funded_amnt_inv': 'float64',
       'id': 'object',
       'inq_fi': 'float64',
       'inq_last_12m': 'float64',
       'inq_last_6mths': 'float64',
       'last_fico_range_high': 'float64',
       'last_fico_range_low': 'float64',
       'loan_amnt': 'float64',
       'max_bal_bc': 'float64',
       'mo_sin_old_rev_tl_op': 'float64',
       'mo_sin_rcnt_rev_tl_op': 'float64',
       'mo_sin_rcnt_tl': 'float64',
       'mort_acc': 'float64',
       'num_accts_ever_120_pd': 'float64',
       'num_actv_bc_tl': 'float64',
       'num_actv_rev_tl': 'float64',
       'num_bc_sats': 'float64',
       'num_bc_tl': 'float64',
       'num_il_tl': 'float64',
       'num_op_rev_tl': 'float64',
       'num_rev_accts': 'float64',
       'num_rev_tl_bal_gt_0': 'float64',
       'num_sats': 'float64',
       'num_tl_30dpd': 'float64',
       'num_tl_90g_dpd_24m': 'float64',
       'num_tl_op_past_12m': 'float64',
       'open_acc': 'float64',
       'open_acc_6m': 'float64',
       'open_act_il': 'float64',
       'open_il_12m': 'float64',
       'open_il_24m': 'float64',
       'open_rv_12m': 'float64',
       'open_rv_24m': 'float64',
       'policy_code': 'float64',
       'pub_rec': 'float64',
       'pub_rec_bankruptcies': 'float64',
       'revol_bal': 'float64',
       'tax_liens': 'float64',
       'tot_coll_amt': 'float64',
       'tot_cur_bal': 'float64',
       'tot_hi_cred_lim': 'float64',
       'total_acc': 'float64',
       'total_bal_ex_mort': 'float64',
       'total_bal_il': 'float64',
       'total_bc_limit': 'float64',
       'total_cu_tl': 'float64',
       'total_il_high_credit_limit': 'float64',
       'total_rev_hi_lim': 'float64'}

In [None]:
# dummy data for demo...
!cp ../Tabular/ldata2016.csv.gz ./
!gunzip ./ldata2016.csv.gz

# import dask
import dask.dataframe as dd
ddf = dd.read_csv("./dask-tutorial/ldata2016.csv", blocksize=15e6,dtype=dtype) # , compression="gzip")
#
#ddf = ddf.repartition(npartitions=5)
ddf

# Standard operations example
filtered_df = ddf[ddf["loan_amnt"] > 15000]
answer = filtered_df.compute()
#compare 
len(answer)
len(ddf)


print(ddf.columns)
# ok, lets count NaNs ..
ddf.isna().sum().compute()

# well dask doesnt do well with NaNs,  let just do a few colums ..
ddf_small = ddf[[ 'id', 'loan_amnt', 'funded_amnt','revol_bal','dti']]

# Check NaNs 
ddf_small.isna().sum().compute()

ddf_small.describe().compute()

# correlation
ddf.corr().compute()

# Do one join.. cartesian ?
merge(ddf_small, ddf_small,on='id')# [['loan_amnt', 'funded_amnt']]