# NearestNeighbors classifier on MNist data set - Multi machine execution w. DASK & CUDA with GPU's

## Task 1 - Load the Mnist data set

In [1]:
from keras.datasets import mnist

# split data into training and testing 60000/10000
(x_train, y_train), (x_test, y_test) = mnist.load_data()
training_size = len(x_train)
testing_size = len(x_test)
print(f"train size : {training_size}, testing size : {testing_size}")

train size : 60000, testing size : 10000


## Task 2 - Pre-proccessing of dataset

In [2]:
# Recaculate values so result is normalized between 0-1
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

x_train /= 255
x_test  /= 255
print(x_test.shape)
print(x_train.shape)
x_train = x_train.reshape([60000,784])
x_test = x_test.reshape([10000,784])

(10000, 28, 28)
(60000, 28, 28)


### Init dask client

In [3]:
from dask.distributed import Client
import sys
sys.path.append('../')
from Utils.Utils import Utils


scheduler_ip = Utils().get_scheduler_ip()
# 10.x.x.x is the virtuel ip of the dask scheduler within the kubernetes cluster
client = Client(f"{scheduler_ip}:8786", timeout=3)
client

0,1
Connection method: Direct,
Dashboard: http://10.42.0.38:8787/status,

0,1
Comm: tcp://10.42.0.38:8786,Workers: 4
Dashboard: http://10.42.0.38:8787/status,Total threads: 4
Started: 2 minutes ago,Total memory: 18.63 GiB

0,1
Comm: tcp://10.42.0.40:42913,Total threads: 1
Dashboard: http://10.42.0.40:39813/status,Memory: 4.66 GiB
Nanny: tcp://10.42.0.40:45185,
Local directory: /rapids/dask-worker-space/worker-infl810w,Local directory: /rapids/dask-worker-space/worker-infl810w
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 7.80 GiB
Tasks executing: 0,Tasks in memory: 3
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 662.74 MiB,Spilled bytes: 0 B
Read bytes: 285.8895785529004 B,Write bytes: 1.10 kiB

0,1
Comm: tcp://10.42.0.39:32957,Total threads: 1
Dashboard: http://10.42.0.39:42725/status,Memory: 4.66 GiB
Nanny: tcp://10.42.0.39:35085,
Local directory: /rapids/dask-worker-space/worker-bv90o8vu,Local directory: /rapids/dask-worker-space/worker-bv90o8vu
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 7.80 GiB
Tasks executing: 0,Tasks in memory: 3
Tasks ready: 0,Tasks in flight: 0
CPU usage: 0.0%,Last seen: Just now
Memory usage: 661.51 MiB,Spilled bytes: 0 B
Read bytes: 286.575980070112 B,Write bytes: 1.09 kiB

0,1
Comm: tcp://10.42.0.41:40719,Total threads: 1
Dashboard: http://10.42.0.41:41533/status,Memory: 4.66 GiB
Nanny: tcp://10.42.0.41:41709,
Local directory: /rapids/dask-worker-space/worker-i8xp_n63,Local directory: /rapids/dask-worker-space/worker-i8xp_n63
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 7.80 GiB
Tasks executing: 0,Tasks in memory: 3
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 661.35 MiB,Spilled bytes: 0 B
Read bytes: 286.17603258229786 B,Write bytes: 1.09 kiB

0,1
Comm: tcp://10.42.0.42:36065,Total threads: 1
Dashboard: http://10.42.0.42:45765/status,Memory: 4.66 GiB
Nanny: tcp://10.42.0.42:42191,
Local directory: /rapids/dask-worker-space/worker-gsnalgg9,Local directory: /rapids/dask-worker-space/worker-gsnalgg9
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 7.80 GiB
Tasks executing: 0,Tasks in memory: 3
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 658.77 MiB,Spilled bytes: 0 B
Read bytes: 285.7498337293972 B,Write bytes: 1.09 kiB


### GPU work - Split into 4 chunks

In [4]:
import dask.array as da

# split into 4 chunks
number_of_chunks = 4

x_train_dask = da.from_array(x_train, chunks=(training_size/number_of_chunks, 784))
x_test_dask = da.from_array(x_test, chunks=(testing_size/number_of_chunks, 784))

#labels
y_train_dask = da.from_array(y_train,  chunks=(training_size/number_of_chunks))
y_test_dask = da.from_array(y_test,  chunks=(testing_size/number_of_chunks))

In [5]:
%%time
from dask.diagnostics import visualize
from dask.distributed import performance_report
from cuml.dask.neighbors import KNeighborsClassifier
import gc
import ctypes
import psutil

model = KNeighborsClassifier(n_neighbors = 5)

with performance_report("reports/KNN GPU 4 chunks.html"):
    model.fit(x_train_dask, y_train_dask)
    score = model.score(x_test_dask, y_test_dask)
    print(f"score : {score}")

score : 0.9688
CPU times: user 1.48 s, sys: 483 ms, total: 1.97 s
Wall time: 9.87 s


### GPU work RAPIDS AI default - 1 big chunk

In [6]:
import dask.array as da

# samples to dask arrays
x_train_dask = da.from_array(x_train)
x_test_dask = da.from_array(x_test)

# labels to dask arrays
y_train_dask = da.from_array(y_train)
y_test_dask = da.from_array(y_test)

In [7]:
%%time
from dask.diagnostics import visualize
from dask.distributed import performance_report
from cuml.dask.neighbors import KNeighborsClassifier
import gc
import ctypes
import psutil

model = KNeighborsClassifier(n_neighbors = 5)

with performance_report("reports/KNN GPU 1 chunk.html"):
    model.fit(x_train_dask, y_train_dask)
    score = model.score(x_test_dask, y_test_dask)
    print(f"score : {score}")

score : 0.9627
CPU times: user 192 ms, sys: 41.8 ms, total: 234 ms
Wall time: 1.11 s


### Clean up unmangaged mem

In [8]:
%%time
def trim_memory() -> int:
    import ctypes
    import os
    import psutil
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)
client.run(trim_memory)


CPU times: user 0 ns, sys: 2.96 ms, total: 2.96 ms
Wall time: 6.87 ms


{'tcp://10.42.0.39:32957': 1,
 'tcp://10.42.0.40:42913': 1,
 'tcp://10.42.0.41:40719': 1,
 'tcp://10.42.0.42:36065': 1}