This file is to repeat the partial least squares regression process in a paper[1].  
[1] Partial least squares regression and projection on latent structure regression (PLS Regression)

In [6]:
import numpy as np
import pandas as pd
import scipy as sp
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import mean_squared_error,r2_score

In [7]:
class PLSCJ():
    
    def __init__(self, n_components = None):
        
        self.PC = n_components
    
    def fit(self, X, Y, scale = True,ddof=1):
        
        
        
        self.X = X
        self.Y = Y
        
        E, F, self.x_mean, self.y_mean, self.x_std, self.y_std = self._center_scale(X, Y, scale,ddof)
        
        T, U, W, B, P, C = self._pls(E, F)      
        
        #X = TP'
        #Y = UQ'
        
        #Be aware that T, W, C is orthogonal; P, U, Q is not orthogonal.
        #Ybar is the estimated Y.
        #Ybar = TBC' Be noticed that this B is tranformed into diagonal matrix
        #Ybar = TQ'
        #In this calculation method, the loadings of y is not actually appeared.
        
        Q = np.dot(np.diag(B),C.T).T #Q = (BC')'
                
        self.x_scores_ = T
        self.y_scores_ = U
        self.x_weights_ = W
        self.y_weights_ = C
        self.x_loadings_ = P
        self.y_loadings_ = Q
        self.regression_weights_ = B
        
        
        #Ybar = TBC' = XBpls with Bpls = (P'+)BC' 
        #where P'+ is the Moore–Penrose pseudo-inverse of P'
        
        Bpls = np.dot(np.dot(sp.linalg.pinv2(P.T),np.diag(B)),C.T)
        self.Bpls_ = Bpls
        Bpls = (Bpls.T/self.x_std).T # np.dot(A/B,C) = np.dot(A,(C.T/B).T)
        self.coef_ = Bpls*self.y_std
        self.intercept = -(np.dot(self.x_mean,Bpls))*self.y_std+self.y_mean
        
    def _center_scale(self, X, Y, scale=True,ddof=1):

    # center
        E = X.copy()
        F = Y.copy()
        x_mean = X.mean(axis=0)
        x_std = X.std(axis=0, ddof=ddof)
        E -= x_mean
        y_mean = Y.mean(axis=0)
        y_std = Y.std(axis=0, ddof=ddof)
        F -= y_mean
    # scale
        if scale:
            E /= x_std
            F /= y_std
        else:
            x_std = np.ones(X.shape[1])
            y_std = np.ones(Y.shape[1])
        
        return E, F, x_mean, y_mean, x_std, y_std    
            
    def _niplas(self, E, F, u):
        
        #Note: the symbol ∝ means ‘to normalize the result of the operation’
        #Step 1. w ∝ E'u (estimate X weights).
        #Step 2. t ∝ Ew (estimate X factor scores).
        #Step 3. c ∝ F't (estimate Y weights). 
        #Step 4. u = Fc (estimate Y scores). Be aware that Y scores is not orthogonal.
        #b = t'u
        #p = E't
        #E = E − tp'
        #F = F − btc'
        
        #Blow is the original method to estimate the first vector
   
        w = np.dot(E.T,u)
        w = w/np.linalg.norm(w) #not equal to w/np.dot(u.T,u)
        t = np.dot(E,w)
        t = t/np.linalg.norm(t) #not equal to t/np.dot(w.T,w)
        c = np.dot(F.T,t)
        c = c/np.linalg.norm(c) #not equal to q/np.dot(t.T,t)
        u = np.dot(F,c)
        to = t
        conv = 1
        while conv > 10**-8:
            w = np.dot(E.T,u)
            w = w/np.linalg.norm(w)
            t = np.dot(E,w)
            t = t/np.linalg.norm(t)
            c = np.dot(F.T,t)
            c = c/np.linalg.norm(c) # is different with c/np.dot(t.T,t)
            u = np.dot(F,c)
            conv = abs(np.linalg.norm(to-t))/abs(np.linalg.norm(t))
            to = t
        b = np.dot(t.T,u)
        p = np.dot(E.T,t)
        E = E - np.dot(t,p.T)
        F = F - b*np.dot(t,c.T)
        
        #In sklearn's PLS regression, the F is not calculated by the regression weights b.
        #The b is equal to I for each steps here.
        #So the Fbar is estimated by TQ'. Where Q is calculated by Q = Yk'U.
        #Because the Ybar = TBC' = TQ', the C is equal to Q in the sklearn's PLS regression finally.
        
        return t, u, w, b, p, c, E, F
        
    def _pls(self, E, F):
    
        T = None 
        U = None
        P = None
        W = None
        B = None
        c = None
        
        PC_iter = 0
        observation,factor = E.shape 

        #u = F[:,0]
        #The first column of Y could be used as the first Y scores.
        #Because w ∝ E'u ∝ E'Fc ∝ E'FF't ∝ E'FF'Ew, 
        #the weight vector w is the first right singular vector of the matrix,
        #the first weight vector c is the left singular vector of S.
        #Below I use this method to speed up the convergence.
        
        
        S = np.dot(E.T,F)
        w,D,c = sp.linalg.svd(S)
        c = c.T
        w = w[:,0]
        c = c[:,0]       
        u = np.dot(F,c)
        u.shape = (observation,1)
        
        if type(self.PC) is int:
            
            if self.PC > factor or self.PC < 1:
                    raise ValueError('Invalid number of components: %d' %
                                     self.PC)
            else:
                
                factor = self.PC
                
                while PC_iter < factor:
                
                    if PC_iter is 0:
                    
                        T, U, W, B, P, C, E, F = self._niplas(E, F, u)
                
                    else:
                    
                        t, u, w, b, p, c, E, F = self._niplas(E, F, u)
                        T = np.column_stack((T,t))
                        U = np.column_stack((U,u))
                        W = np.column_stack((W,w))
                        P = np.column_stack((P,p))
                        B = np.column_stack((B,b))
                        C = np.column_stack((C,c))
                
                    PC_iter = len(T.T)    
        
        elif self.PC is None:
            
            while (np.linalg.norm(E)>(10**-15)) and (PC_iter <= factor):
                                
                if PC_iter is 0:
                    
                    T, U, W, B, P, C, E, F = self._niplas(E, F, u)
                
                else:
                    
                    t, u, w, b, p, c, E, F = self._niplas(E, F, u)
                    T = np.column_stack((T,t))
                    U = np.column_stack((U,u))
                    W = np.column_stack((W,w))
                    B = np.column_stack((B,b))
                    P = np.column_stack((P,p))
                    C = np.column_stack((C,c))
                
                PC_iter = len(T.T)
                          
        else:
            
            raise ValueError('Invalid number of components: %d' %
                             self.PC)
        
        #Because the shape of B is 2-dimensional like (1,3), but we need it reshaped as 1-dimensional like (3,)
        #to be conveniently used, such as, to be diagonalized by numpy.diag().
         
        return T, U, W, B[0,:], P, C
    
    def predict(self,X):
        
        return np.dot(X,self.coef_)+self.intercept
    
    def parameter(self):
        
        #Export parameters for testing.
        
        return self.x_scores_, self.y_scores_, self.x_weights_, self.y_weights_, self.x_loadings_, self.y_loadings_, self.regression_weights_

