/
svm.py
220 lines (174 loc) · 7.18 KB
/
svm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import numpy as np
from m2cgen import ast
from m2cgen.assemblers import utils
from m2cgen.assemblers.base import ModelAssembler
class BaseSVMModelAssembler(ModelAssembler):
def __init__(self, model):
super().__init__(model)
kernel_type = model.kernel
supported_kernels = self._get_supported_kernels()
if kernel_type not in supported_kernels:
raise ValueError(
"Unsupported kernel type '{}'".format(kernel_type))
self._kernel_fun = supported_kernels[kernel_type]
gamma = self._get_gamma()
self._gamma_expr = ast.NumVal(gamma)
self._neg_gamma_expr = utils.sub(ast.NumVal(0), ast.NumVal(gamma),
to_reuse=True)
self._output_size = self._get_output_size()
def assemble(self):
if self._output_size > 1:
return self._assemble_multi_class_output()
else:
return self._assemble_single_output()
def _assemble_single_output(self, idx=0):
support_vectors = self.model.support_vectors_
coef = self._get_single_coef(idx)
intercept = self._get_single_intercept(idx)
kernel_exprs = self._apply_kernel(support_vectors)
kernel_weight_mul_ops = [
utils.mul(kernel_exprs[index], ast.NumVal(value))
for index, value in enumerate(coef)
]
return utils.apply_op_to_expressions(
ast.BinNumOpType.ADD,
ast.NumVal(intercept),
*kernel_weight_mul_ops)
def _apply_kernel(self, support_vectors, to_reuse=False):
kernel_exprs = []
for v in support_vectors:
kernel = self._kernel_fun(v)
kernel.to_reuse = to_reuse
kernel_exprs.append(kernel)
return kernel_exprs
def _get_supported_kernels(self):
return {
"rbf": self._rbf_kernel,
"sigmoid": self._sigmoid_kernel,
"poly": self._poly_kernel,
"linear": self._linear_kernel
}
def _get_gamma(self):
raise NotImplementedError
def _get_output_size(self):
raise NotImplementedError
def _assemble_multi_class_output(self):
raise NotImplementedError
def _get_single_coef(self, idx=0):
raise NotImplementedError
def _get_single_intercept(self, idx=0):
raise NotImplementedError
class SklearnSVMModelAssembler(BaseSVMModelAssembler):
def _get_gamma(self):
return self.model._gamma
def _get_output_size(self):
output_size = 1
if type(self.model).__name__ in {"SVC", "NuSVC"}:
n_classes = len(self.model.n_support_)
if n_classes > 2:
output_size = n_classes
return output_size
def _assemble_multi_class_output(self):
support_vectors = self.model.support_vectors_
coef = self.model.dual_coef_
intercept = self.model.intercept_
n_support = self.model.n_support_
n_support_len = len(n_support)
kernel_exprs = self._apply_kernel(support_vectors, to_reuse=True)
support_ranges = []
for i in range(n_support_len):
range_start = sum(n_support[:i])
range_end = range_start + n_support[i]
support_ranges.append((range_start, range_end))
# One-vs-one decisions.
decisions = []
for i in range(n_support_len):
for j in range(i + 1, n_support_len):
kernel_weight_mul_ops = [
utils.mul(kernel_exprs[k], ast.NumVal(coef[i][k]))
for k in range(*support_ranges[j])
]
kernel_weight_mul_ops.extend([
utils.mul(kernel_exprs[k], ast.NumVal(coef[j - 1][k]))
for k in range(*support_ranges[i])
])
decision = utils.apply_op_to_expressions(
ast.BinNumOpType.ADD,
ast.NumVal(intercept[len(decisions)]),
*kernel_weight_mul_ops
)
decisions.append(decision)
# TODO convert One-vs-one decisions to One-vs-rest
return ast.VectorVal(decisions)
def _get_single_coef(self, idx=0):
return self.model.dual_coef_[idx]
def _get_single_intercept(self, idx=0):
return self.model.intercept_[idx]
def _rbf_kernel(self, support_vector):
elem_wise = [
ast.PowExpr(
utils.sub(ast.NumVal(support_element), ast.FeatureRef(i)),
ast.NumVal(2)
)
for i, support_element in enumerate(support_vector)
]
kernel = utils.apply_op_to_expressions(ast.BinNumOpType.ADD,
*elem_wise)
kernel = utils.mul(self._neg_gamma_expr, kernel)
return ast.ExpExpr(kernel)
def _sigmoid_kernel(self, support_vector):
kernel = self._linear_kernel_with_gamma_and_coef(support_vector)
return ast.TanhExpr(kernel)
def _poly_kernel(self, support_vector):
kernel = self._linear_kernel_with_gamma_and_coef(support_vector)
return ast.PowExpr(kernel, ast.NumVal(self.model.degree))
def _linear_kernel(self, support_vector):
elem_wise = [
utils.mul(ast.NumVal(support_element), ast.FeatureRef(i))
for i, support_element in enumerate(support_vector)
]
return utils.apply_op_to_expressions(ast.BinNumOpType.ADD, *elem_wise)
def _linear_kernel_with_gamma_and_coef(self, support_vector):
kernel = self._linear_kernel(support_vector)
kernel = utils.mul(self._gamma_expr, kernel)
return utils.add(kernel, ast.NumVal(self.model.coef0))
class LightningSVMModelAssembler(SklearnSVMModelAssembler):
def _get_supported_kernels(self):
kernels = super()._get_supported_kernels()
kernels["cosine"] = self._cosine_kernel
return kernels
def _get_gamma(self):
return self.model.gamma
def _get_output_size(self):
output_size = 1
n_classes = len(self.model.classes_)
if n_classes > 2:
output_size = n_classes
return output_size
def _assemble_multi_class_output(self):
exprs = [
self._assemble_single_output(idx)
for idx in range(self.model.classes_.shape[0])
]
return ast.VectorVal(exprs)
def _get_single_coef(self, idx=0):
return self.model.coef_[idx]
def _get_single_intercept(self, idx=0):
return 0.0
def _cosine_kernel(self, support_vector):
support_vector_norm = np.linalg.norm(support_vector)
if support_vector_norm == 0.0:
support_vector_norm = 1.0
feature_norm = ast.SqrtExpr(
utils.apply_op_to_expressions(
ast.BinNumOpType.ADD,
*[utils.mul(ast.FeatureRef(i), ast.FeatureRef(i))
for i in range(len(support_vector))]),
to_reuse=True)
safe_feature_norm = ast.IfExpr(
utils.eq(feature_norm, ast.NumVal(0.0)),
ast.NumVal(1.0),
feature_norm)
kernel = self._linear_kernel(support_vector / support_vector_norm)
kernel = utils.div(kernel, safe_feature_norm)
return kernel