-
Notifications
You must be signed in to change notification settings - Fork 23
/
kafka.go
174 lines (155 loc) · 5 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package resultlog
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"github.com/caraml-dev/mlp/api/pkg/instrumentation/metrics"
"github.com/caraml-dev/turing/engines/router/missionctl/config"
"github.com/caraml-dev/turing/engines/router/missionctl/errors"
"github.com/caraml-dev/turing/engines/router/missionctl/instrumentation"
"github.com/caraml-dev/turing/engines/router/missionctl/log/resultlog/proto/turing"
)
const (
kafkaConnectTimeoutMs = 1000
)
// kafkaProducer minimally defines the functionality used by the KafkaLogger,
// for producing messages to a Kafka topic (useful for mocking in tests).
type kafkaProducer interface {
GetMetadata(*string, bool, int) (*kafka.Metadata, error)
Produce(*kafka.Message, chan kafka.Event) error
}
// KafkaLogger logs the result log data to the configured Kafka topic
type KafkaLogger struct {
serializationFormat config.SerializationFormat
topic string
producer kafkaProducer
}
// NewKafkaLogger creates a new KafkaLogger
func NewKafkaLogger(cfg *config.KafkaConfig) (*KafkaLogger, error) {
// Create Kafka Producer
producer, err := newKafkaProducer(cfg)
if err != nil {
return nil, err
}
// Test that we are able to query the broker on the topic. If the topic
// does not already exist on the broker, this should create it.
_, err = producer.GetMetadata(&cfg.Topic, false, kafkaConnectTimeoutMs)
if err != nil {
return nil, errors.Wrapf(err,
"Error Querying topic %s from Kafka broker(s)", cfg.Topic)
}
// Create Kafka Logger
return &KafkaLogger{
serializationFormat: cfg.SerializationFormat,
topic: cfg.Topic,
producer: producer,
}, nil
}
func newKafkaProducer(cfg *config.KafkaConfig) (kafkaProducer, error) {
producer, err := kafka.NewProducer(
&kafka.ConfigMap{
"bootstrap.servers": cfg.Brokers,
"message.max.bytes": cfg.MaxMessageBytes,
"compression.type": cfg.CompressionType})
if err != nil {
return nil, errors.Wrapf(err, "Error initializing Kafka Producer")
}
return producer, err
}
func (l *KafkaLogger) writeToKafka(
message proto.Message,
turingReqID string,
timestamp *timestamppb.Timestamp) error {
var err error
// Measure time taken to marshal the data and write the log to the kafka topic
defer metrics.Glob().MeasureDurationMs(
instrumentation.TuringComponentRequestDurationMs,
map[string]func() string{
"status": func() string {
return metrics.GetStatusString(err == nil)
},
"component": func() string {
return "kafka_marshal_and_write"
},
"traffic_rule": func() string { return "" },
},
)()
// Format Kafka Message
var keyBytes, valueBytes []byte
if l.serializationFormat == config.JSONSerializationFormat {
valueBytes, err = newJSONKafkaLogEntry(message)
} else if l.serializationFormat == config.ProtobufSerializationFormat {
keyBytes, valueBytes, err = newProtobufKafkaLogEntry(
message,
turingReqID,
timestamp)
} else {
// Unknown format, we wouldn't hit this since the config is checked at initialization,
// but handle it.
return errors.Newf(errors.BadConfig, "Unknown Serialization format %s", l.serializationFormat)
}
if err != nil {
return err
}
// Produce Message
deliveryChan := make(chan kafka.Event, 1)
defer close(deliveryChan)
err = l.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &l.topic,
Partition: kafka.PartitionAny},
Value: valueBytes,
Key: keyBytes,
}, deliveryChan)
if err != nil {
return err
}
// Get delivery response
event := <-deliveryChan
msg := event.(*kafka.Message)
if msg.TopicPartition.Error != nil {
err = errors.Newf(errors.BadResponse,
"Delivery failed: %v\n", msg.TopicPartition.Error)
return err
}
return nil
}
func (l *KafkaLogger) write(turLogEntry *turing.TuringResultLogMessage) error {
return l.writeToKafka(
turLogEntry,
turLogEntry.TuringReqId,
turLogEntry.EventTimestamp,
)
}
// newJSONKafkaLogEntry converts a given TuringResultLogEntry to bytes, for writing to a Kafka topic
// in JSON format
func newJSONKafkaLogEntry(message proto.Message) (messageBytes []byte, err error) {
messageBytes, err = protoJSONMarshaller.Marshal(message)
if err != nil {
return nil, err
}
return
}
// newProtobufKafkaLogEntry converts a given TuringResultLogEntry to the Protobuf format and marshals it,
// for writing to a Kafka topic
func newProtobufKafkaLogEntry(
message proto.Message,
turingReqID string,
eventTimestamp *timestamppb.Timestamp,
) (keyBytes []byte, valueBytes []byte, err error) {
// Create the Kafka key
key := &turing.TuringResultLogKey{
TuringReqId: turingReqID,
EventTimestamp: eventTimestamp,
}
// Marshal the key and the message
keyBytes, err = proto.Marshal(key)
if err != nil {
return nil, nil, errors.Wrapf(err, "Unable to marshal log entry key")
}
valueBytes, err = proto.Marshal(message)
if err != nil {
return nil, nil, errors.Wrapf(err, "Unable to marshal log entry value")
}
return
}