In [8]:
sWine = pd.Series(['1','2','3','4','5'],)
data_Y = {'Hedonic':[14,10,8,2,6],
          'Goes with Meat':[7,7,5,4,2],
          'Goes with Dessert':[8,6,5,7,4]}
dY = pd.DataFrame(data_Y,columns=['Hedonic','Goes with Meat','Goes with Dessert'],index=sWine)
naY = np.array(dY,dtype='float64')

data_X = {'Price':[7,4,10,16,13],
          'Sugar':[7,3,5,7,3],
          'Alchol':[13,14,12,11,10],
          'Acidity':[7,7,5,3,3]}
dX = pd.DataFrame(data_X,columns=['Price','Sugar','Alchol','Acidity'],index=sWine)
naX = np.array(dX,dtype='float64')

data_T = {'t1':[0.4538,0.5399,0,-0.4304,-0.5633],
          't2':[-0.4662,0.4940,0,-0.5327,0.5049],
          't3':[0.5716,-0.4631,0,-0.5301,0.4217]}
dT = pd.DataFrame(data_T,columns=['t1','t2','t3'],index=sWine)
naT = np.array(dT)

data_U = {'u1':[1.9451,0.9347,-0.2327,-0.9158,-1.7313],
          'u2':[-0.7611,0.5305,0.6084,-1.1575,0.7797],
          'u3':[0.6191,-0.5388,0.0823,-0.6139,0.4513]}
