## RAPIDS + Dask examples on AI Notebooks

**This example is not an officially supported Google product, does not have a SLA/SLO, and should not be used in production.**


In [None]:
## Create conda environment - execute in terminal
'''
conda create -n rapids-0.17 -c rapidsai -c nvidia \
-c conda-forge -c dask -c defaults rapids-blazing=0.17 python=3.7 cudatoolkit=11.0  gcsfs=0.7.1 -y

conda activate rapids-0.17
'''

In [3]:
!git clone https://github.com/remylouisew/rapids_AIP.git

Cloning into 'rapids_AIP'...
remote: Enumerating objects: 155, done.[K
remote: Counting objects: 100% (155/155), done.[K
remote: Compressing objects: 100% (117/117), done.[K
remote: Total 155 (delta 39), reused 131 (delta 27), pack-reused 0[K
Receiving objects: 100% (155/155), 107.19 KiB | 2.19 MiB/s, done.
Resolving deltas: 100% (39/39), done.


In [8]:
import dask_cudf
import cupy as cp
import time
import gcsfs
import os, json
import subprocess
import pandas as pd
import time


GCS_BUCKET = "bucket_name/optional_folder" #replace with your GCS bucket name or GCS folder path

In [1]:
# Instatiate LocalCUDACluster to assign dask processes to GPUs

from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

cluster = LocalCUDACluster()
client = Client(cluster)


In [10]:
# Read dataset into Dask DMatrix from Google Cloud Storage

colnames = ['label'] + ['feature%02d' % i for i in range(1, 29)]
train_dir='gs://' + GCS_BUCKET + '/abcdefghij/*.csv' #GCS public bucket. We're using GCS's wildcard functionality to select the files we need
df = dask_cudf.read_csv(train_dir, header=None, names=colnames, chunksize=None)

print("Number of partitions is", df.npartitions)

Number of partitions is 10


In [10]:
%%time
# Some basic functions using Dask

df["key"] = df.feature02.round()
group_means = df.groupby("key").mean().persist()
wait(group_means);

group_means.head()

#group_means.compute() will output the cuDF Dataframe as a Pandas DataFrame. A good workflow would be to summarize your data using Dask, then output to pandas for plotting or other pandas functions.


CPU times: user 5.6 s, sys: 393 ms, total: 6 s
Wall time: 1min 8s


Unnamed: 0_level_0,label,feature01,feature02,feature03,feature04,feature05,feature06,feature07,feature08,feature09,...,feature19,feature20,feature21,feature22,feature23,feature24,feature25,feature26,feature27,feature28
key,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,0.531377,0.985962,0.941141,0.000334,1.001279,0.000383,0.99182,0.245437,-0.000452,0.99971,...,0.163236,0.000175,0.999569,1.035395,1.025391,1.050612,1.022248,0.971953,1.039929,0.967645
-1.0,0.530892,0.985611,-0.941425,-0.000624,1.002881,0.00022,0.99224,-0.246486,0.000753,0.999167,...,-0.163185,-6.4e-05,0.998884,1.035382,1.025831,1.050893,1.022185,0.97194,1.040264,0.968021
0.0,0.536814,1.082443,0.000127,-0.000586,0.989486,0.000751,1.013883,0.000645,-4.5e-05,1.00309,...,0.000762,0.000192,0.992716,1.036274,1.028642,1.057083,0.982833,0.987475,1.019824,0.941772
2.0,0.509495,0.79653,1.825514,0.003485,1.007586,-0.000123,0.932562,0.488155,-0.000122,0.996964,...,0.327445,-0.001159,1.018579,1.025887,1.012906,1.034657,1.030662,0.942668,1.039756,0.974749
-2.0,0.510331,0.797193,-1.825494,0.001399,1.007824,-0.001201,0.932832,-0.487517,-0.001699,0.995276,...,-0.331606,-0.000372,1.020613,1.024566,1.012883,1.034479,1.030533,0.94133,1.039123,0.973977


In [None]:
# This will allow you to see memory usage your GPUs in real-time 
# Better to execute in terminal rather than in Notebook
!nvidia-smi dmon

In [1]:
# Now restart the kernal so that you can instatiate a new LocalCUDAcluster, 
# this time one that will spill to host memory when the GPU memory is exceeded

import dask_cudf
import cupy as cp
import argparse
import time
import gcsfs
import dask_cudf
import os, json
import subprocess
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait
from dask.utils import parse_bytes

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0",
    #Dask will spill to disk by default, but setting device_memory_limit allows you to control when that will happen
    device_memory_limit=parse_bytes("14GB"),)
client = Client(cluster)
client


0,1
Client  Scheduler: tcp://127.0.0.1:35483  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 63.33 GB


In [2]:
%%time
# Read in larger dataset (20GB) from GCS

colnames = ['label'] + ['feature%02d' % i for i in range(1, 29)]
train_dir='gs://' + GCS_BUCKET + 'abcdefghi*/*.csv' #GCS public bucket
df = dask_cudf.read_csv(train_dir, header=None, names=colnames, chunksize=None)

print("Number of partitions is", df.npartitions)

# Run the dask functions, which will require nearly double the memory available on the GPU
df["key"] = df.feature02.round()
group_means = df.groupby("key").mean().persist()
wait(group_means);

group_means.head()

Number of partitions is 30
CPU times: user 5.99 s, sys: 1.11 s, total: 7.1 s
Wall time: 3min 15s


Unnamed: 0_level_0,label,feature01,feature02,feature03,feature04,feature05,feature06,feature07,feature08,feature09,...,feature19,feature20,feature21,feature22,feature23,feature24,feature25,feature26,feature27,feature28
key,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,0.531377,0.985962,0.941141,0.000334,1.001279,0.000383,0.99182,0.245437,-0.000452,0.99971,...,0.163236,0.000175,0.999569,1.035395,1.025391,1.050612,1.022248,0.971953,1.039929,0.967645
-1.0,0.530892,0.985611,-0.941425,-0.000624,1.002881,0.00022,0.99224,-0.246486,0.000753,0.999167,...,-0.163185,-6.4e-05,0.998884,1.035382,1.025831,1.050893,1.022185,0.97194,1.040264,0.968021
2.0,0.509495,0.79653,1.825514,0.003485,1.007586,-0.000123,0.932562,0.488155,-0.000122,0.996964,...,0.327445,-0.001159,1.018579,1.025887,1.012906,1.034657,1.030662,0.942668,1.039756,0.974749
-2.0,0.510331,0.797193,-1.825494,0.001399,1.007824,-0.001201,0.932832,-0.487517,-0.001699,0.995276,...,-0.331606,-0.000372,1.020613,1.024566,1.012883,1.034479,1.030533,0.94133,1.039123,0.973977
0.0,0.536814,1.082443,0.000127,-0.000586,0.989486,0.000751,1.013883,0.000645,-4.5e-05,1.00309,...,0.000762,0.000192,0.992716,1.036274,1.028642,1.057083,0.982833,0.987475,1.019824,0.941772
