In [41]:
import numpy as np
import cupy as cp
import torch
import time

In [2]:
def benchmark(fn, *args, func_type = "cupy", n=1000, device='cuda'):    
    # Warm-up
    for _ in range(10):
        _ = fn(*args)
    if func_type == "torch" and device == 'cuda':
        torch.cuda.synchronize()
    if func_type=="cupy":
        cp.cuda.stream.get_current_stream().synchronize()
    # Timed run
    start = time.time()
    for _ in range(n):
        _ = fn(*args)
    end = time.time()
    if func_type == "torch" and device == 'cuda':
        torch.cuda.synchronize()
    if func_type=="cupy":
        cp.cuda.stream.get_current_stream().synchronize()
    return end - start

In [3]:
@torch.jit.script
def torch_add(a,b):
    return a + b

In [4]:
vec_add =r"""                  
extern "C" __global__
void vector_add(const float* a, const float* b, float* c, int n) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx < n) {
        c[idx] = a[idx] + b[idx];
    }
}
"""
add_kernel=cp.RawKernel(vec_add, 'vector_add')
def cupy_add(a,b):
        n = a.shape[0]
        threads_per_block = min(n, 1024)  # Max threads per block
        blocks_per_grid = 1  # Single block
        y = cp.empty_like(a)  # Pre-allocate output
        add_kernel((blocks_per_grid,), (threads_per_block,), (a, b, y, n))
        return y

In [5]:
def compare(fn1, fn2):
    for n in [64,16384,65536,262144,4194304]:
        print("----------")
        print("dim =", n)
        x1= cp.arange(n).astype(cp.float32)
        x2 = cp.arange(n).astype(cp.float32)
        cupy_time = benchmark(fn1,x1,x2)
        print("cupy:",cupy_time)
        del x1
        del x2
        x1= torch.arange(n, device="cuda", dtype=torch.float32)
        x2= torch.arange(n, device="cuda", dtype=torch.float32)
        torch_time = benchmark(fn2, x1,x2, func_type="torch")
        print("torch:", torch_time)
        print("Speedup:", torch_time/cupy_time)
        del x1
        del x2

In [6]:
compare(cupy_add, torch_add)

----------
dim = 64
cupy: 0.025129318237304688
torch: 0.015166282653808594
Speedup: 0.6035294117647059
----------
dim = 16384
cupy: 0.026528358459472656
torch: 0.01592850685119629
Speedup: 0.6004331883380667
----------
dim = 65536
cupy: 0.02687692642211914
torch: 0.01564788818359375
Speedup: 0.5822052692273574
----------
dim = 262144
cupy: 0.03642463684082031
torch: 0.015377521514892578
Speedup: 0.42217363983871814
----------
dim = 4194304
cupy: 0.037996530532836914
torch: 0.026365995407104492
Speedup: 0.6939053391813966


In [6]:
dot_product_code = """
extern "C" __global__
void dot_product(const float* x, const float* y, float* result, int n) {
    __shared__ float shared_mem[256];
    
    int tid = threadIdx.x;
    int i = blockIdx.x * blockDim.x + threadIdx.x;
    
    float temp = 0;
    while (i < n) {
        temp += x[i] * y[i];
        i += blockDim.x * gridDim.x;
    }
    
    shared_mem[tid] = temp;
    __syncthreads();
    
    // Parallel reduction in shared memory
    for (int s = blockDim.x / 2; s > 0; s >>= 1) {
        if (tid < s) {
            shared_mem[tid] += shared_mem[tid + s];
        }
        __syncthreads();
    }
    
    if (tid == 0) {
        atomicAdd(result, shared_mem[0]);
    }
}
"""
dot_product= cp.RawKernel(dot_product_code, "dot_product")

def cupy_dot(a,b):
    n = a.shape[0]
    threads_per_block = 256
    blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
    result = cp.zeros(1, dtype=cp.float32)
    dot_product(
        (blocks_per_grid,), 
        (threads_per_block,), 
        (a, b, result, n)
    )
    return result

In [7]:
@torch.jit.script
def torch_dot(X: torch.Tensor,Y: torch.Tensor) -> torch.Tensor:
    if X.dim() > 1 and Y.dim() > 1:
        return torch.sum(X*Y, dim=-1)
    return X @ Y

