-
Notifications
You must be signed in to change notification settings - Fork 88
/
recon_error_pca_svd.py
106 lines (89 loc) · 4.77 KB
/
recon_error_pca_svd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Author:MaXiao
# E-mail:maxiaoscut@aliyun.com
# Github:https://github.com/Albertsr
import numpy as np
from numpy import linalg
class PCA_Via_SVD:
def __init__(self, matrix, n_components=None):
self.matrix = matrix
self.n_components = matrix.shape[1] if n_components==None else n_components
U, sigma, Vh = self.svd_matrix()
# cov_eigvalue : eigenvalues of covariance matrix
cov_eigvalue = np.square(sigma) / (self.matrix.shape[0] - 1)
self.components_ = Vh[:n_components, ]
self.explained_variance_ = cov_eigvalue[:n_components]
self.Vh = Vh
def scale_matrix(self):
def scale_vector(vector):
delta = vector - np.mean(vector)
std = np.std(vector, ddof=0)
return delta / std
matrix_scaled = np.apply_along_axis(arr=self.matrix, func1d=scale_vector, axis=0)
return matrix_scaled
# Singular value decomposition of scaled matrix
def svd_matrix(self):
U, sigma, Vh = linalg.svd(self.scale_matrix())
assert len(sigma) == min(self.matrix.shape)
return U, sigma, Vh
# Obtaining projection matrix Q through matrix V
def implement_pca(self):
# Q : The transpose of the first n_components row vectors of Vh
Q = self.Vh[:self.n_components, :].T
pca_result = np.dot(self.scale_matrix(), Q)
assert pca_result.shape[1] == self.n_components
return pca_result
class PCA_Recon_Error(PCA_Via_SVD):
def __init__(self, matrix, contamination=0.01, n_components=None):
super(PCA_Recon_Error, self).__init__(matrix, n_components)
self.contamination = contamination
def get_ev_ratio(self):
pca_result = self.implement_pca()
eigenvalues = self.explained_variance_
# ev_ratio is the cumulative proportion of eigenvalues and the weight of
# reconstruction error corresponding to different number of principal components
ev_ratio = np.cumsum(eigenvalues) / np.sum(eigenvalues)
return ev_ratio
# using different numbers of principal components to generate a series of reconstruction matrices
def reconstruct_matrix(self):
# the parameter recon_pc_num is the number of top principal components used in the reconstruction matrix.
def reconstruct(recon_pc_num):
instance = PCA_Via_SVD(self.matrix, n_components=recon_pc_num)
recon_matrix = np.dot(instance.implement_pca(), instance.Vh[:recon_pc_num, :])
assert_description = 'The shape of the reconstruction matrix should be equal to that of the initial matrix.'
assert np.all(recon_matrix.shape == self.matrix.shape), assert_description
return recon_matrix
# generating a series of reconstruction matrices
col = self.matrix.shape[1]
recon_matrices = [reconstruct(i) for i in range(1, col+1)]
# randomly select two reconstruction matrices to verify that they are different
i, j = np.random.choice(range(col), size=2, replace=False)
description = 'The reconstruction matrices generated by different number of principal components are different.'
assert not np.all(recon_matrices[i] == recon_matrices[j]), description
return recon_matrices
# calculate the final anomaly score
def get_anomaly_score(self):
# calculate the modulus of a vector
def compute_vector_length(vector):
square_sum = np.square(vector).sum()
return np.sqrt(square_sum)
# calculate the anomaly score generated by a single reconstruction matrix for all samples
def compute_sub_score(recon_matrix, ev):
delta_matrix = self.scale_matrix() - recon_matrix
score = np.apply_along_axis(compute_vector_length, axis=1, arr=delta_matrix) * ev
return score
ev_ratio = self.get_ev_ratio()
reconstruct_matrices = self.reconstruct_matrix()
# summarize the anomaly scores generated by all reconstruction matrices
anomaly_scores = list(map(compute_sub_score, reconstruct_matrices, ev_ratio))
return np.sum(anomaly_scores, axis=0)
# returns indices with the highest anomaly score based on a specific contamination
def get_anomaly_indices(self):
indices_desc = np.argsort(-self.get_anomaly_score())
anomaly_num = int(np.ceil(len(self.matrix) * self.contamination))
anomaly_indices = indices_desc[:anomaly_num]
return anomaly_indices
# returns 1 if the prediction is an anomaly, otherwise returns 0
def predict(self):
anomaly_indices = self.get_anomaly_indices()
pred_result = np.isin(range(len(self.matrix)), anomaly_indices).astype(int)
return pred_result