# Dask cuML - kNN End-to-end 

This notebook assumes the following prerequisites:
- Installed a cuda-aware MPI 
- Installed / built mpi4py with the exact same openmpi library


Once the prerequisites above have been done:
- Start a dask scheduler using `dask-scheduler --scheduler-file="cluster.json"` __NOTE:__ This will create the file `cluster.json` in the current directory so that you don't need to manually keep track of ports when running workers and clients. 
- Start a set of workers using `mpirun --mca pml ob1 --mca osc ^ucx -np 2 dask-mpi --no-nanny --no-scheduler --nthreads 10 --memory-limit 3000000000 --scheduler-file cluster.json` 

__NOTE:__ Currently, `dask-mpi` does not provide a way to specify custom environment variables when starting the workers. This means we need to set `CUDA_VISIBLE_DEVICES` after the fact. The `CUDA_VISIBLE_DEVICES` must be set before any cuda contexts are created (e.g. no cuda-based libraries can be imported.)

In [1]:
from dask.distributed import Client

In [2]:
# Run this if you are using an MPI-based cluster
client = Client(scheduler_file="cluster.json")

In [3]:
devs = [0, 1]
workers = list(client.has_what().keys())
worker_devs = workers[0:min(len(devs), len(workers))]

In [4]:
def set_visible(i, n):
    import os
    all_devices = list(range(n))
    vd = ",".join(map(str, all_devices[i:] + all_devices[:i]))
    os.environ["CUDA_VISIBLE_DEVICES"] = vd
    
[client.submit(set_visible, dev, len(devs), workers = [worker]) for dev, worker in zip(devs, worker_devs)]

[<Future: status: pending, key: set_visible-a5d0a72479eb2b4acea4ba65a6b102b8>,
 <Future: status: pending, key: set_visible-e084c15558e5a720e776ddcad4e4a773>]

In [5]:
import dask_cudf
import cudf
import numpy as np

from dask_cuml import knn as cumlKNN

In [6]:
X = cudf.DataFrame([('a', np.array([0, 1, 2, 3, 4], np.float32)),
                    ('b', np.array([5, 6, 7, 7, 8], np.float32))])

X_df = dask_cudf.from_cudf(X, chunksize=1).persist()

In [7]:
lr = cumlKNN.KNN()
lr.fit(X_df)

In [8]:
g = lr.kneighbors(X, 1)


In [9]:
worker, f = g

In [10]:
D, I, = f.result()

In [11]:
print(str(D))

     0
0  0.0
1  0.0
2  0.0
3  1.0
4  5.0


In [12]:
print(str(I))

     0
0    3
1    2
2    4
3    4
4    4
