# Dask for beginners Cheat Sheets sample code

(c) 2020 NVIDIA, Blazing SQL

Distributed under Apache License 2.0

### Imports

In [1]:
import cudf
import numpy as np
import dask_cudf
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask.delayed import delayed
import dask.distributed

### Sample DataFrame

In [2]:
df = cudf.DataFrame(
    [
          (39, 6.88, np.datetime64('2020-10-08T12:12:01'), 'C', 'D', 'data'
            , 'RAPIDS.ai is a suite of open-source libraries that allow you to run your end to end data science and analytics pipelines on GPUs.')
        , (11, 4.21, None,                                 'A', 'D', 'cuDF'
            , 'cuDF is a Python GPU DataFrame (built on the Apache Arrow columnar memory format)')
        , (31, 4.71, np.datetime64('2020-10-10T09:26:43'), 'U', 'D', 'memory'
            , 'cuDF allows for loading, joining, aggregating, filtering, and otherwise manipulating tabular data using a DataFrame style API.')
        , (40, 0.93, np.datetime64('2020-10-11T17:10:00'), 'P', 'B', 'tabular'
            , '''If your workflow is fast enough on a single GPU or your data comfortably fits in memory on 
                 a single GPU, you would want to use cuDF.''')
        , (33, 9.26, np.datetime64('2020-10-15T10:58:02'), 'O', 'D', 'parallel'
            , '''If you want to distribute your workflow across multiple GPUs or have more data than you can fit 
                 in memory on a single GPU you would want to use Dask-cuDF''')
        , (42, 4.21, np.datetime64('2020-10-01T10:02:23'), 'U', 'C', 'GPUs'
            , 'BlazingSQL provides a high-performance distributed SQL engine in Python')
        , (36, 3.01, np.datetime64('2020-09-30T14:36:26'), 'T', 'D', None
            , 'BlazingSQL is built on the RAPIDS GPU data science ecosystem')
        , (38, 6.44, np.datetime64('2020-10-10T08:34:36'), 'X', 'B', 'csv'
            , 'BlazingSQL lets you ETL raw data directly into GPU memory as a GPU DataFrame (GDF)')
        , (17, 5.28, np.datetime64('2020-10-09T08:34:40'), 'P', 'D', 'dataframes'
            , 'Dask is a flexible library for parallel computing in Python')
        , (10, 8.28, np.datetime64('2020-10-03T03:31:21'), 'W', 'B', 'python'
            , None)
    ]
    , columns = ['number', 'float_number', 'datetime', 'letter', 'category', 'word', 'string']
)

## Cluster and client setup

In [3]:
cluster = LocalCUDACluster(
    n_workers=1
    , threads_per_worker=1
    , CUDA_VISIBLE_DEVICES="0"
    , rmm_managed_memory=True
    , rmm_pool_size="20GB"
)

client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:44401  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 67.48 GB


# DataFrame

#### dask_cudf.DataFrame.from_cudf

In [4]:
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.head()

Unnamed: 0,number,float_number,datetime,letter,category,word,string
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...
2,31,4.71,2020-10-10 09:26:43,U,D,memory,"cuDF allows for loading, joining, aggregating,..."
3,40,0.93,2020-10-11 17:10:00,P,B,tabular,If your workflow is fast enough on a single GP...
4,33,9.26,2020-10-15 10:58:02,O,D,parallel,If you want to distribute your workflow across...


In [5]:
ddf = dask_cudf.from_cudf(df, chunksize=2)
ddf.npartitions

5

#### dask_cudf.DataFrame.map_partitions

In [6]:
def process_frame(df):
    df['num_inc'] = df['number'] + 10
    
    return df
    
ddf.map_partitions(process_frame).head()

Unnamed: 0,number,float_number,datetime,letter,category,word,string,num_inc
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...,49
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...,21


In [7]:
def multiply(a, b, mult):
    for i, (aa, bb) in enumerate(zip(a, b)):
        mult[i] = aa * bb

def process_frame_mul(df):
    df = df.apply_rows(
        multiply
        , incols = {'number': 'a', 'float_number': 'b'}
        , outcols = {'mult': np.float64}
        , kwargs = {}
    )
    
    return df['mult']

ddf.map_partitions(process_frame_mul).head()

0    268.32
1     46.31
Name: mult, dtype: float64

In [8]:
def multiply(a, div, b):
    for i, aa in enumerate(a):
        div[i] = aa / b

def process_frame_div(df, col_a, val_divide):
    df = df.apply_rows(
        multiply
        , incols = {col_a: 'a'}
        , outcols = {'div': np.float64}
        , kwargs = {'b': val_divide}
    )
    
    return df['div']

