Skip to content

Commit

Permalink
Merge remote-tracking branch 'francisco-dlp/fix_lazy_decomposition' i…
Browse files Browse the repository at this point in the history
…nto epsic_workshop
  • Loading branch information
ericpre committed Mar 22, 2018
2 parents d8d9039 + cba3f20 commit 227dd43
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 119 deletions.
112 changes: 60 additions & 52 deletions hyperspy/_signals/lazy.py
Expand Up @@ -18,22 +18,21 @@

import logging
from functools import partial
import warnings

import numpy as np
import math as math
import dask.array as da
import dask.delayed as dd
from dask import threaded
from dask.diagnostics import ProgressBar
from itertools import product

from ..signal import BaseSignal
from ..misc.utils import underline, multiply, dummy_context_manager
from ..misc.utils import multiply, dummy_context_manager
from ..external.progressbar import progressbar
from ..external.astroML.histtools import dasky_histogram
from ..defaults_parser import preferences
from ..docstrings.signal import (ONE_AXIS_PARAMETER, OUT_ARG)
from hyperspy.misc.array_tools import _requires_linear_rebin
from hyperspy.exceptions import VisibleDeprecationWarning

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -284,9 +283,6 @@ def rebin(self, new_shape=None, scale=None, crop=False, out=None):
def __array__(self, dtype=None):
return self.data.__array__(dtype=dtype)

def _unfold(self, *args):
raise lazyerror

def _make_sure_data_is_contiguous(self, log=None):
self._make_lazy(rechunk=True)

Expand Down Expand Up @@ -606,28 +602,29 @@ def _block_iterator(self,
self.axes_manager.signal_shape[::-1])

def decomposition(self,
output_dimension,
normalize_poissonian_noise=False,
algorithm='PCA',
output_dimension=None,
signal_mask=None,
navigation_mask=None,
get=threaded.get,
num_chunks=None,
reproject=True,
bounds=True,
bounds=False,
**kwargs):
"""Perform Incremental (Batch) decomposition on the data, keeping n
significant components.
Parameters
----------
output_dimension : int
the number of significant components to keep
normalize_poissonian_noise : bool
If True, scale the SI to normalize Poissonian noise
algorithm : str
One of ('PCA', 'ORPCA', 'ONMF'). By default ('PCA') IncrementalPCA
from scikit-learn is run.
One of ('svd', 'PCA', 'ORPCA', 'ONMF'). By default 'svd',
lazy SVD decomposition from dask.
output_dimension : int
the number of significant components to keep. If None, keep all
(only valid for SVD)
get : dask scheduler
the dask scheduler to use for computations;
default `dask.threaded.get`
Expand All @@ -643,12 +640,6 @@ def decomposition(self,
decomposition.
reproject : bool
Reproject data on the learnt components (factors) after learning.
bounds : {tuple, bool}
The (min, max) values of the data to normalize before learning.
If tuple (min, max), those values will be used for normalization.
If True, extremes will be looked up (expensive), default.
If False, no normalization is done (learning may be very slow).
If normalize_poissonian_noise is True, this cannot be True.
**kwargs
passed to the partial_fit/fit functions.
Expand Down Expand Up @@ -677,6 +668,11 @@ def decomposition(self,
"""
if bounds:
msg = (
"The `bounds` keyword is deprecated and will be removed "
"in v2.0. Since version > 1.3 this has no effect.")
warnings.warn(msg, VisibleDeprecationWarning)
explained_variance = None
explained_variance_ratio = None
_al_data = self._data_aligned_with_axes
Expand All @@ -689,6 +685,9 @@ def decomposition(self,
if blocksize / output_dimension < num_chunks:
num_chunks = np.ceil(blocksize / output_dimension)
blocksize *= num_chunks
if algorithm != "svd" and output_dimension is None:
raise ValueError("With the %s the output_dimension "
"must be specified" % algorithm)

# LEARN
if algorithm == 'PCA':
Expand All @@ -709,16 +708,12 @@ def decomposition(self,
batch_size = kwargs.pop('batch_size', None)
obj = ONMF(output_dimension, **kwargs)
method = partial(obj.fit, batch_size=batch_size)

else:
elif algorithm != "svd":
raise ValueError('algorithm not known')

original_data = self.data
try:
if normalize_poissonian_noise:
if bounds is True:
bounds = False
# warnings.warn?
data = self._data_aligned_with_axes
ndim = self.axes_manager.navigation_dimension
sdim = self.axes_manager.signal_dimension
Expand Down Expand Up @@ -752,36 +747,43 @@ def decomposition(self,
data = data / coeff
self.data = data

# normalize the data for learning algs:
if bounds:
if bounds is True:
_min, _max = da.compute(self.data.min(), self.data.max())
else:
_min, _max = bounds
self.data = (self.data - _min) / (_max - _min)

# LEARN
this_data = []
try:
for chunk in progressbar(
self._block_iterator(
flat_signal=True,
get=get,
signal_mask=signal_mask,
navigation_mask=navigation_mask),
total=nblocks,
leave=True,
desc='Learn'):
this_data.append(chunk)
if len(this_data) == num_chunks:
if algorithm == "svd":
reproject = False
from dask.array.linalg import svd
try:
self._unfolded4decomposition = self.unfold()
# TODO: implement masking
U, S, V = svd(self.data)
factors = V.T
explained_variance = S ** 2 / self.data.shape[0]
loadings = U * S
finally:
if self._unfolded4decomposition is True:
self.fold()
self._unfolded4decomposition is False
else:
this_data = []
try:
for chunk in progressbar(
self._block_iterator(
flat_signal=True,
get=get,
signal_mask=signal_mask,
navigation_mask=navigation_mask),
total=nblocks,
leave=True,
desc='Learn'):
this_data.append(chunk)
if len(this_data) == num_chunks:
thedata = np.concatenate(this_data, axis=0)
method(thedata)
this_data = []
if len(this_data):
thedata = np.concatenate(this_data, axis=0)
method(thedata)
this_data = []
if len(this_data):
thedata = np.concatenate(this_data, axis=0)
method(thedata)
except KeyboardInterrupt:
pass
except KeyboardInterrupt:
pass

# GET ALREADY CALCULATED RESULTS
if algorithm == 'PCA':
Expand Down Expand Up @@ -853,12 +855,18 @@ def post(a): return np.concatenate(a, axis=1).T
target = self.learning_results
target.decomposition_algorithm = algorithm
target.output_dimension = output_dimension
target._object = obj
if algorithm != "svd":
target._object = obj
target.factors = factors
target.loadings = loadings
target.explained_variance = explained_variance
target.explained_variance_ratio = explained_variance_ratio

# Rescale the results if the noise was normalized
if normalize_poissonian_noise is True:
target.factors = target.factors * rbH.ravel()[:, np.newaxis]
target.loadings = target.loadings * raG.ravel()[:, np.newaxis]

def transpose(self, *args, **kwargs):
res = super().transpose(*args, **kwargs)
res._make_lazy(rechunk=True)
Expand Down

0 comments on commit 227dd43

Please sign in to comment.