-
-
Notifications
You must be signed in to change notification settings - Fork 550
/
piecewise_exponential_regression_fitter.py
141 lines (107 loc) · 5.16 KB
/
piecewise_exponential_regression_fitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
import autograd.numpy as np
from lifelines.utils import coalesce, _get_index, CensoringType
from lifelines.fitters import ParametricRegressionFitter
import pandas as pd
from lifelines.utils.safe_exp import safe_exp
class PiecewiseExponentialRegressionFitter(ParametricRegressionFitter):
"""
TODO: docs
Examples
----------
See blog post <here https://dataorigami.net/blogs/napkin-folding/churn>_.
"""
# about 50% faster than BFGS
_scipy_fit_method = "SLSQP"
_scipy_fit_options = {"ftol": 1e-6, "maxiter": 200}
def __init__(self, breakpoints, alpha=0.05, penalizer=0.0):
super(PiecewiseExponentialRegressionFitter, self).__init__(alpha=alpha)
breakpoints = np.sort(breakpoints)
if len(breakpoints) and not (breakpoints[-1] < np.inf):
raise ValueError("Do not add inf to the breakpoints.")
if len(breakpoints) and breakpoints[0] < 0:
raise ValueError("First breakpoint must be greater than 0.")
self.breakpoints = np.append(breakpoints, [np.inf])
self.n_breakpoints = len(self.breakpoints)
self.penalizer = penalizer
self._fitted_parameter_names = ["lambda_%d_" % i for i in range(self.n_breakpoints)]
def _add_penalty(self, params, neg_ll):
params_stacked = np.stack(params.values())
coef_penalty = 0
if self.penalizer > 0:
for i in range(params_stacked.shape[1] - 1): # assuming the intercept col is the last column...
coef_penalty = coef_penalty + (params_stacked[:, i]).var()
return neg_ll + self.penalizer * coef_penalty
def _cumulative_hazard(self, params, T, Xs):
n = T.shape[0]
T = T.reshape((n, 1))
M = np.minimum(np.tile(self.breakpoints, (n, 1)), T)
M = np.hstack([M[:, tuple([0])], np.diff(M, axis=1)])
lambdas_ = np.array([safe_exp(-np.dot(Xs[param], params[param])) for param in self._fitted_parameter_names])
return (M * lambdas_.T).sum(1)
def _log_hazard(self, params, T, X):
hz = self._hazard(params, T, X)
hz = np.clip(hz, 1e-20, np.inf)
return np.log(hz)
def _prep_inputs_for_prediction_and_return_parameters(self, X):
X = X.copy()
if isinstance(X, pd.DataFrame):
X = X[self.params_["lambda_0_"].index]
return np.array([np.exp(np.dot(X, self.params_["lambda_%d_" % i])) for i in range(self.n_breakpoints)])
def predict_cumulative_hazard(self, df, times=None, conditional_after=None):
"""
Return the cumulative hazard rate of subjects in X at time points.
Parameters
----------
X: numpy array or DataFrame
a (n,d) covariate numpy array or DataFrame. If a DataFrame, columns
can be in any order. If a numpy array, columns must be in the
same order as the training data.
times: iterable, optional
an iterable of increasing times to predict the cumulative hazard at. Default
is the set of all durations (observed and unobserved). Uses a linear interpolation if
points in time are not in the index.
Returns
-------
cumulative_hazard_ : DataFrame
the cumulative hazard of individuals over the timeline
"""
if conditional_after is not None:
raise NotImplementedError()
times = np.asarray(coalesce(times, self.timeline, np.unique(self.durations)))
n = times.shape[0]
times = times.reshape((n, 1))
lambdas_ = self._prep_inputs_for_prediction_and_return_parameters(df)
bp = self.breakpoints
M = np.minimum(np.tile(bp, (n, 1)), times)
M = np.hstack([M[:, tuple([0])], np.diff(M, axis=1)])
return pd.DataFrame(np.dot(M, (1 / lambdas_)), columns=_get_index(df), index=times[:, 0])
@property
def _ll_null(self):
if hasattr(self, "_ll_null_"):
return self._ll_null_
initial_point = np.zeros(len(self._fitted_parameter_names))
model = self.__class__(breakpoints=self.breakpoints[:-1], penalizer=self.penalizer)
regressors = {param_name: ["_intercept"] for param_name in self._fitted_parameter_names}
if CensoringType.is_right_censoring(self):
df = pd.DataFrame({"T": self.durations, "E": self.event_observed, "entry": self.entry, "_intercept": 1.0})
model.fit_right_censoring(
df, "T", "E", initial_point=initial_point, entry_col="entry", regressors=regressors
)
elif CensoringType.is_interval_censoring(self):
df = pd.DataFrame(
{
"lb": self.lower_bound,
"ub": self.upper_bound,
"E": self.event_observed,
"entry": self.entry,
"_intercept": 1.0,
}
)
model.fit_interval_censoring(
df, "lb", "ub", "E", initial_point=initial_point, entry_col="entry", regressors=regressors
)
if CensoringType.is_left_censoring(self):
raise NotImplementedError()
self._ll_null_ = model.log_likelihood_
return self._ll_null_