In [1]:
import os
import glob
import numpy as np
import struct
import pandas as pd
import threading
import subprocess
import time
from tqdm import tqdm
from functools import wraps

hw_type = 'gpu'
DATASET_NAME = 'miracl-fp32-1024d-8M'
n_per_cluster = 1000 # Number of vectors per cluster
max_iter = 20 # Maximum number of iterations

In [2]:
def add_prefix(d, prefix):
    """
    Add prefix to dictionary keys.
    """
    return {f"{prefix}{k}": v for k, v in d.items()}
    
def monitor_resources(stop_event, log, hw_type, interval=0.25):
    """
    Monitor CPU and RAM usage until stop_event is set.

    interval: float
      Polling time in seconds.
    """
    import psutil
    import time
    
    if hw_type == 'gpu':
        import pynvml
        
        pynvml.nvmlInit()
        handle = pynvml.nvmlDeviceGetHandleByIndex(0) # Get device handle for GPU_0

    while not stop_event.is_set():
        cpu_util = psutil.cpu_percent(interval=interval)  # Measures over 1 second
        ram_util = psutil.virtual_memory().used/(1024**3)

        sys_util = {'timestamp': time.perf_counter(), 'cpu_util': cpu_util, 'ram_gb': ram_util}

        if hw_type == 'gpu':
            gpu_utilization = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
            gpu_mem_used_gb = pynvml.nvmlDeviceGetMemoryInfo(handle).used/(1024**3)
            gpu_util = {'gpu_util': gpu_utilization, 'vram_gb': gpu_mem_used_gb}

            # Append GPU telemetry to system telemetry
            sys_util = sys_util | gpu_util

        # print(sys_util)
        
        # Store system logs
        log.append(sys_util)
    
    if hw_type == 'gpu':
        pynvml.nvmlShutdown()
    
    return(log)

def summarize_telemetry(resource_log, hw_type):
    """
    Summarize telemetry data.

    resource_log: list of dict
      Resource monitoring logs stored as a list of dictionary.
    stage_prefix: str
      Prefix to add to output dictionary. Choose something like 'idx_build', 'vec_search'.
    hw_type: str
      Hardware used. Select 'cpu' or 'gpu'.
    """

    results_df = pd.DataFrame(resource_log)
    sys_results = {
                   'avg_cpu_util': results_df['cpu_util'].mean(), 
                   'max_cpu_util': results_df['cpu_util'].max(),
                   'max_ram_gb': results_df['ram_gb'].max()
                  }

    if hw_type == 'gpu':
        gpu_results = {'avg_gpu_util': results_df['gpu_util'].mean(), 
                   'max_gpu_util': results_df['gpu_util'].max(),
                   'max_vram_gb': results_df['vram_gb'].max()
                  }
        sys_results = sys_results | gpu_results

    return(sys_results)

