# Distributed K-Means

We can use the `LocalCUDACluster` to start a Dask cluster on a single machine with one worker mapped to each GPU. This is called one-process-per-GPU (OPG). 

In [1]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

In [2]:
cluster = LocalCUDACluster(threads_per_worker=1)
client = Client(cluster)

## Generate Data
We can generate a dask_cudf.DataFrame of synthetic data for multiple clusters using `cuml.dask.datasets.make_blobs`.

In [3]:
from cuml.dask.datasets import make_blobs

In [4]:
n_samples = 1000000
n_features = 2
n_total_partitions = len(list(client.has_what().keys()))

In [5]:
X, y = make_blobs(n_samples,
                  n_features,
                  centers=5, 
                  n_parts=n_total_partitions,
                  cluster_std=0.1, 
                  verbose=True)

Generating 1000000 samples across 1 partitions on 1 workers (total=1000000 samples)


### Fit & Predict

In [6]:
from cuml.dask.cluster.kmeans import KMeans

In [7]:
%%time
kmeans = KMeans(client=client,
                init="k-means||",
                n_clusters=5,
                random_state=100)

kmeans.fit(X)

CPU times: user 1.84 s, sys: 303 ms, total: 2.14 s
Wall time: 3.16 s


<cuml.dask.cluster.kmeans.KMeans at 0x7fe7d1e08250>

In [8]:
%%time
preds = kmeans.predict(X)

CPU times: user 122 ms, sys: 17.3 ms, total: 139 ms
Wall time: 253 ms


In [9]:
preds.tail()

999995    3
999996    3
999997    3
999998    3
999999    3
dtype: int32

## Results

In [10]:
from cuml.metrics import adjusted_rand_score

In [11]:
labels_true = y.compute().values
labels_pred = preds.compute().values

In [12]:
adjusted_rand_score(labels_true, labels_pred)

  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.


1.0