<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_horizontal_no_pad.svg"
     width="30%"
     alt="Dask logo\" />

# Parallel and Distributed Machine Learning

The material in this notebook was based on the open-source content from [Dask's tutorial repository](https://github.com/dask/dask-tutorial) and the [Machine learning notebook](https://github.com/coiled/data-science-at-scale/blob/master/3-machine-learning.ipynb) from data science at scale from coiled

So far we have seen how Dask makes data analysis scalable with parallelization via Dask DataFrames. Let's now see how [Dask-ML](https://ml.dask.org/) allows us to do machine learning in a parallel and distributed manner. Note, machine learning is really just a special case of data analysis (one that automates analytical model building), so the 💪 Dask gains 💪 we've seen will apply here as well!

(If you'd like a refresher on the difference between parallel and distributed computing, [here's a good discussion on StackExchange](https://cs.stackexchange.com/questions/1580/distributed-vs-parallel-computing).)


## Types of scaling problems in machine learning

There are two main types of scaling challenges you can run into in your machine learning workflow: scaling the **size of your data** and scaling the **size of your model**. That is:

1. **CPU-bound problems**: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. **Memory-bound problems**: Data is larger than RAM, and sampling isn't an option.

Here's a handy diagram for visualizing these problems:

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/dimensions_of_scale.svg"
     width="60%"
     alt="scaling problems\" />


In the bottom-left quadrant, your datasets are not too large (they fit comfortably in RAM) and your model is not too large either. When these conditions are met, you are much better off using something like scikit-learn, XGBoost, and similar libraries. You don't need to leverage multiple machines in a distributed manner with a library like Dask-ML. However, if you are in any of the other quadrants, distributed machine learning is the way to go.

Summarizing: 

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator.
* For large datasets, use `dask_ml` estimators.

## Scikit-Learn in five minutes

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/scikit_learn_logo_small.svg" 
     width="30%"
     alt="sklearn logo\" />


In this section, we'll quickly run through a typical Scikit-Learn workflow:

* Load some data (in this case, we'll generate it)
* Import the Scikit-Learn module for our chosen ML algorithm
* Create an estimator for that algorithm and fit it with our data
* Inspect the learned attributes
* Check the accuracy of our model

Scikit-Learn has a nice, consistent API:

* You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.
* You call `estimator.fit(X, y)` to train the estimator.
* Use `estimator` to inspect attributes, make predictions, etc. 

Here `X` is an array of *feature variables* (what you're using to predict) and `y` is an array of *target variables* (what we're trying to predict).

### Generate some random data

In [1]:
from sklearn.datasets import make_classification

# Generate data
X, y = make_classification(n_samples=10000, n_features=4, random_state=0)

**Refreshing some ML concepts**

- `X` is the samples matrix (or design matrix). The size of `X` is typically (`n_samples`, `n_features`), which means that samples are represented as rows and features are represented as columns.
- A "feature" (also called an "attribute") is a measurable property of the phenomenon we're trying to analyze. A feature for a dataset of employees might be their hire date, for example.
- `y` are the target values, which are real numbers for regression tasks, or integers for classification (or any other discrete set of values). For unsupervized learning tasks, `y` does not need to be specified. `y` is usually 1d array where the `i`th entry corresponds to the target of the `i`th sample (row) of `X`.

In [2]:
# Let's take a look at X
X[:8]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-1.90879217, -1.1602627 , -0.27364545, -0.82766028],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [-1.17047054,  0.02212382, -2.17376797, -0.13421976],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959],
       [ 1.68616989,  1.6329131 , -1.42072654,  1.04050557],
       [-0.93912893, -1.02270838,  1.10093827, -0.63714432]])

In [3]:
# Let's take a look at y
y[:8]

array([0, 0, 1, 0, 0, 0, 0, 1])

### Fitting and SVC

For this example, we will fit a [Support Vector Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html).

In [4]:
from sklearn.svm import SVC

estimator = SVC(random_state=0)
estimator.fit(X, y)

SVC(random_state=0)

We can inspect the learned features by taking a look a the `support_vectors_`:

In [5]:
estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959]])

And we check the accuracy:

In [6]:
estimator.score(X, y)

0.905

