Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fcma parallelize cv #176

Merged
merged 5 commits into from
Feb 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 108 additions & 48 deletions brainiak/fcma/voxelselector.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from . import fcma_extension
from . import cython_blas as blas
import logging
import pathos.multiprocessing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you do not use other features of Pathos, could you please use the standalone Multiprocess package?
https://pypi.python.org/pypi/multiprocess
We are trying to remove the dependency on Pathos (see issue #149).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to @mjanderson09 , the standalone multiprocessing can only serialize methods at the root level, and the Pathos version is more flexible so that we can define a lambda function and use it in the map method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiprocess can serialize lambdas, as shown in the last example in its README:
https://github.com/uqfoundation/multiprocess#examples

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't make myself clear. It is not just about lambda functions. A lambda function without involving any other user-defined methods should probably be fine, but in my case, I have _cross_validation_for_one_voxel used in my lambda function. If I want to use the standalone multiprocessing, I will have to put _cross_validation_for_one_voxel outside the class and define it at the root level.


logger = logging.getLogger(__name__)

Expand All @@ -37,7 +38,7 @@


class VoxelSelector:
"""Correlation-based voxel selection component of FCMA
"""Correlation-based voxel selection component of FCMA.

Parameters
----------
Expand Down Expand Up @@ -69,7 +70,7 @@ class VoxelSelector:
If raw_data2 is specified, len(raw_data) MUST equal len(raw_data2),
the correlation will be computed as raw_data by raw_data2.

voxel_unit: int, default 100
voxel_unit: int, default 64
The number of voxels assigned to a worker each time

master_rank: int, default 0
Expand All @@ -81,7 +82,7 @@ def __init__(self,
num_folds,
raw_data,
raw_data2=None,
voxel_unit=100,
voxel_unit=64,
master_rank=0):
self.labels = labels
self.epochs_per_subj = epochs_per_subj
Expand Down Expand Up @@ -113,7 +114,7 @@ def __init__(self,
_TERMINATETAG = 1

def run(self, clf):
""" run correlation-based voxel selection in master-worker model
"""Run correlation-based voxel selection in master-worker model.

Sort the voxels based on the cross-validation accuracy
of their correlation vectors
Expand All @@ -140,7 +141,7 @@ def run(self, clf):
return results

def _master(self):
""" master node's operation
"""Master node's operation.

Assigning tasks to workers and collecting results from them

Expand Down Expand Up @@ -214,7 +215,7 @@ def _master(self):
return results

def _worker(self, clf):
""" worker node's operation
"""Worker node's operation.

Receiving tasks from the master to process and sending the result back

Expand All @@ -235,20 +236,20 @@ def _worker(self, clf):
status=status)
if status.Get_tag():
break
comm.send(self._voxelScoring(task, clf),
comm.send(self._voxel_scoring(task, clf),
dest=self.master_rank)

def _correlationComputation(self, task):
""" use BLAS API to do correlation computation (matrix multiplication)
def _correlation_computation(self, task):
"""Use BLAS API to do correlation computation (matrix multiplication).

Parameters
----------
task: tuple (start_voxel_id, num_assigned_voxels)
task: tuple (start_voxel_id, num_processed_voxels)
depicting the voxels assigned to compute

Returns
-------
corr: 3D array in shape [num_selected_voxels, num_epochs, num_voxels]
corr: 3D array in shape [num_processed_voxels, num_epochs, num_voxels]
the correlation values of all subjects in all epochs
for the assigned values, in row-major
corr[i, e, s + j] = corr[j, e, s + i]
Expand Down Expand Up @@ -280,22 +281,22 @@ def _correlationComputation(self, task):
)
return corr

def _correlationNormalization(self, corr):
""" within-subject normalization
def _correlation_normalization(self, corr):
"""Do within-subject normalization.

This method uses scipy.zscore to normalize the data,
but is much slower than its C++ counterpart.
It is doing in-place z-score.

Parameters
----------
corr: 3D array in shape [num_selected_voxels, num_epochs, num_voxels]
corr: 3D array in shape [num_processed_voxels, num_epochs, num_voxels]
the correlation values of all subjects in all epochs
for the assigned values, in row-major

Returns
-------
corr: 3D array in shape [num_selected_voxels, num_epochs, num_voxels]
corr: 3D array in shape [num_processed_voxels, num_epochs, num_voxels]
the normalized correlation values of all subjects in all epochs
for the assigned values, in row-major
"""
Expand All @@ -320,75 +321,130 @@ def _correlationNormalization(self, corr):
)
return corr

def _crossValidation(self, task, corr, clf):
""" voxelwise cross validation based on correlation vectors
def _prepare_for_cross_validation(self, corr, clf):
"""Prepare data for voxelwise cross validation.

If the classifier is sklearn.svm.SVC with precomputed kernel,
the kernel matrix of each voxel is computed, otherwise do nothing.

Parameters
----------
task: tuple (start_voxel_id, num_assigned_voxels)
depicting the voxels assigned to compute
corr: 3D array in shape [num_selected_voxels, num_epochs, num_voxels]
corr: 3D array in shape [num_processed_voxels, num_epochs, num_voxels]
the normalized correlation values of all subjects in all epochs
for the assigned values, in row-major
clf: classification function
the classifier to be used in cross validation

Returns
-------
results: list of tuple (voxel_id, accuracy)
the accuracy numbers of all voxels, in accuracy descending order
the length of array equals the number of assigned voxels
data: 3D numpy array
If using sklearn.svm.SVC with precomputed kernel,
it is in shape [num_processed_voxels, num_epochs, num_epochs];
otherwise it is the input argument corr,
in shape [num_processed_voxels, num_epochs, num_voxels]
"""
time1 = time.time()
(sv, e, av) = corr.shape
kernel_matrix = np.zeros((e, e), np.float32, order='C')
results = []
for i in range(sv):
if isinstance(clf, sklearn.svm.SVC) \
and clf.kernel == 'precomputed':
(num_processed_voxels, num_epochs, _) = corr.shape
if isinstance(clf, sklearn.svm.SVC) and clf.kernel == 'precomputed':
# kernel matrices should be computed
kernel_matrices = np.zeros((num_processed_voxels, num_epochs,
num_epochs),
np.float32, order='C')
for i in range(num_processed_voxels):
blas.compute_kernel_matrix('L', 'T',
e, self.num_voxels2,
num_epochs, self.num_voxels2,
1.0, corr,
i, self.num_voxels2,
0.0, kernel_matrix, e)
0.0, kernel_matrices[i, :, :],
num_epochs)
# shrink the values for getting more stable alpha values
# in SVM training iteration
num_digits = len(str(int(kernel_matrix[0, 0])))
num_digits = len(str(int(kernel_matrices[i, 0, 0])))
if num_digits > 2:
proportion = 10**(2-num_digits)
kernel_matrix *= proportion
data = kernel_matrix
else:
data = corr[i, :, :]
kernel_matrices[i, :, :] *= proportion
data = kernel_matrices
else:
data = corr
time2 = time.time()
logger.debug(
'cross validation data preparation takes %.2f s' %
(time2 - time1)
)
return data

