In [5]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import scipy as sc

In [3]:
def _center_scale_xy(X, Y):
    """ Center X, Y and scale

    Returns
    -------
        X, Y, x_mean, y_mean, x_std, y_std
    """
    # center
    x_mean = X.mean(axis=0)
    X -= x_mean
    y_mean = Y.mean(axis=0)
    Y -= y_mean
    # scale
    x_std = X.std(axis=0, ddof=1)
    x_std[x_std == 0.0] = 1.0
    X /= x_std
    y_std = Y.std(axis=0, ddof=1)
    y_std[y_std == 0.0] = 1.0
    Y /= y_std
    return X, Y, x_mean, y_mean, x_std, y_std

In [6]:
def nipals(X, Y, max_iter=500, tol=1e-06,
                                 norm_y_weights=False):
    """Inner loop of the iterative NIPALS algorithm.
    Provides an alternative to the svd(X'Y); returns the first left and right
    singular vectors of X'Y.  See PLS for the meaning of the parameters.  It is
    similar to the Power method for determining the eigenvectors and
    eigenvalues of a X'Y.
    
    y_score - U
    y_weights - C
    x_score - T
    x_weights - W
    
    Return: vectors w, c
    """
    # 1: initialization of u
    u = Y[:, [0]]
# TEST IT!!!!
    t_old = 0
    ite = 1
    eps = np.finfo(X.dtype).eps
    # Inner loop of the Wold algo.
    while True:
        # w = X^T u / (u^T u)
        w = np.dot(X.T, u) / np.dot(u.T, u)
        
        # If u only has zeros w will only have zeros. In
        # this case add an epsilon to converge to a more acceptable
        # solution
        if np.dot(w.T, w) < eps:
            w += eps
        # w = w / |w|
        w /= (np.sqrt(np.dot(w.T, w)) + eps)
        
        # t = Xw
        t = np.dot(X, w)
        
        # c = Y^T t / (t^T t)
        c = np.dot(Y.T, t) / np.dot(t.T, t)
        
        # c = c / |c|
# ????? (TEST IT!!!)
        if norm_y_weights:
            c /= np.sqrt(np.dot(c.T, c)) + eps

        # u = Yc
# ????? (TEST IT!!!!)
        u = np.dot(Y, y_weights) / (np.dot(c.T, c) + eps)
        
        # 10: convergence of t
        t_diff = t - t_old
        if np.dot(t_diff.T, t_diff) < tol or Y.shape[1] == 1:
            break 
        if ite == max_iter:
            warnings.warn('Maximum number of iterations reached')
            break
        t_old = t
        ite += 1
    return w, c, ite

In [None]:
from sklearn.utils.extmath import svd_flip

from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin
from sklearn.utils import check_array, check_consistent_length
from sklearn.externals import six

import warnings
from abc import ABCMeta, abstractmethod
import numpy as np
from scipy import linalg
from sklearn.utils import arpack
from sklearn.utils.validation import check_is_fitted, FLOAT_DTYPES

pinv2_args = {'check_finite': False}


