Skip to content

Commit

Permalink
Merge pull request #16 from gmrukwa/fixup/multiprocessing-data-passing
Browse files Browse the repository at this point in the history
Fixup: multiprocessing data passing
  • Loading branch information
gmrukwa committed Dec 10, 2019
2 parents d271321 + 7c2e6ee commit 2ddc600
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:
env:
MAJOR: ${{ 2 }}
MINOR: ${{ 2 }}
FIXUP: ${{ 0 }}
FIXUP: ${{ 1 }}
PACKAGE_INIT_FILE: ${{ 'divik/__init__.py' }}
DOCKER_REPO: ${{ 'gmrukwa/divik' }}
IS_ALPHA: ${{ github.event_name == 'pull_request' }}
Expand Down
2 changes: 1 addition & 1 deletion divik/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '2.2.0'
__version__ = '2.2.1'

from ._seeding import seeded
from ._sklearn import DiviK
Expand Down
6 changes: 3 additions & 3 deletions divik/_cli/auto_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def make_segmentations_matrix(kmeans: km.AutoKMeans) -> np.ndarray:
return np.hstack([e.labels_.reshape(-1, 1) for e in kmeans.estimators_])


def make_scores_report(kmeans: km.AutoKMeans) -> pd.DataFrame:
picker = divik._score.make_picker(kmeans.method, kmeans.gap)
def make_scores_report(kmeans: km.AutoKMeans, n_jobs: int = 1) -> pd.DataFrame:
picker = divik._score.make_picker(kmeans.method, n_jobs, kmeans.gap)
return picker.report(kmeans.estimators_, kmeans.scores_)


Expand Down Expand Up @@ -62,7 +62,7 @@ def save(kmeans: km.AutoKMeans, destination: str, xy: np.ndarray=None):
visualization)

logging.info("Saving scores.")
report = make_scores_report(kmeans)
report = make_scores_report(kmeans, n_jobs=-1)
report.to_csv(fname('scores.csv'))


Expand Down
11 changes: 5 additions & 6 deletions divik/_divik.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from functools import partial
import gc
import logging as lg
from multiprocessing import Pool
from typing import List, Optional

import numpy as np
Expand Down Expand Up @@ -102,8 +101,8 @@ def assemble(self):
def divik(data: Data, selection: np.ndarray,
fast_kmeans: km.AutoKMeans, full_kmeans: km.AutoKMeans,
feature_selector: fs.StatSelectorMixin,
minimal_size: int, rejection_size: int, report: DivikReporter,
pool: Pool = None) -> Optional[DivikResult]:
minimal_size: int, rejection_size: int, report: DivikReporter) \
-> Optional[DivikResult]:
subset = data[selection]

if subset.shape[0] <= max(full_kmeans.max_clusters, minimal_size):
Expand All @@ -116,13 +115,13 @@ def divik(data: Data, selection: np.ndarray,
report.filtered(filtered_data)

report.stop_check()
fast_kmeans = clone(fast_kmeans).fit(filtered_data, pool=pool)
fast_kmeans = clone(fast_kmeans).fit(filtered_data)
if fast_kmeans.fitted_ and fast_kmeans.n_clusters_ == 1:
report.finished_for(subset.shape[0])
return None

report.processing(filtered_data)
clusterer = clone(full_kmeans).fit(filtered_data, pool=pool)
clusterer = clone(full_kmeans).fit(filtered_data)
partition = clusterer.labels_
_, counts = np.unique(partition, return_counts=True)

Expand All @@ -135,7 +134,7 @@ def divik(data: Data, selection: np.ndarray,
divik, data=data, fast_kmeans=fast_kmeans,
full_kmeans=full_kmeans, feature_selector=feature_selector,
minimal_size=minimal_size, rejection_size=rejection_size,
report=report, pool=pool)
report=report)
del subset
del filtered_data
gc.collect()
Expand Down
24 changes: 13 additions & 11 deletions divik/_kmeans/_auto.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from functools import partial
from multiprocessing import Pool
import sys
import uuid

from sklearn.base import BaseEstimator, ClusterMixin, TransformerMixin
from sklearn.utils.validation import check_is_fitted
Expand All @@ -11,8 +12,11 @@
from divik._utils import get_n_jobs


_DATA = {}


def _fit_kmeans(*args, **kwargs):
data = kwargs.pop('data')
data = _DATA[kwargs.pop('data')]
return KMeans(*args, **kwargs).fit(data)


