Skip to content

Commit

Permalink
Merge pull request #6277 from noahnovsak/kmeans-dask
Browse files Browse the repository at this point in the history
Dask: KMeans
  • Loading branch information
markotoplak committed Oct 29, 2023
2 parents 9887af3 + c046e49 commit 3e1c079
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 80 deletions.
3 changes: 2 additions & 1 deletion Orange/clustering/clustering.py
@@ -1,5 +1,6 @@
import numpy as np
import scipy.sparse
import dask.array as da

from Orange.data import Table, Instance
from Orange.data.table import DomainTransformationError
Expand All @@ -20,7 +21,7 @@ def fix_dim(x):
return x[0] if one_d else x

one_d = False
if isinstance(data, np.ndarray):
if isinstance(data, (np.ndarray, da.Array)):
one_d = data.ndim == 1
prediction = self.predict(np.atleast_2d(data))
elif isinstance(data, scipy.sparse.csr_matrix) or \
Expand Down
26 changes: 26 additions & 0 deletions Orange/clustering/kmeans.py
@@ -1,5 +1,8 @@
import warnings
from typing import Union

import numpy as np
import dask.array as da
import sklearn.cluster

from Orange.clustering.clustering import Clustering, ClusteringModel
Expand All @@ -17,6 +20,8 @@ def __init__(self, projector):
self.k = projector.get_params()["n_clusters"]

def predict(self, X):
if isinstance(X, da.Array):
X = X.rechunk({0: "auto", 1: -1})
return self.projector.predict(X)


Expand All @@ -37,6 +42,27 @@ def __init__(self, n_clusters=8, init='k-means++', n_init=10, max_iter=300,
preprocessors, {k: v for k, v in vars().items()
if k != "compute_silhouette_score"})

def fit(self, X: Union[np.ndarray, da.Array], y: np.ndarray = None):
params = self.params.copy()
__wraps__ = self.__wraps__
if isinstance(X, da.Array):
try:
import dask_ml.cluster

del params["n_init"]
if params["init"] != "k-means||":
warnings.warn(f"Initializing with {params['init']} defaults"
f" to sklearn. Using k-means|| instead.")
params["init"] = "k-means||"

X = X.rechunk({0: "auto", 1: -1})
__wraps__ = dask_ml.cluster.KMeans

except ImportError:
warnings.warn("dask_ml is not installed. Using sklearn instead.")

return self.__returns__(__wraps__(**params).fit(X))


if __name__ == "__main__":
d = Table("iris")
Expand Down
83 changes: 50 additions & 33 deletions Orange/tests/test_clustering_kmeans.py
Expand Up @@ -11,37 +11,46 @@
from Orange.clustering.kmeans import KMeans, KMeansModel
from Orange.data import Table, Domain, ContinuousVariable
from Orange.data.table import DomainTransformationError
from Orange.tests.test_dasktable import with_dasktable


class TestKMeans(unittest.TestCase):
def setUp(self):
self.kmeans = KMeans(n_clusters=2)
self.iris = Orange.data.Table('iris')

def test_kmeans(self):
c = self.kmeans(self.iris)
@with_dasktable
def test_kmeans(self, prepare_table):
iris = prepare_table(self.iris)
c = self.kmeans(iris)
# First 20 iris belong to one cluster
self.assertEqual(np.ndarray, type(c))
self.assertEqual(len(self.iris), len(c))
self.assertEqual(1, len(set(c[:20].ravel())))
self.assertEqual(type(iris.X), type(c))
self.assertEqual(len(iris), len(c))
self.assertEqual(1, len(np.unique(np.asarray(c[:20]))))

def test_kmeans_parameters(self):
@with_dasktable
def test_kmeans_parameters(self, prepare_table):
kmeans = KMeans(n_clusters=10, max_iter=10, random_state=42, tol=0.001,
init='random')
c = kmeans(self.iris)
self.assertEqual(np.ndarray, type(c))
self.assertEqual(len(self.iris), len(c))

def test_predict_table(self):
c = self.kmeans(self.iris)
self.assertEqual(np.ndarray, type(c))
self.assertEqual(len(self.iris), len(c))

def test_predict_numpy(self):
c = self.kmeans.fit(self.iris.X)
iris = prepare_table(self.iris)
c = kmeans(iris)
self.assertEqual(type(iris.X), type(c))
self.assertEqual(len(iris), len(c))

@with_dasktable
def test_predict_table(self, prepare_table):
iris = prepare_table(self.iris)
c = self.kmeans(iris)
self.assertEqual(type(iris.X), type(c))
self.assertEqual(len(iris), len(c))

@with_dasktable
def test_predict_numpy(self, prepare_table):
iris = prepare_table(self.iris)
c = self.kmeans.fit(iris.X)
self.assertEqual(KMeansModel, type(c))
self.assertEqual(np.ndarray, type(c.labels))
self.assertEqual(len(self.iris), len(c.labels))
self.assertEqual(type(iris.X), type(c.labels))
self.assertEqual(len(iris), len(c.labels))

def test_predict_sparse_csc(self):
with self.iris.unlocked():
Expand All @@ -57,21 +66,25 @@ def test_predict_spares_csr(self):
self.assertEqual(np.ndarray, type(c))
self.assertEqual(len(self.iris), len(c))

def test_model(self):
c = self.kmeans.get_model(self.iris)
@with_dasktable
def test_model(self, prepare_table):
iris = prepare_table(self.iris)
c = self.kmeans.get_model(iris)
self.assertEqual(KMeansModel, type(c))
self.assertEqual(len(self.iris), len(c.labels))
self.assertEqual(len(iris), len(c.labels))

c1 = c(self.iris)
c1 = c(iris)
# prediction of the model must be same since data are same
np.testing.assert_array_almost_equal(c.labels, c1)

