-
Notifications
You must be signed in to change notification settings - Fork 3
/
sink.go
97 lines (83 loc) · 2.04 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package kafkasink
import (
"context"
"github.com/arquivei/goduck/pipeline"
"github.com/arquivei/foundationkit/errors"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
type kafkaPusher struct {
producer *kafka.Producer
}
// SinkMessage is the input for the Kafka Sink
type SinkMessage struct {
Topic string
Key []byte
Value []byte
}
// MustNew creates a new pipeline sink that saves messages to kafka
func MustNew(
brokers string,
username string,
password string,
) (pipeline.Sink, func()) {
if brokers == "" {
panic("missing kafka brokers")
}
if username == "" {
panic("missing kafka username")
}
if password == "" {
panic("missing kafka password")
}
configs := &kafka.ConfigMap{
"bootstrap.servers": brokers,
"compression.codec": "gzip",
"partitioner": "murmur2_random",
"sasl.mechanisms": "PLAIN",
"sasl.password": password,
"sasl.username": username,
"security.protocol": "sasl_plaintext",
}
producer, err := kafka.NewProducer(configs)
if err != nil {
panic(err)
}
pusher := &kafkaPusher{
producer: producer,
}
closeFn := func() {
producer.Close()
}
return pusher, closeFn
}
func (p *kafkaPusher) Store(ctx context.Context, messages ...pipeline.SinkMessage) error {
const op = errors.Op("kafkasink.kafkaPusher.Store")
sdkMessages := make([]*kafka.Message, len(messages))
for i, m := range messages {
message := m.(SinkMessage)
sdkMsg := &kafka.Message{
Key: message.Key,
Value: message.Value,
TopicPartition: kafka.TopicPartition{
Topic: &message.Topic,
Partition: kafka.PartitionAny,
},
}
sdkMessages[i] = sdkMsg
}
deliveryChan := make(chan kafka.Event, len(sdkMessages))
for _, msg := range sdkMessages {
err := p.producer.Produce(msg, deliveryChan)
if err != nil {
return errors.E(op, err, errors.SeverityRuntime)
}
}
for i := 0; i < len(sdkMessages); i++ {
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
return errors.E(op, m.TopicPartition.Error, errors.SeverityRuntime)
}
}
return nil
}