forked from lovoo/goka
/
filter.go
55 lines (48 loc) · 1.34 KB
/
filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package filter
import (
"strings"
"github.com/lovoo/goka"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/translator"
)
var (
group goka.Group = "message_filter"
)
func shouldDrop(ctx goka.Context) bool {
v := ctx.Join(blocker.Table)
return v != nil && v.(*blocker.BlockValue).Blocked
}
func filter(ctx goka.Context, msg interface{}) {
if shouldDrop(ctx) {
return
}
m := translate(ctx, msg.(*messaging.Message))
ctx.Emit(messaging.ReceivedStream, m.To, m)
}
func translate(ctx goka.Context, m *messaging.Message) *messaging.Message {
words := strings.Split(m.Content, " ")
for i, w := range words {
if tw := ctx.Lookup(translator.Table, w); tw != nil {
words[i] = tw.(string)
}
}
return &messaging.Message{
From: m.From,
To: m.To,
Content: strings.Join(words, " "),
}
}
func Run(brokers []string) {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), filter),
goka.Output(messaging.ReceivedStream, new(messaging.MessageCodec)),
goka.Join(blocker.Table, new(blocker.BlockValueCodec)),
goka.Lookup(translator.Table, new(translator.ValueCodec)),
)
if p, err := goka.NewProcessor(brokers, g); err != nil {
panic(err)
} else if err = p.Start(); err != nil {
panic(err)
}
}