In [9]:
compare(cupy_dot, torch_dot)

----------
dim = 64
cupy: 0.04149889945983887
torch: 0.035501956939697266
Speedup: 0.8554915287345095
----------
dim = 16384
cupy: 0.037999629974365234
torch: 0.035997629165649414
Speedup: 0.9473152551731061
----------
dim = 65536
cupy: 0.037999868392944336
torch: 0.036501169204711914
Speedup: 0.9605604110852475
----------
dim = 262144
cupy: 0.05249929428100586
torch: 0.03749871253967285
Speedup: 0.7142707926502512
----------
dim = 4194304
cupy: 0.03749966621398926
torch: 0.053003549575805664
Speedup: 1.4134405696665289


In [8]:
cupy_dot(cp.array([1,2,3], dtype=cp.float32), cp.array([1,2,3], dtype=cp.float32))

array([14.], dtype=float32)

In [10]:
distance_l2 = """
extern "C" __global__
void distance(const float* x, const float* y, float* result, int n) {
    __shared__ float shared_mem[256];
    
    int tid = threadIdx.x;
    int i = blockIdx.x * blockDim.x + threadIdx.x;
    
    float temp = 0;
    while (i < n) {
        float diff = x[i] - y[i];
        temp += diff * diff;
        i += blockDim.x * gridDim.x;
    }
    
    shared_mem[tid] = temp;
    __syncthreads();
    
    // Parallel reduction in shared memory
    #pragma unroll
    for (int s = blockDim.x / 2; s > 0; s >>= 1) {
        if (tid < s) {
            shared_mem[tid] += shared_mem[tid + s];
        }
        __syncthreads();
    }
    
    if (tid == 0) {
        atomicAdd(result, shared_mem[0]);
        // Only one thread in the entire grid should compute the sqrt
        __threadfence();  // Ensure all atomicAdds complete
        if (blockIdx.x == 0 && threadIdx.x == 0) {
            *result = sqrtf(*result);  // sqrtf for float
        }
    }
}
"""
l2= cp.RawKernel(distance_l2, "distance")

def cupy_l2(a,b):
    n = a.shape[0]
    threads_per_block = 256
    blocks_per_grid = (n + threads_per_block - 1) // threads_per_block
    result = cp.zeros(1, dtype=cp.float32)
    l2(
        (blocks_per_grid,), 
        (threads_per_block,), 
        (a, b, result, n)
    )
    return result

In [11]:
@torch.jit.script
def torch_l2(X,Y):
    return (X-Y).norm(2)

In [11]:
print(cupy_l2(cp.array([0,1,2,3,4], dtype=cp.float32), cp.array([0,1,2,0,0], dtype=cp.float32)))

[5.]


In [12]:
compare(cupy_l2, torch_l2)

----------
dim = 64
cupy: 0.03850197792053223
torch: 0.04799962043762207
Speedup: 1.2466793403885095
----------
dim = 16384
cupy: 0.03949856758117676
torch: 0.048999786376953125
Speedup: 1.2405459078041154
----------
dim = 65536
cupy: 0.04100179672241211
torch: 0.05150032043457031
Speedup: 1.256050333189901
----------
dim = 262144
cupy: 0.04299807548522949
torch: 0.05500030517578125
Speedup: 1.2791341136808485
----------
dim = 4194304
cupy: 0.038001298904418945
torch: 0.08900141716003418
Speedup: 2.3420625011763674


In [13]:
d_l2_batch = """
extern "C" __global__
void batch_distance(const float* x, const float* y, float* output, const int N, const int D) {
    extern __shared__ float shared_mem[];
    
    int tid = threadIdx.x;
    int i = blockIdx.x;  // Each block handles one vector x[i]
    int offset = i * D;  // Starting index for x[i] in memory
    
    float temp = 0.0f;
    int j = tid;        // Thread-local index for vector components
    
    // Grid-stride loop across vector components
    while (j < D) {
        float diff = x[offset + j] - y[j];
        temp += diff * diff;
        j += blockDim.x;  // Jump by threads-per-block
    }
    
    shared_mem[tid] = temp;
    __syncthreads();
    
    // Parallel reduction within the block
    for (int s = blockDim.x / 2; s > 0; s >>= 1) {
        if (tid < s) {
            shared_mem[tid] += shared_mem[tid + s];
        }
        __syncthreads();
    }
    
    // Write final result for this vector
    if (tid == 0) {
        output[i] = sqrtf(shared_mem[0]);
    }
}
"""
l2_b= cp.RawKernel(d_l2_batch, "batch_distance")