There are [3 different approaches](https://scikit-learn.org/0.15/modules/model_evaluation.html) to evaluate the quality of predictions of a model. One of them is the **estimator score method**. Estimators have a score method providing a default evaluation criterion for the problem they are designed to solve, which is discussed in each estimator's documentation.

### Hyperparameter Optimization

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations. Scikit-learn provides tools to automatically find the best parameter combinations via cross-validation (which is the "CV" in `GridSearchCV`).

In [7]:
from sklearn.model_selection import GridSearchCV

In [8]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

# Brute-force search over a grid of hyperparameter combinations
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] END ................................C=0.001, kernel=rbf; total time=   3.0s
[CV] END ................................C=0.001, kernel=rbf; total time=   3.0s
[CV] END ...............................C=0.001, kernel=poly; total time=   1.2s
[CV] END ...............................C=0.001, kernel=poly; total time=   1.2s
[CV] END .................................C=10.0, kernel=rbf; total time=   0.8s
[CV] END .................................C=10.0, kernel=rbf; total time=   0.8s
[CV] END ................................C=10.0, kernel=poly; total time=   1.2s
[CV] END ................................C=10.0, kernel=poly; total time=   1.1s
CPU times: user 14.3 s, sys: 224 ms, total: 14.5 s
Wall time: 14.5 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

In [9]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0, 'kernel': 'rbf'}, 0.9086000000000001)

## Compute Bound: Single-machine parallelism with Joblib

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/joblib_logo.svg" 
     alt="Joblib logo" 
     width="50%"/>

In this section we'll see how [Joblib](https://joblib.readthedocs.io/en/latest/) ("*a set of tools to provide lightweight pipelining in Python*") gives us parallelism on our laptop. Here's what our grid search graph would look like if we set up six training "jobs" in parallel:

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/unmerged_grid_search_graph.svg" 
     alt="grid search graph" 
     width="100%"/>

With Joblib, we can say that Scikit-Learn has *single-machine* parallelism.
Any Scikit-Learn estimator that can operate in parallel exposes an `n_jobs` keyword, which tells you how many tasks to run in parallel. Specifying `n_jobs=-1` jobs means running the maximum possible number of tasks in parallel.

In [10]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
CPU times: user 2.31 s, sys: 104 ms, total: 2.41 s
Wall time: 6.67 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             n_jobs=-1,
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

Notice that the computation above it is faster than before. If you are running this computation on binder, you might not see a speed-up and the reason for that is that binder instances tend to have only one core with no threads so you can't see any parallelism. 

## Compute Bound: Multi-machine parallelism with Dask


In this section we'll see how Dask (plus Joblib and Scikit-Learn) gives us multi-machine parallelism. Here's what our grid search graph would look like if we allowed Dask to schedule our training "jobs" over multiple machines in our cluster:

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/merged_grid_search_graph.svg" 
     alt="merged grid search graph" 
     width="100%"/>
     
We can say that Dask can talk to Scikit-Learn (via Joblib) so that our *cluster* is used to train a model. 

If we run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, we would need a distributed cluster. For details on how to create a LocalCluster you can check the Dask documentation on [Single Machine: dask.distributed](https://docs.dask.org/en/latest/setup/single-distributed.html). 

Let's instantiate a Client with `n_workers=4`, which will give us a `LocalCluster`.

In [11]:
import dask.distributed

client = dask.distributed.Client(n_workers=4)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50017 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:50017/status,

0,1
Dashboard: http://127.0.0.1:50017/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50018,Workers: 4
Dashboard: http://127.0.0.1:50017/status,Total threads: 8
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://192.168.1.6:50029,Total threads: 2
Dashboard: http://192.168.1.6:50035/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50023,
Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-4qd77awe,Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-4qd77awe

0,1
Comm: tcp://192.168.1.6:50032,Total threads: 2
Dashboard: http://192.168.1.6:50034/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50021,
Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-62elp4pk,Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-62elp4pk

0,1
Comm: tcp://192.168.1.6:50031,Total threads: 2
Dashboard: http://192.168.1.6:50036/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50024,
Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-240uxqzl,Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-240uxqzl

0,1
Comm: tcp://192.168.1.6:50030,Total threads: 2
Dashboard: http://192.168.1.6:50033/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50022,
Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-96v_lecz,Local directory: /Users/rpelgrim/Documents/git/dask-mini-tutorial/notebooks/dask-worker-space/worker-96v_lecz


**Note:** Click on Cluster Info, to see more details about the cluster. You can see the configuration of the cluster and some other specs. 

We can expand our problem by specifying more hyperparameters before training, and see how using `dask` as backend can help us. 

In [12]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    'kernel': ['rbf', 'poly', 'linear'],
    'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)

### Dask parallel backend

We can fit our estimator with multi-machine parallelism by quickly *switching to a Dask parallel backend* when using joblib. 

In [13]:
import joblib

In [14]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

Fitting 2 folds for each of 36 candidates, totalling 72 fits
CPU times: user 3.58 s, sys: 339 ms, total: 3.92 s
Wall time: 24.6 s


**What did just happen?**

Dask-ML developers worked with the Scikit-Learn and Joblib developers to implement a Dask parallel backend. So internally, scikit-learn now talks to Joblib, and Joblib talks to Dask, and Dask is what handles scheduling all of those tasks on multiple machines.

The best parameters and best score:

In [15]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0, 'kernel': 'rbf', 'shrinking': True}, 0.9086000000000001)