def _do_cross_validation(self, clf, data, task):
"""Run voxelwise cross validation based on correlation vectors.

clf: classification function
the classifier to be used in cross validation
data: 3D numpy array
If using sklearn.svm.SVC with precomputed kernel,
it is in shape [num_processed_voxels, num_epochs, num_epochs];
otherwise it is the input argument corr,
in shape [num_processed_voxels, num_epochs, num_voxels]
task: tuple (start_voxel_id, num_processed_voxels)
depicting the voxels assigned to compute

Returns
-------
results: list of tuple (voxel_id, accuracy)
the accuracy numbers of all voxels, in accuracy descending order
the length of array equals the number of assigned voxels
"""
time1 = time.time()

def _cross_validation_for_one_voxel(vid, num_folds,
subject_data, labels):
# no shuffling in cv
skf = model_selection.StratifiedKFold(n_splits=self.num_folds,
skf = model_selection.StratifiedKFold(n_splits=num_folds,
shuffle=False)
scores = model_selection.cross_val_score(clf, data,
y=self.labels,
scores = model_selection.cross_val_score(clf, subject_data,
y=labels,
cv=skf, n_jobs=1)
results.append((i + task[0], scores.mean()))
logger.debug(
'cross validation for voxel %d is done' %
(i + task[0])
vid
)
return (vid, scores.mean())

if isinstance(clf, sklearn.svm.SVC) and clf.kernel == 'precomputed':
inlist = [(i + task[0], self.num_folds, data[i, :, :],
self.labels) for i in range(task[1])]

pool = pathos.multiprocessing.ProcessingPool(None)
results = list(pool.map(
lambda x: _cross_validation_for_one_voxel
(x[0], x[1], x[2], x[3]),
inlist))
else:
results = []
for i in range(task[1]):
result = _cross_validation_for_one_voxel(i + task[0],
self.num_folds,
data[i, :, :],
self.labels)
results.append(result)
time2 = time.time()
logger.debug(
'cross validation for %d voxels, takes %.2f s' %
(task[1], (time2 - time1))
)
return results

def _voxelScoring(self, task, clf):
""" voxel selection processing done in the worker node
def _voxel_scoring(self, task, clf):
"""The voxel selection process done in the worker node.

Take the task in,
do analysis on voxels specified by the task (voxel id, num_voxels)
It is a three-stage pipeline consisting of:
1. correlation computation
2. within-subject normalization
3. voxelwise cross validaion
3. voxelwise cross validation

Parameters
----------
task: tuple (start_voxel_id, num_assigned_voxels),
task: tuple (start_voxel_id, num_processed_voxels),
depicting the voxels assigned to compute
clf: classification function
the classifier to be used in cross validation
Expand All @@ -401,9 +457,9 @@ def _voxelScoring(self, task, clf):
"""
time1 = time.time()
# correlation computation
corr = self._correlationComputation(task)
corr = self._correlation_computation(task)
# normalization
# corr = self._correlationNormalization(corr)
# corr = self._correlation_normalization(corr)
time3 = time.time()
fcma_extension.normalization(corr, self.epochs_per_subj)
time4 = time.time()
Expand All @@ -414,7 +470,11 @@ def _voxelScoring(self, task, clf):
)

# cross validation
results = self._crossValidation(task, corr, clf)
data = self._prepare_for_cross_validation(corr, clf)
if isinstance(clf, sklearn.svm.SVC) and clf.kernel == 'precomputed':
# to save memory so that the process can be forked
del corr
results = self._do_cross_validation(clf, data, task)
time2 = time.time()
logger.info(
'in rank %d, task %d takes %.2f s' %
Expand Down
2 changes: 1 addition & 1 deletion tests/fcma/test_voxel_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_voxel_selection():
vs = VoxelSelector(labels, 4, 2, fake_raw_data, voxel_unit=1)
# test scipy normalization
fake_corr = prng.rand(1, 4, 5).astype(np.float32)
fake_corr = vs._correlationNormalization(fake_corr)
fake_corr = vs._correlation_normalization(fake_corr)
if MPI.COMM_WORLD.Get_rank() == 0:
expected_fake_corr = [[[1.19203866, 0.18862808, -0.54350245,
-1.18334889, -0.16860008],
Expand Down