-
Notifications
You must be signed in to change notification settings - Fork 77
/
utils_eval.py
executable file
·142 lines (107 loc) · 4.2 KB
/
utils_eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
'''
This provide time-dependent Concordance index and Brier Score:
- Use weighted_c_index and weighted_brier_score, which are the unbiased estimates.
See equations and descriptions eq. (11) and (12) of the following paper:
- C. Lee, W. R. Zame, A. Alaa, M. van der Schaar, "Temporal Quilting for Survival Analysis", AISTATS 2019
'''
import numpy as np
from lifelines import KaplanMeierFitter
### C(t)-INDEX CALCULATION
def c_index(Prediction, Time_survival, Death, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other cause)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
N = len(Prediction)
A = np.zeros((N,N))
Q = np.zeros((N,N))
N_t = np.zeros((N,N))
Num = 0
Den = 0
for i in range(N):
A[i, np.where(Time_survival[i] < Time_survival)] = 1
Q[i, np.where(Prediction[i] > Prediction)] = 1
if (Time_survival[i]<=Time and Death[i]==1):
N_t[i,:] = 1
Num = np.sum(((A)*N_t)*Q)
Den = np.sum((A)*N_t)
if Num == 0 and Den == 0:
result = -1 # not able to compute c-index!
else:
result = float(Num/Den)
return result
### BRIER-SCORE
def brier_score(Prediction, Time_survival, Death, Time):
N = len(Prediction)
y_true = ((Time_survival <= Time) * Death).astype(float)
return np.mean((Prediction - y_true)**2)
# result2[k, t] = brier_score_loss(risk[:, k], ((te_time[:,0] <= eval_horizon) * (te_label[:,0] == k+1)).astype(int))
##### WEIGHTED C-INDEX & BRIER-SCORE
def CensoringProb(Y, T):
T = T.reshape([-1]) # (N,) - np array
Y = Y.reshape([-1]) # (N,) - np array
kmf = KaplanMeierFitter()
kmf.fit(T, event_observed=(Y==0).astype(int)) # censoring prob = survival probability of event "censoring"
G = np.asarray(kmf.survival_function_.reset_index()).transpose()
G[1, G[1, :] == 0] = G[1, G[1, :] != 0][-1] #fill 0 with ZoH (to prevent nan values)
return G
### C(t)-INDEX CALCULATION: this account for the weighted average for unbaised estimation
def weighted_c_index(T_train, Y_train, Prediction, T_test, Y_test, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other cause)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
A = np.zeros((N,N))
Q = np.zeros((N,N))
N_t = np.zeros((N,N))
Num = 0
Den = 0
for i in range(N):
tmp_idx = np.where(G[0,:] >= T_test[i])[0]
if len(tmp_idx) == 0:
W = (1./G[1, -1])**2
else:
W = (1./G[1, tmp_idx[0]])**2
A[i, np.where(T_test[i] < T_test)] = 1. * W
Q[i, np.where(Prediction[i] > Prediction)] = 1. # give weights
if (T_test[i]<=Time and Y_test[i]==1):
N_t[i,:] = 1.
Num = np.sum(((A)*N_t)*Q)
Den = np.sum((A)*N_t)
if Num == 0 and Den == 0:
result = -1 # not able to compute c-index!
else:
result = float(Num/Den)
return result
# this account for the weighted average for unbaised estimation
def weighted_brier_score(T_train, Y_train, Prediction, T_test, Y_test, Time):
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
W = np.zeros(len(Y_test))
Y_tilde = (T_test > Time).astype(float)
for i in range(N):
tmp_idx1 = np.where(G[0,:] >= T_test[i])[0]
tmp_idx2 = np.where(G[0,:] >= Time)[0]
if len(tmp_idx1) == 0:
G1 = G[1, -1]
else:
G1 = G[1, tmp_idx1[0]]
if len(tmp_idx2) == 0:
G2 = G[1, -1]
else:
G2 = G[1, tmp_idx2[0]]
W[i] = (1. - Y_tilde[i])*float(Y_test[i])/G1 + Y_tilde[i]/G2
y_true = ((T_test <= Time) * Y_test).astype(float)
return np.mean(W*(Y_tilde - (1.-Prediction))**2)