/
kafkaproducer.go
160 lines (135 loc) · 4.88 KB
/
kafkaproducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package kafkaproducer
import (
"errors"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
log "github.com/sirupsen/logrus"
"github.com/digitalocean/firebolt"
"github.com/digitalocean/firebolt/fbcontext"
"github.com/digitalocean/firebolt/util"
)
// KafkaProducer is a firebolt node for producing messages onto a Kafka topic.
type KafkaProducer struct {
fbcontext.ContextAware
producer MessageProducer
topic string
stopChan chan bool
}
// MessageProducer is an interface extracted from 'kafka.Producer' to make this mockable
// generated from this dir with 'mockery -name messageProducer -inpkg .' in case that interface changes
type MessageProducer interface {
ProduceChannel() chan *kafka.Message
Events() chan kafka.Event
Flush(timeoutMs int) int
Close()
}
// Setup creates the underlying Kafka producer client and events receiver, leaving it ready to handle events.
func (k *KafkaProducer) Setup(config map[string]string) error {
configMap, err := k.buildConfigMap(config)
if err != nil {
return err
}
log.WithField("kafkabrokers", config["brokers"]).Info("creating kafka producer")
p, err := kafka.NewProducer(configMap)
if err != nil {
log.WithError(err).Error("failed to create kafka producer")
return err
}
log.Info("created kafka producer")
k.producer = p
k.topic = config["topic"]
k.stopChan = make(chan bool)
go k.startEventsReceiver()
log.Info("started kafka events receiver")
return nil
}
func (k *KafkaProducer) buildConfigMap(config map[string]string) (*kafka.ConfigMap, error) {
err := k.checkConfig(config)
if err != nil {
return nil, err
}
// default kafka producer config
configMap := &kafka.ConfigMap{
"bootstrap.servers": config["brokers"],
"statistics.interval.ms": 60000,
"queue.buffering.max.messages": 50000,
"queue.buffering.max.kbytes": 256000,
"queue.buffering.max.ms": 3000,
"log.connection.close": false,
"socket.keepalive.enable": true,
"compression.codec": "snappy",
}
err = util.ApplyLibrdkafkaConf(config, configMap)
if err != nil {
return nil, err
}
return configMap, nil
}
func (k *KafkaProducer) checkConfig(config map[string]string) error {
if config["brokers"] == "" {
return fmt.Errorf("kafkaproducer: missing or invalid value for config 'brokers': %s", config["brokers"])
}
return nil
}
// Process sends a single event `msg` to the configured Kafka topic.
func (k *KafkaProducer) Process(event *firebolt.Event) (*firebolt.Event, error) {
// start with a type assertion because :sad-no-generics:
produceRequest, ok := event.Payload.(firebolt.ProduceRequest)
if !ok {
return nil, errors.New("kafkaproducer: failed type assertion for conversion to ProduceRequest")
}
// allow overriding the node config topic on a per-msg basis
destinationTopic := k.topic
if produceRequest.Topic() != "" {
destinationTopic = produceRequest.Topic()
}
if destinationTopic == "" {
return nil, errors.New("kafkaproducer: missing topic name in both node config and ProduceRequest")
}
kafkaMsg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &destinationTopic, Partition: kafka.PartitionAny},
Value: produceRequest.Message(),
}
k.Produce(kafkaMsg)
return nil, nil
}
// Produce produces a single client-constructed kafka.Message to the configured Kafka topic.
func (k *KafkaProducer) Produce(msg *kafka.Message) {
log.WithField("topic", k.topic).Debug("kafkaproducer: placing message on producechannel")
k.producer.ProduceChannel() <- msg
}
// Shutdown stops the underlying Kafka producer client.
func (k *KafkaProducer) Shutdown() error {
k.stop()
return nil
}
// Running in a goroutine, asynchronously report on producer events
func (k *KafkaProducer) startEventsReceiver() {
for e := range k.producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.WithField("node_id", k.ID).WithField("producer_error", ev.TopicPartition.Error).Debug("produce failed")
} else {
log.WithField("node_id", k.ID).WithField("kafka_topic", *ev.TopicPartition.Topic).
WithField("kafka_partition", ev.TopicPartition.Partition).
WithField("partition_offset", ev.TopicPartition.Offset).
Debug("produced message successfully")
}
case *kafka.Stats:
log.WithField("node_id", k.ID).WithField("kafka_producer_stats", ev).Debug("librdkafka producer stats")
default:
log.WithField("node_id", k.ID).WithField("kafka_producer_event", ev).Warn("received unexpected kafka producer event")
}
}
}
// stop closes the underlying kafka producer after flushing any unwritten records
func (k *KafkaProducer) stop() {
log.WithField("node_id", k.ID).Info("stopping kafka producer")
k.producer.Flush(5000)
k.producer.Close()
}
// Receive handles a message from another node or an external source
func (k *KafkaProducer) Receive(msg fbcontext.Message) error {
return errors.New("kafkaproducer: message not supported")
}