def test_model_np(self):
@with_dasktable
def test_model_np(self, prepare_table):
"""
Test with numpy array as an input in model.
"""
c = self.kmeans.get_model(self.iris)
c1 = c(self.iris.X)
iris = prepare_table(self.iris)
c = self.kmeans.get_model(iris)
c1 = c(iris.X)
# prediction of the model must be same since data are same
np.testing.assert_array_almost_equal(c.labels, c1)

Expand All @@ -93,12 +106,14 @@ def test_model_sparse_csr(self):
# prediction of the model must be same since data are same
np.testing.assert_array_almost_equal(c.labels, c1)

def test_model_instance(self):
@with_dasktable
def test_model_instance(self, prepare_table):
"""
Test with instance as an input in model.
"""
c = self.kmeans.get_model(self.iris)
c1 = c(self.iris[0])
iris = prepare_table(self.iris)
c = self.kmeans.get_model(iris)
c1 = c(iris[0])
# prediction of the model must be same since data are same
self.assertEqual(c1, c.labels[0])

Expand All @@ -107,20 +122,22 @@ def test_model_list(self):
Test with list as an input in model.
"""
c = self.kmeans.get_model(self.iris)
c1 = c(self.iris.X.tolist())
c1 = c(np.asarray(self.iris.X).tolist())
# prediction of the model must be same since data are same
np.testing.assert_array_almost_equal(c.labels, c1)

# example with a list of only one data item
c1 = c(self.iris.X.tolist()[0])
c1 = c(np.asarray(self.iris.X).tolist()[0])
# prediction of the model must be same since data are same
np.testing.assert_array_almost_equal(c.labels[0], c1)

def test_model_bad_datatype(self):
@with_dasktable
def test_model_bad_datatype(self, prepare_table):
"""
Check model with data-type that is not supported.
"""
c = self.kmeans.get_model(self.iris)
iris = prepare_table(self.iris)
c = self.kmeans.get_model(iris)
self.assertRaises(TypeError, c, 10)

def test_model_data_table_domain(self):
Expand Down
30 changes: 25 additions & 5 deletions Orange/widgets/unsupervised/owkmeans.py
Expand Up @@ -13,6 +13,7 @@
from Orange.clustering import KMeans
from Orange.clustering.kmeans import KMeansModel
from Orange.data import Table, Domain, DiscreteVariable, ContinuousVariable
from Orange.data.dask import DaskTable
from Orange.data.util import get_unique_names, array_equal
from Orange.preprocess import Normalize
from Orange.preprocess.impute import ReplaceUnknowns
Expand Down Expand Up @@ -135,6 +136,7 @@ class Warning(widget.OWWidget.Warning):

INIT_METHODS = (("Initialize with KMeans++", "k-means++"),
("Random initialization", "random"))
DASK_METHODS = (("Initialize with KMeans||", "k-means||"),)

resizing_enabled = False

Expand Down Expand Up @@ -170,7 +172,7 @@ def __init__(self):
self.__task = None # type: Optional[Task]

layout = QGridLayout()
bg = gui.radioButtonsInBox(
self.radiobox = bg = gui.radioButtonsInBox(
self.controlArea, self, "optimize_k", orientation=layout,
box="Number of Clusters", callback=self.update_method,
)
Expand All @@ -186,7 +188,7 @@ def __init__(self):

layout.addWidget(
gui.appendRadioButton(bg, "From", addToLayout=False), 2, 1)
ftobox = gui.hBox(None)
self.ftobox = ftobox = gui.hBox(None)
ftobox.layout().setContentsMargins(0, 0, 0, 0)
layout.addWidget(ftobox, 2, 2)
gui.spin(
Expand Down Expand Up @@ -293,6 +295,12 @@ def _compute_clustering(data, k, init, n_init, max_iter, random_state):
random_state=random_state, preprocessors=[]
).get_model(data)

if isinstance(data, DaskTable):
# just skip silhouettes for now
model.silhouette_samples = None
model.silhouette = np.nan
return model

if data.X.shape[0] <= SILHOUETTE_MAX_SAMPLES:
model.silhouette_samples = silhouette_samples(data.X, model.labels)
model.silhouette = np.mean(model.silhouette_samples)
Expand Down Expand Up @@ -501,9 +509,7 @@ def preproces(self, data):
self.Warning.no_sparse_normalization()
else:
data = Normalize()(data)
for preprocessor in KMeans.preprocessors: # use same preprocessors than
data = preprocessor(data)
return data
return KMeans().preprocess(data) # why?

def send_data(self):
if self.optimize_k:
Expand Down Expand Up @@ -584,13 +590,27 @@ def set_data(self, data):
self.controls.normalize.setDisabled(
bool(self.data) and sp.issparse(self.data.X))

if type(data) is not type(old_data):
self.setup_controls(isinstance(self.data, DaskTable))

# Do not needlessly recluster the data if X hasn't changed
if old_data and self.data and array_equal(self.data.X, old_data.X):
if self.auto_commit:
self.send_data()
else:
self.invalidate(unconditional=True)

def setup_controls(self, is_dask):
self.ftobox.setDisabled(is_dask)
self.radiobox.buttons[1].setDisabled(is_dask)
self.optimize_k = not is_dask and self.optimize_k
self.INIT_METHODS = OWKMeans.DASK_METHODS \
if is_dask else OWKMeans.INIT_METHODS
self.controls.smart_init.clear()
self.controls.smart_init.addItems([t[0] for t in self.INIT_METHODS])
self.smart_init = 0
self.controls.n_init.setDisabled(is_dask)

def send_report(self):
# False positives (Setting is not recognized as int)
# pylint: disable=invalid-sequence-index
Expand Down

0 comments on commit 3e1c079

Please sign in to comment.