From 3eca98ccc518161ce0ea45a4b8a9d76324566d1d Mon Sep 17 00:00:00 2001 From: noahnovsak Date: Fri, 12 May 2023 15:07:46 +0200 Subject: [PATCH 1/3] dask compatible kmeans --- Orange/clustering/kmeans.py | 21 +++++++++++++++++ Orange/widgets/unsupervised/owkmeans.py | 30 ++++++++++++++++++++----- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/Orange/clustering/kmeans.py b/Orange/clustering/kmeans.py index 97ba7d0e8a1..72e61545118 100644 --- a/Orange/clustering/kmeans.py +++ b/Orange/clustering/kmeans.py @@ -1,5 +1,8 @@ import warnings +from typing import Union +import numpy as np +import dask.array as da import sklearn.cluster from Orange.clustering.clustering import Clustering, ClusteringModel @@ -37,6 +40,24 @@ def __init__(self, n_clusters=8, init='k-means++', n_init=10, max_iter=300, preprocessors, {k: v for k, v in vars().items() if k != "compute_silhouette_score"}) + def fit(self, X: Union[np.ndarray, da.Array], y: np.ndarray = None): + params = self.params.copy() + __wraps__ = self.__wraps__ + if isinstance(X, da.Array): + try: + import dask_ml.cluster + + del params["n_init"] + assert params["init"] == "k-means||" + + X = X.rechunk({0: "auto", 1: -1}) + __wraps__ = dask_ml.cluster.KMeans + + except ImportError: + warnings.warn("dask_ml is not installed. Using sklearn instead.") + + return self.__returns__(__wraps__(**params).fit(X)) + if __name__ == "__main__": d = Table("iris") diff --git a/Orange/widgets/unsupervised/owkmeans.py b/Orange/widgets/unsupervised/owkmeans.py index 4f2511b705f..e0ede34dd35 100644 --- a/Orange/widgets/unsupervised/owkmeans.py +++ b/Orange/widgets/unsupervised/owkmeans.py @@ -13,6 +13,7 @@ from Orange.clustering import KMeans from Orange.clustering.kmeans import KMeansModel from Orange.data import Table, Domain, DiscreteVariable, ContinuousVariable +from Orange.data.dask import DaskTable from Orange.data.util import get_unique_names, array_equal from Orange.preprocess import Normalize from Orange.preprocess.impute import ReplaceUnknowns @@ -135,6 +136,7 @@ class Warning(widget.OWWidget.Warning): INIT_METHODS = (("Initialize with KMeans++", "k-means++"), ("Random initialization", "random")) + DASK_METHODS = (("Initialize with KMeans||", "k-means||"),) resizing_enabled = False @@ -170,7 +172,7 @@ def __init__(self): self.__task = None # type: Optional[Task] layout = QGridLayout() - bg = gui.radioButtonsInBox( + self.radiobox = bg = gui.radioButtonsInBox( self.controlArea, self, "optimize_k", orientation=layout, box="Number of Clusters", callback=self.update_method, ) @@ -186,7 +188,7 @@ def __init__(self): layout.addWidget( gui.appendRadioButton(bg, "From", addToLayout=False), 2, 1) - ftobox = gui.hBox(None) + self.ftobox = ftobox = gui.hBox(None) ftobox.layout().setContentsMargins(0, 0, 0, 0) layout.addWidget(ftobox, 2, 2) gui.spin( @@ -293,6 +295,12 @@ def _compute_clustering(data, k, init, n_init, max_iter, random_state): random_state=random_state, preprocessors=[] ).get_model(data) + if isinstance(data, DaskTable): + # just skip silhouettes for now + model.silhouette_samples = None + model.silhouette = np.nan + return model + if data.X.shape[0] <= SILHOUETTE_MAX_SAMPLES: model.silhouette_samples = silhouette_samples(data.X, model.labels) model.silhouette = np.mean(model.silhouette_samples) @@ -501,9 +509,7 @@ def preproces(self, data): self.Warning.no_sparse_normalization() else: data = Normalize()(data) - for preprocessor in KMeans.preprocessors: # use same preprocessors than - data = preprocessor(data) - return data + return KMeans().preprocess(data) # why? def send_data(self): if self.optimize_k: @@ -584,6 +590,9 @@ def set_data(self, data): self.controls.normalize.setDisabled( bool(self.data) and sp.issparse(self.data.X)) + if type(data) is not type(old_data): + self.setup_controls(isinstance(self.data, DaskTable)) + # Do not needlessly recluster the data if X hasn't changed if old_data and self.data and array_equal(self.data.X, old_data.X): if self.auto_commit: @@ -591,6 +600,17 @@ def set_data(self, data): else: self.invalidate(unconditional=True) + def setup_controls(self, is_dask): + self.ftobox.setDisabled(is_dask) + self.radiobox.buttons[1].setDisabled(is_dask) + self.optimize_k = not is_dask and self.optimize_k + self.INIT_METHODS = OWKMeans.DASK_METHODS \ + if is_dask else OWKMeans.INIT_METHODS + self.controls.smart_init.clear() + self.controls.smart_init.addItems([t[0] for t in self.INIT_METHODS]) + self.smart_init = 0 + self.controls.n_init.setDisabled(is_dask) + def send_report(self): # False positives (Setting is not recognized as int) # pylint: disable=invalid-sequence-index From 13389266538e1a0da4b2a4be4130125a386d2d0a Mon Sep 17 00:00:00 2001 From: noahnovsak Date: Tue, 20 Jun 2023 12:59:13 +0200 Subject: [PATCH 2/3] add widget tests --- .../unsupervised/tests/test_owkmeans.py | 128 ++++++++++++------ 1 file changed, 87 insertions(+), 41 deletions(-) diff --git a/Orange/widgets/unsupervised/tests/test_owkmeans.py b/Orange/widgets/unsupervised/tests/test_owkmeans.py index dd4b122960a..68bd21b8e16 100644 --- a/Orange/widgets/unsupervised/tests/test_owkmeans.py +++ b/Orange/widgets/unsupervised/tests/test_owkmeans.py @@ -4,6 +4,7 @@ import numpy as np import scipy.sparse as sp +import dask.array as da from AnyQt.QtCore import Qt from AnyQt.QtWidgets import QRadioButton @@ -11,9 +12,11 @@ import Orange.clustering from Orange.data import Table, Domain +from Orange.data.dask import DaskTable from Orange.widgets import gui from Orange.widgets.tests.base import WidgetTest from Orange.widgets.unsupervised.owkmeans import OWKMeans, ClusterTableModel +from Orange.tests.test_dasktable import temp_dasktable class TestClusterTableModel(unittest.TestCase): @@ -47,6 +50,7 @@ def setUp(self): OWKMeans, stored_settings={"auto_commit": False, "version": 2} ) # type: OWKMeans self.data = Table("heart_disease") + self.housing = Table("housing") def tearDown(self): self.widget.onDeleteWidget() @@ -206,13 +210,15 @@ def test_centroids_on_output(self): widget.k = 4 self.send_signal(widget.Inputs.data, self.data) self.commit_and_wait() - widget.clusterings[widget.k].labels = np.array([0] * 100 + [1] * 203).flatten() - widget.clusterings[widget.k].silhouette_samples = np.arange(303) / 303 + l = len(self.data) + widget.clusterings[widget.k].labels = np.array([0] * (l // 3) + + [1] * (l - l // 3)).flatten() + widget.clusterings[widget.k].silhouette_samples = np.arange(l) / l widget.send_data() out = self.get_output(widget.Outputs.centroids) np.testing.assert_array_almost_equal( - np.array([[0, np.mean(np.arctan(np.arange(100) / 303)) / np.pi + 0.5], - [1, np.mean(np.arctan(np.arange(100, 303) / 303)) / np.pi + 0.5], + np.array([[0, np.mean(np.arctan(np.arange(l // 3) / l)) / np.pi + 0.5], + [1, np.mean(np.arctan(np.arange(l // 3, l) / l)) / np.pi + 0.5], [2, 0], [3, 0]]), out.metas.astype(float)) self.assertEqual(out.name, "heart_disease centroids") @@ -220,12 +226,11 @@ def test_centroids_domain_on_output(self): widget = self.widget widget.optimize_k = False widget.k = 4 - heart_disease = Table("heart_disease") - heart_disease.name = Table.name # untitled - self.send_signal(widget.Inputs.data, heart_disease) + self.data.name = Table.name # untitled + self.send_signal(widget.Inputs.data, self.data) self.commit_and_wait() - in_attrs = heart_disease.domain.attributes + in_attrs = self.data.domain.attributes out = self.get_output(widget.Outputs.centroids) out_attrs = out.domain.attributes out_names = {attr.name for attr in out_attrs} @@ -316,7 +321,7 @@ def test_select_best_row(self): widget.k_from, widget.k_to = 2, 6 widget.optimize_k = True widget.normalize = False - self.send_signal(self.widget.Inputs.data, Table("housing"), wait=5000) + self.send_signal(self.widget.Inputs.data, self.housing, wait=5000) self.commit_and_wait() widget.update_results() # for housing dataset without normalization, @@ -324,7 +329,7 @@ def test_select_best_row(self): self.assertEqual(widget.selected_row(), 1) self.widget.controls.normalize.toggle() - self.send_signal(self.widget.Inputs.data, Table("housing"), wait=5000) + self.send_signal(self.widget.Inputs.data, self.housing, wait=5000) self.commit_and_wait() widget.update_results() # for housing dataset with normalization, @@ -471,27 +476,23 @@ def test_invalidate_clusterings_cancels_jobs(self): self.assertEqual(widget.clusterings, {}) - def test_do_not_recluster_on_same_data(self): - """Do not recluster data points when targets or metas change.""" - + def dummy_data(self): # Prepare some dummy data - x = np.eye(5) + x1, x2 = np.eye(5), np.tri(5) y1, y2 = np.ones((5, 1)), np.ones((5, 2)) meta1, meta2 = np.ones((5, 1)), np.ones((5, 2)) - table1 = Table.from_numpy( - domain=Domain.from_numpy(X=x, Y=y1, metas=meta1), - X=x, Y=y1, metas=meta1, - ) + table1 = Table.from_numpy(None, X=x1, Y=y1, metas=meta1) # X is same, should not cause update - table2 = Table.from_numpy( - domain=Domain.from_numpy(X=x, Y=y2, metas=meta2), - X=x, Y=y2, metas=meta2, - ) + table2 = Table.from_numpy(None, X=x1, Y=y2, metas=meta2) # X is different, should cause update - table3 = table1.copy() - with table3.unlocked(): - table3.X[:, 0] = 1 + table3 = Table.from_numpy(None, X=x2, Y=y2, metas=meta2) + + return table1, table2, table3 + + def test_do_not_recluster_on_same_data(self): + """Do not recluster data points when targets or metas change.""" + table1, table2, table3 = self.dummy_data() with patch.object(self.widget.commit, 'now') as commit: self.send_signal(self.widget.Inputs.data, table1) @@ -509,18 +510,14 @@ def test_do_not_recluster_on_same_data(self): def test_correct_smart_init(self): # due to a bug where wrong init was passed to _compute_clustering self.send_signal(self.widget.Inputs.data, self.data[::10], wait=5000) - self.widget.smart_init = 0 self.widget.clusterings = {} - with patch.object(self.widget, "_compute_clustering", - wraps=self.widget._compute_clustering) as compute: - self.commit_and_wait() - self.assertEqual(compute.call_args[1]['init'], "k-means++") - self.widget.invalidate() # reset caches - self.widget.smart_init = 1 - with patch.object(self.widget, "_compute_clustering", - wraps=self.widget._compute_clustering) as compute: - self.commit_and_wait() - self.assertEqual(compute.call_args[1]['init'], "random") + for i, method in enumerate(self.widget.INIT_METHODS): + self.widget.smart_init = i + with patch.object(self.widget, "_compute_clustering", + wraps=self.widget._compute_clustering) as compute: + self.commit_and_wait() + self.assertEqual(compute.call_args[1]['init'], method[1]) + self.widget.invalidate() # reset caches def test_always_same_cluster(self): """The same random state should always return the same clusters""" @@ -535,11 +532,9 @@ def assert_all_same(l): for a1, a2 in zip(l, l[1:]): np.testing.assert_equal(a1, a2) - self.widget.smart_init = 0 - assert_all_same([cluster() for _ in range(5)]) - - self.widget.smart_init = 1 - assert_all_same([cluster() for _ in range(5)]) + for i, _ in enumerate(self.widget.INIT_METHODS): + self.widget.smart_init = i + assert_all_same([cluster() for _ in range(5)]) def test_error_no_attributes(self): domain = Domain([]) @@ -566,5 +561,56 @@ def test_saved_selection(self): self.assertEqual(self.widget.selected_row(), w.selected_row()) +class TestOWKMeansOnDask(TestOWKMeans): + def setUp(self): + super().setUp() + self.data = temp_dasktable(Table("heart_disease")[:50]) + self.housing = temp_dasktable(Table("housing")) + + def dummy_data(self): + # Prepare some dummy data + x1, x2 = da.eye(5), da.tri(5) + y1, y2 = da.ones((5, 1)), da.ones((5, 2)) + meta1, meta2 = np.ones((5, 1)), np.ones((5, 2)) + domain1 = Domain.from_numpy(X=np.asarray(x1), Y=np.asarray(y1), metas=meta1) + domain2 = Domain.from_numpy(X=np.asarray(x2), Y=np.asarray(y2), metas=meta2) + + table1 = DaskTable.from_arrays(domain=domain1, X=x1, Y=y1, metas=meta1) + # X is same, should not cause update + table2 = DaskTable.from_arrays(domain=domain2, X=x1, Y=y2, metas=meta2) + # X is different, should cause update + table3 = DaskTable.from_arrays(domain=domain2, X=x2, Y=y2, metas=meta2) + + return table1, table2, table3 + + @unittest.skip("optimization is disabled for dask tables") + def test_select_best_row(self): + super().test_select_best_row() + + @unittest.skip("optimization is disabled for dask tables") + def test_from_to_table(self): + super().test_from_to_table() + + @unittest.skip("optimization is disabled for dask tables") + def test_use_cache(self): + super().test_use_cache() + + @unittest.skip("optimization is disabled for dask tables") + def test_optimization_fails(self): + super().test_optimization_fails() + + @unittest.skip("optimization is disabled for dask tables") + def test_optimization_report_display(self): + super().test_optimization_report_display() + + @unittest.skip("optimization is disabled for dask tables") + def test_saved_selection(self): + super().test_saved_selection() + + @unittest.skip("optimization is disabled for dask tables") + def test_no_data_hides_main_area(self): + super().test_no_data_hides_main_area() + + if __name__ == "__main__": unittest.main() From c046e49db38ca8b11928f572b557f3d96d6c69cc Mon Sep 17 00:00:00 2001 From: noahnovsak Date: Wed, 21 Jun 2023 12:53:55 +0200 Subject: [PATCH 3/3] add kmeans tests --- Orange/clustering/clustering.py | 3 +- Orange/clustering/kmeans.py | 7 ++- Orange/tests/test_clustering_kmeans.py | 83 ++++++++++++++++---------- 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/Orange/clustering/clustering.py b/Orange/clustering/clustering.py index a7598a03629..b9c9f427c3d 100644 --- a/Orange/clustering/clustering.py +++ b/Orange/clustering/clustering.py @@ -1,5 +1,6 @@ import numpy as np import scipy.sparse +import dask.array as da from Orange.data import Table, Instance from Orange.data.table import DomainTransformationError @@ -20,7 +21,7 @@ def fix_dim(x): return x[0] if one_d else x one_d = False - if isinstance(data, np.ndarray): + if isinstance(data, (np.ndarray, da.Array)): one_d = data.ndim == 1 prediction = self.predict(np.atleast_2d(data)) elif isinstance(data, scipy.sparse.csr_matrix) or \ diff --git a/Orange/clustering/kmeans.py b/Orange/clustering/kmeans.py index 72e61545118..e940538854a 100644 --- a/Orange/clustering/kmeans.py +++ b/Orange/clustering/kmeans.py @@ -20,6 +20,8 @@ def __init__(self, projector): self.k = projector.get_params()["n_clusters"] def predict(self, X): + if isinstance(X, da.Array): + X = X.rechunk({0: "auto", 1: -1}) return self.projector.predict(X) @@ -48,7 +50,10 @@ def fit(self, X: Union[np.ndarray, da.Array], y: np.ndarray = None): import dask_ml.cluster del params["n_init"] - assert params["init"] == "k-means||" + if params["init"] != "k-means||": + warnings.warn(f"Initializing with {params['init']} defaults" + f" to sklearn. Using k-means|| instead.") + params["init"] = "k-means||" X = X.rechunk({0: "auto", 1: -1}) __wraps__ = dask_ml.cluster.KMeans diff --git a/Orange/tests/test_clustering_kmeans.py b/Orange/tests/test_clustering_kmeans.py index 1ab9043e964..f1642480f7f 100644 --- a/Orange/tests/test_clustering_kmeans.py +++ b/Orange/tests/test_clustering_kmeans.py @@ -11,6 +11,7 @@ from Orange.clustering.kmeans import KMeans, KMeansModel from Orange.data import Table, Domain, ContinuousVariable from Orange.data.table import DomainTransformationError +from Orange.tests.test_dasktable import with_dasktable class TestKMeans(unittest.TestCase): @@ -18,30 +19,38 @@ def setUp(self): self.kmeans = KMeans(n_clusters=2) self.iris = Orange.data.Table('iris') - def test_kmeans(self): - c = self.kmeans(self.iris) + @with_dasktable + def test_kmeans(self, prepare_table): + iris = prepare_table(self.iris) + c = self.kmeans(iris) # First 20 iris belong to one cluster - self.assertEqual(np.ndarray, type(c)) - self.assertEqual(len(self.iris), len(c)) - self.assertEqual(1, len(set(c[:20].ravel()))) + self.assertEqual(type(iris.X), type(c)) + self.assertEqual(len(iris), len(c)) + self.assertEqual(1, len(np.unique(np.asarray(c[:20])))) - def test_kmeans_parameters(self): + @with_dasktable + def test_kmeans_parameters(self, prepare_table): kmeans = KMeans(n_clusters=10, max_iter=10, random_state=42, tol=0.001, init='random') - c = kmeans(self.iris) - self.assertEqual(np.ndarray, type(c)) - self.assertEqual(len(self.iris), len(c)) - - def test_predict_table(self): - c = self.kmeans(self.iris) - self.assertEqual(np.ndarray, type(c)) - self.assertEqual(len(self.iris), len(c)) - - def test_predict_numpy(self): - c = self.kmeans.fit(self.iris.X) + iris = prepare_table(self.iris) + c = kmeans(iris) + self.assertEqual(type(iris.X), type(c)) + self.assertEqual(len(iris), len(c)) + + @with_dasktable + def test_predict_table(self, prepare_table): + iris = prepare_table(self.iris) + c = self.kmeans(iris) + self.assertEqual(type(iris.X), type(c)) + self.assertEqual(len(iris), len(c)) + + @with_dasktable + def test_predict_numpy(self, prepare_table): + iris = prepare_table(self.iris) + c = self.kmeans.fit(iris.X) self.assertEqual(KMeansModel, type(c)) - self.assertEqual(np.ndarray, type(c.labels)) - self.assertEqual(len(self.iris), len(c.labels)) + self.assertEqual(type(iris.X), type(c.labels)) + self.assertEqual(len(iris), len(c.labels)) def test_predict_sparse_csc(self): with self.iris.unlocked(): @@ -57,21 +66,25 @@ def test_predict_spares_csr(self): self.assertEqual(np.ndarray, type(c)) self.assertEqual(len(self.iris), len(c)) - def test_model(self): - c = self.kmeans.get_model(self.iris) + @with_dasktable + def test_model(self, prepare_table): + iris = prepare_table(self.iris) + c = self.kmeans.get_model(iris) self.assertEqual(KMeansModel, type(c)) - self.assertEqual(len(self.iris), len(c.labels)) + self.assertEqual(len(iris), len(c.labels)) - c1 = c(self.iris) + c1 = c(iris) # prediction of the model must be same since data are same np.testing.assert_array_almost_equal(c.labels, c1) - def test_model_np(self): + @with_dasktable + def test_model_np(self, prepare_table): """ Test with numpy array as an input in model. """ - c = self.kmeans.get_model(self.iris) - c1 = c(self.iris.X) + iris = prepare_table(self.iris) + c = self.kmeans.get_model(iris) + c1 = c(iris.X) # prediction of the model must be same since data are same np.testing.assert_array_almost_equal(c.labels, c1) @@ -93,12 +106,14 @@ def test_model_sparse_csr(self): # prediction of the model must be same since data are same np.testing.assert_array_almost_equal(c.labels, c1) - def test_model_instance(self): + @with_dasktable + def test_model_instance(self, prepare_table): """ Test with instance as an input in model. """ - c = self.kmeans.get_model(self.iris) - c1 = c(self.iris[0]) + iris = prepare_table(self.iris) + c = self.kmeans.get_model(iris) + c1 = c(iris[0]) # prediction of the model must be same since data are same self.assertEqual(c1, c.labels[0]) @@ -107,20 +122,22 @@ def test_model_list(self): Test with list as an input in model. """ c = self.kmeans.get_model(self.iris) - c1 = c(self.iris.X.tolist()) + c1 = c(np.asarray(self.iris.X).tolist()) # prediction of the model must be same since data are same np.testing.assert_array_almost_equal(c.labels, c1) # example with a list of only one data item - c1 = c(self.iris.X.tolist()[0]) + c1 = c(np.asarray(self.iris.X).tolist()[0]) # prediction of the model must be same since data are same np.testing.assert_array_almost_equal(c.labels[0], c1) - def test_model_bad_datatype(self): + @with_dasktable + def test_model_bad_datatype(self, prepare_table): """ Check model with data-type that is not supported. """ - c = self.kmeans.get_model(self.iris) + iris = prepare_table(self.iris) + c = self.kmeans.get_model(iris) self.assertRaises(TypeError, c, 10) def test_model_data_table_domain(self):