forked from lovoo/goka
/
main.go
69 lines (61 loc) · 1.76 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main
import (
"context"
"fmt"
"log"
"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)
var (
brokers = []string{"localhost:9092"}
topic goka.Stream = "example-stream"
group goka.Group = "example-group"
)
// emits a single message and leave
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
fmt.Println("message emitted")
}
// process messages until ctrl-c is pressed
func runProcessor() {
// process callback is invoked for each message delivered from
// "example-stream" topic.
cb := func(ctx goka.Context, msg interface{}) {
var counter int64
// ctx.Value() gets from the group table the value that is stored for
// the message's key.
if val := ctx.Value(); val != nil {
counter = val.(int64)
}
counter++
// SetValue stores the incremented counter in the group table for in
// the message's key.
ctx.SetValue(counter)
log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
}
// Define a new processor group. The group defines all inputs, outputs, and
// serialization formats. The group-table topic is "example-group-table".
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), cb),
goka.Persist(new(codec.Int64)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
if err = p.Run(context.Background()); err != nil {
log.Fatalf("error running processor: %v", err)
}
}
func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}