forked from sodafoundation/strato
/
kafka.go
81 lines (72 loc) · 2.01 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package kafka
import (
"errors"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/opensds/multi-cloud/s3/pkg/helper"
"github.com/opensds/multi-cloud/s3/pkg/messagebus/types"
)
type Kafka struct {
producer *kafka.Producer
doneChan chan int
}
func (kf *Kafka) Start() error {
if kf.producer == nil {
return errors.New("Kafka sender is not created correctly.")
}
go func() {
defer close(kf.doneChan)
for e := range kf.producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.Opaque != nil {
switch v := m.Opaque.(type) {
case chan error:
go func(c chan error, err error) {
c <- err
}(v, m.TopicPartition.Error)
}
}
if m.TopicPartition.Error != nil {
// error here.
log.Errorf("failed to send message to topic[%s] [%d] at offset [%v] with err: %v", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset, m.TopicPartition.Error)
break
}
log.Infof("succeed to send message to topic[%s] [%d] at offset [%v]", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
default:
log.Infof("skip event: %v", ev)
}
}
}()
return nil
}
func (kf *Kafka) Flush(timeout int) error {
if nil == kf.producer {
return errors.New("Kafka sender is not created correclty yet.")
}
kf.producer.Flush(timeout)
return nil
}
func (kf *Kafka) Close() {
if nil == kf.producer {
return
}
kf.producer.Flush(300000)
kf.producer.Close()
_ = <-kf.doneChan
}
func (kf *Kafka) AsyncSend(msg *types.Message) error {
if nil == kf.producer {
return errors.New("Kafka is not created correctly yet.")
}
if nil == msg.Value || "" == msg.Topic {
return errors.New(fmt.Sprintf("input message[%v] is invalid.", msg))
}
var key []byte
if msg.Key != "" {
key = []byte(msg.Key)
}
kf.producer.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &msg.Topic, Partition: kafka.PartitionAny}, Key: key, Value: msg.Value, Opaque: msg.ErrChan}
return nil
}