/
linear.py
192 lines (151 loc) · 6.28 KB
/
linear.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import numpy as np
from m2cgen import ast
from m2cgen.assemblers import utils
from m2cgen.assemblers.base import ModelAssembler
class BaseLinearModelAssembler(ModelAssembler):
def assemble(self):
return self._build_ast()
def _build_ast(self):
coef = utils.to_2d_array(self._get_coef())
intercept = utils.to_1d_array(self._get_intercept())
if coef.shape[0] == 1:
return self._final_transform(
_linear_to_ast(coef[0], intercept[0]))
exprs = [
self._final_transform(
_linear_to_ast(coef[idx], intercept[idx]))
for idx in range(coef.shape[0])
]
return ast.VectorVal(exprs)
def _final_transform(self, ast_to_transform):
return ast_to_transform
def _get_intercept(self):
raise NotImplementedError
def _get_coef(self):
raise NotImplementedError
class SklearnLinearModelAssembler(BaseLinearModelAssembler):
def _get_intercept(self):
return getattr(self.model, "intercept_",
np.zeros(self._get_coef().shape[0]))
def _get_coef(self):
return self.model.coef_
class StatsmodelsLinearModelAssembler(BaseLinearModelAssembler):
def __init__(self, model):
super(StatsmodelsLinearModelAssembler, self).__init__(model)
const_idx = self.model.model.data.const_idx
if const_idx is None and self.model.k_constant:
raise ValueError("Unknown constant position")
self.const_idx = const_idx
def _get_intercept(self):
return (self.model.params[self.const_idx]
if self.model.k_constant
else 0.0)
def _get_coef(self):
idxs = np.arange(len(self.model.params))
return (
self.model.params[idxs != self.const_idx]
if self.model.k_constant
else self.model.params)
class ProcessMLEModelAssembler(BaseLinearModelAssembler):
def _get_intercept(self):
return 0.0
def _get_coef(self):
return self.model.params[:self.model.k_exog]
class StatsmodelsGLMModelAssembler(StatsmodelsLinearModelAssembler):
def _final_transform(self, ast_to_transform):
link_function = type(self.model.model.family.link).__name__
link_function_lower = link_function.lower()
supported_inversed_functions = {
"logit": self._logit_inversed,
"power": self._power_inversed,
"inverse_power": self._inverse_power_inversed,
"sqrt": self._sqrt_inversed,
"inverse_squared": self._inverse_squared_inversed,
"identity": self._identity_inversed,
"log": self._log_inversed,
"cloglog": self._cloglog_inversed,
"negativebinomial": self._negativebinomial_inversed,
"nbinom": self._negativebinomial_inversed
}
if link_function_lower not in supported_inversed_functions:
raise ValueError(
"Unsupported link function '{}'".format(link_function))
fun = supported_inversed_functions[link_function_lower]
return fun(ast_to_transform)
def _logit_inversed(self, ast_to_transform):
return utils.div(
ast.NumVal(1.0),
utils.add(
ast.NumVal(1.0),
ast.ExpExpr(
utils.sub(
ast.NumVal(0.0),
ast_to_transform))))
def _power_inversed(self, ast_to_transform):
power = self.model.model.family.link.power
if power == 1:
return self._identity_inversed(ast_to_transform)
elif power == -1:
return self._inverse_power_inversed(ast_to_transform)
elif power == 2:
return ast.SqrtExpr(ast_to_transform)
elif power == -2:
return self._inverse_squared_inversed(ast_to_transform)
elif power < 0: # some languages may not support negative exponent
return utils.div(
ast.NumVal(1.0),
ast.PowExpr(ast_to_transform, ast.NumVal(1 / -power)))
else:
return ast.PowExpr(ast_to_transform, ast.NumVal(1 / power))
def _inverse_power_inversed(self, ast_to_transform):
return utils.div(ast.NumVal(1.0), ast_to_transform)
def _sqrt_inversed(self, ast_to_transform):
return ast.PowExpr(ast_to_transform, ast.NumVal(2))
def _inverse_squared_inversed(self, ast_to_transform):
return utils.div(ast.NumVal(1.0), ast.SqrtExpr(ast_to_transform))
def _identity_inversed(self, ast_to_transform):
return ast_to_transform
def _log_inversed(self, ast_to_transform):
return ast.ExpExpr(ast_to_transform)
def _cloglog_inversed(self, ast_to_transform):
return utils.sub(
ast.NumVal(1.0),
ast.ExpExpr(
utils.sub(
ast.NumVal(0.0),
ast.ExpExpr(ast_to_transform))))
def _negativebinomial_inversed(self, ast_to_transform):
return utils.div(
ast.NumVal(-1.0),
utils.mul(
ast.NumVal(self.model.model.family.link.alpha),
utils.sub(
ast.NumVal(1.0),
ast.ExpExpr(
utils.sub(
ast.NumVal(0.0),
ast_to_transform)))))
class StatsmodelsModelAssemblerSelector(ModelAssembler):
def __init__(self, model):
underlying_model = type(model.model).__name__
if underlying_model == "GLM":
self.assembler = StatsmodelsGLMModelAssembler(model)
elif underlying_model in {"GLS",
"GLSAR",
"OLS",
"WLS"}:
self.assembler = StatsmodelsLinearModelAssembler(model)
else:
raise NotImplementedError(
"Model '{}' is not supported".format(underlying_model))
def assemble(self):
return self.assembler.assemble()
def _linear_to_ast(coef, intercept):
feature_weight_mul_ops = [
utils.mul(ast.FeatureRef(index), ast.NumVal(value))
for index, value in enumerate(coef)
]
return utils.apply_op_to_expressions(
ast.BinNumOpType.ADD,
ast.NumVal(intercept),
*feature_weight_mul_ops)