## Memory Bound: Single/Multi machine parallelism with Dask-ML

We have seen how to work with larger models, but sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on Dask `Arrays` and `DataFrames` that may be larger than your machine's RAM.

In [6]:
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using Scikit-Learn.

In [17]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

array([[ 1.00796679,  4.34582168,  2.15175661,  1.04337835, -1.82115164,
         2.81149666, -1.18757701,  7.74628882,  9.36761449, -2.20570731,
         5.71142324,  0.41084221,  1.34168817,  8.4568751 , -8.59042755,
        -8.35194302, -9.55383028,  6.68605157,  5.34481483,  7.35044606],
       [ 9.49283024,  6.1422784 , -0.97484846,  5.8604399 , -7.61126963,
         2.86555735, -7.25390288,  8.89609285,  0.33510318, -1.79181328,
        -4.66192239,  5.43323887, -0.86162507,  1.3705568 , -9.7904172 ,
         2.3613231 ,  2.20516237,  2.20604823,  8.76464833,  3.47795068],
       [-2.67206588, -1.30103177,  3.98418492, -8.88040428,  3.27735964,
         3.51616445, -5.81395151, -7.42287114, -3.73476887, -2.89520363,
         1.49435043, -1.35811028,  9.91250767, -7.86133474, -5.78975793,
        -6.54897163,  3.08083281, -5.18975209, -0.85563107, -5.06615534],
       [-6.85980599, -7.87144648,  3.33572279, -7.00394241, -5.97224874,
        -2.55638942,  6.36329802, -7.97988653,  

**Note**: The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

If you are not in binder and you machine has 16GB of RAM you can make `n_samples_per_block=200_000` and the computations takes around 10 min. If you are in binder the resources are limited and the problem below is big enough. 

In [None]:
n_samples_per_block = 60_000 #on binder replace this for 15_000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)
X

### KMeans from Dask-ml

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with Scikit-Learn, you'll feel at home with Dask-ML.

In [18]:
from dask_ml.cluster import KMeans

In [19]:
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [20]:
%time clf.fit(X)

CPU times: user 930 ms, sys: 378 ms, total: 1.31 s
Wall time: 2.37 s


KMeans(init_max_iter=3, oversampling_factor=10)

In [21]:
clf.labels_

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,4.88 kiB
Shape,"(10000,)","(1250,)"
Count,48 Tasks,8 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 39.06 kiB 4.88 kiB Shape (10000,) (1250,) Count 48 Tasks 8 Chunks Type int32 numpy.ndarray",10000  1,

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,4.88 kiB
Shape,"(10000,)","(1250,)"
Count,48 Tasks,8 Chunks
Type,int32,numpy.ndarray


In [22]:
clf.labels_[:10].compute()

array([6, 2, 3, 6, 6, 7, 1, 0, 0, 2], dtype=int32)

In [23]:
client.close()

##  Multi-machine parallelism in the cloud with Coiled

<br>
<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/Coiled-Logo_Horizontal_RGB_Black.png"
     alt="Coiled logo" 
     width=25%/>
<br>

In this section we'll see how Coiled allows us to solve machine learning problems with multi-machine parallelism in the cloud.

Coiled, [among other things](https://coiled.io/product/), provides hosted and scalable Dask clusters. The biggest barriers to entry for doing machine learning at scale are "Do you have access to a cluster?" and "Do you know how to manage it?" Coiled solves both of those problems. 

We'll spin up a Coiled cluster (with 10 workers in this case), then instantiate a Dask Client to use with that cluster.

If you are running on your local machine and not in binder, and you want to give Coiled a try, you can signup [here](https://cloud.coiled.io/login?redirect_uri=/) and you will get some free credits. If you installed the environment by following the steps on the repository's [README](https://github.com/coiled/dask-mini-tutorial/blob/main/README.md) you will have `coiled` installed. You will just need to login, by following the steps on the [setup page](https://docs.coiled.io/user_guide/getting_started.html), and you will be ready to go. 

To learn more about how to set up an environment you can visit Coiled documentation on [Creating software environments](https://docs.coiled.io/user_guide/software_environment_creation.html). But for now you can use the envioronment we set up for this tutorial. 

In [1]:
import coiled
from dask.distributed import Client

In [2]:
# Spin up a Coiled cluster, instantiate a Client
cluster = coiled.Cluster(n_workers=10, software="ncclementi/dask-mini-tutorial",)

Output()

Found software environment build
Created FW rules: coiled-dask-rrpelgr71-63815-firewall
Created scheduler VM: coiled-dask-rrpelgr71-63815-scheduler (type: t3.medium, ip: ['44.192.10.127'])


In [4]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: coiled.Cluster
Dashboard: http://44.192.10.127:8787,

0,1
Dashboard: http://44.192.10.127:8787,Workers: 10
Total threads: 20,Total memory: 75.60 GiB

0,1
Comm: tls://10.4.11.219:8786,Workers: 10
Dashboard: http://10.4.11.219:8787/status,Total threads: 20
Started: Just now,Total memory: 75.60 GiB

0,1
Comm: tls://10.4.26.225:34015,Total threads: 2
Dashboard: http://10.4.26.225:45433/status,Memory: 7.56 GiB
Nanny: tls://10.4.26.225:46283,
Local directory: /dask-worker-space/worker-hi5l9lzw,Local directory: /dask-worker-space/worker-hi5l9lzw

0,1
Comm: tls://10.4.27.147:37177,Total threads: 2
Dashboard: http://10.4.27.147:45903/status,Memory: 7.56 GiB
Nanny: tls://10.4.27.147:34477,
Local directory: /dask-worker-space/worker-31751d98,Local directory: /dask-worker-space/worker-31751d98

0,1
Comm: tls://10.4.21.187:37709,Total threads: 2
Dashboard: http://10.4.21.187:45889/status,Memory: 7.56 GiB
Nanny: tls://10.4.21.187:36021,
Local directory: /dask-worker-space/worker-ez1hbu5d,Local directory: /dask-worker-space/worker-ez1hbu5d

0,1
Comm: tls://10.4.19.116:44083,Total threads: 2
Dashboard: http://10.4.19.116:40101/status,Memory: 7.56 GiB
Nanny: tls://10.4.19.116:43871,
Local directory: /dask-worker-space/worker-gm5y_ast,Local directory: /dask-worker-space/worker-gm5y_ast

0,1
Comm: tls://10.4.28.14:35445,Total threads: 2
Dashboard: http://10.4.28.14:45439/status,Memory: 7.56 GiB
Nanny: tls://10.4.28.14:43499,
Local directory: /dask-worker-space/worker-tlxtmrgn,Local directory: /dask-worker-space/worker-tlxtmrgn

0,1
Comm: tls://10.4.19.175:36013,Total threads: 2
Dashboard: http://10.4.19.175:35343/status,Memory: 7.56 GiB
Nanny: tls://10.4.19.175:43829,
Local directory: /dask-worker-space/worker-ondtx754,Local directory: /dask-worker-space/worker-ondtx754

0,1
Comm: tls://10.4.20.55:45581,Total threads: 2
Dashboard: http://10.4.20.55:34567/status,Memory: 7.56 GiB
Nanny: tls://10.4.20.55:37401,
Local directory: /dask-worker-space/worker-ukpvdrrv,Local directory: /dask-worker-space/worker-ukpvdrrv

0,1
Comm: tls://10.4.20.237:40083,Total threads: 2
Dashboard: http://10.4.20.237:42345/status,Memory: 7.56 GiB
Nanny: tls://10.4.20.237:37709,
Local directory: /dask-worker-space/worker-bvg9jglc,Local directory: /dask-worker-space/worker-bvg9jglc

0,1
Comm: tls://10.4.16.125:46355,Total threads: 2
Dashboard: http://10.4.16.125:34969/status,Memory: 7.56 GiB
Nanny: tls://10.4.16.125:41099,
Local directory: /dask-worker-space/worker-4yfk80g2,Local directory: /dask-worker-space/worker-4yfk80g2

0,1
Comm: tls://10.4.23.150:33905,Total threads: 2
Dashboard: http://10.4.23.150:35519/status,Memory: 7.56 GiB
Nanny: tls://10.4.23.150:38655,
Local directory: /dask-worker-space/worker-dhr3z490,Local directory: /dask-worker-space/worker-dhr3z490


### Memory bound: Dask-ML

We can use Dask-ML estimators on the cloud to work with larger datasets.

In [7]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)


In [9]:
n_samples_per_block = 200_000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X_small.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)


In [10]:
X = X.persist()

In [11]:
from dask_ml.cluster import KMeans

In [12]:
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [13]:
%time clf.fit(X)

CPU times: user 4.02 s, sys: 241 ms, total: 4.26 s
Wall time: 55.3 s


KMeans(init_max_iter=3, oversampling_factor=10)

Computng the labels:

In [14]:
clf.labels_[:10].compute()

array([0, 4, 0, 5, 5, 5, 6, 4, 4, 7], dtype=int32)

In [15]:
client.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError


## Extra resources:

- [Dask-ML documentation](https://ml.dask.org/)
- [Getting started with Coiled](https://docs.coiled.io/user_guide/getting_started.html)