## Working with Huge datasets in Dask

In [1]:
import dask

In [7]:

from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
X[:8]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-1.90879217, -1.1602627 , -0.27364545, -0.82766028],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [-1.17047054,  0.02212382, -2.17376797, -0.13421976],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959],
       [ 1.68616989,  1.6329131 , -1.42072654,  1.04050557],
       [-0.93912893, -1.02270838,  1.10093827, -0.63714432]])

In [2]:
from sklearn.svm import SVC


In [3]:
estimator = SVC(random_state=0)
estimator.fit(X, y)

SVC(random_state=0)

In [4]:

estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959]])

In [5]:

estimator = SVC(C=0.00001, shrinking=False, random_state=0)
estimator.fit(X, y)
estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [-1.17047054,  0.02212382, -2.17376797, -0.13421976]])

In [6]:
estimator.score(X, y)


0.5007

## Grid search with 1 worker

In [7]:
from sklearn.model_selection import GridSearchCV


In [8]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] C=0.001, kernel=rbf .............................................


[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.


[CV] .............................. C=0.001, kernel=rbf, total=   4.1s
[CV] C=0.001, kernel=rbf .............................................


[Parallel(n_jobs=1)]: Done   1 out of   1 | elapsed:    4.0s remaining:    0.0s


[CV] .............................. C=0.001, kernel=rbf, total=   4.4s
[CV] C=0.001, kernel=poly ............................................
[CV] ............................. C=0.001, kernel=poly, total=   2.4s
[CV] C=0.001, kernel=poly ............................................
[CV] ............................. C=0.001, kernel=poly, total=   2.0s
[CV] C=10.0, kernel=rbf ..............................................
[CV] ............................... C=10.0, kernel=rbf, total=   1.0s
[CV] C=10.0, kernel=rbf ..............................................
[CV] ............................... C=10.0, kernel=rbf, total=   0.9s
[CV] C=10.0, kernel=poly .............................................
[CV] .............................. C=10.0, kernel=poly, total=   2.1s
[CV] C=10.0, kernel=poly .............................................
[CV] .............................. C=10.0, kernel=poly, total=   1.9s


[Parallel(n_jobs=1)]: Done   8 out of   8 | elapsed:   18.8s finished


Wall time: 22.3 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

## Grid Search with all workers

In [9]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done   3 out of   8 | elapsed:    5.1s remaining:    8.6s
[Parallel(n_jobs=-1)]: Done   8 out of   8 | elapsed:    7.2s remaining:    0.0s
[Parallel(n_jobs=-1)]: Done   8 out of   8 | elapsed:    7.2s finished


Wall time: 10.8 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             n_jobs=-1,
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

## Grid Search with dask

In [2]:
import joblib
import dask.distributed

c = dask.distributed.Client()

In [3]:
c

0,1
Client  Scheduler: tcp://127.0.0.1:63082  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.01 GB


In [12]:

param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    # Uncomment this for larger Grid searches on a cluster
    # 'kernel': ['rbf', 'poly', 'linear'],
    # 'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)

In [13]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

Fitting 5 folds for each of 6 candidates, totalling 30 fits


[Parallel(n_jobs=-1)]: Using backend DaskDistributedBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  30 out of  30 | elapsed:   35.8s finished


Wall time: 41.3 s


In [14]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0}, 0.9119000000000002)

## Loading a Dataset that cant fit directly into memory

In [5]:
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

array([[ 1.00796679,  4.34582168,  2.15175661,  1.04337835, -1.82115164,
         2.81149666, -1.18757701,  7.74628882,  9.36761449, -2.20570731,
         5.71142324,  0.41084221,  1.34168817,  8.4568751 , -8.59042755,
        -8.35194302, -9.55383028,  6.68605157,  5.34481483,  7.35044606],
       [ 9.49283024,  6.1422784 , -0.97484846,  5.8604399 , -7.61126963,
         2.86555735, -7.25390288,  8.89609285,  0.33510318, -1.79181328,
        -4.66192239,  5.43323887, -0.86162507,  1.3705568 , -9.7904172 ,
         2.3613231 ,  2.20516237,  2.20604823,  8.76464833,  3.47795068],
       [-2.67206588, -1.30103177,  3.98418492, -8.88040428,  3.27735964,
         3.51616445, -5.81395151, -7.42287114, -3.73476887, -2.89520363,
         1.49435043, -1.35811028,  9.91250767, -7.86133474, -5.78975793,
        -6.54897163,  3.08083281, -5.18975209, -0.85563107, -5.06615534],
       [-6.85980599, -7.87144648,  3.33572279, -7.00394241, -5.97224874,
        -2.55638942,  6.36329802, -7.97988653,  

In [17]:
n_samples_per_block = 200000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype='float64')
          for obj in delayeds]
X = da.concatenate(arrays)
X

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,32.00 MB
Shape,"(100000000, 20)","(200000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 16.00 GB 32.00 MB Shape (100000000, 20) (200000, 20) Count 2000 Tasks 500 Chunks Type float64 numpy.ndarray",20  100000000,

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,32.00 MB
Shape,"(100000000, 20)","(200000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray


In [9]:
from dask_ml.cluster import KMeans
clf = KMeans(init_max_iter=3, oversampling_factor=10)
clf.fit(X)

## Visualiziation with Vaex

In [12]:
import vaex

INFO:MainThread:matplotlib.font_manager:generated new fontManager


In [13]:
df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5?anon=true')

In [15]:
import vaex
import warnings; warnings.filterwarnings("ignore")


print(f'number of rows: {df.shape[0]:,}')
print(f'number of columns: {df.shape[1]}')

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90

df.plot(df.pickup_longitude, df.pickup_latitude, f="log1p", limits=[[-74.05, -73.75], [40.58, 40.90]], show=True);

number of rows: 146,112,991
number of columns: 18


KeyboardInterrupt: 

<Figure size 432x288 with 0 Axes>