class _PLS(six.with_metaclass(ABCMeta), BaseEstimator, TransformerMixin,
           RegressorMixin):
    """
    Partial Least Squares (PLS)
    
    """

    @abstractmethod
    def __init__(self, n_components=2, deflation_mode="regression",\
                 algorithm="nipals", trans_function_name=None, trans_parameters=None,\
                 norm_y_weights=False, max_iter=500, tol=1e-06, copy=True):
        self.n_components = n_components
        self.deflation_mode = deflation_mode
        self.norm_y_weights = norm_y_weights
        self.algorithm = algorithm
        self.max_iter = max_iter
        self.tol = tol
        self.copy = copy
        self.trans_function_name = trans_function_name
        self.trans_parameters = trans_parameters
        

    def fit(self, X, Y):
        """
        Fit model to data.
        
        """

        # copy since this will contains the residuals (deflated) matrices
        check_consistent_length(X, Y)
        X = check_array(X, dtype=np.float64, copy=self.copy)
        Y = check_array(Y, dtype=np.float64, copy=self.copy, ensure_2d=False)
        if Y.ndim == 1:
            Y = Y.reshape(-1, 1)

        n = X.shape[0]
        p = X.shape[1]
        q = Y.shape[1]

        if self.n_components < 1 or self.n_components > p:
            raise ValueError('Invalid number of components: %d' %
                             self.n_components)
        if self.algorithm not in nipals", "nipals_nonlinear"):
            raise ValueError("Got algorithm %s when only 'svd' "
                             "and 'nipals' are known" % self.algorithm)
        if self.algorithm == "svd" and self.mode == "B":
            raise ValueError('Incompatible configuration: mode B is not '
                             'implemented with svd algorithm')
        if self.deflation_mode not in ["canonical", "regression"]:
            raise ValueError('The deflation mode is unknown')
        # Scale (in place)
        X, Y, self.x_mean_, self.y_mean_, self.x_std_, self.y_std_ = (
            _center_scale_xy(X, Y))
        # Residuals (deflated) matrices
        Xk = X
        Yk = Y
        # Results matrices
        self.x_scores_ = np.zeros((n, self.n_components))
        self.y_scores_ = np.zeros((n, self.n_components))
        self.x_weights_ = np.zeros((p, self.n_components))
        self.y_weights_ = np.zeros((q, self.n_components))
        self.x_loadings_ = np.zeros((p, self.n_components))
        self.y_loadings_ = np.zeros((q, self.n_components))
        self.n_iter_ = []

        trans_parameters = np.ones([Y.shape[0], Y.shape[1]*2])*np.random.randn()
#         y_tilde = trans_function(self.trans_function_name, Y, trans_parameters)
        
        # NIPALS algo: outer loop, over components
        for k in range(self.n_components):
            if np.all(np.dot(Yk.T, Yk) < np.finfo(np.double).eps):
                # Yk constant
                warnings.warn('Y residual constant at iteration %s' % k)
                break
            # 1) weights estimation (inner loop)
            # -----------------------------------
            if self.algorithm == "nipals":
                x_weights, y_weights, n_iter_ = \
                    _nipals_twoblocks_inner_loop(\
                             X=Xk, Y=Yk, max_iter=self.max_iter,\
                             tol=self.tol, norm_y_weights=self.norm_y_weights,)
                self.n_iter_.append(n_iter_)
            if self.algorithm == "nipals_nonlinear":
                x_weights, y_weights, trans_parameters, y_tilde, n_iter_ = \
                    _nipals_twoblocks_inner_loop_nonlinear(\
                             X=Xk, Y=Yk, max_iter=self.max_iter,\
                             tol=self.tol, norm_y_weights=self.norm_y_weights,\
                             trans_function_name=self.trans_function_name,\
                             trans_parameters=trans_parameters)
                self.n_iter_.append(n_iter_)
            # Forces sign stability of x_weights and y_weights
            # Sign undeterminacy issue from svd if algorithm == "svd"
            # and from platform dependent computation if algorithm == 'nipals'
            x_weights, y_weights = svd_flip(x_weights, y_weights.T)
            y_weights = y_weights.T
            # compute scores
            x_scores = np.dot(Xk, x_weights)
            if self.norm_y_weights:
                y_ss = 1
            else:
                y_ss = np.dot(y_weights.T, y_weights) + np.finfo(np.double).eps
            if self.algorithm=="nipals":
                y_scores = np.dot(Yk, y_weights) / y_ss
            if self.algorithm=="nipals_nonlinear":
                y_scores = np.dot(y_tilde, y_weights) / y_ss
            
            # test for null variance
            if np.dot(x_scores.T, x_scores) < np.finfo(np.double).eps:
                warnings.warn('X scores are null at iteration %s' % k)
                break
            # 2) Deflation (in place)
            # - regress Xk's on x_score
            x_loadings = np.dot(Xk.T, x_scores) / (np.dot(x_scores.T, x_scores) + \
                                                   np.finfo(np.double).eps)
            # - subtract rank-one approximations to obtain remainder matrix
            Xk -= np.dot(x_scores, x_loadings.T)
