/
consumer.go
86 lines (68 loc) · 1.85 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package kafka
import (
"context"
"fmt"
"os"
"path/filepath"
"emperror.dev/errors"
"github.com/banzaicloud/allspark/internal/platform/log"
kafka "github.com/segmentio/kafka-go"
)
type Consumer struct {
reader *kafka.Reader
dialer *kafka.Dialer
logger log.Logger
BootstrapServer string `mapstructure:"bootstrap_server"`
Topic string `mapstructure:"topic"`
ConsumerGroup string `mapstructure:"consumer_group"`
}
func NewConsumer(bootStrapServer string, topic string, consumerGroup string, logger log.Logger) *Consumer {
dialer := kafka.DefaultDialer
progName := filepath.Base(os.Args[0])
hostName, _ := os.Hostname()
dialer.ClientID = fmt.Sprintf("%s@%s", progName, hostName)
consumer := &Consumer{
BootstrapServer: bootStrapServer,
Topic: topic,
ConsumerGroup: consumerGroup,
logger: logger,
dialer: dialer,
}
consumer, _ = consumer.Validate()
return consumer
}
func (c *Consumer) Consume(ctx context.Context) (*kafka.Message, error) {
if c.reader == nil {
c.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{c.BootstrapServer},
Topic: c.Topic,
GroupID: c.ConsumerGroup,
Logger: c.logger,
Dialer: c.dialer,
})
}
// the `ReadMessage` method blocks until we receive the next event
message, err := c.reader.ReadMessage(ctx)
if err != nil {
if err := c.reader.Close(); err != nil {
return nil, errors.WrapIf(err, "failed to close kafka reader")
}
return nil, errors.WrapIf(err, "could not read kafka message")
}
return &message, nil
}
func (c *Consumer) SetLogger(log log.Logger) {
c.logger = log
}
func (c *Consumer) Close() error {
return c.reader.Close()
}
func (c *Consumer) Validate() (*Consumer, error) {
if c.ConsumerGroup == "" {
c.ConsumerGroup = "allspark-consumer-group"
}
if c.Topic == "" {
c.Topic = "example-topic"
}
return c, nil
}