/
kafka.go
102 lines (85 loc) · 2.7 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package event
import (
"encoding/json"
"fmt"
"strings"
"github.com/Shopify/sarama"
"github.com/spf13/viper"
"github.com/ovh/cds/engine/log"
"github.com/ovh/cds/sdk"
)
var producer sarama.SyncProducer
// KafkaClient enbeddes the Kafka connecion
type KafkaClient struct {
options KafkaConfig
producer sarama.SyncProducer
}
// KafkaConfig handles all config to connect to Kafka
type KafkaConfig struct {
Enabled bool
BrokerAddresses string
User string
Password string
Topic string
}
// initialize returns broker, isInit and err if
func (c *KafkaClient) initialize(options interface{}) (Broker, error) {
conf, ok := options.(KafkaConfig)
if !ok {
return nil, fmt.Errorf("Invalid Kafka Initialization")
}
if conf.BrokerAddresses == "" ||
conf.User == "" ||
conf.Password == "" ||
conf.Topic == "" {
return nil, fmt.Errorf("initKafka> Invalid Kafka Configuration")
}
c.options = conf
if err := c.initProducer(); err != nil {
return nil, fmt.Errorf("initKafka> Error with init sarama:%s (newSyncProducer on %s user:%s)", err.Error(), viper.GetString("event_kafka_broker_addresses"), viper.GetString("event_kafka_user"))
}
return c, nil
}
// close closes producer
func (c *KafkaClient) close() {
if c.producer != nil {
if err := c.producer.Close(); err != nil {
log.Warning("closeKafka> Error while closing kafka producer:%s", err.Error())
}
}
}
// initProducer initializes kafka producer
func (c *KafkaClient) initProducer() error {
var config = sarama.NewConfig()
config.Net.TLS.Enable = true
config.Net.SASL.Enable = true
config.Net.SASL.User = c.options.User
config.Net.SASL.Password = c.options.Password
config.ClientID = c.options.User
config.Producer.Return.Successes = true
producer, errp := sarama.NewSyncProducer(strings.Split(c.options.BrokerAddresses, ","), config)
if errp != nil {
return fmt.Errorf("initKafka> Error with init sarama:%s (newSyncProducer on %s user:%s)", errp.Error(), c.options.BrokerAddresses, c.options.User)
}
log.Info("initKafka> Kafka used at %s on topic:%s", c.options.BrokerAddresses, c.options.Topic)
c.producer = producer
return nil
}
// sendOnKafkaTopic send a hook on a topic kafka
func (c *KafkaClient) sendEvent(event *sdk.Event) error {
data, errm := json.Marshal(event)
if errm != nil {
return errm
}
msg := &sarama.ProducerMessage{Topic: c.options.Topic, Value: sarama.ByteEncoder(data)}
partition, offset, errs := c.producer.SendMessage(msg)
if errs != nil {
return errs
}
log.Debug("Event %+v sent to topic %s partition %d offset %d", event, c.options.Topic, partition, offset)
return nil
}
// status: here, if c is initialized, Kafka is ok
func (c *KafkaClient) status() string {
return "Kafka OK"
}