-
Notifications
You must be signed in to change notification settings - Fork 25
/
classifier.go
464 lines (394 loc) · 13.5 KB
/
classifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
package classifier
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/rego"
"google.golang.org/protobuf/types/known/wrapperspb"
selectorv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/common/selector/v1"
classificationv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/language/v1"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/multimatcher"
"github.com/fluxninja/aperture/pkg/policies/dataplane/resources/classifier/extractors"
"github.com/fluxninja/aperture/pkg/selectors"
"github.com/fluxninja/aperture/pkg/services"
)
const defaultPackageName = "fluxninja.classification.extractors"
type multiMatcherByControlPoint map[selectors.ControlPointID]*multimatcher.MultiMatcher[int, []*labeler]
// rules is a helper struct to keep both compiled and uncompiled sets of rules in sync.
type rules struct {
// rules compiled to map from ControlPointID to MultiMatcher
MultiMatcherByControlPointID multiMatcherByControlPoint
// non-compiled version of rules, used for reporting
ReportedRules []ReportedRule
}
// CompiledRuleset is compiled form of Classifier proto.
type CompiledRuleset struct {
ControlPointID selectors.ControlPointID
Labelers []labelerWithSelector
ReportedRules []ReportedRule
}
type labelerWithSelector struct {
Labeler *labeler
LabelSelector multimatcher.Expr
}
// labeler is used to create flow labels
//
// label can create either:
// * a single label – LabelName is non-empty or
// * multiple labels – LabelName is empty.
type labeler struct {
// rego query that's prepared to take envoy authz request as an input.
// Result expression should be a single value (if LabelName is set) or a
// map[string]interface{} otherwise.
Query rego.PreparedEvalQuery
// flags for created flow labels:
LabelsFlags map[string]LabelFlags // multi-label variant
// flow label that the result should be assigned to (single-label variant)
LabelName string
LabelFlags LabelFlags // single-label variant
}
// Classifier receives classification policies and provides Classify method.
type Classifier struct {
// storing activeRules underneath
mu sync.Mutex
activeRules atomic.Value
activeRulesets map[rulesetID]CompiledRuleset // protected by mu
nextRulesetID rulesetID // protected by mu
}
type rulesetID = uint64
// ReportedRule is a rule along with its selector and label name.
type ReportedRule struct {
Selector *selectorv1.Selector
Rule *classificationv1.Rule
RulesetName string
LabelName string
}
func rulesetToReportedRules(rs *classificationv1.Classifier, rulesetName string) []ReportedRule {
out := make([]ReportedRule, 0, len(rs.Rules))
for label, rule := range rs.Rules {
out = append(out, ReportedRule{
RulesetName: rulesetName,
LabelName: label,
Rule: rule,
Selector: rs.Selector,
})
}
return out
}
// FlowLabels is a map from flow labels to their values.
type FlowLabels map[string]FlowLabelValue
// FlowLabelValue is a value of a flow label with additional metadata.
type FlowLabelValue struct {
Value string
Flags LabelFlags
}
// LabelFlags are flags for a flow label.
type LabelFlags struct {
// Should the created label be applied to the whole flow (propagated in baggage)
Propagate bool
// Should the created flow label be hidden from telemetry
Hidden bool
}
// ToPlainMap returns flow labels as normal map[string]string.
func (fl FlowLabels) ToPlainMap() map[string]string {
plainMap := make(map[string]string, len(fl))
for key, val := range fl {
plainMap[key] = val.Value
}
return plainMap
}
func labelFlagsFromRule(rule *classificationv1.Rule) LabelFlags {
return LabelFlags{
Propagate: boolValueOrTrue(rule.GetPropagate()),
Hidden: rule.GetHidden(),
}
}
func boolValueOrTrue(bv *wrapperspb.BoolValue) bool { return bv == nil || bv.Value }
// New creates a new Flow Classifier.
func New() *Classifier {
return &Classifier{
activeRulesets: make(map[rulesetID]CompiledRuleset),
}
}
func populateFlowLabels(ctx context.Context, flowLabels FlowLabels, mm *multimatcher.MultiMatcher[int, []*labeler], labelsForMatching selectors.Labels, input ast.Value) {
for _, query := range mm.Match(labelsForMatching.ToPlainMap()) {
resultSet, err := query.Query.Eval(ctx, rego.EvalParsedInput(input))
if err != nil {
log.Warn().Msg("Rego: Evaluation failed")
continue
}
if len(resultSet) == 0 {
log.Warn().Msg("Rego: Empty resultSet")
continue
}
if len(resultSet) > 1 {
log.Warn().Msg("Rego: Ambiguous resultSet")
}
if len(resultSet[0].Expressions) != 1 {
log.Warn().Msg("Rego: Expected exactly one expression")
continue
}
if query.LabelName != "" {
// single-label-query
flowLabels[query.LabelName] = FlowLabelValue{
Value: resultSet[0].Expressions[0].String(),
Flags: query.LabelFlags,
}
} else {
// multi-label-query
variables, isMap := resultSet[0].Expressions[0].Value.(map[string]interface{})
if !isMap {
log.Error().Msg("Rego: Expression's not a map (bug)")
continue
}
for key, value := range variables {
flowLabels[key] = FlowLabelValue{
Value: fmt.Sprint(value),
Flags: query.LabelsFlags[key],
}
}
}
}
}
// Classify takes rego input, performs classification, and returns a map of flow labels.
// LabelsForMatching are additional labels to use for selector matching.
func (c *Classifier) Classify(
ctx context.Context,
svcs []services.ServiceID,
labelsForMatching selectors.Labels,
direction selectors.TrafficDirection,
input ast.Value,
) (FlowLabels, error) {
flowLabels := make(FlowLabels)
r, ok := c.activeRules.Load().(rules)
if !ok {
return flowLabels, nil
}
cp := selectors.ControlPoint{
Traffic: direction,
}
cpID := selectors.ControlPointID{
ServiceID: services.ServiceID{
Service: "",
},
ControlPoint: cp,
}
camm, ok := r.MultiMatcherByControlPointID[cpID]
if ok {
populateFlowLabels(ctx, flowLabels, camm, labelsForMatching, input)
}
// TODO (krdln): update prometheus metrics upon classification errors.
for _, svc := range svcs {
cpID := selectors.ControlPointID{
ServiceID: svc,
ControlPoint: cp,
}
mm, ok := r.MultiMatcherByControlPointID[cpID]
if !ok {
log.Trace().Str("controlPoint", cpID.String()).Msg("No labelers for controlPoint")
continue
}
populateFlowLabels(ctx, flowLabels, mm, labelsForMatching, input)
}
return flowLabels, nil
}
// ActiveRules returns a slice of uncompiled Rules which are currently active.
func (c *Classifier) ActiveRules() []ReportedRule {
ac, _ := c.activeRules.Load().(rules)
return ac.ReportedRules
}
// AddRules compiles a ruleset and adds it to the active rules
//
// # The name will be used for reporting
//
// To retract the rules, call Classifier.Drop.
func (c *Classifier) AddRules(
ctx context.Context,
name string,
classifier *classificationv1.Classifier,
) (ActiveRuleset, error) {
compiled, err := CompileRuleset(ctx, name, classifier)
if err != nil {
return ActiveRuleset{}, err
}
c.mu.Lock()
defer c.mu.Unlock()
// Why index activeRulesets via ID instead of provided name?
// * more robust if caller provides non-unique names
// * when modifying file, one approach would be to first unload old ruleset
// and load a new one – in this case duplicated name is kinda expected.
// So the name is used only for reporting.
id := c.nextRulesetID
c.nextRulesetID++
c.activeRulesets[id] = compiled
c.activateRulesets()
return ActiveRuleset{id: id, classifier: c}, nil
}
// ActiveRuleset represents one of currently active set of rules.
type ActiveRuleset struct {
classifier *Classifier
id rulesetID
}
// Drop retracts all the rules belonging to a ruleset.
func (rs ActiveRuleset) Drop() {
if rs.classifier == nil {
return
}
c := rs.classifier
c.mu.Lock()
defer c.mu.Unlock()
delete(c.activeRulesets, rs.id)
c.activateRulesets()
}
// BadExtractor is an error occurring when extractor is invalid.
var BadExtractor = extractors.BadExtractor
// BadRego is an error occurring when rego compilation fails.
var BadRego = badRego{}
type badRego struct{}
func (b badRego) Error() string { return "failed to compile rego" }
// BadSelector is an error occurring when selector is invalid.
var BadSelector = badSelector{}
type badSelector struct{}
func (b badSelector) Error() string { return "invalid ruleset selector" }
// BadLabelName is an error occurring when label name is invalid.
var BadLabelName = badLabelName{}
type badLabelName struct{}
func (b badLabelName) Error() string { return "invalid label name" }
// CompileRuleset parses ruleset's selector and compiles its rules.
func CompileRuleset(ctx context.Context, name string, classifier *classificationv1.Classifier) (CompiledRuleset, error) {
if classifier.Selector == nil {
return CompiledRuleset{}, fmt.Errorf("%w: missing selector", BadSelector)
}
selector, err := selectors.FromProto(classifier.Selector)
if err != nil {
return CompiledRuleset{}, fmt.Errorf("%w: %v", BadSelector, err)
}
labelers, err := compileRules(ctx, selector.LabelMatcher, classifier.Rules)
if err != nil {
return CompiledRuleset{}, fmt.Errorf("failed to compile %q rules for %v: %w", name, selector, err)
}
cr := CompiledRuleset{
ControlPointID: selector.ControlPointID,
Labelers: labelers,
ReportedRules: rulesetToReportedRules(classifier, name),
}
return cr, nil
}
// compileRules compiles a set of rules into set of rego queries
//
// Raw rego rules are compiled 1:1 to rego queries. High-level extractor-based
// rules are compiled into a single rego query.
func compileRules(ctx context.Context, labelSelector multimatcher.Expr, labelRules map[string]*classificationv1.Rule) ([]labelerWithSelector, error) {
log.Trace().Msg("Classifier.compileRules starting")
// Group all the extractor-based rules so that we can compile them to a
// single rego query
labelExtractors := map[string]*classificationv1.Extractor{}
labelFlags := map[string]LabelFlags{} // flags for labels created by extractors
rawRegoCount := 0
var labelers []labelerWithSelector
for labelName, rule := range labelRules {
if strings.Contains(labelName, "/") {
// Forbidding '/' in case we want to support multiple rules for the
// same label:
// labels:
// user/1: <snip>
// user/2: <snip>
return nil, fmt.Errorf("%w: cannot contain '/'", BadLabelName)
}
switch source := rule.GetSource().(type) {
case *classificationv1.Rule_Extractor:
labelExtractors[labelName] = source.Extractor
labelFlags[labelName] = labelFlagsFromRule(rule)
case *classificationv1.Rule_Rego_:
query, err := rego.New(
rego.Query(source.Rego.Query),
rego.Module("tmp.rego", source.Rego.Source),
).PrepareForEval(ctx)
if err != nil {
log.Trace().Str("src", source.Rego.Source).Str("query", source.Rego.Query).
Msg("Failed to prepare for eval")
return nil, fmt.Errorf(
"failed to compile raw rego module, label: %s, query: %s: %w: %v",
labelName,
source.Rego.Query,
BadRego,
err,
)
}
labelers = append(labelers, labelerWithSelector{
LabelSelector: labelSelector,
Labeler: &labeler{
Query: query,
LabelName: labelName,
LabelFlags: labelFlagsFromRule(rule),
},
})
rawRegoCount++
}
}
if len(labelExtractors) != 0 {
regoSrc, err := extractors.CompileToRego(defaultPackageName, labelExtractors)
if err != nil {
return nil, fmt.Errorf("failed to compile extractors to rego: %w", err)
}
query, err := rego.New(
rego.Query("data."+defaultPackageName),
rego.Module("tmp.rego", regoSrc),
).PrepareForEval(ctx)
if err != nil {
// Note: Not wrapping BadRego error here – the rego returned by
// compileExtractors should always be valid, otherwise it's a
// bug, and not user's fault.
log.Trace().Str("src", regoSrc).Msg("Failed to prepare for eval")
return nil, fmt.Errorf("(bug) failed to compile classification rules: %w", err)
}
labelers = append(labelers, labelerWithSelector{
LabelSelector: labelSelector,
Labeler: &labeler{
Query: query,
LabelsFlags: labelFlags,
},
})
}
log.Info().
Int("modules", len(labelers)).
Int("raw rego modules", rawRegoCount).
Int("extractors", len(labelExtractors)).
Msg("Compilation of rules finished")
return labelers, nil
}
// needs to be called with activeRulesets mutex held.
func (c *Classifier) activateRulesets() {
c.activeRules.Store(c.combineRulesets())
log.Info().Int("rulesets", len(c.activeRulesets)).Msg("Rules updated")
}
func (c *Classifier) combineRulesets() rules {
combined := rules{
MultiMatcherByControlPointID: make(multiMatcherByControlPoint),
ReportedRules: make([]ReportedRule, 0),
}
// to have unique keys to AddEntry
controlPointKeys := make(map[selectors.ControlPointID]int)
for _, ruleset := range c.activeRulesets {
combined.ReportedRules = append(combined.ReportedRules, ruleset.ReportedRules...)
for _, labelerWithSelector := range ruleset.Labelers {
mm, ok := combined.MultiMatcherByControlPointID[ruleset.ControlPointID]
if !ok {
mm = multimatcher.New[int, []*labeler]()
combined.MultiMatcherByControlPointID[ruleset.ControlPointID] = mm
}
matcherID := controlPointKeys[ruleset.ControlPointID]
controlPointKeys[ruleset.ControlPointID]++
err := mm.AddEntry(matcherID, labelerWithSelector.LabelSelector, multimatcher.Appender(labelerWithSelector.Labeler))
if err != nil {
log.Error().Err(err).Msg("Failed to add entry to catchall multimatcher")
return rules{}
}
}
}
return combined
}