forked from lovoo/goka
/
main.go
67 lines (61 loc) · 1.7 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"syscall"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/collector"
"github.com/lovoo/goka/examples/3-messaging/detector"
"github.com/lovoo/goka/examples/3-messaging/filter"
"github.com/lovoo/goka/examples/3-messaging/translator"
"golang.org/x/sync/errgroup"
)
var (
brokers = []string{"localhost:9092"}
runFilter = flag.Bool("filter", false, "run filter processor")
runCollector = flag.Bool("collector", false, "run collector processor")
runTranslator = flag.Bool("translator", false, "run translator processor")
runBlocker = flag.Bool("blocker", false, "run blocker processor")
runDetector = flag.Bool("detector", false, "run detector processor")
broker = flag.String("broker", "localhost:9092", "boostrap Kafka broker")
)
func main() {
flag.Parse()
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)
if *runCollector {
log.Println("starting collector")
grp.Go(collector.Run(ctx, brokers))
}
if *runFilter {
log.Println("starting filter")
grp.Go(filter.Run(ctx, brokers))
}
if *runBlocker {
log.Println("starting blocker")
grp.Go(blocker.Run(ctx, brokers))
}
if *runDetector {
log.Println("starting detector")
grp.Go(detector.Run(ctx, brokers))
}
if *runTranslator {
log.Println("starting translator")
grp.Go(translator.Run(ctx, brokers))
}
// Wait for SIGINT/SIGTERM
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)
select {
case <-waiter:
case <-ctx.Done():
}
cancel()
if err := grp.Wait(); err != nil {
log.Println(err)
}
log.Println("done")
}