## scaling out k-means to multi-GPU when data is large

In [1]:
import pandas as pd
import cudf

# Data generation.
from sklearn.datasets import make_blobs

# Local compute.
from sklearn.cluster import KMeans as cpuKMeans
from cuml.cluster import KMeans as gpuKMeans

# Distributed compute.
import dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
from dask_ml.cluster import KMeans as daskKMeans
from cuml.dask.cluster.kmeans import KMeans as mnmgKMeans

# Comparing results.
import cupy
import matplotlib.pyplot as plt
from sklearn.metrics import adjusted_rand_score
from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
# Enable drawing images in this notebook.  No need to for interactive graphics.
%matplotlib inline

import gcsfs
fs = gcsfs.GCSFileSystem(cache_timeout = 0)

## Generate data

In [2]:
# Generate a large dataset.

def generate_data(
    file_name,
    n_samples = 1000000, 
    n_features = 2,
    n_clusters = 6
):

    input_data, input_labels = make_blobs(n_samples=n_samples,
                                          n_features=n_features,
                                          centers=n_clusters,
                                          random_state = 1
                                          )

    # Save the data for CPU compute.
    data_cpu = pd.DataFrame(input_data, columns = ['x', 'y'])
    data_cpu['label'] = input_labels    
#     data_cpu.to_csv("/root/data.csv", index = False)
#     fs.put("/root/data.csv",f"gs://shakdemo-hyperplane/data/synthetic_data/{file_name}.csv")
    data_cpu.to_csv(f"gs://shakdemo-hyperplane/data/synthetic_data/{file_name}.csv", index = False)
    
    return data_cpu
    

In [3]:
generate_data(0)

Unnamed: 0,x,y,label
0,-1.089090,3.117198,0
1,-3.409946,3.208246,0
2,-1.344134,4.759056,5
3,-0.676370,3.235765,0
4,-1.334046,3.632215,5
...,...,...,...
999995,-0.955715,3.086061,0
999996,-5.532552,-1.514448,3
999997,-2.204251,0.435175,4
999998,-1.886550,1.660001,5


In [4]:
for i in tqdm(range(50)):
    generate_data(i)

In [46]:
!nvidia-smi 

Wed Feb 16 17:03:47 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.119.04   Driver Version: 450.119.04   CUDA Version: 11.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  A100-SXM4-40GB      Off  | 00000000:00:04.0 Off |                    0 |
| N/A   31C    P0    49W / 400W |   3108MiB / 40537MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [47]:
!echo "Threads/core: $(nproc --all)"

Threads/core: 12


In [48]:
!free -h

              total        used        free      shared  buff/cache   available
Mem:           83Gi       4.1Gi        70Gi        11Mi       8.4Gi        78Gi
Swap:            0B          0B          0B


## Get file list

In [4]:
## initiate GCP storage access
import gcsfs
fs = gcsfs.GCSFileSystem(cache_timeout = 0)
file_path = "gs://shakdemo-hyperplane/data/synthetic_data"
file_list = fs.ls(file_path)
print(f"number of files {len(file_list)}")

number of files 50


## CPU local

In [5]:
# read data
data_cpu = []
for file in tqdm(file_list):
    data_cpu.append(pd.read_csv(f"gs://{file}"))
data_cpu = pd.concat(data_cpu, ignore_index = True)
print(data_cpu.shape)
print(f'data size {data_cpu.memory_usage(deep = True).sum()/1024/1024/1024 :.2f} G')

  0%|          | 0/50 [00:00<?, ?it/s]

(50000000, 3)
data size 1.12 G


In [6]:
# Plot the raw data and labels.
samples_frac = 0.01
df_sample = data_cpu.sample(frac = samples_frac)
# fig = plt.figure(figsize=(16,10))
# plt.scatter(df_sample['x'], df_sample['y'], c=df_sample['label'], s=20, cmap='viridis')
df_sample.shape

(500000, 3)

In [7]:
%%time
# Instantiate, train and predict.
kmeans_cpu = cpuKMeans(init="k-means++",
                       n_clusters=6,
                       random_state=0)
kmeans_cpu.fit(df_sample[['x','y']])
labels_cpu = kmeans_cpu.predict(df_sample[['x','y']])

CPU times: user 11.7 s, sys: 7.78 s, total: 19.5 s
Wall time: 13 s


## GPU local

In [8]:
%%time
# Instantiate, train and predict.
data_gpu = cudf.DataFrame(data_cpu)

CPU times: user 2.44 s, sys: 512 ms, total: 2.95 s
Wall time: 4.47 s


In [9]:
%%time
kmeans_gpu = gpuKMeans(init="k-means||",
                       n_clusters=6,
                       random_state=0)
kmeans_gpu.fit(data_gpu[['x','y']])
labels_gpu = kmeans_gpu.predict(data_gpu[['x','y']])

CPU times: user 8.32 s, sys: 1.39 s, total: 9.72 s
Wall time: 9.92 s


## distributed Dask with GPU

In [10]:
## spin up a remote dask cluster
from hyperplane import notebook_common as nc

client, cluster = nc.initialize_cluster(
    num_workers = 2,
    ngpus = 1,
    nprocs=1,
    nthreads=12,
    ram_gb_per_proc=80,
    cores_per_worker=12,
    scheduler_ram = "4Gi",
    scheduler_cpu = "1500m",
    node_selector = '',
    
)

👉 Hyperplane: selecting worker node pool
👉 Hyperplane: selecting scheduler node pool
Creating scheduler pod on cluster. This may take some time.
👉 Hyperplane: spinning up a dask cluster with a scheduler as a standalone container.
👉 Hyperplane: In a few minutes you'll be able to access the dashboard at https://shakdemo.hyperplane.dev/dask-cluster-21fe2d3c-c4a0-4b40-9736-98fea4c1492e/status
👉 Hyperplane: to get logs from all workers, do `cluster.get_logs()`


In [32]:
client.nthreads()

{'tcp://10.0.111.3:38443': 12, 'tcp://10.0.112.3:34785': 12}

In [12]:
%%time
import dask_cudf
file_path = "gs://shakdemo-hyperplane/data/synthetic_data/*.csv"
ddf = dask_cudf.read_csv(file_path)
ddf.head(2)

CPU times: user 464 ms, sys: 107 ms, total: 571 ms
Wall time: 9.92 s


Unnamed: 0,x,y,label
0,-1.08909,3.117198,0
1,-3.409946,3.208246,0


In [49]:
ddf.npartitions

50

In [14]:
ddf = client.persist(ddf)

In [None]:
ddf.map_partitions(len).compute().sum()

50000000

In [50]:
from cuml.dask.cluster.kmeans import KMeans as mnmgKMeans

In [52]:
%%time

# Setup the Dask task graph.

# Instantiate, train and predict.
kmeans_mnmg = mnmgKMeans(init="k-means||",
                         n_clusters=6,
                         random_state=0)

kmeans_mnmg.fit(ddf[['x','y']])
kmeans_mnmg_df = kmeans_mnmg.predict(ddf[['x','y']])

# Execute the Dask task graph.
labels_mnmg = kmeans_mnmg_df.compute()

CPU times: user 331 ms, sys: 88.9 ms, total: 419 ms
Wall time: 3.96 s


In [18]:
# Display the output.
print('MNMG k-means labels:')
print(labels_mnmg.value_counts())

MNMG k-means labels:
1    15767950
3     9194100
2     8314450
0     7281950
5     5425000
4     4016550
dtype: int32


In [24]:
cluster.close()