#### In this notebook, we show how to use dask-glm with rapids dask-cudf on mutliple GPus.
dask-glm offers 3 estimators:
- LinearRegression
- LogisticRegression
- PoissonRegression

and 5 solvers:
- admm,
- gradient_descent,
- newton,
- lbfgs,
- proximal_grad

Currently, all 3 estimators and 3 out of 5 algorithms, `gradient_descent`, `newton` and `proximal_grad` work seamlessly with `dask-cudf`. To use `lbfgs`, please see the [dask/dask-glm#89](https://github.com/dask/dask-glm/pull/89)

In this demo, we use `LogisticRegression` to solve a binary classification problem on multiple GPUs.

**Install instructions:**
- conda create -n dask_glm python=3.7
- conda activate dask_glm
- conda install -c rapidsai -c nvidia -c conda-forge -c defaults rapids=0.15 python=3.7 cudatoolkit=10.1
- git clone https://github.com/dask/dask-glm
- cd dask-glm
- pip install -e .

We use cuda 10.1 in this demo.

### Dask-glm with dask-cudf on GPU

In [1]:
import dask.dataframe as dd
from dask_glm.estimators import LogisticRegression
import time
import dask_glm
import dask
import numpy as np
import cupy as cp

import dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask.array as da

print('dask', dask.__version__) # 
print('dask_glm', dask_glm.__version__) # 
print('cupy', cp.__version__) #
print('dask_cudf', dask_cudf.__version__)

dask 2.30.0+5.ga2235ae8
dask_glm 0.2.1.dev41+g52da624.d20201010
cupy 7.8.0
dask_cudf 0.15.0


In [2]:
cluster = LocalCUDACluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:41853  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 270.39 GB


We use 4xV100 GPUs in this demo.

In [3]:
%%time
# download data from:
# https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz

fname = '../../data/HIGGS.csv'
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
# By default dask dataframe uses pandas as data handling backend
dask_df = dask_cudf.read_csv(fname, header=None, names=colnames)

ddf = dask_df.repartition(npartitions=4) # set it to the number of GPUs for best performance
ddf.head()

CPU times: user 1.95 s, sys: 496 ms, total: 2.44 s
Wall time: 5.66 s


Unnamed: 0,label,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,...,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
0,1.0,0.869293,-0.635082,0.22569,0.32747,-0.689993,0.754202,-0.248573,-1.092064,0.0,...,-0.010455,-0.045767,3.101961,1.35376,0.979563,0.978076,0.920005,0.721657,0.988751,0.876678
1,1.0,0.907542,0.329147,0.359412,1.49797,-0.31301,1.095531,-0.557525,-1.58823,2.173076,...,-1.13893,-0.000819,0.0,0.30222,0.833048,0.9857,0.978098,0.779732,0.992356,0.798343
2,1.0,0.798835,1.470639,-1.635975,0.453773,0.425629,1.104875,1.282322,1.381664,0.0,...,1.128848,0.900461,0.0,0.909753,1.10833,0.985692,0.951331,0.803252,0.865924,0.780118
3,0.0,1.344385,-0.876626,0.935913,1.99205,0.882454,1.786066,-1.646778,-0.942383,0.0,...,-0.678379,-1.360356,0.0,0.946652,1.028704,0.998656,0.728281,0.8692,1.026736,0.957904
4,1.0,1.105009,0.321356,1.522401,0.882808,-1.205349,0.681466,-1.070464,-0.921871,0.0,...,-0.373566,0.113041,0.0,0.755856,1.361057,0.98661,0.838085,1.133295,0.872245,0.808487


In [4]:
%%time
print('number of samples:', len(ddf))
print('number of features:', ddf.shape[1])

number of samples: 11000000
number of features: 29
CPU times: user 162 ms, sys: 17.7 ms, total: 180 ms
Wall time: 2.21 s


In [5]:
%%time
print(f"mean target:{ddf['label'].mean().compute():.3f}")

mean target:0.530
CPU times: user 303 ms, sys: 39.8 ms, total: 343 ms
Wall time: 2.47 s


**So there are 11M samples and 28 numerical features**. The target column `label` is binary and it is pretty balanced.

In [6]:
%%time
print(len(ddf), ddf.npartitions)
y = ddf['label']
X = ddf[ddf.columns.difference(['label'])]

X

11000000 4
CPU times: user 157 ms, sys: 42.4 ms, total: 199 ms
Wall time: 2.29 s


Unnamed: 0_level_0,feature-01,feature-02,feature-03,feature-04,feature-05,feature-06,feature-07,feature-08,feature-09,feature-10,feature-11,feature-12,feature-13,feature-14,feature-15,feature-16,feature-17,feature-18,feature-19,feature-20,feature-21,feature-22,feature-23,feature-24,feature-25,feature-26,feature-27,feature-28
npartitions=4,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Unfortunately `dask.dataframe` is not accepted by `dask-glm` at the moment. We have to convert them to arrays. 

In [7]:
%%time
# convert dask dataframes to dask arrays
Xa = X.values
ya = y.values

Xa

CPU times: user 9.39 ms, sys: 1.03 ms, total: 10.4 ms
Wall time: 8.3 ms


Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 28)","(nan, 28)"
Count,42 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes unknown unknown Shape (nan, 28) (nan, 28) Count 42 Tasks 4 Chunks Type float64 numpy.ndarray",,