def cupy_l2_b(x,y):
    N, D = x.shape
    threads_per_block = 256  # Can experiment with 128-1024
    blocks_per_grid = N
    
    # Output array (one distance per vector)
    output = cp.zeros(N, dtype=cp.float32)
    
    # Calculate required shared memory (1 float per thread)
    shared_mem_size = threads_per_block * cp.dtype(cp.float32).itemsize
    
    # Launch kernel
    l2_b(
        (blocks_per_grid,),       # One block per vector
        (threads_per_block,),     # Threads per block
        (x, y, output, N, D),     # Arguments
        shared_mem=shared_mem_size
    )
    
    return output

In [14]:
N, D = 2, 3
x = cp.random.rand(N, D).astype(cp.float32)
y = cp.random.rand(D).astype(cp.float32)

distances = cupy_l2_b(x, y)
print(distances)  # (10000,)
print(x, y)

[0.576986  0.4369282]
[[0.0802488  0.40076688 0.5116273 ]
 [0.60504085 0.45253175 0.46711785]] [0.529375   0.10946937 0.7269046 ]


In [15]:
def add_parameters(code, params):
    for k, v in params.items():
        code = '#define ' + k + ' ' + str(v) + '\n' + code
    return code

In [16]:
knn_code = r'''
    extern "C" __global__
    void knn_topk_kernel(const float* distances, int* indices, const int K, const int N) {
        int tid = threadIdx.x + blockIdx.x * blockDim.x;
        
        if (tid == 0) {
            float* temp_dist = new float[N];  // Copy of the distances
            for (int i = 0; i < N; i++) {
                temp_dist[i] = distances[i];
            }

            for (int k = 0; k < K; k++) {
                float min_dist = 1e10f;
                int min_idx = -1;

                for (int i = 0; i < N; i++) {
                    if (temp_dist[i] < min_dist) {
                        min_dist = temp_dist[i];
                        min_idx = i;
                    }
                }

                if (min_idx != -1) {
                    indices[k] = min_idx;  // Store the index of the nearest neighbor
                    temp_dist[min_idx] = 1e10f;  // Mark it as selected
                }
            }

            delete[] temp_dist;  // Free memory
        }
    }

    '''
knn = cp.RawKernel(knn_code, "knn_topk_kernel")
def cupy_knn(N, D, A, X, K, device=torch.device("cuda")):    
    # Step 1: Calculate the distances using the first kernel
    distances = cp.zeros(N, dtype=cp.float32)
    indices = cp.zeros(K, dtype=cp.int32)  # Store K nearest indices

    # Configure block and grid sizes for the distance calculation kernel
    block_size = 256
    grid_size = (N + block_size - 1) // block_size
    threads_per_block = 256  # Can experiment with 128-1024
    blocks_per_grid = N
    shared_mem_size = threads_per_block * cp.dtype(cp.float32).itemsize
    # Run the kernel to calculate the distances
    l2_b(
        (blocks_per_grid,),       # One block per vector
        (threads_per_block,),     # Threads per block
        (A, X, distances, N, D),     # Arguments
        shared_mem=shared_mem_size
    )

    # Step 2: Apply the second kernel to find the K nearest neighbors by distance
    grid_size_topk = (K + block_size - 1) // block_size
    knn((grid_size_topk,), (block_size,), (distances, indices, K, N))

    return indices  # Return the indices of the K closest neighbors