def get_telemetry(func):
    """Aquire CPU and GPU metrics during function calls."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        resource_log = []
        stop_event = threading.Event()
        monitor_thread = threading.Thread(target=monitor_resources, 
                                                args=(stop_event, resource_log, hw_type))
        start_time = time.perf_counter()
        
        # Start monitoring resources
        monitor_thread.start()
    
        # Run function
        result = func(*args, **kwargs)
    
        # Signal the monitor to stop and wait for it to finish
        stop_event.set()
        elapsed_time = time.perf_counter() - start_time
        monitor_thread.join()
    
        # Process telemetry for index build stage
        telemetry = summarize_telemetry(resource_log, hw_type)
        telemetry['duration_sec'] = elapsed_time
        return (telemetry, result)
    return wrapper

In [3]:
@get_telemetry
def load_bin(base_path, dtype):
    """
    Load embedding vectors from a binary file written by save_bin.

    Parameters:
    - base_path: Path to file (e.g. "/output/base.u8bin")
    - dtype: numpy dtype used when saving (e.g. np.float32, np.uint8, etc.)

    Returns:
    - embeddings: numpy array of shape (num_vectors, num_dimensions)
    """
    with open(base_path, 'rb') as f:
        # Read header: 2 unsigned int32, little-endian
        header = f.read(8)
        num_vectors, num_dimensions = struct.unpack('<II', header)

        # Read remaining data as flat array
        embeddings = np.fromfile(f, dtype=dtype, count=num_vectors * num_dimensions)

    return embeddings.reshape((num_vectors, num_dimensions))

@get_telemetry
def scaler_fit_transform(X):
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    return(X)
    
# Read cuvs-bench packed data
base_path = f'./datasets/{DATASET_NAME}/base.fbin'

if hw_type == 'cpu':
    import faiss
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import train_test_split

    algorithm = 'faiss-cpu_kmeans'
    telem, X = load_bin(base_path, 'float32')
elif hw_type == 'gpu':
    import cuml
    import cupy as cp
    from cuml.preprocessing import StandardScaler
    from cuml.model_selection import train_test_split
    import cuvs.cluster.kmeans as cuvs_kmeans
    from cuvs.cluster.kmeans import cluster_cost

    algorithm = 'cuvs_kmeans'
    telem, X_npy = load_bin(base_path, 'float32')
    X = cp.array(X_npy)
else:
    raise ValueError(f"hw_type '{hw_type}' unknown. Use hw_type of 'cpu' or 'gpu'.")

telem_load_data = add_prefix(telem, 'load_data_')
print(f'Data shape: {X.shape} \n')
print(telem_load_data)

# Get dataset metadata
n_rows = len(X)
dim = X.shape[1]
X_dtype = str(X[0].dtype)

# # Apply standard scaler
# telem, X = scaler_fit_transform(X)
# telem_standard_scaler = add_prefix(telem, 'standard_scaler_')

# print(telem_standard_scaler)

Data shape: (7999994, 1024) 

{'load_data_avg_cpu_util': 1.5333333333333332, 'load_data_max_cpu_util': 1.6, 'load_data_max_ram_gb': 78.31558227539062, 'load_data_avg_gpu_util': 0.0, 'load_data_max_gpu_util': 0, 'load_data_max_vram_gb': 62.1982421875, 'load_data_duration_sec': 8.203247316996567}


In [4]:
@get_telemetry
def train_kmeans(X, n_clusters, hw_type, time_delay):
    """
    Train KMeans model.
    """
    time.sleep(time_delay)
    if hw_type == 'cpu':
        # https://github.com/facebookresearch/faiss/wiki/Faiss-building-blocks:-clustering,-PCA,-quantization
        ncentroids = n_clusters # Number of centroids
        niter = max_iter
        verbose = True
        d = X.shape[1] # Number of dimensions
        faiss_kmeans = faiss.Kmeans(d, ncentroids, niter=niter, verbose=verbose, gpu=False)
        
        # Train FAISS-CPU kmeans model:
        faiss_kmeans.train(X)
        model = faiss_kmeans
    elif hw_type == 'gpu':
        # https://docs.rapids.ai/api/cuvs/nightly/python_api/cluster_kmeans/
        # Balanced k_means by setting hierarchical=True
        cuvs_kmeans_hierarchical = False

        if cuvs_kmeans_hierarchical:
            cuvs_kmeans_hierarchical_params = {'hierarchical': True, 'hierarchical_n_iters': max_iter}
        else:
            cuvs_kmeans_hierarchical_params = {}
        cuvs_kmeans_params = cuvs_kmeans.KMeansParams(n_clusters=n_clusters, 
                                                      max_iter=None, 
                                                      **cuvs_kmeans_hierarchical_params
                                                     )
        
        # Train cuVS kmeans model:
        centroids, inertia, n_iter = cuvs_kmeans.fit(cuvs_kmeans_params, X)
        model = {'params': cuvs_kmeans_params, 'centroids': centroids, 
                 'inertia': inertia, 'n_iter': n_iter}
    return(model)

@get_telemetry
def predict_kmeans(model, X, hw_type, time_delay):
    """
    Apply KMeans model to assign cluster labels.
    """
    time.sleep(time_delay)
    if hw_type == 'cpu':
        D, labels = model.index.search(X, 1)
        # D contains the squared L2 distances.
        
        # Flatten labels since we're only using first cluster assignments.
        labels =  labels.flatten()
        inertia = np.sum(D)
    elif hw_type == 'gpu':
        labels, inertia = cuvs_kmeans.predict(model['params'], X, model['centroids'])

        # inertia: sum of squared distances of samples to their closest cluster center

    results = {'labels': labels, 'inertia': inertia}
    return(results)

# Issue when function runs faster than polling rate (0.25s). 
# Need to slow function execution by adding an internal delay and reversing it. 
time_delay = 0.25

telem_store = []
for n_clusters in [10, 100, 1000]:
    telem, kmeans_model = train_kmeans(X, n_clusters, hw_type, time_delay)
    telem['duration_sec'] = telem['duration_sec'] - time_delay
    telem_kmeans_train = add_prefix(telem, 'kmeans_train_')
    
    telem, kmeans_result = predict_kmeans(kmeans_model, X, hw_type, time_delay)
    telem['duration_sec'] = telem['duration_sec'] - time_delay
    telem_kmeans_predict = add_prefix(telem, 'kmeans_predict_')
    
    # Get cluster labels
    labels = kmeans_result['labels']

    telem_stages = telem_load_data | telem_kmeans_train | telem_kmeans_predict #| telem_silhouette_score

    # telem_stages = telem_load_data | telem_standard_scaler | telem_kmeans_train | telem_kmeans_predict #| telem_silhouette_score
    metadata = {
        'dataset': DATASET_NAME,
        'n_rows': n_rows,
        'dimension': dim,
        'dtype': X_dtype,
        'hw_type': hw_type,
        'algorithm': algorithm, 
        'num_clusters': n_clusters,
        'inertia': kmeans_result['inertia']
    }
    
    telem_store.append(metadata | telem_stages)

# Write results to disk
telem_store_df = pd.DataFrame(telem_store)
telem_store_df.to_csv(f'./results/{hw_type}_{DATASET_NAME}.csv',index=False)

CuvsException: parallel_for failed: cudaErrorIllegalAddress: an illegal memory access was encountered

In [None]:
from cuvs.cluster.kmeans import cluster_cost

inertia = cluster_cost(X, kmeans_model['centroids'])
inertia

In [None]:
break

In [None]:
# Read all csv files in directory and merge them
results_csv = glob.glob("./results/*.csv")
results_csv.sort()

merged_results = pd.concat([pd.read_csv(fn) for fn in results_csv]).reset_index(drop=True)
merged_results.to_csv('merged_kmeans_results.csv', float_format='%.3f', index=False)

In [None]:
# UMAP projections
from cuml.manifold.umap import UMAP
 
# Running default UMAP. Runs with NN Descent if data has more than 50K points
umap = UMAP(n_neighbors=16)

start_time = time.perf_counter()
emb  = umap.fit_transform(X_test)
umap_fit_transform_time = time.perf_counter() - start_time

print(f'UMAP fit + transform time: {umap_fit_transform_time:0.2f} s')

In [None]:
# Plot UMAP projection with cluster colorized