Unnamed: 0,Array,Chunk
Bytes,unknown,unknown
Shape,"(nan, 28)","(nan, 28)"
Count,42 Tasks,4 Chunks
Type,float64,numpy.ndarray


We make two observations:
- the chunk type is incorrect. it is supposed to be `cupy.ndarray`
- the chunk shape is `nan`.

Both of these lead to errors when we feed these data to `dask-glm`

**Fix the chunk type**

In [8]:
%%time
# Make the chunk type cupy
Xa._meta = cp.asarray(Xa._meta)
ya._meta = cp.asarray(ya._meta)

CPU times: user 130 µs, sys: 98 µs, total: 228 µs
Wall time: 203 µs


**Compute the chunk shape**

In [9]:
%%time
Xa.compute_chunk_sizes()
ya.compute_chunk_sizes()
Xa

CPU times: user 344 ms, sys: 42.1 ms, total: 386 ms
Wall time: 4.55 s


Unnamed: 0,Array,Chunk
Bytes,2.46 GB,658.50 MB
Shape,"(11000000, 28)","(2939749, 28)"
Count,42 Tasks,4 Chunks
Type,float64,cupy.ndarray
"Array Chunk Bytes 2.46 GB 658.50 MB Shape (11000000, 28) (2939749, 28) Count 42 Tasks 4 Chunks Type float64 cupy.ndarray",28  11000000,

Unnamed: 0,Array,Chunk
Bytes,2.46 GB,658.50 MB
Shape,"(11000000, 28)","(2939749, 28)"
Count,42 Tasks,4 Chunks
Type,float64,cupy.ndarray


In [10]:
%%time

lr = LogisticRegression(fit_intercept=True, solver="newton")
lr.fit(Xa, ya)
yp = lr.predict(Xa)
acc = (yp==ya).mean().compute()
print('Training ACC %.3f'%acc)

Training ACC 0.642
CPU times: user 2.57 s, sys: 358 ms, total: 2.93 s
Wall time: 18.7 s


**It took only `20` seconds to train and predict 11M samples!** As we will see below, it is **8x speedup** over the CPU solution.

### Run dask glm with dask on CPU

In [11]:
%%time

fname = '../../data/HIGGS.csv'
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
# By default dask dataframe uses pandas as data handling backend#
dask_df = dd.read_csv(fname, header=None, names=colnames)
dask_df = dask_df.repartition(npartitions=4)

print(len(dask_df), dask_df.npartitions)
y = dask_df['label']
X = dask_df[dask_df.columns.difference(['label'])]

Xa = X.values
ya = y.values

Xa.compute_chunk_sizes()
ya.compute_chunk_sizes()
Xa

11000000 4
CPU times: user 3.36 s, sys: 498 ms, total: 3.85 s
Wall time: 1min 2s


Unnamed: 0,Array,Chunk
Bytes,2.46 GB,628.00 MB
Shape,"(11000000, 28)","(2803564, 28)"
Count,138 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.46 GB 628.00 MB Shape (11000000, 28) (2803564, 28) Count 138 Tasks 4 Chunks Type float64 numpy.ndarray",28  11000000,

Unnamed: 0,Array,Chunk
Bytes,2.46 GB,628.00 MB
Shape,"(11000000, 28)","(2803564, 28)"
Count,138 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [12]:
%%time

lr = LogisticRegression(fit_intercept=True, solver="newton")
lr.fit(Xa, ya)
yp = lr.predict(Xa)
acc = (yp==ya).mean().compute()
print('Training ACC %.3f'%acc)

  step, _, _, _ = np.linalg.lstsq(hess, grad)
  step, _, _, _ = np.linalg.lstsq(hess, grad)
  step, _, _, _ = np.linalg.lstsq(hess, grad)
  step, _, _, _ = np.linalg.lstsq(hess, grad)
  step, _, _, _ = np.linalg.lstsq(hess, grad)


Training ACC 0.642
CPU times: user 9.16 s, sys: 1.19 s, total: 10.4 s
Wall time: 2min 43s