In [17]:
def knn_cupy_optimized_kernel(N, D, A, X, K, device=torch.device("cuda")):
    kernel_code = '''
    extern "C" __global__
    void knn_kernel(const float* A, const float* X, float* distances, int N, int D) {
        int idx = threadIdx.x + blockIdx.x * blockDim.x;
        if (idx < N) {
            float sum = 0.0f;
            for (int d = 0; d < D; d++) {
                float diff = A[idx * D + d] - X[d];
                sum += diff * diff;
            }
            distances[idx] = sqrtf(sum);  // L2 distance
        }
    }
    extern "C" __global__
    void knn_topk_kernel(const float* distances, int* indices, int K, int N) {
        int tid = threadIdx.x + blockIdx.x * blockDim.x;
        
        if (tid == 0) {  
            float* temp_dist = new float[N];  // Copy of the distances
            for (int i = 0; i < N; i++) {
                temp_dist[i] = distances[i];
            }

            for (int k = 0; k < K; k++) {
                float min_dist = 1e10f;
                int min_idx = -1;

                for (int i = 0; i < N; i++) {
                    if (temp_dist[i] < min_dist) {
                        min_dist = temp_dist[i];
                        min_idx = i;
                    }
                }

                if (min_idx != -1) {
                    indices[k] = min_idx;  // Store the index of the nearest neighbor
                    temp_dist[min_idx] = 1e10f;  // Mark it as selected
                }
            }

            delete[] temp_dist;  // Free memory
        }
    }

    '''
    
    mod = cp.RawModule(code=kernel_code)
    func_knn = mod.get_function('knn_kernel')
    func_topk = mod.get_function('knn_topk_kernel')

    # Step 1: Calculate the distances using the first kernel
    distances = cp.zeros(N, dtype=cp.float32)
    indices = cp.zeros(K, dtype=cp.int32)  # Store K nearest indices

    # Configure block and grid sizes for the distance calculation kernel
    block_size = 256
    grid_size = (N + block_size - 1) // block_size
    
    # Run the kernel to calculate the distances
    func_knn((grid_size,), (block_size,), (A, X, distances, N, D))

    # Step 2: Apply the second kernel to find the K nearest neighbors by distance
    grid_size_topk = (K + block_size - 1) // block_size
    func_topk((grid_size_topk,), (block_size,), (distances, indices, K, N))

    return indices  # Return the indices of the K closest neighbors

In [18]:
N, D = 100,10000
K = 3
A = cp.random.random((N,D), dtype=cp.float32)
X = cp.random.random((K,D), dtype=cp.float32)

cupy_knn(N,D,A,X,K)

array([39, 44,  4], dtype=int32)

In [19]:
l2_closest_kernel = """
extern "C" __global__
void closest_index(const float* x, float* y, int* output, const int N, const int M, const int D) {
    extern __shared__ char shared_mem[];
    float* shared_dist = (float*)shared_mem;
    int* shared_idx = (int*)(shared_dist + blockDim.x);

    int tid = threadIdx.x;
    int i = blockIdx.x;  // Each block handles one vector x[i]
    int num_threads = blockDim.x;

    float min_dist = 99999999.0;
    int min_index = -1;

    // Grid-stride loop over M y vectors
    for (int j = tid; j < M; j += num_threads) {
        float sum_sq = 0.0f;
        for (int k = 0; k < D; ++k) {
            float diff = x[i * D + k] - y[j * D + k];
            sum_sq += diff * diff;
        }
        if (sum_sq < min_dist) {
            min_dist = sum_sq;
            min_index = j;
        }
    }

    // Store local min into shared memory
    shared_dist[tid] = min_dist;
    shared_idx[tid] = min_index;
    __syncthreads();

    // Parallel reduction to find the global min
    for (int s = num_threads / 2; s > 0; s >>= 1) {
        if (tid < s) {
            if (shared_dist[tid + s] < shared_dist[tid]) {
                shared_dist[tid] = shared_dist[tid + s];
                shared_idx[tid] = shared_idx[tid + s];
            }
        }
        __syncthreads();
    }

    // Write the result
    if (tid == 0) {
        output[i] = shared_idx[0];
    }
}
"""

closest_kernel = cp.RawKernel(l2_closest_kernel, "closest_index")

