In [None]:
import numpy as np
import time
from sklearn.datasets import load_digits
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import pairwise_distances, accuracy_score
from sklearn.model_selection import train_test_split
from multiprocessing import Pool, cpu_count
import pandas as pd

In [None]:
# --- Global variables for worker processes ---
_global_data = None
_global_y = None

In [None]:
def init_worker(data, y):
    """Initialize each worker with the dataset once."""
    global _global_data, _global_y
    _global_data = data
    _global_y = y

def knn_batch(args):
    """Worker function (now returns a 2D array of labels)."""
    query_batch, k = args
    dists = pairwise_distances(query_batch, _global_data)
    indices = np.argsort(dists, axis=1)[:, :k]
    return _global_y[indices]  # Directly return 2D array (shape: [batch_size, k])

def parallel_knn(queries, k, n_jobs=4):
    """Parallel KNN with shared data."""
    query_batches = np.array_split(queries, n_jobs)
    args = [(q, k) for q in query_batches]
    with Pool(processes=n_jobs, initializer=init_worker, initargs=(_global_data, _global_y)) as pool:
        results = pool.map(knn_batch, args)
    return np.vstack(results)  # Now works with 2D arrays

def load_cifar_csv(csv_path='C:/Users/Admin/Desktop/pdcproj/cifar10_spark/cifar10.csv'):
    df = pd.read_csv(csv_path)
    X = df.drop('label', axis=1).values
    y = df['label'].values
    X = X.astype('float32') / 255.0
    return X, y

In [None]:
# --- Main script ---
def benchmark_knn(X, y):
    test_fraction_results = []
    dataset_size_results = []

    test_sizes = np.linspace(0.05, 0.5, 10)
    train_fracs = np.linspace(0.1, 1.0, 10)

    # Fixed k for this benchmark
    k = 5

    # --- Test fraction benchmark ---
    for test_frac in test_sizes:
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_frac, stratify=y, shuffle=True, random_state=42)
        _global_data = X_train
        _global_y = y_train

        # Parallel KNN
        start = time.time()
        knn_indices_parallel = parallel_knn(X_test, k, n_jobs=4)
        parallel_time = time.time() - start

        # Sklearn KNN
        knn = KNeighborsClassifier(n_neighbors=k)
        knn.fit(X_train, y_train)
        start = time.time()
        _ = knn.predict(X_test)
        sklearn_time = time.time() - start

        test_fraction_results.append((test_frac, parallel_time, sklearn_time))

    # --- Dataset size benchmark ---
    X_full_train, X_test, y_full_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, shuffle=True, random_state=42)

    for frac in train_fracs:
        n_samples = int(len(X_full_train) * frac)
        X_train = X_full_train[:n_samples]
        y_train = y_full_train[:n_samples]
        _global_data = X_train
        _global_y = y_train

        # Parallel KNN
        start = time.time()
        knn_indices_parallel = parallel_knn(X_test, k, n_jobs=4)
        parallel_time = time.time() - start

        # Sklearn KNN
        knn = KNeighborsClassifier(n_neighbors=k)
        knn.fit(X_train, y_train)
        start = time.time()
        _ = knn.predict(X_test)
        sklearn_time = time.time() - start

        dataset_size_results.append((len(X_train), parallel_time, sklearn_time))

    # --- Plotting ---
    test_frac_arr = np.array(test_fraction_results)
    data_size_arr = np.array(dataset_size_results)

    plt.figure(figsize=(14, 6))

    plt.subplot(1, 2, 1)
    plt.plot(test_frac_arr[:, 0], test_frac_arr[:, 1], 'o-', label='Parallel')
    plt.plot(test_frac_arr[:, 0], test_frac_arr[:, 2], 's-', label='Sklearn')
    plt.xlabel('Test Set Fraction')
    plt.ylabel('Prediction Time (s)')
    plt.title('Effect of Test Set Size on Prediction Time')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(data_size_arr[:, 0], data_size_arr[:, 1], 'o-', label='Parallel')
    plt.plot(data_size_arr[:, 0], data_size_arr[:, 2], 's-', label='Sklearn')
    plt.xlabel('Training Set Size')
    plt.ylabel('Prediction Time (s)')
    plt.title('Effect of Training Set Size on Prediction Time')
    plt.legend()

    plt.tight_layout()
    plt.show()

    return test_fraction_results, dataset_size_results

# Load data and run benchmark
X, y = load_cifar_csv()
test_fraction_results, dataset_size_results = benchmark_knn(X, y)