-
Notifications
You must be signed in to change notification settings - Fork 19
/
filter.go
77 lines (62 loc) · 2 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package streams
import (
"bytes"
"github.com/coreos/etcd/mvcc/mvccpb"
kitlog "github.com/go-kit/kit/log"
)
type Filter func(kitlog.Logger, <-chan *mvccpb.KeyValue) <-chan *mvccpb.KeyValue
// Tap intercepts the given channel and passes values into an operation function.
func Tap(in <-chan *mvccpb.KeyValue, op func(*mvccpb.KeyValue)) <-chan *mvccpb.KeyValue {
out := make(chan *mvccpb.KeyValue)
go func() {
for kv := range in {
op(kv)
out <- kv
}
close(out)
}()
return out
}
// DedupeFilter creates a new channel from `in` that emits events provided the value is
// changed from what was previously seen for that key.
func DedupeFilter(logger kitlog.Logger, in <-chan *mvccpb.KeyValue) <-chan *mvccpb.KeyValue {
out := make(chan *mvccpb.KeyValue)
lastValues := map[string][]byte{}
go func() {
for kv := range in {
previousValue := lastValues[string(kv.Key)]
if bytes.Equal(previousValue, kv.Value) {
withKv(logger, kv).Log("event", "value_unchanged")
} else {
out <- kv
lastValues[string(kv.Key)] = kv.Value
}
}
logger.Log("event", "close", "msg", "in channel closed, closing out")
close(out)
}()
return out
}
// RevisionFilter creates a new channel from `in` that emits every received event,
// provided it preserves ordering of kv ModRevision values on a per-key basis.
func RevisionFilter(logger kitlog.Logger, in <-chan *mvccpb.KeyValue) <-chan *mvccpb.KeyValue {
out := make(chan *mvccpb.KeyValue)
lastRevisions := map[string]int64{}
go func() {
for kv := range in {
previous := lastRevisions[string(kv.Key)]
if previous >= kv.ModRevision {
withKv(logger, kv).Log("event", "stale_revision", "previous", previous)
} else {
out <- kv
lastRevisions[string(kv.Key)] = kv.ModRevision
}
}
logger.Log("event", "close", "msg", "in channel closed, closing out")
close(out)
}()
return out
}
func withKv(logger kitlog.Logger, kv *mvccpb.KeyValue) kitlog.Logger {
return kitlog.With(logger, "key", string(kv.Key), "revision", kv.ModRevision)
}