forked from CaMeLCa5e/daily
-
Notifications
You must be signed in to change notification settings - Fork 0
/
apachekafka.py
88 lines (59 loc) · 2.43 KB
/
apachekafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func main() {
config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
startMetrics(graphiteConnect, graphiteFlushInterval)
ctrlc := make(chan os.Signal, 1)
signal.Notify(ctrlc, os.Interrupt)
consumers := make([]*kafkaClient.Consumer, numConsumers)
for i := 0; i < numConsumers; i++ {
consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
time.Sleep(10 * time.Second)
}
<-ctrlc
fmt.Println("Shutdown triggered, closing all alive consumers")
for _, consumer :=range consumers {
<-consumer.Close()
}
fmt.Println("Successfully shut down all consumers")
}
func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
addr, err := net.ResolveTCPAddr('tcp', graphiteConnect)
if err != nil{
panic(err)
}
go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
Addr: addr,
Registry: metrics.DefaultRegistry,
FlushInterval: graphiteFlushInterval,
DurationUnit: time.Second,
Prefix: "metrics"
Precentiles: []float64{0.5, .75, .99, .999}
})
}
func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
config.Strategy = GetStrategy(config.Consumerid)
config.WorkerFailureCallback = FailedCallback
config.WorkerFailureAttemptCallback = FailedAttemptCallback
consumer := kafkaClient.NewConsumer(&config)
topics := map[string]int {topic: config.NumConsumerFetchers}
go func() {
consumer.StartStatic(topics)
}()
return consumer
}
funct GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult{
consumeRate := metrics.NewRegisteredMeter (fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
return func(_*kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult{
kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))
consumeRate.Mark(1)
return kafkaClient.NewSucessfulResult(id)
}
}
func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
kafkaClient.Info ("main", "FailedCallback")
return kafkaClient.DoNotCommitOffsetAndStop
}
func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
kafkaClient.Info("main", "Failed attempt")
return kafkaClient.CommitOffsetAndStop
}