Skip to content

Commit

Permalink
Parallelize Sampling of LDT (#744)
Browse files Browse the repository at this point in the history
* LPT tutorial render fix attempt

* lpt tutorial changes

* parallelize ldt

* parallelize ldt

* functioning parallelization

* remove print

* remove comments

* format fix

* add parallel test

* change seeds array

* add random_state param

* change workers to n_jobs

* change back to workers

* add missing imports

* remove unused warnings import

* allow for none in type checking

* update description for workers

* black

* try only generating N seeds

Co-authored-by: Benjamin Pedigo <benjamindpedigo@gmail.com>
  • Loading branch information
kareef928 and bdpedigo committed May 20, 2021
1 parent 4d97391 commit 23d147b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 13 deletions.
52 changes: 39 additions & 13 deletions graspologic/inference/latent_distribution_test.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# Copyright (c) Microsoft Corporation and contributors.
# Licensed under the MIT License.

import warnings
from collections import namedtuple

import numpy as np
from hyppo.ksample import KSample
from scipy import stats
from sklearn.metrics.pairwise import PAIRED_DISTANCES, PAIRWISE_KERNEL_FUNCTIONS
from sklearn.utils import check_array
from sklearn.utils import check_array, check_random_state
from joblib import Parallel, delayed

from ..align import SeedlessProcrustes, SignFlips
from ..embed import AdjacencySpectralEmbed, select_dimension
from ..utils import fit_plug_in_variance_estimator, import_graph


_VALID_DISTANCES = list(PAIRED_DISTANCES.keys())
_VALID_KERNELS = list(PAIRWISE_KERNEL_FUNCTIONS.keys())
_VALID_KERNELS.append("gaussian") # can use hyppo's medial gaussian kernel too
Expand All @@ -31,7 +32,8 @@ def latent_distribution_test(
metric="euclidean",
n_components=None,
n_bootstraps=500,
workers=1,
random_state=None,
workers=None,
size_correction=True,
pooled=False,
align_type="sign_flips",
Expand Down Expand Up @@ -101,9 +103,23 @@ def latent_distribution_test(
Number of bootstrap iterations for the backend hypothesis test.
See :class:`hyppo.ksample.KSample` for more information.
workers : int (default=1)
random_state : {None, int, `~np.random.RandomState`, `~np.random.Generator`}
This parameter defines the object to use for drawing random
variates.
If `random_state` is ``None`` the `~np.random.RandomState` singleton is
used.
If `random_state` is an int, a new ``RandomState`` instance is used,
seeded with `random_state`.
If `random_state` is already a ``RandomState`` or ``Generator``
instance, then that object is used.
Default is None.
workers : int or None (default=None)
Number of workers to use. If more than 1, parallelizes the code.
Supply -1 to use all cores available to the Process.
Supply -1 to use all cores available. None is a marker for
'unset' that will be interpreted as ``workers=1`` (sequential execution) unless
the call is performed under a Joblib parallel_backend context manager that sets
another value for ``workers``. See :class:joblib.Parallel for more details.
size_correction : bool (default=True)
Ignored when the two graphs have the same number of vertices. The test
Expand Down Expand Up @@ -262,8 +278,8 @@ def latent_distribution_test(
raise ValueError(msg.format(n_bootstraps))

# check workers argument
if not isinstance(workers, int):
msg = "workers must be an int, not {}".format(type(workers))
if workers is not None and not isinstance(workers, (int, np.integer)):
msg = "workers must be an int or None, not {}".format(type(workers))
raise TypeError(msg)

# check size_correction argument
Expand Down Expand Up @@ -360,7 +376,9 @@ def latent_distribution_test(
Q = np.identity(X1_hat.shape[0])

if size_correction:
X1_hat, X2_hat = _sample_modified_ase(X1_hat, X2_hat, pooled=pooled)
X1_hat, X2_hat = _sample_modified_ase(
X1_hat, X2_hat, workers=workers, random_state=random_state, pooled=pooled
)

test_obj = KSample(test, compute_distkern=metric)

Expand Down Expand Up @@ -402,7 +420,7 @@ def _embed(A1, A2, n_components):
return X1_hat, X2_hat


def _sample_modified_ase(X, Y, pooled=False):
def _sample_modified_ase(X, Y, workers, random_state, pooled=False):
N, M = len(X), len(Y)

# return if graphs are same order, else ensure X the larger graph.
Expand All @@ -422,12 +440,20 @@ def _sample_modified_ase(X, Y, pooled=False):
else:
get_sigma = fit_plug_in_variance_estimator(X)
X_sigmas = get_sigma(X) * (N - M) / (N * M)

# increase the variance of X by sampling from the asy dist
X_sampled = np.zeros(X.shape)
# TODO may be parallelized, but requires keeping track of random state
for i in range(N):
X_sampled[i, :] = X[i, :] + stats.multivariate_normal.rvs(cov=X_sigmas[i])
rng = check_random_state(random_state)
X_sampled = np.asarray(
Parallel(n_jobs=workers)(
delayed(add_variance)(X[i, :], X_sigmas[i], r)
for i, r in zip(range(N), rng.randint(np.iinfo(np.int32).max, size=N))
)
)

# return the embeddings in the appropriate order
return (Y, X_sampled) if reverse_order else (X_sampled, Y)


def add_variance(X_orig, X_sigma, seed):
np.random.seed(seed)
return X_orig + stats.multivariate_normal.rvs(cov=X_sigma)
9 changes: 9 additions & 0 deletions tests/test_latentdistributiontest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def test_bad_kwargs(self):
# check workers argument
with pytest.raises(TypeError):
latent_distribution_test(A1, A2, workers=0.5)
latent_distribution_test(A1, A2, workers="oops")
# check size_correction argument
with pytest.raises(TypeError):
latent_distribution_test(A1, A2, size_correction=0)
Expand Down Expand Up @@ -207,11 +208,19 @@ def test_SBM_dcorr(self):
A1 = sbm(2 * [b_size], B1)
A2 = sbm(2 * [b_size], B1)
A3 = sbm(2 * [b_size], B2)

# non-parallel test
ldt_null = latent_distribution_test(A1, A2)
ldt_alt = latent_distribution_test(A1, A3)
self.assertTrue(ldt_null[0] > 0.05)
self.assertTrue(ldt_alt[0] <= 0.05)

# parallel test
ldt_null = latent_distribution_test(A1, A2, workers=-1)
ldt_alt = latent_distribution_test(A1, A3, workers=-1)
self.assertTrue(ldt_null[0] > 0.05)
self.assertTrue(ldt_alt[0] <= 0.05)

def test_different_sizes_null(self):
np.random.seed(314)

Expand Down

0 comments on commit 23d147b

Please sign in to comment.