In [1]:
import os
import sys
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Add current directory to path to ensure we can import our library
if os.getcwd() not in sys.path:
    sys.path.append(os.getcwd())

# Import our custom K-means library
from kmeans_scalability.models import (
    SparkMLKMeansModel,
    LocalNumPyKMeans,
    CustomRDDKMeansModel,
    CustomDataFrameKMeansModel,
    OptimizedDataFrameKMeansModel
)

# Configure Python executable for Spark to avoid environment mismatches
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

print(f"Python: {sys.version}")
print("Custom library imported successfully")

Python: 3.11.14 (main, Dec  9 2025, 18:59:10) [MSC v.1944 64 bit (AMD64)]
Custom library imported successfully


In [2]:
# Create a dedicated temporary directory for Spark
spark_temp_dir = os.path.join(os.getcwd(), "spark_temp")
os.makedirs(spark_temp_dir, exist_ok=True)

# Initialize Spark Session with sufficient memory
spark = SparkSession.builder \
    .appName("KMeans-SizeUp-Benchmark") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.local.dir", spark_temp_dir) \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")

Spark Version: 3.5.7
Master: local[*]


In [3]:
# Load the parquet data generated in the previous notebook
# We assume 'embeddings_temp.parquet' exists from the main notebook run
parquet_path = "embeddings_temp.parquet"

if not os.path.exists(parquet_path):
    print(f"Error: {parquet_path} not found. Please run the data generation steps in spark_kmeans_scalability.ipynb first.")
else:
    print(f"Loading data from {parquet_path}...")
    df_raw = spark.read.parquet(parquet_path)
    
    # Identify feature columns (0 to 127)
    sample_row = df_raw.first()
    feature_cols = [c for c in df_raw.columns if c != 'id' and c != 'index']
    
    # Vector Assembler
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    df = assembler.transform(df_raw).select("features")
    
    # Cache for performance during repeated sampling
    df.cache()
    print(f"Total rows: {df.count():,}")
    print("Data loaded and cached.")

Loading data from embeddings_temp.parquet...
Total rows: 1,879,040
Data loaded and cached.


In [4]:
# Define common parameters
k = 4
max_iter = 10
seed = 42

# Define the models to benchmark
models_to_test = {
    "Local NumPy (Baseline)": LocalNumPyKMeans(k=k, max_iter=max_iter, seed=seed),
    "Spark MLlib": SparkMLKMeansModel(k=k, max_iter=max_iter, seed=seed),
    "Custom RDD": CustomRDDKMeansModel(k=k, max_iter=max_iter, seed=seed),
    "Custom DataFrame (UDF)": CustomDataFrameKMeansModel(k=k, max_iter=max_iter, seed=seed),
    # Uncomment if you want to test the optimized version too
    # "Custom DataFrame (Optimized)": OptimizedDataFrameKMeansModel(k=k, max_iter=max_iter, seed=seed)
}

print("Benchmarking the following models:")
for name in models_to_test:
    print(f" - {name}")

Benchmarking the following models:
 - Local NumPy (Baseline)
 - Spark MLlib
 - Custom RDD
 - Custom DataFrame (UDF)


In [5]:
# Define sampling fractions for SizeUp test
test_fractions = [0.1, 0.25, 0.5, 0.75, 1.0] 
# Note: For very large datasets, start smaller [0.01, 0.05, 0.1] to avoid waiting too long

results = []

print("Starting SizeUp Scalability Benchmark...")
print("="*80)

for fraction in test_fractions:
    # 1. Sample the data
    print(f"\nSampling fraction: {fraction:.2%}")
    sample_df = df.sample(withReplacement=False, fraction=fraction, seed=seed)
    # Collect numpy version for Local model ONCE per fraction
    # WARNING: Only do this if data fits in driver memory. For huge data, skip LocalNumPy.
    rows_count = sample_df.count()
    print(f"  Rows: {rows_count:,}")
    
    # Pre-collect data for local model to be fair (don't count collection time as training)
    try:
        if rows_count < 20000000000000: # Safety limit for local array
            local_data = sample_df.collect() 
        else:
            print("  Skipping Local NumPy for this size (limit exceeded)")
            local_data = None
    except Exception as e:
        print(f"  Could not collect local data: {e}")
        local_data = None

    for model_name, model_instance in models_to_test.items():
        print(f"  -> Running {model_name}...", end=" ", flush=True)
        
        try:
            # Handle input type differences
            if "Local" in model_name:
                if local_data is None:
                    print("SKIPPED (Data too large)")
                    continue
                input_data = local_data
            else:
                input_data = sample_df
            
            # Train and Measure
            model_instance.fit(input_data)
            
            # Record result
            results.append({
                "Fraction": fraction,
                "Samples": rows_count,
                "Model": model_name,
                "Time": model_instance.training_time,
                "WSSSE": getattr(model_instance, 'training_cost', 0) or 0
            })
            print(f"Done ({model_instance.training_time:.2f}s)")
            
        except Exception as e:
            print(f"FAILED: {e}")

