/
consumer.go
100 lines (96 loc) · 3 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright 2019 InfAI (CC SES)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import (
"context"
"github.com/segmentio/kafka-go"
"io"
"log"
"os"
"time"
)
type ConsumerConfig struct {
KafkaUrl string
GroupId string
Topic string
MinBytes int
MaxBytes int
MaxWait time.Duration
TopicConfigMap map[string][]kafka.ConfigEntry
}
func NewConsumer(ctx context.Context, config ConsumerConfig, listener func(topic string, msg []byte, time time.Time) error, errorhandler func(err error)) (err error) {
log.Println("DEBUG: consume topic: \"" + config.Topic + "\"")
err = InitTopic(config.KafkaUrl, config.TopicConfigMap, config.Topic)
if err != nil {
log.Println("ERROR: unable to create topic", err)
return err
}
r := kafka.NewReader(kafka.ReaderConfig{
CommitInterval: 0, //synchronous commits
Brokers: []string{config.KafkaUrl},
GroupID: config.GroupId,
Topic: config.Topic,
MinBytes: config.MinBytes,
MaxBytes: config.MaxBytes,
MaxWait: config.MaxWait,
Logger: log.New(io.Discard, "", 0),
ErrorLogger: log.New(os.Stdout, "[KAFKA-ERR]", log.LstdFlags),
WatchPartitionChanges: true,
PartitionWatchInterval: time.Minute,
})
go func() {
for {
select {
case <-ctx.Done():
log.Println("close kafka reader ", config.Topic)
return
default:
m, err := r.FetchMessage(ctx)
if err == io.EOF || err == context.Canceled {
log.Println("close consumer for topic ", config.Topic)
return
}
if err != nil {
log.Println("ERROR: while consuming topic ", config.Topic, err)
errorhandler(err)
return
}
if time.Now().Sub(m.Time) > 1*time.Hour { //floodgate to prevent old messages to DOS the consumer
log.Println("WARNING: kafka message older than 1h: ", config.Topic, time.Now().Sub(m.Time))
err = r.CommitMessages(ctx, m)
if err != nil {
log.Println("ERROR: while committing message ", config.Topic, err)
errorhandler(err)
return
}
} else {
err = listener(m.Topic, m.Value, m.Time)
if err != nil {
log.Println("ERROR: unable to handle message (no commit)", err, m.Topic, string(m.Value))
} else {
err = r.CommitMessages(ctx, m)
if err != nil {
log.Println("ERROR: while committing message ", config.Topic, err)
errorhandler(err)
return
}
}
}
}
}
}()
return err
}