dU = pd.DataFrame(data_U,columns=['u1','u2','u3'],index=sWine)
naU = np.array(dU)

data_P = {'p1':[-1.8706,0.0468,1.9547,1.9874],
          'p2':[-0.6845,-1.9977,0.0283,0.0556],
          'p3':[-0.1796,0.0829,-0.4224,0.2170]}
dP = pd.DataFrame(data_P,columns=['p1','p2','p3'],index=['Price','Sugar','Alchol','Acidity'])
naP = np.array(dP)

data_W = {'w1':[-0.5137,0.2010,0.5705,0.6085],
          'w2':[-0.3379,-0.9400,-0.0188,0.0429],
          'w3':[-0.3492,0.1612,-0.8211,0.4218]}
dW = pd.DataFrame(data_W,columns=['w1','w2','w3'],index=['Price','Sugar','Alchol','Acidity'])
naW = np.array(dW)

In [9]:
X = naX
Y_true = naY

In [10]:
plscj = PLSCJ(n_components=3)
plscj.fit(X,Y_true)
r2_sum = 0
for i in range(0,3):
    Y_pred=np.dot(plscj.x_scores_[:,i].reshape(-1,1)*plscj.regression_weights_[i],plscj.y_weights_[:,i].reshape(-1,1).T)*naY.std(axis=0, ddof=1)+naY.mean(axis=0)
    r2_sum += round(r2_score(Y_true,Y_pred),3) 
    print('R2 for %d component: %g' %(i+1,round(r2_score(Y_true,Y_pred),3)))
print('R2 for all components: %g' %r2_sum) #Sum of above
print('R2 for all components: %g' %round(r2_score(Y_true,plscj.predict(X)),3)) #Calcuted from PLSRegres's 'predict' function.

R2 for 1 component: 0.633
R2 for 2 component: 0.221
R2 for 3 component: 0.104
R2 for all components: 0.958
R2 for all components: 0.958


In [11]:
pls = PLSRegression(n_components=3)
pls.fit(X,Y_true)
r2_sum = 0
for i in range(0,3):
    Y_pred=np.dot(pls.x_scores_[:,i].reshape(-1,1),pls.y_loadings_[:,i].reshape(-1,1).T)*naY.std(axis=0, ddof=1)+naY.mean(axis=0)
    r2_sum += round(r2_score(Y_true,Y_pred),3) 
    print('R2 for %d component: %g' %(i+1,round(r2_score(Y_true,Y_pred),3)))
print('R2 for all components: %g' %r2_sum) #Sum of above
print('R2 for all components: %g' %round(r2_score(Y_true,pls.predict(X)),3)) #Calcuted from PLSRegression's 'predict' function.

R2 for 1 component: 0.633
R2 for 2 component: 0.221
R2 for 3 component: 0.104
R2 for all components: 0.958
R2 for all components: 0.958


In [12]:
print('X:')
print(dX)
print('Actual Y:')
print(dY)
print('Estimated Y from sklearn PLSRegression:')
print(pd.DataFrame(pls.predict(X),columns=['Hedonic','Goes with Meat','Goes with Dessert'],index=sWine))
print('Estimated Y from PLSCJ:')
print(pd.DataFrame(plscj.predict(X),columns=['Hedonic','Goes with Meat','Goes with Dessert'],index=sWine))

X:
   Price  Sugar  Alchol  Acidity
1      7      7      13        7
2      4      3      14        7
3     10      5      12        5
4     16      7      11        3
5     13      3      10        3
Actual Y:
   Hedonic  Goes with Meat  Goes with Dessert
1       14               7                  8
2       10               7                  6
3        8               5                  5
4        2               4                  7
5        6               2                  4
Estimated Y from sklearn PLSRegression:
   Hedonic  Goes with Meat  Goes with Dessert
1     14.0             7.0               7.75
2     10.0             7.0               5.75
3      8.0             5.0               6.00
4      2.0             4.0               6.75
5      6.0             2.0               3.75
Estimated Y from PLSCJ:
   Hedonic  Goes with Meat  Goes with Dessert
1     14.0             7.0               7.75
2     10.0             7.0               5.75
3      8.0             5.0         

In [13]:
T,U,W,C,P,Q,B = plscj.parameter()
E = (X-X.mean(axis=0))/X.std(axis=0,ddof=1)
F = (Y_true-Y_true.mean(axis=0))/Y_true.std(axis=0,ddof=1)