def cupy_closest_index(x, y):
    N, D = x.shape
    M, D_y = y.shape
    assert D == D_y, "Dimensions of x and y must match"
    
    threads_per_block = 256  # Use a power of two for optimal reduction
    blocks_per_grid = N
    
    output = cp.zeros(N, dtype=cp.int32)
    
    # Calculate shared memory size: floats + ints per thread
    shared_mem_size = (threads_per_block * cp.dtype(cp.float32).itemsize +
                       threads_per_block * cp.dtype(cp.int32).itemsize)
    
    closest_kernel(
        (blocks_per_grid,),
        (threads_per_block,),
        (x, y, output, N, M, D),
        shared_mem=shared_mem_size
    )
    
    return output

In [20]:
cupy_closest_index(A, X)

array([0, 1, 0, 0, 0, 1, 0, 2, 1, 2, 2, 2, 2, 0, 1, 0, 1, 1, 1, 0, 1, 2,
       2, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 2, 1, 0, 1, 0, 0, 1, 0, 0, 2,
       0, 2, 1, 2, 2, 0, 0, 2, 1, 1, 0, 1, 0, 1, 2, 1, 0, 1, 1, 2, 1, 1,
       1, 1, 1, 1, 0, 0, 2, 1, 1, 0, 1, 0, 2, 1, 2, 2, 0, 1, 1, 0, 1, 1,
       0, 2, 1, 0, 1, 2, 0, 1, 1, 1, 1, 2], dtype=int32)

In [21]:
A[:3], X

(array([[0.08890758, 0.41564733, 0.28502268, ..., 0.7871353 , 0.12632202,
         0.51791453],
        [0.6050231 , 0.29184753, 0.51308817, ..., 0.25733826, 0.8296473 ,
         0.82374144],
        [0.62811095, 0.6549251 , 0.12396739, ..., 0.19293967, 0.45274127,
         0.55706614]], dtype=float32),
 array([0.06871646, 0.4052225 , 0.8851587 , ..., 0.4769703 , 0.4420246 ,
        0.53318596], dtype=float32))

In [17]:
def torch_knn(N,D,A,X,K):        
    def g(A_prime): 
        def f(Y):
            return torch_l2(X,Y)
        distance = torch.vmap(f)(A_prime)
        _, indices = distance.sort()
        return indices[:K]
    if A.shape[0] != N:
        return torch.vmap(g)(A)
    return g(A)
    

In [23]:
def compare2(fn1, fn2):
    for n in [64,16384,65536]:
        print("----------")
        d=1000
        k=3
        print("dim =", d, "n=", n)        
        x1= torch.rand((n,d), device="cuda", dtype=torch.float32)
        x2= torch.rand(d, device="cuda", dtype=torch.float32)
        torch_time = benchmark(fn2,n,d,x1,x2,k,func_type="torch")
        print("torch:", torch_time)        
        del x1
        del x2
        x1 = cp.random.random((n, d)).astype(cp.float32)
        x2 = cp.random.random(d).astype(cp.float32)
        cupy_time = benchmark(fn1,n,d,x1,x2,k)
        print("cupy:",cupy_time)
        print("Speedup:", torch_time/cupy_time)
        del x1
        del x2

In [18]:
def compare3(fn1,fn2):
    for n in [64,16384,65536]:
        print("----------")
        d=1000
        k=3
        print("dim =", d, "n=", n)
        x1 = cp.random.random((n, d)).astype(cp.float32)
        x2 = cp.random.random(d).astype(cp.float32)
        cupy_time = benchmark(fn1,n,d,x1,x2,k)
        print("cupy:",cupy_time)
        cupy_time_2 = benchmark(fn2,n,d,x1,x2,k)
        print("cupy 2:", cupy_time_2)
        print("Speedup:", cupy_time_2/cupy_time)
        del x1
        del x2

In [25]:
compare2(cupy_knn, torch_knn)

----------
dim = 1000 n= 64


torch: 0.22306132316589355
cupy: 0.0513768196105957
Speedup: 4.34167246739988
----------
dim = 1000 n= 16384
torch: 0.5670590400695801
cupy: 0.056891441345214844
Speedup: 9.967387477998491
----------
dim = 1000 n= 65536
torch: 2.1910436153411865
cupy: 0.05539703369140625
Speedup: 39.55164147500344


In [19]:
compare3(cupy_knn,knn_cupy_optimized_kernel)