ddf['div_number'] = ddf.map_partitions(process_frame_div, 'number', 10.0)
ddf['div_float']  = ddf.map_partitions(process_frame_div, 'float_number', 5.0)

ddf.head()

Unnamed: 0,number,float_number,datetime,letter,category,word,string,div_number,div_float
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...,3.9,1.376
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...,1.1,0.842


In [11]:
ddf['div_number'] = ddf.map_partitions(lambda df: process_frame_div(df, 'number', 10.0))
ddf['div_float']  = ddf.map_partitions(lambda df: process_frame_div(df, 'float_number', 5.0))

#### dask_cudf.compute

In [9]:
ddf

Unnamed: 0_level_0,number,float_number,datetime,letter,category,word,string,div_number,div_float
npartitions=5,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,int64,float64,datetime64[s],object,object,object,object,float64,float64
2,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
8,...,...,...,...,...,...,...,...,...
9,...,...,...,...,...,...,...,...,...


In [51]:
ddf.compute()

Unnamed: 0,number,float_number,datetime,letter,category,word,string,div_number,div_float
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...,3.9,1.376
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...,1.1,0.842
2,31,4.71,2020-10-10 09:26:43,U,D,memory,"cuDF allows for loading, joining, aggregating,...",3.1,0.942
3,40,0.93,2020-10-11 17:10:00,P,B,tabular,If your workflow is fast enough on a single GP...,4.0,0.186
4,33,9.26,2020-10-15 10:58:02,O,D,parallel,If you want to distribute your workflow across...,3.3,1.852
5,42,4.21,2020-10-01 10:02:23,U,C,GPUs,BlazingSQL provides a high-performance distrib...,4.2,0.842
6,36,3.01,2020-09-30 14:36:26,T,D,,BlazingSQL is built on the RAPIDS GPU data sci...,3.6,0.602
7,38,6.44,2020-10-10 08:34:36,X,B,csv,BlazingSQL lets you ETL raw data directly into...,3.8,1.288
8,17,5.28,2020-10-09 08:34:40,P,D,dataframes,Dask is a flexible library for parallel comput...,1.7,1.056
9,10,8.28,2020-10-03 03:31:21,W,B,python,,1.0,1.656


#### client.compute

In [53]:
computation = client.compute(ddf)

In [58]:
computation.result().head()

Unnamed: 0,number,float_number,datetime,letter,category,word,string,div_number,div_float
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...,3.9,1.376
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...,1.1,0.842
2,31,4.71,2020-10-10 09:26:43,U,D,memory,"cuDF allows for loading, joining, aggregating,...",3.1,0.942
3,40,0.93,2020-10-11 17:10:00,P,B,tabular,If your workflow is fast enough on a single GP...,4.0,0.186
4,33,9.26,2020-10-15 10:58:02,O,D,parallel,If you want to distribute your workflow across...,3.3,1.852


In [61]:
computation = client.compute(ddf, optimize_graph=True, workers='0')
computation.result().head()

