Skip to content

Commit

Permalink
Add support for static and random routing keys in kafka output (influ…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Jean-Louis Dupond committed Apr 22, 2019
1 parent 7a79ac2 commit d163c03
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 19 deletions.
7 changes: 7 additions & 0 deletions plugins/outputs/kafka/README.md
Expand Up @@ -50,6 +50,13 @@ This plugin writes to a [Kafka Broker](http://kafka.apache.org/07/quickstart.htm
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"

## Static routing key. Used when no routing_tag is set or as a fallback
## when the tag specified in routing tag is not found. If set to "random",
## a random value will be generated for each message.
## ex: routing_key = "random"
## routing_key = "telegraf"
# routing_key = ""

## CompressionCodec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
Expand Down
55 changes: 36 additions & 19 deletions plugins/outputs/kafka/kafka.go
Expand Up @@ -10,6 +10,7 @@ import (
tlsint "github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
uuid "github.com/satori/go.uuid"

"github.com/Shopify/sarama"
)
Expand All @@ -22,24 +23,16 @@ var ValidTopicSuffixMethods = []string{

type (
Kafka struct {
// Kafka brokers to send metrics to
Brokers []string
// Kafka topic
Topic string
// Kafka client id
ClientID string `toml:"client_id"`
// Kafka topic suffix option
TopicSuffix TopicSuffix `toml:"topic_suffix"`
// Routing Key Tag
RoutingTag string `toml:"routing_tag"`
// Compression Codec Tag
Brokers []string
Topic string
ClientID string `toml:"client_id"`
TopicSuffix TopicSuffix `toml:"topic_suffix"`
RoutingTag string `toml:"routing_tag"`
RoutingKey string `toml:"routing_key"`
CompressionCodec int
// RequiredAcks Tag
RequiredAcks int
// MaxRetry Tag
MaxRetry int
// Max Message Bytes
MaxMessageBytes int `toml:"max_message_bytes"`
RequiredAcks int
MaxRetry int
MaxMessageBytes int `toml:"max_message_bytes"`

Version string `toml:"version"`

Expand Down Expand Up @@ -116,6 +109,13 @@ var sampleConfig = `
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
## Static routing key. Used when no routing_tag is set or as a fallback
## when the tag specified in routing tag is not found. If set to "random",
## a random value will be generated for each message.
## ex: routing_key = "random"
## routing_key = "telegraf"
# routing_key = ""
## CompressionCodec represents the various compression codecs recognized by
## Kafka in messages.
## 0 : No compression
Expand Down Expand Up @@ -273,6 +273,22 @@ func (k *Kafka) Description() string {
return "Configuration for the Kafka server to send metrics to"
}

func (k *Kafka) routingKey(metric telegraf.Metric) string {
if k.RoutingTag != "" {
key, ok := metric.GetTag(k.RoutingTag)
if ok {
return key
}
}

if k.RoutingKey == "random" {
u := uuid.NewV4()
return u.String()
}

return k.RoutingKey
}

func (k *Kafka) Write(metrics []telegraf.Metric) error {
msgs := make([]*sarama.ProducerMessage, 0, len(metrics))
for _, metric := range metrics {
Expand All @@ -285,8 +301,9 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
Topic: k.GetTopicName(metric),
Value: sarama.ByteEncoder(buf),
}
if h, ok := metric.GetTag(k.RoutingTag); ok {
m.Key = sarama.StringEncoder(h)
key := k.routingKey(metric)
if key != "" {
m.Key = sarama.StringEncoder(key)
}
msgs = append(msgs, m)
}
Expand Down
59 changes: 59 additions & 0 deletions plugins/outputs/kafka/kafka_test.go
Expand Up @@ -2,7 +2,10 @@ package kafka

import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,3 +99,59 @@ func TestValidateTopicSuffixMethod(t *testing.T) {
require.NoError(t, err, "Topic suffix method used should be valid.")
}
}

func TestRoutingKey(t *testing.T) {
tests := []struct {
name string
kafka *Kafka
metric telegraf.Metric
check func(t *testing.T, routingKey string)
}{
{
name: "static routing key",
kafka: &Kafka{
RoutingKey: "static",
},
metric: func() telegraf.Metric {
m, _ := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
return m
}(),
check: func(t *testing.T, routingKey string) {
require.Equal(t, "static", routingKey)
},
},
{
name: "random routing key",
kafka: &Kafka{
RoutingKey: "random",
},
metric: func() telegraf.Metric {
m, _ := metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
)
return m
}(),
check: func(t *testing.T, routingKey string) {
require.Equal(t, 36, len(routingKey))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key := tt.kafka.routingKey(tt.metric)
tt.check(t, key)
})
}
}

0 comments on commit d163c03

Please sign in to comment.