----------
dim = 1000 n= 64
cupy: 0.052050113677978516
cupy 2: 0.06259870529174805
Speedup: 1.202662220471431
----------
dim = 1000 n= 16384
cupy: 0.055298805236816406
cupy 2: 0.05983686447143555
Speedup: 1.0820643269811159
----------
dim = 1000 n= 65536
cupy: 0.0556178092956543
cupy 2: 0.058794498443603516
Speedup: 1.0571164018895909


In [79]:
def cupy_kmeans(N,D,A,K, max_iters=100, tol=1e-4, random_state=None):
    """
    K-Means clustering using CuPy and a custom CUDA kernel for the assignment step.
    
    Args:
        data (cupy.ndarray): Input data of shape (N, D).
        n_clusters (int): Number of clusters.
        max_iters (int): Maximum number of iterations.
        tol (float): Tolerance for convergence checking.
        random_state (int): Seed for random number generation.
    
    Returns:
        centroids (cupy.ndarray): Final centroids of shape (n_clusters, D).
        assignments (cupy.ndarray): Indices of closest clusters for each data point.
    """
    # Ensure data is float32 for CUDA kernel compatibility

    # Initialize centroids by randomly selecting data points
    rng = cp.random.RandomState(seed=random_state)
    indices = rng.choice(N, size=K, replace=False)
    centroids = A[indices].copy()
    prev_centroids = cp.empty_like(centroids)
    assignments = cp.zeros(N, dtype=cp.int32)

    for _ in range(max_iters):
        prev_centroids = centroids  # Save current centroids for convergence check
        assignments = cupy_closest_index(A, centroids)

        mask = cp.zeros((N, K), cp.float32)
        mask[cp.arange(N), assignments] = 1.0
        counts = mask.sum(axis=0)
        sums = cp.matmul(mask.T, A)
        new_centroids = sums/cp.expand_dims(counts, 1)
        centroid_shift = cp.linalg.norm(new_centroids - centroids, axis=1).max()
        if centroid_shift <= tol:
            break
        
        centroids = new_centroids

    return centroids, assignments

In [80]:
N, D = 100,10000
K = 3
A = np.random.random((N,D))
A_cp = cp.array(A, dtype=cp.float32)
cupy_kmeans(N,D,A_cp,K)

(array([[0.51815706, 0.39126608, 0.4591676 , ..., 0.55887026, 0.5351173 ,
         0.4687856 ],
        [0.48987067, 0.4051168 , 0.45776987, ..., 0.4972301 , 0.51798433,
         0.6031835 ],
        [0.38664937, 0.5449114 , 0.45332503, ..., 0.537874  , 0.34532708,
         0.52378136]], dtype=float32),
 array([1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 2, 1, 1, 2,
        0, 1, 0, 0, 0, 1, 0, 0, 2, 0, 1, 0, 2, 0, 1, 2, 0, 0, 1, 1, 2, 0,
        0, 1, 0, 1, 0, 1, 1, 0, 2, 0, 2, 0, 0, 1, 1, 1, 2, 2, 1, 0, 0, 2,
        1, 1, 1, 0, 1, 0, 1, 1, 2, 2, 0, 0, 0, 1, 1, 1, 2, 1, 1, 1, 2, 0,
        1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0], dtype=int32))

In [92]:
def cupy_ann(N, D, A, X, K, assignments=None, means=None):
    if assignments is None or means is None:
        means, assignments = cupy_kmeans(N, D, A, 5)
    
    # Compute distances from means to X (shape: n_means x N)
    distances = cupy_l2_b(X, means)
    
    # Determine the nearest 2 means for each data point
    nearest_k = 2
    indices = cp.empty((N, nearest_k), dtype=cp.int32)
    block_size = 256
    grid_size_topk = (N + block_size - 1) // block_size
    knn((grid_size_topk,), (block_size,), (distances, indices, nearest_k, N))
    
    # Create mask where assignment is one of the top 2 nearest means
    mask = indices[assignments]
    original_indices = cp.nonzero(mask)[0]
    filtered = A[original_indices]
    
    # Compute approximate KNN on the filtered data
    approx_knn = cupy_knn(filtered.shape[0], D, filtered, X, K)
    
    # Map indices back to the original dataset
    return original_indices[approx_knn]
 


In [102]:
cupy_ann(N,D,A_cp,X,K)

array([21, 96, 61])