# TO THINK HOW TO CHANGE TILDE Y
            if self.deflation_mode == "canonical":
                # - regress Yk's on y_score, then subtract rank-one approx.
                y_loadings = (np.dot(Yk.T, y_scores)
                              / np.dot(y_scores.T, y_scores))
                
                if self.algorithm=="nipals":
                    Yk -= np.dot(x_scores, y_loadings.T)
                if self.algorithm=="nipals_nonlinear":
                    y_tilde -= np.dot(y_scores, y_loadings.T)
                    Yk = inverse_trans_function(self.trans_function_name, y_tilde, trans_parameters)
            if self.deflation_mode == "regression":
                # - regress Yk's on x_score, then subtract rank-one approx.
                y_loadings = (np.dot(Yk.T, x_scores)
                              / np.dot(x_scores.T, x_scores))
                if self.algorithm=="nipals_nonlinear":
                    y_tilde -= np.dot(y_scores, y_loadings.T) #регрессия на \tilde{Y}???
                    # COMPUTE Yk AND INVERSE
                    Yk = inverse_trans_function(self.trans_function_name, y_tilde, trans_parameters)
                if self.algorithm=="nipals":
                    Yk -= np.dot(x_scores, y_loadings.T)
            # 3) Store weights, scores and loadings # Notation:
            self.x_scores_[:, k] = x_scores.ravel()  # T
            self.y_scores_[:, k] = y_scores.ravel()  # U
            self.x_weights_[:, k] = x_weights.ravel()  # W
            self.y_weights_[:, k] = y_weights.ravel()  # C
            self.x_loadings_[:, k] = x_loadings.ravel()  # P
            self.y_loadings_[:, k] = y_loadings.ravel()  # Q
        # Such that: X = TP' + Err and Y = UQ' + Err

        # 4) rotations from input space to transformed space (scores)
        # T = X W(P'W)^-1 = XW* (W* : p x k matrix)
        # U = Y C(Q'C)^-1 = YC* (W* : q x k matrix)
        self.x_rotations_ = np.dot(
            self.x_weights_,
            linalg.pinv2(np.dot(self.x_loadings_.T, self.x_weights_),
                         **pinv2_args))
        if Y.shape[1] > 1:
            self.y_rotations_ = np.dot(
                self.y_weights_,
                linalg.pinv2(np.dot(self.y_loadings_.T, self.y_weights_),
                             **pinv2_args))
        else:
            self.y_rotations_ = np.ones(1)

        if True or self.deflation_mode == "regression":
            # FIXME what's with the if?
            # Estimate regression coefficient
            # Regress Y on T
            # Y = TQ' + Err,
            # Then express in function of X
            # Y = X W(P'W)^-1Q' + Err = XB + Err
            # => B = W*Q' (p x q)
            self.coef_ = np.dot(self.x_rotations_, self.y_loadings_.T)
            self.coef_ = (1. / self.x_std_.reshape((p, 1)) * self.coef_ *
                          self.y_std_)
        self.trans_parameters = trans_parameters
        return self

    def transform(self, X, Y=None, copy=True):
        """Apply the dimension reduction learned on the train data.

        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        check_is_fitted(self, 'x_mean_')
        X = check_array(X, copy=copy, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        # Apply rotation
        x_scores = np.dot(X, self.x_rotations_)
        if Y is not None:
            Y = check_array(Y, ensure_2d=False, copy=copy, dtype=FLOAT_DTYPES)
            if Y.ndim == 1:
                Y = Y.reshape(-1, 1)
# G?
            Y -= self.y_mean_
            Y /= self.y_std_
# MB TILDE Y
            y_scores = np.dot(Y, self.y_rotations_)
            return x_scores, y_scores

        return x_scores

    def predict(self, X, copy=True):
        """Apply the dimension reduction learned on the train data.
        
        This call requires the estimation of a p x q matrix, which may
        be an issue in high dimensional space.
        """
        check_is_fitted(self, 'x_mean_')
        X = check_array(X, copy=copy, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        Ypred = np.dot(X, self.coef_)
        return Ypred + self.y_mean_

    def fit_transform(self, X, y=None, **fit_params):
        """
        Learn and apply the dimension reduction on the train data.

        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        return self.fit(X, y, **fit_params).transform(X, y)


class PLSRegression(_PLS):
    """
    PLS regression
    """

    def __init__(self, n_components=2,max_iter=500, tol=1e-06, copy=True, trans_function_name=None):
        super(PLSRegression, self).__init__(
            n_components=n_components,
            deflation_mode="regression", trans_function_name=trans_function_name,
            norm_y_weights=False, max_iter=max_iter, tol=tol, copy=copy)