# Generalized K-Means Clustering Tutorial

This notebook demonstrates the massivedatascience-clusterer library for PySpark.

## Features
- Multiple Bregman divergences (Squared Euclidean, KL, Itakura-Saito, etc.)
- Weighted clustering
- Quality metrics (WCSS, BCSS, Calinski-Harabasz, Davies-Bouldin)
- Model persistence
- Full Spark ML Pipeline integration

## Setup

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
import numpy as np
import matplotlib.pyplot as plt

from massivedatascience.clusterer import GeneralizedKMeans

# Create Spark session
spark = (
    SparkSession.builder
    .appName("ClusteringTutorial")
    .config("spark.ui.enabled", "false")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")

## Example 1: Basic Clustering with Squared Euclidean Distance

In [None]:
# Create sample data with 3 clusters
data = spark.createDataFrame([
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([0.5, 0.5]),),
    (Vectors.dense([0.5, -0.5]),),
    (Vectors.dense([5.0, 5.0]),),
    (Vectors.dense([5.5, 5.0]),),
    (Vectors.dense([5.0, 5.5]),),
    (Vectors.dense([10.0, 0.0]),),
    (Vectors.dense([10.5, 0.0]),),
    (Vectors.dense([10.0, 0.5]),),
], ["features"])

data.show()

In [None]:
# Train clustering model
kmeans = GeneralizedKMeans(
    k=3,
    divergence="squaredEuclidean",
    maxIter=20,
    seed=42
)

model = kmeans.fit(data)

print(f"Number of clusters: {model.numClusters}")
print(f"\nCluster centers:")
for i, center in enumerate(model.clusterCenters()):
    print(f"  Cluster {i}: {center}")

In [None]:
# Make predictions
predictions = model.transform(data)
predictions.select("features", "prediction").show()

In [None]:
# Visualize clusters
predictions_pd = predictions.toPandas()

plt.figure(figsize=(10, 6))
for cluster_id in range(model.numClusters):
    cluster_data = predictions_pd[predictions_pd.prediction == cluster_id]
    features = np.array(cluster_data.features.tolist())
    plt.scatter(features[:, 0], features[:, 1], label=f"Cluster {cluster_id}", s=100)

# Plot cluster centers
centers = model.clusterCenters()
plt.scatter(centers[:, 0], centers[:, 1], c='red', marker='X', s=300, 
            edgecolors='black', linewidths=2, label='Centers')

plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('K-Means Clustering with Squared Euclidean Distance')
plt.legend()
plt.grid(True)
plt.show()

## Example 2: Clustering Probability Distributions with KL Divergence

In [None]:
# Create probability distribution data (document word distributions)
prob_data = spark.createDataFrame([
    # Technical documents
    (Vectors.dense([0.5, 0.3, 0.1, 0.05, 0.05]),),
    (Vectors.dense([0.6, 0.2, 0.1, 0.05, 0.05]),),
    (Vectors.dense([0.55, 0.25, 0.1, 0.05, 0.05]),),
    # Sports documents
    (Vectors.dense([0.1, 0.1, 0.5, 0.2, 0.1]),),
    (Vectors.dense([0.05, 0.15, 0.6, 0.15, 0.05]),),
    # Food documents
    (Vectors.dense([0.05, 0.1, 0.1, 0.35, 0.4]),),
    (Vectors.dense([0.1, 0.05, 0.15, 0.3, 0.4]),),
], ["features"])

print("Probability distributions:")
prob_data.show(truncate=False)

In [None]:
# Use KL divergence for probability distributions
kmeans_kl = GeneralizedKMeans(
    k=3,
    divergence="kl",
    smoothing=1e-10,
    maxIter=30,
    seed=42
)

model_kl = kmeans_kl.fit(prob_data)

print(f"Number of clusters: {model_kl.numClusters}")
print(f"\nCluster centers (probability distributions):")
for i, center in enumerate(model_kl.clusterCenters()):
    print(f"  Cluster {i}: {center}")
    print(f"    Sum: {np.sum(center):.6f}")

In [None]:
# Make predictions
predictions_kl = model_kl.transform(prob_data)
predictions_kl.select("features", "prediction").show(truncate=False)

## Example 3: Quality Metrics and Finding Optimal K

In [None]:
# Test different values of k
data_cached = data.cache()

results = []
k_values = range(2, 7)

for k in k_values:
    kmeans_test = GeneralizedKMeans(k=k, maxIter=20, seed=42)
    model_test = kmeans_test.fit(data_cached)
    summary = model_test.summary
    
    results.append({
        'k': k,
        'wcss': summary.wcss,
        'bcss': summary.bcss,
        'ch_index': summary.calinskiHarabaszIndex,
        'db_index': summary.daviesBouldinIndex
    })

