-
Notifications
You must be signed in to change notification settings - Fork 9
/
functions.py
522 lines (346 loc) · 14.2 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
# -*- coding: utf-8 -*-
"""
Bottleneck Analytics GmbH
info@bottleneck-analytics.com
@author: Dr. Ramin Nikzad-Langerodi
"""
# Modules
import numpy as np
import scipy.linalg
import scipy.stats
from scipy.linalg import eigh
import scipy.spatial.distance as scd
from scipy.spatial import distance_matrix
import warnings
warnings.filterwarnings("ignore")
def dipals(x, y, xs, xt, A, l, heuristic=False, target_domain=0):
'''
(Multiple) Domain-invariant partial least squares regression ((m)di-PLS) performs PLS regression
using labeled Source domain data x (=xs) and y and unlabeled Target domain data (xt_1,...,xt_k)
with the goal to fit an (invariant) model that generalizes over all domains.
References:
(1) Ramin Nikzad-Langerodi, Werner Zellinger, Edwin Lughofer, and Susanne Saminger-Platz
"Domain-Invariant Partial Least Squares Regression" Analytical Chemistry 2018 90 (11),
6693-6701 DOI: 10.1021/acs.analchem.8b00498
(2) Ramin Nikzad-Langerodi, Werner Zellinger, Susanne Saminger-Platz and Bernhard Moser
"Domain-Invariant Regression under Beer-Lambert's Law" In Proc. International Conference
on Machine Learning and Applications, Boca Raton FL 2019.
(3) Ramin Nikzad-Langerodi, Werner Zellinger, Susanne Saminger-Platz, Bernhard A. Moser,
Domain adaptation for regression under Beer–Lambert’s law, Knowledge-Based Systems,
2020 (210) DOI: 10.1016/j.knosys.2020.106447
(4) Bianca Mikulasek, Valeria Fonseca Diaz, David Gabauer, Christoph Herwig, Ramin Nikzad-Langerodi,
Partial least squares regression with multiple domains, J. Chemometrics, 2023 (to appear).
doi: 10.13140/RG.2.2.23750.75845
Parameters
----------
x: numpy array (N,K)
Labeled X data
y: numpy array (N,1)
Response variable
xs: numpy array (N_S,K)
Source domain data
xt: numpy array (N_T, K) or list of z (N_z, K) numpy arrays
Target domain data. If list is passed multiple target domains are considered
in the optimization
A: int
Number of latent variables
l: int or numpy array (A x 1)
Regularization parameter: Typically 0 < l < 1e10
If Array is passed, a different l is used for each LV
heuristic: str
If 'True' the regularization parameter is determined using a
heuristic that gives equal weight to:
i) Fitting the output variable y and
ii) Minimizing domain discrepancy.
For details see ref. (3).
target_domain: int
If multiple target domains are passed, target_domain specifies for which of the target domains
the model should apply. If target_domain=0, the model applies to the source domain,
if target_domain=1, the model applies to the first target domain etc.
Returns
-------
b: numpy array (K,1)
Regression vector
b0: int
Offset (Note: yhat = b0 + x*b)
T: numpy array (N,A)
Training data projections (scores)
Ts: numpy array (N_S,A)
Source domain projections (scores)
Tt: numpy array (N_T,A)
Target domain projections (scores)
W: numpy array (K,A)
Weight vector
P: numpy array (K,A)
Loadings vector
E: numpy array (N_S,K)
Residuals of labeled X data
Es: numpy array (N_S,K)
Source domain residual matrix
Et: numpy array (N_T,K)
Target domain residual matrix
Ey: numpy array (N_S,1)
Response variable residuals
C: numpy array (A,1)
Regression vector, such that
y = Ts*C
opt_l: numpy array (A,1)
The heuristically determined regularization parameter for each LV
(if heuristic = 'True')
discrepancy: numpy array (A,)
Absolute difference between variance of source and target domain projections
'''
# Get array dimensions
(n, k) = np.shape(x)
(ns, k) = np.shape(xs)
# Initialize matrices
Xt = xt
if(type(xt) is list):
Pt = []
Tt = []
for z in range(len(xt)):
Tti = np.zeros([np.shape(xt[z])[0], A])
Pti = np.zeros([k, A])
Pt.append(Pti)
Tt.append(Tti)
else:
(nt, k) = np.shape(xt)
Tt = np.zeros([nt, A])
Pt = np.zeros([k, A])
T = np.zeros([n, A])
P = np.zeros([k, A])
Ts = np.zeros([ns, A])
Ps = np.zeros([k, A])
W = np.zeros([k, A])
C = np.zeros([A, 1])
opt_l = np.zeros(A)
discrepancy = np.zeros(A)
I = np.eye(k)
# Compute LVs
for i in range(A):
if type(l) is np.ndarray: # Separate regularization params for each LV
lA = l[i]
elif(type(l) is np.int or type(l) is np.float64): # One regularization for all LVs
lA = l
else:
lA = l[0]
# Compute Domain-Invariant Weight Vector
w_pls = ((y.T@x)/(y.T@y)) # Ordinary PLS solution
if(lA != 0 or heuristic is True): # In case of regularization
if(type(xt) is not list):
# Convex relaxation of covariance difference matrix
D = convex_relaxation(xs, xt)
# Multiple target domains
elif(type(xt) is list):
#print('Relaxing domains ... ')
ndoms = len(xt)
D = np.zeros([k, k])
for z in range(ndoms):
d = convex_relaxation(xs, xt[z])
D = D + d
else:
print('xt must either be a matrix or list of (appropriately dimensioned) matrices')
if(heuristic is True): # Regularization parameter heuristic
w_pls = w_pls/np.linalg.norm(w_pls)
gamma = (np.linalg.norm((x-y@w_pls))**2)/(w_pls@D@w_pls.T)
opt_l[i] = gamma
lA = gamma
else:
# di-PLS
# reg = (np.linalg.inv(I+lA/((y.T@y))*D))
# w = w_pls@reg
reg = I+lA/((y.T@y))*D
w = scipy.linalg.solve(reg.T, w_pls.T, assume_a='sym').T # 10 times faster than previous comptation of reg
# Normalize w
w = w/np.linalg.norm(w)
# Absolute difference between variance of source and target domain projections
discrepancy[i] = w@D@w.T
else:
if(type(xt) is list):
D = convex_relaxation(xs, xt[0])
else:
D = convex_relaxation(xs, xt)
w = w_pls/np.linalg.norm(w_pls)
discrepancy[i] = w@D@w.T
# Compute scores
t = x@w.T
ts = xs@w.T
if(type(xt) is list):
tt = []
for z in range(len(xt)):
tti = xt[z]@w.T
tt.append(tti)
else:
tt = xt@w.T
# Regress y on t
c = (y.T@t)/(t.T@t)
# Compute loadings
p = (t.T@x)/(t.T@t)
ps = (ts.T@xs)/(ts.T@ts)
if(type(xt) is list):
pt = []
for z in range(len(xt)):
pti = (tt[z].T@xt[z])/(tt[z].T@tt[z])
pt.append(pti)
else:
pt = (tt.T@xt)/(tt.T@tt)
# Deflate X and y (Gram-Schmidt orthogonalization)
x = x - t@p
xs = xs - ts@ps
if(type(xt) is list):
for z in range(len(xt)):
xt[z] = xt[z] - tt[z]@pt[z]
else:
if(np.sum(xt) != 0): # Deflate target matrix only if not zero
xt = xt - tt@pt
y = y - t*c
# Store w,t,p,c
W[:, i] = w
T[:, i] = t.reshape(n)
Ts[:, i] = ts.reshape(ns)
P[:, i] = p.reshape(k)
Ps[:, i] = ps.reshape(k)
C[i] = c
if(type(xt) is list):
for z in range(len(xt)):
Pt[z][:, i] = pt[z].reshape(k)
Tt[z][:, i] = tt[z].reshape(np.shape(xt[z])[0])
else:
Pt[:, i] = pt.reshape(k)
Tt[:, i] = tt.reshape(nt)
# Calculate regression vector
if type(l) is np.ndarray and np.any(xt != 0): # Check if multiple regularization
# parameters are passed (one for each LV)
b = W@(np.linalg.inv(Pt.T@W))@C
elif(type(l) is np.int or type(l) is np.float64):
if l==0:
b = W@(np.linalg.inv(Ps.T@W))@C
elif np.any(xt != 0):
b = W@(np.linalg.inv(Pt.T@W))@C
else:
b = W@(np.linalg.inv(Ps.T@W))@C
elif np.any(xt != 0):
if(type(xt) is list):
if(target_domain==0):
b = W@(np.linalg.inv(Ps.T@W))@C
else:
b = W@(np.linalg.inv(Pt[target_domain-1].T@W))@C
else:
b = W@(np.linalg.inv(Pt.T@W))@C
else:
b = W@(np.linalg.inv(Ps.T@W))@C
# Store residuals
E = x
Es = xs
Et = xt
Ey = y
return b, T, Ts, Tt, W, P, Ps, Pt, E, Es, Et, Ey, C, opt_l, discrepancy
def convex_relaxation(xs, xt):
'''
Convex relaxation of covariance difference.
The convex relaxation computes the eigenvalue decomposition of the (symetric) covariance
difference matrix, inverts the signs of the negative eigenvalues and reconstructs it again.
It can be shown that this relaxation corresponds to an upper bound on the covariance difference
btw. source and target domain (see ref.)
Reference:
* Ramin Nikzad-Langerodi, Werner Zellinger, Susanne Saminger-Platz and Bernhard Moser
"Domain-Invariant Regression under Beer-Lambert's Law" In Proc. International Conference
on Machine Learning and Applications, Boca Raton FL 2019.
Parameters
----------
xs: numpy array (Ns x k)
Source domain matrix
xt: numpy array (Nt x k)
Target domain matrix
Returns
-------
D: numpy array (k x k)
Covariance difference matrix
'''
# Preliminaries
ns = np.shape(xs)[0]
nt = np.shape(xt)[0]
x = np.vstack([xs, xt])
x = x[..., :] - np.mean(x, 0)
# Compute difference between source and target covariance matrices
rot = (1/ns*xs.T@xs- 1/nt*xt.T@xt)
# Convex Relaxation
w,v = eigh(rot)
eigs = np.abs(w)
eigs = np.diag(eigs)
D = v@eigs@v.T
return D
def gengaus(length, mu, sigma, mag, noise=0):
'''
Generate a spectrum-like Gaussian signal with random noise
Params
------
length: int
Length of the signal (i.e. number of variables)
mu: float
Mean of the Gaussian signal
sigma: float
Standard deviation of the Gaussian
mag: float
Magnitude of the signal
noise: float
Amount of i.i.d noise
Returns
-------
signal: numpy array (length x 1)
Generated Gaussian signal
'''
s = mag*scipy.stats.norm.pdf(np.arange(length),mu,sigma)
n = noise*np.random.rand(length)
signal = s + n
return signal
def hellipse(X, alpha=0.05):
'''
95% Confidence Intervals for 2D Scatter Plots
Parameters
----------
X: numpy array (N x 2)
Scores Matrix
alpha: float
Confidence level (default = 0.05)
Returns
-------
el: numpy array (2 x 100)
x,y coordinates of ellipse points arranged in rows.
To plot use plt.plot(el[0,:],el[1,:])
'''
# Means
mean_all = np.zeros((2,1))
mean_all[0] = np.mean(X[:,0])
mean_all[1] = np.mean(X[:,1])
# Covariance matrix
X = X[:,:2]
comat_all = np.cov(np.transpose(X))
# SVD
U,S,V = np.linalg.svd(comat_all)
# Confidence limit computed as the 95% quantile of the F-Distribution
N = np.shape(X)[0]
quant = 1 - alpha
Conf = (2*(N-2))/(N-2)*scipy.stats.f.ppf(quant,2,(N-2))
# Evalute CI on (0,2pi)
el = np.zeros((2,100))
t = np.linspace(0,2*np.pi,100)
for j in np.arange(100):
sT = np.matmul(U,np.diag(np.sqrt(S*Conf)))
el[:,j] = np.transpose(mean_all)+np.matmul(sT,np.array([np.cos(t[j]),np.sin(t[j])]))
return el
def rmse(y, yhat):
'''
Root mean squared error
Parameters
----------
y: numpy array (N,1)
Measured Y
yhat: numpy array (N,1)
Predicted Y
Returns
-------
int
The Root Means Squared Error
'''
return np.sqrt(((y-yhat)**2).mean())