# Template für KNN Klassifizierung und Regression | Multi-Node/Multi-GPU

***

<b>[Bild]</b>

In diesem Abschnitt wird erkläre, wie man den KMeans Algorithmus auf Worker skaliert und zum Laufen bringt. Dazu nutzen wir Dask und Rapids. Dask bietet uns ein Dashboard wo wir Einzelheiten verfolgen können. Dask-Cuda lässt sich gut mit PyTorch und anderen Frameworks kombinieren, ohne die Clustereinstellungen groß zu ändern.

In dem Nootebook für das Setup wird erklärt, wie man selber sowas auf seinem eigenen PC/Workstation/Cluster installiert. Es gibt auch andere Möglichkeiten das zu installieren. Hier nutzen wir ein Cluster Setup mit Nodes, installiert mittels Docker.   

<u>Hinweis:</u><br>
Wenn es mal hängt, muss ggf. der Cluster, Worker oder das Notebook neu gestartet werden. Sowas passiert, wenn z.B. eine Exception vorkommt.   

<br>

Dask: https://www.dask.org | https://docs.dask.org/en/stable/ <br>
Rapids: https://rapids.ai <br> 

# 1. Rapids und Dask

<img src="./pictures/raspids_oss.PNG"  width="825px;" hight="725px;">
(Bild: (https://on-demand.gputechconf.com/gtcdc/2018/pdf/dc8256-rapids-the-platform-inside-and-out.pdf))

Rapids bietet uns eine Sammlung von Libraries, die uns bei vielen Dingen unterstützen kann. Eine dieser Libraries, die wir nutzen werden, nennt sich cuDF. cuDF sind Cuda Dataframes, die erstmal nur auf einer GPU laufen. Die Idee dahinter ist, dass wir <u>nah zu dasselbe machen</u> wie bei Pandas (Panda DataFrame), aber nur auf einer GPU. Dadurch können viele Berechnungen viel schneller durchgeführt werden. Beispielsweise das Vorverarbeiten von Daten vor dem Trainieren, oder anderweitige Zwecke. Die API ist nah zu identisch wie bei Pandas. 

Rapids funktioniert <u>nur</u> mit <u>Nvidia Grafikkarten</u>.

Rapids ist OpenSource, die API Beschreibungen der Libraries verlinken den dahinterliegenden Code. Der Code enthählt selber nochmal Kommentare und erklärungen.

Rapids entwickelt sich und es kommen Updates sowie Erweiterungen. Algorithmen, die noch nicht (Single-/Multi-Node/ GPU) verfügbar sind, können noch in den neueren Versionen auftauchen. Das gilt besonders für Multi-Node/Multi-GPU Implementierungen der Algorithmen. Es kann sein das manche Multi-Node/Multi-GPU Algorithmen sich am Ende nicht zusammenfassen lassen können, oder das Speichern/Laden sowie die Verwendung dieser auf GPU/CPU eingeschränkt sind. Andere Multi-Node/Multi-GPU Algorithmen können so wie in diesem Template hier verwendet werden. Einge andere können sich in der Anwendung unterscheiden. 


Floating-Point Berechnungen mit cuDF werden auf der GPU parallel ausgeführt, also ist die Reihenfolge nicht deterministisch. Die Ergebnisse können von Pandas abweichen.
a + b ist nicht gleich b + a. Das kann z.B. bei data.sum() vorkommen.
cuDF unterstützt auch verschiedene Datentypen wie int32, int64, float32, float64. Die ganze Liste ist hier zu finden.

Es gibt noch Dask-cuDF, diese nutzt man unter anderem, um große Daten zu Partitionieren. Die API ist Dask-ähnlich. 

Eine weitere Library von Rapids ist cuML, welche auch cuDF nutzt. Diese enthält verschiedene supervised und unsupervsed Algorithmen. Der Fokus hier liegt dabei, dass die Algorithmen möglichst auf einer GPU laufen. Ein Teil dieser Algorithmen kann mittels eines Clusters (cuML nutzt dafür ein Dask Cluster) auf verschiedene Nodes und GPUs skalieren. 

Die API von cuML ist fast genauso aufgebaut wie bei 
Sklearn. <br> Die Release Notes von cuML sind hier zu finden (https://github.com/rapidsai/cuml/releases).
  

Was wir neben Rapids und Dask noch nutzen können ist CuPy [5]. CuPy ist wie Numpy, nur läuft es unter auf GPU. CuPy wird hier nicht detailliert erklärt.

<br> 

cuDF Cuda DataFrames Dok.:  https://docs.rapids.ai/api/cudf/stable/ <br>
cuDF API: https://docs.rapids.ai/api/cudf/stable/api_docs/api/cudf.dataframe/ <br>
cuDF Floating-Point Berechnung: https://docs.rapids.ai/api/cudf/stable/user_guide/pandas-comparison/#floating-point-computation        <br>
cuDF Datentypen: https://docs.rapids.ai/api/cudf/stable/user_guide/data-types/ | https://docs.rapids.ai/api/cudf/stable/user_guide/io/ <br>

cuML GitHub: https://github.com/rapidsai/cuml  <br>
cuML Dok.: https://docs.rapids.ai/api/cuml/stable/ <br>

Dask-cuDF: https://docs.rapids.ai/api/dask-cudf/stable/

CuPy: https://cupy.dev



Was wir nutzen:<br>
cuDF: DataFrames deren API nahezu identisch mit Pandas ist. Also quasi Pandas auf GPU.<br>
dask_cudf: Dask bietet uns DataFrames, die verteilt sind. Mit dask_cudf läuft das ganze auf GPU. Also quasi Dask auf GPU. <br>
cupy: wie Numpy und Scipy aber auf GPU.

<img src="./pictures/df.PNG"  width="725px;" hight="725px;">

Wir teilen das Training in 4 Bereiche auf. Hier nutzen wir MNSG.

<img src="./pictures/rapids_flynn.PNG"  width="725px;" hight="725px;">

Das Bild zeigt in welchen Bereichen man welche DataFrames nutzen kann. Pandas liegt dabei nicht im Fokus.

<img src="./pictures/zuordnung.PNG"  width="1050px;" hight="1050px;">

Da wir mit Dask-cuDF und Dask große Daten lesen und partitionieren können, ist auch der Einsatz im SNSG Bereich möglich. cuDF unterstützt das lesen mehrere Dateien (bis jetzt)  nicht, so das man Dask oder Dask-cuDF nutzen muss, wenn man globstings nutzt, um alle CVS Dateien in einem Verzeichnis zu lesen.

Es gibt auch die Möglichkeit cuDF als Backend für Dask zu nutzen.

<br>

cuDF Backend für Dask: https://docs.rapids.ai/api/dask-cudf/stable/#dataframe-creation-from-on-disk-formats

<b>[Bild Konvertierungen]</b>

<u>Hinweis:</u><br>
Variablen die GPU Speicher nutzen und nicht mehr gebraucht werden, sollten mittels `del` gelöscht werden. Wenn nach dem Löschen immer noch Speicher belegt ist, muss der Kernel des Notebooks neugestartet werden. 

In [1]:
# Imports 
import cupy as   cp
import cudf
import cuml
from cuml.dask.common import utils as dask_utils
import dask_cudf
import dask

import numpy as np
from numpy import genfromtxt

import pandas as pd
import os
import time

Da dier API wie Pandas aufgebaut ist, wird cuDF im detail nicht erklärt. Bei interesse kann man sich die API durchlesen (https://docs.rapids.ai/api/cudf/stable/user_guide/pandas-comparison/).  

In [2]:
# cuDF und Panda series 
s_cuda = cudf.Series([1, 2, cudf.NA])
s_panda = pd.Series([1, 2, pd.NA])

print(f"Pandas:\n{s_cuda}\n")
print(f"Rapids:\n{s_panda}")

Pandas:
0       1
1       2
2    <NA>
dtype: int64

Rapids:
0       1
1       2
2    <NA>
dtype: object


Wir können DataFrames auf verschiedenste Arten erstellen. Unten befinden sich Beispiele wie man DataFrames erstellt. Mehr dazu finden man in der API Beschreibung [3, 6].

Es gibt noch viele weitere Gemeinsamkeiten. Für unsere Zwecke reicht das erstmal.

## 1.1 Start Dask-Cluster (Mit Docker)

Pro GPU wird genau ein Prozess für einen Worker gestartet. Die Worker-ID wird hochgezählt. <br>

<u>Hinweise:</u><br>
Die MNMG Versionen der Algorithmen können auch auf SNSG laufen.

<img src="./pictures/rapids_cluster2.PNG">

In [3]:
# Ein Modul, das uns unterstützt.
import rapids_tools as rapids_tools    

Rapids-tools 1.0
Umgebungsvariablen werden gesetzt
NCCL_SOCKET_NTHREADS: 4
NCCL_NSOCKS_PERTHREAD: 2


In [4]:
## Erstelle Client 
# rapids_tools wird mit dem Cluster geteilt, um Umgebungsvariablen zu setzen.
client = rapids_tools.create_dask_client()  # Gebe IP an. Standard:  127.0.0.1:8786 (Scheduler ist da wo auch Jupyter läuft)
client

Client IP: 127.0.0.1:8786


0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://149.201.182.203:8786,Workers: 3
Dashboard: http://149.201.182.203:8787/status,Total threads: 3
Started: Just now,Total memory: 187.58 GiB

0,1
Comm: tcp://149.201.182.188:45677,Total threads: 1
Dashboard: http://149.201.182.188:43543/status,Memory: 62.53 GiB
Nanny: tcp://149.201.182.188:43021,
Local directory: /tmp/dask-worker-space/worker-npf9vmld,Local directory: /tmp/dask-worker-space/worker-npf9vmld
GPU: Quadro RTX 5000,GPU memory: 16.00 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 10.0%,Last seen: Just now
Memory usage: 410.73 MiB,Spilled bytes: 0 B
Read bytes: 913.6321534718728 B,Write bytes: 1.46 kiB

0,1
Comm: tcp://149.201.182.203:40997,Total threads: 1
Dashboard: http://149.201.182.203:33187/status,Memory: 62.53 GiB
Nanny: tcp://149.201.182.203:43993,
Local directory: /tmp/dask-worker-space/worker-f00rmecn,Local directory: /tmp/dask-worker-space/worker-f00rmecn
GPU: Quadro RTX 5000,GPU memory: 16.00 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 6.0%,Last seen: Just now
Memory usage: 410.82 MiB,Spilled bytes: 0 B
Read bytes: 63.86 kiB,Write bytes: 106.57 kiB

0,1
Comm: tcp://149.201.182.205:33081,Total threads: 1
Dashboard: http://149.201.182.205:37131/status,Memory: 62.53 GiB
Nanny: tcp://149.201.182.205:38497,
Local directory: /tmp/dask-worker-space/worker-z65v4bd4,Local directory: /tmp/dask-worker-space/worker-z65v4bd4
GPU: Quadro RTX 5000,GPU memory: 16.00 GiB
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 6.0%,Last seen: Just now
Memory usage: 410.32 MiB,Spilled bytes: 0 B
Read bytes: 1.15 kiB,Write bytes: 1.59 kiB


Wenn wir eigene Python Module erstellen, können wir diese auch mit dem Cluster teilen.

Beim Ausführen sollte der Status "OK" ausgegeben werden. Wenn die Worker oder der Cluster neugestartet werden, müssen die Module ggf. neu geteilt werden.

In [5]:
# Lade Datei hoch. Gibt Status aus... 
client.upload_file('rapids_tools.py')  # Nur als Beispiel

{'tcp://149.201.182.188:45677': {'status': 'OK'},
 'tcp://149.201.182.203:40997': {'status': 'OK'},
 'tcp://149.201.182.205:33081': {'status': 'OK'}}

In [6]:
# Zeige Schlüssel. Die Ports ändern sich. 
client.has_what().keys()

dict_keys(['tcp://149.201.182.188:35905', 'tcp://149.201.182.203:38543', 'tcp://149.201.182.205:39657'])

In [7]:
# Starte alle Worker neu
client.restart_workers( client.has_what().keys() )
client.has_what().keys()

dict_keys(['tcp://149.201.182.188:33351', 'tcp://149.201.182.203:46815', 'tcp://149.201.182.205:37759'])

In [8]:
# Startet client neu.
#client.restart()

# 2. Daten

## 2.1 Daten generieren

Als Anfang erstellen wir Daten. Daten die auf dem Host sind, müssen erst verteilt werden.

cuML bietet uns Möglichkeiten Datensets zu erstellen.

In [7]:
from cuml.datasets  import make_blobs      # Klassifikation    
from cuml.datasets  import make_regression # Regression

from cuml import datasets  
from cuml.model_selection import train_test_split

Ein Weg an Daten zu kommen, ist Datensets zu nutzen, die schon mit den Frameworks kommen. 

Das können wir nutzen, um den Aufbau des Codes zu testen und zu überprüfen, ob die Installation von Rapids und die Dask-Worker funktionieren.

Es gibt einige Parameter, die wir eingeben können, um Daten so zu generieren wie wir es wollen. Um es einfach zu halten, nutzen wir nur wenige Parameter.

<br>

Generiere Single-GPU Daten: https://docs.rapids.ai/api/cuml/stable/api/#dataset-generation-single-gpu

In [34]:
# Trainings- und Testgröße

train_size = 2000
test_size  = 100

n_samples  = train_size + test_size

# Für Klassifizierung
X_c, y_c = datasets.make_blobs(n_samples=n_samples)
# Für Regression 
X_r, y_r = datasets.make_regression(n_samples=n_samples)

# Teile Daten auf
X_train_c, X_test_c, y_train_c, y_test_c = train_test_split( X_c, y_c, train_size=0.70, random_state=5)
X_train_r, X_test_r, y_train_r, y_test_r = train_test_split( X_r, y_r, train_size=0.70, random_state=5)

Jetzt können wir uns anschauen was gemacht wurde, so könnten unsere Trainingsdaten auch aussehen.

In [10]:
print(f"Daten Klassifizierung:\nX: {type(X_c)} y: {type(y_c)} \n X[0]: {X_c[0]}  \n y[0]: {y_c[0]} \n Länge X, y: {len(X_c)}, {len(y_c)}")

Daten Klassifizierung:
X: <class 'cupy.ndarray'> y: <class 'cupy.ndarray'> 
 X[0]: [-4.390422 -7.993816]  
 y[0]: 0.0 
 Länge X, y: 2100, 2100


In [11]:
print(f"Daten Regression:\nX: {type(X_r)} y: {type(y_r)} \n X[0]: {X_r[0]}  \n y[0]: {y_r[0]} \n Länge X, y: {len(X_r)}, {len(y_r)}")

Daten Regression:
X: <class 'cupy.ndarray'> y: <class 'cupy.ndarray'> 
 X[0]: [0.27291918 0.6470641 ]  
 y[0]: [34.67788] 
 Länge X, y: 2100, 2100


Die erstellten Daten sind in CuPy Format. Für das Trainieren müssen die Daten im Dask-cuDF Format sein. Alternativ können die Daten im Format Dask Array sein, mit CuPy als Backend (Siehe fit() https://docs.rapids.ai/api/cuml/stable/api/#id43).

## 2.2 Dask und Dask_cuDF

Schauen wir uns den Fall an, das die Daten <u>erst verteilt werden</u> müssen. 

In [12]:
# X_c ist auf dem Host und nicht Partitioniert.
X_c

array([[-4.390422 , -7.993816 ],
       [-1.7899303, -8.560305 ],
       [ 6.878088 ,  3.2281156],
       ...,
       [ 7.9013543,  3.418171 ],
       [-3.146109 , -8.004911 ],
       [-4.038967 , -7.6395087]], dtype=float32)

Bei Verteilen oder Laden der Daten mit HDFS, können wir eine Chunkgröße angeben oder die Anzahl der Partitionen. Die Worker übernehmen eine Anzahl der Partitionen für das Trainieren.

X ist ein cupy. Wir brauchen ein dask_cuDF oder ein Dask Array mit cupy als Backend. 

Erst wandeln wir das Array X in ein DataFrame um. Dann werden wir X partitionieren.

In [35]:
## cupy -> cuDF
X_cudf_c = cudf.DataFrame(X_train_c).astype(np.float32)
y_cudf_c = cudf.Series(y_train_c).astype('int32')
print(f"X Type: {type(X_cudf_c)} \nX ist:\n {X_cudf_c}\ny Type: {type(y_cudf_c)}\ny ist:\n{y_cudf_c}")

X Type: <class 'cudf.core.dataframe.DataFrame'> 
X ist:
               0         1
0      2.842662  3.105909
1      3.469578  2.136864
2      9.238327 -3.773556
3      9.248910 -1.065306
4      7.330943 -4.446175
...         ...       ...
1465  10.862763 -0.531786
1466   8.759447 -4.912135
1467   2.829170  3.579678
1468   1.950749  3.262400
1469   9.195857 -1.191773

[1470 rows x 2 columns]
y Type: <class 'cudf.core.series.Series'>
y ist:
0       2
1       2
2       0
3       1
4       0
       ..
1465    1
1466    0
1467    2
1468    2
1469    1
Length: 1470, dtype: int32


Danach werden wir das DataFrame Partitionieren.

In [36]:
# Dasselbe bei für _r, ohne astype(np.float32)
X_cudf_r = cudf.DataFrame(X_train_r)
y_cudf_r = cudf.Series(y_train_r)

In [37]:
workers = client.has_what().keys()
n_workers = len(workers)
print(f"nworker: {n_workers}")

nworker: 3


In [38]:
# cuDF -> dask_cuDF
X_dask_c = dask_cudf.from_cudf( X_cudf_c, npartitions=n_workers )  
y_dask_c = dask_cudf.from_cudf( y_cudf_c, npartitions=n_workers )

X_dask_c, y_dask_c = dask_utils.persist_across_workers(client, [X_dask_c, y_dask_c], workers=client.has_what().keys())  # Halte Ergebnis im Speicher 

In [39]:
# Für _r auch
X_dask_r = dask_cudf.from_cudf( X_cudf_r, npartitions=n_workers ) 
y_dask_r = dask_cudf.from_cudf( y_cudf_r, npartitions=n_workers )

X_dask_r, y_dask_r = dask_utils.persist_across_workers(client, [X_dask_r, y_dask_r], workers=client.has_what().keys())  # Halte Ergebnis im Speicher 

In [15]:
print(f"dask_X_c: {type(X_dask_c)}\n{X_dask_c}")

dask_X_c: <class 'dask_cudf.core.DataFrame'>
<dask_cudf.DataFrame | 3 tasks | 3 npartitions>


In [16]:
X_dask_c

Unnamed: 0_level_0,0,1
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1
0,float32,float32
560,...,...
1120,...,...
1679,...,...


In [20]:
## Damit sehen wir was der Cluster hat.
# Jeder hat ein Teil des DataFrames. Jeder Worker nutzt die eigenen Partitionen. 
client.has_what()

Worker,Key count,Key list
tcp://149.201.182.188:33351,4,"Expand  ('from_cudf-427dd0072f4a48c130c282242402dc99', 1)  ('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 1)  ('from_cudf-10f10ff11cf8696158b024e810bfbef0', 1)  ('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 1)"
tcp://149.201.182.203:46815,4,"Expand  ('from_cudf-427dd0072f4a48c130c282242402dc99', 0)  ('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 0)  ('from_cudf-10f10ff11cf8696158b024e810bfbef0', 0)  ('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 0)"
tcp://149.201.182.205:37759,4,"Expand  ('from_cudf-427dd0072f4a48c130c282242402dc99', 2)  ('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 2)  ('from_cudf-10f10ff11cf8696158b024e810bfbef0', 2)  ('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 2)"

0
"('from_cudf-427dd0072f4a48c130c282242402dc99', 1)"
"('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 1)"
"('from_cudf-10f10ff11cf8696158b024e810bfbef0', 1)"
"('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 1)"

0
"('from_cudf-427dd0072f4a48c130c282242402dc99', 0)"
"('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 0)"
"('from_cudf-10f10ff11cf8696158b024e810bfbef0', 0)"
"('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 0)"

0
"('from_cudf-427dd0072f4a48c130c282242402dc99', 2)"
"('from_cudf-a33c3ddacdcb428237dc320102bb7c71', 2)"
"('from_cudf-10f10ff11cf8696158b024e810bfbef0', 2)"
"('from_cudf-ec2bd4900dbe7e4b6dafc3178d92b077', 2)"


So siet der Flow aus, wenn wir Daten als Host erzeugen und dann verteilen.

<img src="./pictures/part.PNG"  width="925px;" hight="925px;">

Zusammeengefasst: <br>
Wenn wir Daten als Host erstellen (z.B. um den Aufbau zu testen), müssen die Daten Partitioniert werden. Das verteilen geht gut mit <u>geringen Datenmengen</u>.

Nach dem Verteilen hat jeder Worker ein Teil dieser Daten. Diese Teile sind Partitionen.

<img src="./pictures/part2.PNG"  width="925px;" hight="925px;">

## 2.3 Daten sind schon Lokal verfügbar

Mit Dask-cuDF können wir Daten direkt in die GPU laden und Daten schreiben. Es gibt viele Parameter, die angegeben werden können (https://docs.rapids.ai/api/dask-cudf/stable/api/#creating-and-storing-dataframes). 

Wenn die Daten nicht in den Speicher passen, können diese partitioniert werden, indem wir die Daten mit Dask-cuDF laden.

<img src="./pictures/rapids_chunk.PNG">

Wir können Daten mit Dask laden und daraus ein Dask-cuDF erstellen.

Intern nutzt jeder Worker cuDF um die Daten zu lesen. Es können verschiedene Quellen angegeben werden wie HDFs, S3, http, ... (https://docs.dask.org/en/stable/how-to/connect-to-remote-data.html)

Wir können auch direkt Dask-cuDF nutzen um Daten zu laden. Das geht genau so wie bei Dask. Das schreiben der Daten mit Dask-cuDF folg auch der gleichen API.

Jeder Worker sollte die gleichen Pfade sehen können.

Das erstellte Dask-cuDF kann dann zum trainieren genutzt werden. 

<br>

Rapids Dask-cuDF Creating and storing DataFrames:  https://docs.rapids.ai/api/dask-cudf/stable/api/ <br>
Reading Larger than Memory CSVs with RAPIDS and Dask: https://medium.com/rapids-ai/reading-larger-than-memory-csvs-with-rapids-and-dask-e6e27dfa6c0f


<u>Hinweis zu HDFS:</u><br>
Das lesen von vielen Dateien oder einer großen Datei bringt keinen Vorteile der Datenlokalität mehr, siehe (https://stackoverflow.com/questions/54565738/does-dask-communicate-with-hdfs-to-optimize-for-data-locality)<br>
Das kann sich noch ändern. 
 

# 3. Multi-Node/Multi-GPU Training: Dask und Dask_cuDF

Wie bereits erwähnt, ist das Interface fast genau so wie bei sklearn. Wenn wir jetzt Daten haben und diese noch verarbeiten wollen, muss darauf geachtet werden, dass die Datentypen am Ende richtig sind. 

Für das Model gibt es einige Parameter, die wir einstellen können, dieser Auszug zeigt einige Parameter:  <br>


KNN Klassifikation:<br>
`n_neighbors=5: Default number of neighbors to query `<br>
`metric=euclidean: Distance metric to use.` <br>
Für Regression: <br>
`n_neighbors=5: Default number of neighbors to query `<br>
`algorithm=auto: The query algorithm to use. Valid options are: (rbc | brute | ivfflat | ivfpq)` <br>
`metric=euclidean: Distance metric to use.` <br>


<br>

Sklearn KNNt: <br>
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsClassifier.html <br>
https://scikit-learn.org/stable/modules/generated/sklearn.neighbors.KNeighborsRegressor.html<br>
cuML MNMG KNN: https://docs.rapids.ai/api/cuml/stable/api/#id44

In [17]:
# Imortiere Multi-Node RM
from cuml.dask.neighbors import KNeighborsClassifier as multi_KNeighborsClassifier
from cuml.dask.neighbors import KNeighborsRegressor  as multi_KNeighborsRegressor

Jetzt kann man wie üblich vorgehen. Das Model Trainieren, Speichern, Laden und Evaluieren.

Wir können das Dashboard von Dask öffnen und sehen unter GPU die Auslastung (wenn es nicht zu schnell fertig ist).

In [40]:
# Erstelle Model | RM Klassifizierung 
multi_knn_c = multi_KNeighborsClassifier() 
                     
# Training  
%time multi_knn_c.fit(X_dask_c, y_dask_c)

CPU times: user 18.3 ms, sys: 4.58 ms, total: 22.8 ms
Wall time: 51.5 ms


<cuml.dask.neighbors.kneighbors_classifier.KNeighborsClassifier at 0x7f20e2465c00>

In [41]:
# Erstelle Model | RM Regressor 
multi_knn_r = multi_KNeighborsRegressor()
                     
# Training  
# - Input Type
%time multi_knn_r.fit(X_dask_r, y_dask_r)

CPU times: user 5.64 ms, sys: 363 µs, total: 6.01 ms
Wall time: 15.6 ms


<cuml.dask.neighbors.kneighbors_regressor.KNeighborsRegressor at 0x7f20e2467100>

Das Model ist momentan noch verteilt, wir können trotzdem Predictions machen. Dafür müssen wir die Daten wieder verteilen, wenn diese nicht schon lokal vorliegen.

In [42]:
# Regression 
to_predict_r = X_test_r
print(f"Länge: {len(to_predict_r)} \nTyp: {type(to_predict_r)}\n {to_predict_r}")

Länge: 630 
Typ: <class 'cupy.ndarray'>
 [[ 0.2746206   0.31240636]
 [ 0.61967766 -0.54332435]
 [ 0.7652495  -1.0946572 ]
 ...
 [ 0.16508104 -0.24708588]
 [-0.62052864  2.9190931 ]
 [-0.20805861  0.23506299]]


Das cupy Array müssen wir in ein cuDF, dann in ein dask_cuDF umwandeln.

In [43]:
# Regression 
to_predict__dask_r = dask_cudf.from_cudf(cudf.DataFrame( to_predict_r ), npartitions=n_workers)

In [44]:
# Regression 
pred_futures = multi_knn_r.predict(to_predict__dask_r)
print(pred_futures)

<dask_cudf.Series | 6 tasks | 3 npartitions>


Die futures wurden erstellt. Um die Berechnung zu starten schreiben wir `.compute()`

In [45]:
%time pred_futures__MultiNodeOrig_results_r = pred_futures.compute()

CPU times: user 10.3 ms, sys: 0 ns, total: 10.3 ms
Wall time: 24.9 ms


In [46]:
# Index of table | result
pred_futures__MultiNodeOrig_results_r.head(10)

0     33.609367
1     18.313051
2      0.440034
3     43.838165
4    103.260300
5      4.718654
6    129.210129
7     -9.480383
8    -73.922844
9     56.577709
dtype: float32

Dasselbe ist auch mit dem Klassifizierer möglich.

In [47]:
# Klassifizierung
to_predict_c = X_test_c
print(f"Länge: {len(to_predict_c)} \nTyp: {type(to_predict_c)}\n {to_predict_c} \n")
# Klassifizierung
to_predict__dask_c = dask_cudf.from_cudf(cudf.DataFrame( to_predict_c ), npartitions=n_workers)
pred_futures_c = multi_knn_c.predict(to_predict__dask_c)
# Klassifizierung
%time pred_futures__MultiNodeOrig_results_c = pred_futures_c.compute()
pred_futures__MultiNodeOrig_results_c.head(10)

Länge: 630 
Typ: <class 'cupy.ndarray'>
 [[ 7.415387  -3.141217 ]
 [ 8.788737  -0.3654628]
 [ 8.828011  -6.480409 ]
 ...
 [10.903368  -1.6943322]
 [ 3.9670722  2.3996148]
 [ 7.96042   -5.3838677]] 

CPU times: user 37.1 ms, sys: 4 ms, total: 41.1 ms
Wall time: 65 ms


0    0
1    1
2    0
3    2
4    0
5    2
6    2
7    0
8    0
9    0
dtype: int32

jetzt können wir Metriken anwenden.

In [48]:
# Auch mit cuML möglich
#   cuml.metrics.accuracy_score()    # GPU
#   cuml.metrics.r2_score()          # GPU

from sklearn.metrics import accuracy_score   # CPU
from sklearn.metrics import r2_score         # CPU

In [49]:
# Eval KNN Klassifizierer mit sklearn
score__c      = accuracy_score( y_test_c.get(), pred_futures__MultiNodeOrig_results_c.to_pandas() )

# Eval KNN Regressor mit sklearn
score__r = r2_score( y_test_r.get(), pred_futures__MultiNodeOrig_results_r.to_pandas() )

print(f"score__c: {score__c} \nscore__r: {score__r}")

score__c: 0.9873015873015873 
score__r: 0.9952111731724724


# 3. Speichern

Es gibt kein lokales Model, was zusammengefasst und gespeichert werden kann (kann sich ändern).

Siehe https://github.com/rapidsai/cuml/issues/4591

In [29]:
multi_knn_r.__doc__

"\n    Multi-node Multi-GPU K-Nearest Neighbors Regressor Model.\n\n    K-Nearest Neighbors Regressor is an instance-based learning technique,\n    that keeps training samples around for prediction, rather than trying\n    to learn a generalizable set of model parameters.\n\n    Parameters\n    ----------\n    n_neighbors : int (default=5)\n        Default number of neighbors to query\n    batch_size: int (optional, default 2000000)\n        Maximum number of query rows processed at once. This parameter can\n        greatly affect the throughput of the algorithm. The optimal setting\n        of this value will vary for different layouts and index to query\n        ratios, but it will require `batch_size * n_features * 4` bytes of\n        additional memory on each worker hosting index partitions.\n    handle : cuml.Handle\n        Specifies the cuml.handle that holds internal CUDA state for\n        computations in this model. Most importantly, this specifies the CUDA\n        stream t

In [50]:
# Kein internes Model
multi_knn_r.__dict__

{'client': <Client: 'tcp://149.201.182.203:8786' processes=3 threads=3, memory=187.58 GiB>,
 'verbose': False,
 'kwargs': {'verbose': False},
 'internal_model': None,
 'streams_per_handle': 0,
 'data_handler': <cuml.dask.common.input_utils.DistributedDataHandler at 0x7f20e2483490>,
 'n_outputs': 1,
 'datatype': 'cudf'}

## 3.1 Multi-Node/-Multi-GPU Predictions

Derzeit gibt es keine Möglichkeit das MNMG Model zu speichern. Mit dem Cluster können wir das SNSG Model laden und die Predictions verteilen.

Lade das SNSG Model. Um die Predictions zu verteilen kann man.:<br>
1) Jedem Worker das Model geben und die Daten hineinladen.



### 3.1.1 Details

Das gespeicherte Model befindet sich auf dem Host oder wurde schon verteilt. In dem Fall, dass sich das Model auf dem Host befindet, muss es an jeweilige Worker verteilt werden.

Für ein Beispiel trainieren wir ein SNSG KNN und speichern/laden es.

In [51]:
from cuml.neighbors import KNeighborsClassifier as single_KNNClassifier  # Single-Node / Single GPU 
from cuml.neighbors import KNeighborsRegressor  as single_KNNRegressor   # Single-Node / Single GPU

In [75]:
single_knn_c = single_KNNClassifier(algorithm='brute') # brute um es nach Sklearn zu überführen
                     
%time single_knn_c.fit(X_train_c, y_train_c)

CPU times: user 0 ns, sys: 17.3 ms, total: 17.3 ms
Wall time: 16.4 ms


KNeighborsClassifier()

In [76]:
import pickle
pickle.dump(single_knn_c, open("template_data_KNN/cuml_SingleNode_KNN_c_model.pkl", "wb"))

loaded_model_c = pickle.load(open("template_data_KNN/cuml_SingleNode_KNN_c_model.pkl", "rb"))

Mit Dask können wir das Model an alle Worker senden.

In [77]:
futures = client.scatter(loaded_model_c, broadcast=True)

In [78]:
# Future sollte den status finished angeben
futures

Jetzt können wir eine Funktion schreiben die uns die Predictions liefert. Wir können Lokale Daten nehmen oder erst wieder Daten für die Prediction verteilen.

In [79]:
to_predict__dask = dask_cudf.from_cudf(cudf.DataFrame( X_test_c ), npartitions=n_workers)
to_predict__dask

Unnamed: 0_level_0,0,1
npartitions=3,Unnamed: 1_level_1,Unnamed: 2_level_1
0,float32,float32
210,...,...
420,...,...
629,...,...


<img src="./pictures/pred1.PNG"  width="925px;" hight="925px;">

<img src="./pictures/pred2.PNG"  width="325px;" hight="225px;" >

Mit Dask Delayed sagen wir, dass diese Funktion parallel ausgeführt werden soll. In der Funktion können wir verteilt Daten lesen und bearbeiten, bevor wir die Predictions machen. 

<br>

Dask Delayed: https://docs.dask.org/en/stable/delayed.html

In [80]:
# Länge der Testdaten
len(X_test_c)

630

Wir haben die Daten als Dask-cuDF gelesen. Wenn wir das als Host machen, müssen wir in einer Schleife die Partitionen übergeben.

In [81]:
@dask.delayed
def dask_predict(test_data, part):  # Jeder Worker führt das aus. 
    
    # Das wird in der Console ausgegeben. Damit sehen wir das alles passt, und jeder worker seine Partitionen nimmt. 
    print(f"type: {type(test_data)} länge: {len(test_data)}\nHead:\n{test_data.head(2)}\nPartition: {part}")

    predictions = loaded_model_c.predict(test_data) 

    return predictions

Wir müssen die Anzahl der Partitionen angeben. Die Worker werden die eigenen Partitionen bearbeiten.

In [82]:
results = []
parts = 3   # Anzahl Partitionen

for part in range(parts):
    fut = dask_predict( to_predict__dask.get_partition(part), part )
    results.append( fut )
    
delayed_results = dask.compute(*results)

print(f"Type: {type(delayed_results)}\n")

Type: <class 'tuple'>



In [83]:
# Mache daraus ein Numpy Array.
deleayed_pred_c = []
for i in range( len(delayed_results) ):
    deleayed_pred_c = np.append(deleayed_pred_c, np.asarray( delayed_results[i].to_numpy()) )
type(deleayed_pred_c)

numpy.ndarray

Eine andere Möglichkeit ist, dass die Worker die Daten lokal lesen und ggf. auch für sich diese Partitionieren. 

Es können wieder veschiedene Quellen angegeben werden. Für ein einfaches kleines Beispiel, schreibt jeder Worker eine CSV Datei die später gelesen wird. 



In [84]:
## Diese Datei wird im lokalen Verzeichnis geschrieben.
np.savetxt('csv_file.csv', cp.asnumpy( X_test_c ), delimiter=",", header="A,B") # Sonst werden die Daten als header genommen 

In [85]:
## Damit schreibt jeder Worker diese Datei.
@dask.delayed    # Jeder Worker führt das aus. 
def dask_write(test_data):
    print("Write file")
    np.savetxt('csv_file.csv', np.asarray(test_data, dtype=np.float32), delimiter=",", header="A,B")
    
futs = []
for i in range(3):
    futs.append(dask_write( cp.asnumpy( X_test_c ) ))
dask.compute(*futs)

(None, None, None)

jetzt können wir diese CSV Datei laden und für das Testen nutzen.
Am Ende bekommen wir wieder ein Tupel. Diesemal werden alle Daten genutzt.

In [86]:
@dask.delayed
def dask_predict_2():

    #test_data = cudf.read_csv("csv_file.csv")       # Lese lokale Daten
    #test_data = test_data.astype(dtype='float32')   # Muss float32 sein, oder ändere IO
    
    # oder- nutze Dask-cuDF um lokale Daten zu partitionieren. 
    test_data = dask_cudf.read_csv("csv_file.csv",  blocksize="5KB")       # Lese lokale Daten, gebe Blocksize an
    test_data = test_data.astype(dtype='float32')                          # Muss float32 sein, oder ändere IO
    
    print(f"Data loaded, len: {len(test_data)}")
    
    predictions = loaded_model_c.predict(test_data)
    return predictions

In [89]:
futs = []
for i in range(3):
    futs.append(dask_predict_2())

delayed_results = dask.compute(*futs)
delayed_results

(array([0., 1., 0., 2., 0., 2., 2., 0., 0., 0., 2., 2., 2., 2., 0., 1., 1.,
        0., 1., 2., 1., 2., 0., 1., 2., 0., 0., 0., 0., 1., 2., 1., 0., 1.,
        2., 0., 2., 0., 0., 2., 1., 1., 0., 1., 2., 1., 2., 1., 0., 2., 1.,
        2., 1., 1., 2., 0., 1., 0., 2., 0., 1., 0., 0., 1., 2., 0., 0., 1.,
        0., 2., 0., 0., 1., 0., 1., 1., 1., 2., 2., 1., 1., 2., 0., 1., 1.,
        1., 2., 1., 2., 2., 1., 0., 2., 0., 2., 1., 1., 0., 2., 0., 0., 0.,
        2., 0., 1., 2., 0., 2., 0., 1., 0., 0., 2., 1., 1., 1., 0., 0., 2.,
        0., 2., 0., 0., 0., 0., 0., 0., 2., 1., 1., 0., 1., 1., 2., 0., 2.,
        0., 1., 1., 0., 2., 1., 1., 1., 2., 1., 0., 2., 1., 2., 1., 1., 1.,
        0., 0., 1., 2., 0., 0., 2., 2., 1., 1., 2., 0., 2., 0., 2., 2., 1.,
        0., 0., 2., 1., 2., 2., 2., 0., 1., 2., 0., 1., 0., 2., 2., 1., 2.,
        2., 0., 1., 1., 0., 1., 2., 2., 1., 1., 1., 2., 2., 1., 0., 2., 1.,
        2., 2., 1., 2., 1., 0., 0., 2., 0., 0., 2., 1., 2., 2., 2., 0., 2.,
        0., 

Man sieht das alle Worker alle Predictions gemacht haben.

In [92]:
# Mache daraus ein Numpy Array.
deleayed_pred_c2 = []
for i in range( 1 ):
    deleayed_pred_c2 = np.append(deleayed_pred_c2, np.asarray( delayed_results[i].get()) )
type(deleayed_pred_c2)

numpy.ndarray

In [93]:
rapids_tools.arrays_equal(deleayed_pred_c,  deleayed_pred_c2 )

Gebe 10 Zeilen aus, die ungleich sind


In [95]:
# Eval KNN Klassifizierer mit sklearn
score__c      = accuracy_score( y_test_c.get(), deleayed_pred_c  )

score__c2      = accuracy_score( y_test_c.get(), deleayed_pred_c2 )

print(f"score__c: {score__c} \nscore__r: {score__c2}")

score__c: 0.9873015873015873 
score__r: 0.9873015873015873


In [96]:
client.shutdown()