# Display results
import pandas as pd
results_df = pd.DataFrame(results)
print(results_df)

In [None]:
# Visualize quality metrics
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# WCSS (Elbow plot)
axes[0, 0].plot(results_df['k'], results_df['wcss'], 'bo-', linewidth=2, markersize=8)
axes[0, 0].set_xlabel('Number of Clusters (k)')
axes[0, 0].set_ylabel('WCSS')
axes[0, 0].set_title('Within-Cluster Sum of Squares (Lower is Better)')
axes[0, 0].grid(True)

# BCSS
axes[0, 1].plot(results_df['k'], results_df['bcss'], 'go-', linewidth=2, markersize=8)
axes[0, 1].set_xlabel('Number of Clusters (k)')
axes[0, 1].set_ylabel('BCSS')
axes[0, 1].set_title('Between-Cluster Sum of Squares (Higher is Better)')
axes[0, 1].grid(True)

# Calinski-Harabasz Index
axes[1, 0].plot(results_df['k'], results_df['ch_index'], 'mo-', linewidth=2, markersize=8)
axes[1, 0].set_xlabel('Number of Clusters (k)')
axes[1, 0].set_ylabel('Calinski-Harabasz Index')
axes[1, 0].set_title('Calinski-Harabasz Index (Higher is Better)')
axes[1, 0].grid(True)

# Davies-Bouldin Index
axes[1, 1].plot(results_df['k'], results_df['db_index'], 'ro-', linewidth=2, markersize=8)
axes[1, 1].set_xlabel('Number of Clusters (k)')
axes[1, 1].set_ylabel('Davies-Bouldin Index')
axes[1, 1].set_title('Davies-Bouldin Index (Lower is Better)')
axes[1, 1].grid(True)

plt.tight_layout()
plt.show()

# Find optimal k
best_k = results_df.loc[results_df['ch_index'].idxmax(), 'k']
print(f"\nOptimal k by Calinski-Harabasz Index: {best_k}")

## Example 4: Weighted Clustering

In [None]:
# Create data with weights
weighted_data = spark.createDataFrame([
    (Vectors.dense([0.0, 0.0]), 0.1),  # Low weight
    (Vectors.dense([0.5, 0.5]), 0.1),
    (Vectors.dense([5.0, 5.0]), 10.0),  # High weight
    (Vectors.dense([5.5, 5.0]), 10.0),
    (Vectors.dense([5.0, 5.5]), 10.0),
], ["features", "weight"])

print("Weighted data:")
weighted_data.show()

In [None]:
# Cluster with weights
kmeans_weighted = GeneralizedKMeans(
    k=2,
    weightCol="weight",
    maxIter=20,
    seed=42
)

model_weighted = kmeans_weighted.fit(weighted_data)

print("Cluster centers (weighted):")
for i, center in enumerate(model_weighted.clusterCenters()):
    print(f"  Cluster {i}: {center}")

print("\nNote: High-weight points pull cluster centers toward them")

## Example 5: Model Persistence

In [None]:
import tempfile
import shutil
from massivedatascience.clusterer import GeneralizedKMeansModel

# Save model
temp_dir = tempfile.mkdtemp()
model_path = f"{temp_dir}/tutorial_model"

print(f"Saving model to: {model_path}")
model.write().overwrite().save(model_path)

# Load model
loaded_model = GeneralizedKMeansModel.load(model_path)
print(f"Loaded model with {loaded_model.numClusters} clusters")

# Verify predictions match
original_preds = [row.prediction for row in model.transform(data).collect()]
loaded_preds = [row.prediction for row in loaded_model.transform(data).collect()]

print(f"Predictions match: {original_preds == loaded_preds}")

# Clean up
shutil.rmtree(temp_dir)

## Cleanup

In [None]:
spark.stop()

## Summary

This tutorial covered:
1. Basic clustering with Squared Euclidean distance
2. Clustering probability distributions with KL divergence
3. Using quality metrics to find optimal k
4. Weighted clustering for important points
5. Model persistence (save/load)

### Next Steps
- Try other divergences: `itakuraSaito`, `generalizedI`, `logisticLoss`
- Experiment with initialization modes: `random`, `k-means||`
- Use assignment strategies: `auto`, `broadcast`
- Integrate with Spark ML Pipelines
- Apply to real-world datasets

### Documentation
- GitHub: https://github.com/massivedatascience/generalized-kmeans-clustering
- Examples: See `python/examples/` directory