-
Notifications
You must be signed in to change notification settings - Fork 2
/
policy_filters.go
179 lines (147 loc) · 4.51 KB
/
policy_filters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package ebpftracer
import (
"errors"
"log/slog"
"time"
"github.com/castai/kvisor/pkg/ebpftracer/events"
"github.com/castai/kvisor/pkg/ebpftracer/types"
"github.com/castai/kvisor/pkg/logging"
"github.com/cespare/xxhash"
"github.com/elastic/go-freelru"
"github.com/samber/lo"
"golang.org/x/time/rate"
)
var (
FilterPass error = nil
FilterErrRateLimit = errors.New("rate limit")
FilterErrEmptyDNSResponse = errors.New("empty dns response")
FilterErrDNSDuplicateDetected = errors.New("dns duplicate detected")
)
// GlobalPreEventFilterGenerator always returns the given filter on each generator invocation. This is useful,
// if you want some global filtering across cgroups.
func GlobalPreEventFilterGenerator(filter PreEventFilter) PreEventFilterGenerator {
return func() PreEventFilter {
return filter
}
}
// GlobalEventFilterGenerator always returns the given filter on each generator invocation. This is useful,
// if you want some global filtering across cgroups.
func GlobalEventFilterGenerator(filter EventFilter) EventFilterGenerator {
return func() EventFilter {
return filter
}
}
func FilterAnd(filtersGenerators ...EventFilterGenerator) EventFilterGenerator {
return func() EventFilter {
filters := lo.Map(filtersGenerators, func(generator EventFilterGenerator, index int) EventFilter {
return generator()
})
return func(event *types.Event) error {
for _, f := range filters {
if err := f(event); err != nil {
return err
}
}
return FilterPass
}
}
}
// PreRateLimit creates an pre event filter that limits the amount of events that will be
// processed accoring to the specified limits
func PreRateLimit(spec RateLimitPolicy) PreEventFilterGenerator {
return func() PreEventFilter {
rateLimiter := newRateLimiter(spec)
return func(ctx *types.EventContext) error {
if rateLimiter.Allow() {
return FilterPass
}
return FilterErrRateLimit
}
}
}
func RateLimit(spec RateLimitPolicy) EventFilterGenerator {
return func() EventFilter {
rateLimiter := newRateLimiter(spec)
return func(event *types.Event) error {
if rateLimiter.Allow() {
return FilterPass
}
return FilterErrRateLimit
}
}
}
func newRateLimiter(spec RateLimitPolicy) *rate.Limiter {
var limit rate.Limit
if spec.Interval != 0 {
limit = rate.Every(spec.Interval)
spec.Burst = 1
} else {
limit = rate.Limit(spec.Rate)
if spec.Burst == 0 {
spec.Burst = 1
}
}
rateLimiter := rate.NewLimiter(limit, spec.Burst)
return rateLimiter
}
// FilterEmptyDnsAnswers will drop any DNS event, that is missing an answer section
func FilterEmptyDnsAnswers(l *logging.Logger) EventFilterGenerator {
return func() EventFilter {
return func(event *types.Event) error {
if event.Context.EventID != events.NetPacketDNSBase {
return FilterPass
}
dnsEventArgs, ok := event.Args.(types.NetPacketDNSBaseArgs)
if !ok {
return FilterPass
}
if dnsEventArgs.Payload == nil {
l.Warn("retreived invalid event for event type dns")
return FilterPass
}
if len(dnsEventArgs.Payload.Answers) == 0 {
return FilterErrEmptyDNSResponse
}
return FilterPass
}
}
}
// more hash function in https://github.com/elastic/go-freelru/blob/main/bench/hash.go
func hashStringXXHASH(s string) uint32 {
return uint32(xxhash.Sum64String(s))
}
// DeduplicateDnsEvents creates a filter that will drop any DNS event with questions already seen in `ttl` time
func DeduplicateDnsEvents(l *logging.Logger, size uint32, ttl time.Duration) EventFilterGenerator {
type cacheValue struct{}
return func() EventFilter {
cache, err := freelru.New[string, cacheValue](size, hashStringXXHASH)
// err is only ever returned on configuration issues. There is nothing we can really do here, besides
// panicing and surfacing the error to the user.
if err != nil {
panic(err)
}
cache.SetLifetime(ttl)
return func(event *types.Event) error {
if event.Context.EventID != events.NetPacketDNSBase {
return FilterPass
}
dnsEventArgs, ok := event.Args.(types.NetPacketDNSBaseArgs)
if !ok {
return FilterPass
}
if dnsEventArgs.Payload == nil {
l.Warn("received invalid event for event type dns")
return FilterPass
}
cacheKey := dnsEventArgs.Payload.DNSQuestionDomain
if cache.Contains(cacheKey) {
if l.IsEnabled(slog.LevelDebug) {
l.WithField("cachekey", cacheKey).Debug("dropping DNS event")
}
return FilterErrDNSDuplicateDetected
}
cache.Add(cacheKey, cacheValue{})
return FilterPass
}
}
}