print("\n" + "="*80)
print("Benchmark Complete.")

Starting SizeUp Scalability Benchmark...

Sampling fraction: 10.00%


  Rows: 187,887
  -> Running Local NumPy (Baseline)... Training Local NumPy K-means with k=4...
Training completed in 7.96 seconds
WSSSE: 54143693.71
Done (7.96s)
  -> Running Spark MLlib... Training Spark ML K-means with k=4...
Training completed in 10.29 seconds
Within Set Sum of Squared Errors (WSSSE): 54141873.50
Done (10.29s)
  -> Running Custom RDD... Training Custom RDD K-means with k=4...
Training completed in 444.20 seconds
Done (444.20s)
  -> Running Custom DataFrame (UDF)... Training Custom DataFrame K-means (UDF) with k=4...




Training completed in 330.61 seconds
Done (330.61s)

Sampling fraction: 25.00%
  Rows: 469,447
  -> Running Local NumPy (Baseline)... Training Local NumPy K-means with k=4...
Training completed in 35.63 seconds
WSSSE: 135798292.18
Done (35.63s)
  -> Running Spark MLlib... Training Spark ML K-means with k=4...
Training completed in 19.10 seconds
Within Set Sum of Squared Errors (WSSSE): 135184739.51
Done (19.10s)
  -> Running Custom RDD... Training Custom RDD K-means with k=4...
Training completed in 468.91 seconds
Done (468.91s)
  -> Running Custom DataFrame (UDF)... Training Custom DataFrame K-means (UDF) with k=4...
Training completed in 502.54 seconds
Done (502.54s)

Sampling fraction: 50.00%
  Rows: 939,656
  -> Running Local NumPy (Baseline)... Training Local NumPy K-means with k=4...
Training completed in 56.46 seconds
WSSSE: 270564706.51
Done (56.46s)
  -> Running Spark MLlib... Training Spark ML K-means with k=4...
Training completed in 32.31 seconds
Within Set Sum of Squared E

: 

: 

In [None]:
# Create DataFrame from results
results_df = pd.DataFrame(results)

# Display raw results
print(results_df)

# Pivot for easier plotting if needed
pivot_df = results_df.pivot(index='Samples', columns='Model', values='Time')
print("\nTime Matrix (seconds):")
print(pivot_df)

In [None]:
# Plot Scalability Results
plt.figure(figsize=(12, 8))

# Get unique models
models = results_df['Model'].unique()
colors = plt.cm.tab10(np.linspace(0, 1, len(models)))

# Plot lines for each model
for i, model_name in enumerate(models):
    subset = results_df[results_df['Model'] == model_name].sort_values('Samples')
    plt.plot(subset['Samples'], subset['Time'], 'o-', linewidth=2, label=model_name, color=colors[i])

# Add Ideal Linear Scalability Line
# Based on the fastest model at the smallest sample size
min_samples = results_df['Samples'].min()
if not results_df.empty:
    baseline_time = results_df[results_df['Samples'] == min_samples]['Time'].min()
    
    # Calculate ideal line y = m*x + c passing through the first point
    # We want Time ~ Samples
    ideal_x = np.sort(results_df['Samples'].unique())
    # Scale factor: t_new = t_base * (n_new / n_base)
    ideal_y = baseline_time * (ideal_x / min_samples)
    
    plt.plot(ideal_x, ideal_y, 'k--', linewidth=1.5, alpha=0.7, label='Ideal Linear Scale-up')

plt.title('K-Means Scalability: Training Time vs Data Size', fontsize=16, fontweight='bold')
plt.xlabel('Number of Samples', fontsize=12)
plt.ylabel('Training Time (seconds)', fontsize=12)
plt.legend(fontsize=10)
plt.grid(True, linestyle='--', alpha=0.3)

# Log-Log scale option (useful if Local model explodes)
# plt.xscale('log')
# plt.yscale('log')

plt.tight_layout()
plt.show()

In [None]:
# Optional: Cleanup Spark Session
# spark.stop()