Expand Down Expand Up @@ -140,7 +144,7 @@ def __init__(self, max_clusters: int, min_clusters: int = 1,
self.gap = gap
self.verbose = verbose

def fit(self, X, y=None, pool: Pool=None):
def fit(self, X, y=None):
"""Compute k-means clustering and estimate optimal number of clusters.
Parameters
Expand All @@ -154,30 +158,28 @@ def fit(self, X, y=None, pool: Pool=None):
y : Ignored
not used, present here for API consistency by convention.
pool: Pool
used for parallelization of computations reusing single pool
"""
fit_kmeans = partial(_fit_kmeans, data=X, distance=self.distance,
ref = str(uuid.uuid4())
_DATA[ref] = X
fit_kmeans = partial(_fit_kmeans, data=ref, distance=self.distance,
init=self.init, percentile=self.percentile,
max_iter=self.max_iter,
normalize_rows=self.normalize_rows)
n_clusters = range(self.min_clusters, self.max_clusters + 1)
if self.verbose:
n_clusters = tqdm.tqdm(n_clusters, leave=False, file=sys.stdout)

method = make_picker(self.method, self.gap)
method = make_picker(self.method, self.n_jobs, self.gap)

processes = get_n_jobs(self.n_jobs)
if processes == 1 or pool is None:
if processes == 1:
self.estimators_ = [fit_kmeans(n_clusters=k) for k in n_clusters]
self.scores_ = method.score(X, self.estimators_)
elif pool is not None:
self.estimators_ = pool.map(fit_kmeans, n_clusters)
self.scores_ = method.score(X, self.estimators_, pool)
else:
with Pool(processes) as pool:
self.estimators_ = pool.map(fit_kmeans, n_clusters)
self.scores_ = method.score(X, self.estimators_, pool)
self.scores_ = method.score(X, self.estimators_)
del _DATA[ref]

best = method.select(self.scores_)

Expand Down
7 changes: 3 additions & 4 deletions divik/_score/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from ._optimizer import Optimizer, ParameterValues
from ._dunn import dunn, DunnPicker
from ._gap import gap, GapPicker
from ._picker import Picker


def make_picker(method, gap=None):
def make_picker(method, n_jobs: int = 1, gap=None):
if method == 'dunn':
picker = DunnPicker()
picker = DunnPicker(n_jobs=n_jobs)
elif method == 'gap':
if gap is None:
gap = {}
max_iter = gap.get('max_iter', 10)
seed = gap.get('seed', 0)
trials = gap.get('trials', 10)
correction = gap.get('correction', True)
picker = GapPicker(max_iter, seed, trials, correction)
picker = GapPicker(max_iter, seed, trials, correction, n_jobs=n_jobs)
else:
raise ValueError('Unknown quality measure {0}'.format(method))
return picker
27 changes: 19 additions & 8 deletions divik/_score/_dunn.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
from functools import partial
from multiprocessing.pool import Pool
from typing import List, Optional
import uuid

import numpy as np
import pandas as pd

from divik._distance import DistanceMetric, make_distance
from divik._score._picker import Picker
from divik._utils import Centroids, IntLabels, Data
from divik._utils import Centroids, IntLabels, Data, get_n_jobs


def dunn(data: Data, labels: IntLabels, centroids: Centroids,
distance: DistanceMetric) -> float:
if centroids.shape[0] == 1:
return -np.inf
clusters = pd.DataFrame(data).groupby(labels).apply(lambda cluster: cluster.values)
clusters = pd.DataFrame(data).groupby(labels).apply(
lambda cluster: cluster.values)
intercluster = distance(centroids, centroids)
intercluster = np.min(intercluster[intercluster != 0])
intracluster = np.max([
Expand All @@ -28,19 +30,28 @@ def dunn(data: Data, labels: IntLabels, centroids: Centroids,
KMeans = 'divik.KMeans'


_DATA = {}


def _dunn(kmeans: KMeans, data: Data) -> float:
distance = make_distance(kmeans.distance)
if isinstance(data, str):
data = _DATA[data]
return dunn(data, kmeans.labels_, kmeans.cluster_centers_, distance)


class DunnPicker(Picker):
def score(self, data: Data, estimators: List[KMeans], pool: Pool=None) \
-> np.ndarray:
score = partial(_dunn, data=data)
if pool:
scores = pool.map(score, estimators)
def score(self, data: Data, estimators: List[KMeans]) -> np.ndarray:
if self.n_jobs != 1:
ref = str(uuid.uuid4())
global _DATA
_DATA[ref] = data
score = partial(_dunn, data=ref)
with Pool(get_n_jobs(self.n_jobs)) as pool:
scores = pool.map(score, estimators)
del _DATA[ref]
else:
scores = [score(estimator) for estimator in estimators]
scores = [_dunn(estimator, data) for estimator in estimators]
return np.array(scores)

def select(self, scores: np.ndarray) -> Optional[int]:
Expand Down
48 changes: 25 additions & 23 deletions divik/_score/_gap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from divik._distance import DistanceMetric, make_distance
from divik._score._picker import Picker
from divik._utils import Centroids, IntLabels, Data, SegmentationMethod, \
normalize_rows
context_if, get_n_jobs, normalize_rows
from divik._seeding import seeded


KMeans = 'divik.KMeans'


def _dispersion(data: Data, labels: IntLabels, centroids: Centroids,
distance: DistanceMetric, normalize: bool=False) -> float:
distance: DistanceMetric, normalize: bool = False) -> float:
if normalize:
data = normalize_rows(data)
clusters = pd.DataFrame(data).groupby(labels)
Expand All @@ -36,11 +36,12 @@ def _dispersion_of_random_sample(seed: int,
ranges: np.ndarray,
split: SegmentationMethod,
distance: DistanceMetric,
normalize_rows: bool=False) -> float:
normalize_rows: bool = False) -> float:
np.random.seed(seed)
sample = np.random.random_sample(shape) * ranges + minima
labels, centroids = split(sample)
dispersion = _dispersion(sample, labels, centroids, distance, normalize_rows)
dispersion = _dispersion(sample, labels, centroids, distance,
normalize_rows)
del sample
gc.collect()
return dispersion
Expand All @@ -50,8 +51,8 @@ def _dispersion_of_random_sample(seed: int,
@seeded(wrapped_requires_seed=True)
def gap(data: Data, labels: IntLabels, centroids: Centroids,
distance: DistanceMetric, split: SegmentationMethod,
seed: int=0, n_trials: int = 100, pool: Pool=None,
return_deviation: bool = False, normalize_rows: bool=False) -> float:
seed: int = 0, n_trials: int = 100, pool: Pool = None,
return_deviation: bool = False, normalize_rows: bool = False) -> float:
minima = np.min(data, axis=0)
ranges = np.max(data, axis=0) - minima
compute_dispersion = partial(_dispersion_of_random_sample,
Expand All @@ -62,7 +63,8 @@ def gap(data: Data, labels: IntLabels, centroids: Centroids,
distance=distance,
normalize_rows=normalize_rows)
if pool is None:
dispersions = [compute_dispersion(i) for i in range(seed, seed + n_trials)]
dispersions = [compute_dispersion(i)
for i in range(seed, seed + n_trials)]
else:
dispersions = pool.map(compute_dispersion, range(seed, seed + n_trials))
reference = _dispersion(data, labels, centroids, distance, normalize_rows)
Expand Down Expand Up @@ -95,27 +97,27 @@ def _fast_kmeans(kmeans: KMeans, max_iter: int = 10) -> SegmentationMethod:

class GapPicker(Picker):
def __init__(self, max_iter: int = 10, seed: int = 0, n_trials: int = 10,
correction: bool=True):
correction: bool = True, n_jobs: int = 1):
super().__init__(n_jobs=n_jobs)
self.max_iter = max_iter
self.seed = seed
self.n_trials = n_trials
self.correction = correction

def score(self, data: Data, estimators: List[KMeans], pool: Pool=None) \
-> np.ndarray:
scores = [
gap(data=data,
labels=estimator.labels_,
centroids=estimator.cluster_centers_,
distance=make_distance(estimator.distance),
split=_fast_kmeans(estimator, self.max_iter),
seed=self.seed,
n_trials=self.n_trials,
pool=pool,
return_deviation=True,
normalize_rows=estimator.normalize_rows)
for estimator in estimators
]
def score(self, data: Data, estimators: List[KMeans]) -> np.ndarray:
gap_ = partial(gap, data, seed=self.seed, n_trials=self.n_trials,
return_deviation=True)
n_jobs = get_n_jobs(self.n_jobs)
with context_if(self.n_jobs != 1, Pool, n_jobs) as pool:
scores = [
gap_(labels=estimator.labels_,
centroids=estimator.cluster_centers_,
distance=make_distance(estimator.distance),
split=_fast_kmeans(estimator, self.max_iter),
pool=pool,
normalize_rows=estimator.normalize_rows)
for estimator in estimators
]
return np.array(scores)

def select(self, scores: np.ndarray) -> Optional[int]:
Expand Down
42 changes: 0 additions & 42 deletions divik/_score/_optimizer.py

This file was deleted.

7 changes: 4 additions & 3 deletions divik/_score/_picker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABCMeta, abstractmethod
from multiprocessing.pool import Pool
from typing import List, Optional

import numpy as np
Expand All @@ -11,9 +10,11 @@


class Picker(metaclass=ABCMeta):
def __init__(self, n_jobs: int = 1):
self.n_jobs = n_jobs

@abstractmethod
def score(self, data: Data, estimators: List[KMeans], pool: Pool = None) \
-> np.ndarray:
def score(self, data: Data, estimators: List[KMeans]) -> np.ndarray:
raise NotImplementedError

@abstractmethod
Expand Down

0 comments on commit 2ddc600

Please sign in to comment.