Unnamed: 0,number,float_number,datetime,letter,category,word,string,div_number,div_float
0,39,6.88,2020-10-08 12:12:01,C,D,data,RAPIDS.ai is a suite of open-source libraries ...,3.9,1.376
1,11,4.21,,A,D,cuDF,cuDF is a Python GPU DataFrame (built on the A...,1.1,0.842
2,31,4.71,2020-10-10 09:26:43,U,D,memory,"cuDF allows for loading, joining, aggregating,...",3.1,0.942
3,40,0.93,2020-10-11 17:10:00,P,B,tabular,If your workflow is fast enough on a single GP...,4.0,0.186
4,33,9.26,2020-10-15 10:58:02,O,D,parallel,If you want to distribute your workflow across...,3.3,1.852


#### dask_cudf.persist

In [62]:
ddf.persist()

Unnamed: 0_level_0,number,float_number,datetime,letter,category,word,string,div_number,div_float
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,int64,float64,datetime64[s],object,object,object,object,float64,float64
5,...,...,...,...,...,...,...,...,...
9,...,...,...,...,...,...,...,...,...


# Delayed

#### dask.delayed.delayed

In [12]:
import cupy as cp

def delayed_task(n):
    df = cudf.DataFrame({'random': cp.random.rand(n)})
    df['rand_scaled'] = df['random'] * 3
    return df

tasks = [delayed(delayed_task)(10) for _ in range(2)]
computation = client.compute(tasks, optimize_graph=True)
computation

[<Future: pending, key: delayed_task-d25d078f-ac98-4b93-908f-cbe572f55a17>,
 <Future: pending, key: delayed_task-78524eac-968f-46fe-97bc-4c0d01b47d5a>]

In [13]:
import cupy as cp

@delayed
def delayed_task(n):
    df = cudf.DataFrame({'random': cp.random.rand(n)})
    df['rand_scaled'] = df['random'] * 3
    return df

tasks = [delayed_task(10) for _ in range(2)]
computation = client.compute(tasks, optimize_graph=True)
computation

[<Future: pending, key: delayed_task-de252d78-fa6f-47bd-b949-b40e9a9c8256>,
 <Future: pending, key: delayed_task-dd6740e4-4b89-4114-8dda-22e11dc65050>]

In [70]:
cudf.concat([f.result() for f in computation]).head()

Unnamed: 0,random,rand_scaled
0,0.126264,0.378792
1,0.200792,0.602375
2,0.73275,2.198249
3,0.156914,0.470743
4,0.598553,1.795659


#### dask_cudf.DataFrame.to_delayed

In [14]:
def process_frame_delayed(df):
    return df['number'] + 10
    
ddf_delayed_add = dask_cudf.from_delayed([
    process_frame_delayed(df) 
    for df 
    in ddf.to_delayed()
])

In [15]:
ddf_delayed_add.compute()

0    49
1    21
2    41
3    50
4    43
5    52
6    46
7    48
8    27
9    20
Name: number, dtype: int64

#### dask_cudf.DataFrame.from_delayed

In [18]:
def process_frame_delayed(df, divide):
    added = df['number'] + 10
    
    return added / divide
    
ddf_delayed_div = dask_cudf.from_delayed([
    process_frame_delayed(df, 10.0) 
    for df 
    in ddf.to_delayed()
])

ddf_delayed_div.head()

0    4.9
1    2.1
2    4.1
3    5.0
4    4.3
Name: number, dtype: float64

# Futures

#### client.persist

In [19]:
client.persist(ddf)

Unnamed: 0_level_0,number,float_number,datetime,letter,category,word,string
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,int64,float64,datetime64[s],object,object,object,object
5,...,...,...,...,...,...,...
9,...,...,...,...,...,...,...


#### client.submit

In [54]:
def first_computation(df):
    return df['number'] + 10

def second_computation(result):
    return result / 10.0

computation_1 = client.submit(first_computation, ddf)
computation_2 = client.submit(second_computation, computation_1)

In [55]:
computation_1

In [56]:
computation_2.result().compute()

0    4.9
1    2.1
2    4.1
3    5.0
4    4.3
5    5.2
6    4.6
7    4.8
8    2.7
9    2.0
Name: number, dtype: float64

#### dask.distributed.wait

In [78]:
computation = client.compute(tasks, optimize_graph=True)
dask.distributed.wait(computation)

### this object only gets created one all computations are finished
results = dask_cudf.from_delayed(computation)
results.head()

Unnamed: 0,random,rand_scaled
0,0.126264,0.378792
1,0.200792,0.602375
2,0.73275,2.198249
3,0.156914,0.470743
4,0.598553,1.795659


#### dask.distributed.as_completed

In [85]:
computation = client.compute(tasks, optimize_graph=True)

for part in dask.distributed.as_completed(computation):
    print(part.result())

     random  rand_scaled
0  0.126264     0.378792
1  0.200792     0.602375
2  0.732750     2.198249
3  0.156914     0.470743
4  0.598553     1.795659
5  0.785085     2.355256
6  0.592523     1.777569
7  0.237564     0.712691
8  0.351258     1.053773
9  0.544141     1.632422
     random  rand_scaled
0  0.634716     1.904149
1  0.233698     0.701095
2  0.040242     0.120725
3  0.261856     0.785568
4  0.910627     2.731881
5  0.597355     1.792064
6  0.843589     2.530766
7  0.829203     2.487609
8  0.864137     2.592410
9  0.278736     0.836207


#### Future.result

In [110]:
def first_computation(df):
    return df['number'] + 10

computation_1 = client.submit(first_computation, ddf)

In [113]:
computation_1.result().compute()

0    49
1    21
2    41
3    50
4    43
5    52
6    46
7    48
8    27
9    20
Name: number, dtype: int64

#### Future.done

In [114]:
print(computation_1.done())

True


#### client.gather

In [116]:
client.gather(computation_1).compute()

0    49
1    21
2    41
3    50
4    43
5    52
6    46
7    48
8    27
9    20
Name: number, dtype: int64

#### client.scatter

In [120]:
data = client.gather(computation_1).compute()
distributed = client.scatter(data)

#### client.cancel

In [121]:
computation_1.cancel()