Skip to content
59 changes: 59 additions & 0 deletions diffpy/snmf/subroutines.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,64 @@
import numdifftools


def lift_data(data_input, lift=1):
"""Lifts values of data_input

Adds 'lift' * the minimum value in data_input to data_input element-wise.

Parameters
----------
data_input: 2d array like
The matrix containing a series of signals to be decomposed. Has dimensions N x M where N is the length of each
signal and M is the number of signals.

lift: float
The factor representing how much to lift 'data_input'.

Returns
-------
2d array like
The matrix that contains data_input - (min(data_input) * lift).

"""
data_input = np.asarray(data_input)
return data_input + np.abs(np.min(data_input) * lift)


def initialize_arrays(number_of_components, number_of_moments, signal_length):
"""Generates the initial guesses for the weight, stretching, and component matrices

Calculates the initial guesses for the component matrix, stretching factor matrix, and weight matrix. The initial
guess for the component matrix is a random (signal_length) x (number_of_components) matrix where each element is
between 0 and 1. The initial stretching factor matrix is a random (number_of_components) x (number_of_moments)
matrix where each element is number slightly perturbed from 1. The initial weight matrix guess is a random
(number_of_components) x (number_of_moments) matrix where each element is between 0 and 1.

Parameters
----------
number_of_components: int
The number of component signals to obtain from the stretched nmf decomposition.

number_of_moments: int
The number of signals in the user provided dataset where each signal is at a different moment.

signal_length: int
The length of each signal in the user provided dataset.

Returns
-------
tuple of 2d arrays of floats
The tuple containing three elements: the initial component matrix guess, the initial stretching factor matrix
guess, and the initial weight factor matrix guess in that order.

"""
component_matrix_guess = np.random.rand(signal_length, number_of_components)
weight_matrix_guess = np.random.rand(number_of_components, number_of_moments)
stretching_matrix_guess = np.ones(number_of_components, number_of_moments) + np.random.randn(number_of_components,
number_of_moments) * 1e-3
return component_matrix_guess, weight_matrix_guess, stretching_matrix_guess


def objective_function(residual_matrix, stretching_factor_matrix, smoothness, smoothness_term, component_matrix,
sparsity):
"""Defines the objective function of the algorithm and returns its value.
Expand Down Expand Up @@ -80,6 +138,7 @@ def get_stretched_component(stretching_factor, component, signal_length):

def stretched_component_func(stretching_factor):
return np.interp(normalized_grid / stretching_factor, normalized_grid, component, left=0, right=0)

derivative_func = numdifftools.Derivative(stretched_component_func)
second_derivative_func = numdifftools.Derivative(derivative_func)

Expand Down
17 changes: 16 additions & 1 deletion diffpy/snmf/tests/test_subroutines.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest
import numpy as np
from diffpy.snmf.subroutines import objective_function, get_stretched_component, reconstruct_data, get_residual_matrix, \
update_weights_matrix
update_weights_matrix, initialize_arrays, lift_data

to = [
([[[1, 2], [3, 4]], [[5, 6], [7, 8]], 1e11, [[1, 2], [3, 4]], [[1, 2], [3, 4]], 1], 2.574e14),
Expand Down Expand Up @@ -134,3 +134,18 @@ def test_reconstruct_data(trd):
actual = reconstruct_data(trd[0][0], trd[0][1], trd[0][2], trd[0][3], trd[0][4], trd[0][5])
expected = trd[1]
np.testing.assert_allclose(actual, expected, rtol=1e-03)


tld = [(([[[1, -1, 1], [0, 0, 0], [2, 10, -3]], 1]), ([[4, 2, 4], [3, 3, 3], [5, 13, 0]])),
(([[[1, -1, 1], [0, 0, 0], [2, 10, -3]], 0]), ([[1, -1, 1], [0, 0, 0], [2, 10, -3]])),
(([[[1, -1, 1], [0, 0, 0], [2, 10, -3]], .5]), ([[2.5, .5, 2.5], [1.5, 1.5, 1.5], [3.5, 11.5, -1.5]])),
(([[[1, -1, 1], [0, 0, 0], [2, 10, -3]], -1]), ([[4, 2, 4], [3, 3, 3], [5, 13, 0]])),
(([[[0, 0, 0], [0, 0, 0], [0, 0, 0]], 100]), ([[0, 0, 0], [0, 0, 0], [0, 0, 0]])),
(([[[1.5, 2], [10.5, 1], [0.5, 2]], 1]), ([[2, 2.5], [11, 1.5], [1, 2.5]])),
(([[[-10, -10.5], [-12.2, -12.2], [0, 0]], 1]), ([[2.2, 1.7], [0, 0], [12.2, 12.2]])),
]
@pytest.mark.parametrize('tld', tld)
def test_lift_data(tld):
actual = lift_data(tld[0][0], tld[0][1])
expected = tld[1]
np.testing.assert_allclose(actual, expected)