Skip to content

Commit

Permalink
[Kafka output] Add config validation for topic and topics fields (#38058
Browse files Browse the repository at this point in the history
)

This commit adds configuration validation for `topic` and `topics`
fields from the Kafka output. Validation is performed for both cases:
standalone Beat and running under Elastic-Agent. The latter does not
support dynamic topics, hence setting `topics` is rejected by the
validation.

When running under Elastic-Agent the topic is validated using a regexpt
taken from the Kafka source code:
https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33

(cherry picked from commit 8289144)
  • Loading branch information
belimawr authored and mergify[bot] committed Mar 18, 2024
1 parent 41625cf commit 4442583
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Upgrade go-sysinfo from 1.12.0 to 1.13.1. {pull}37996[37996]
- Make `range` condition work with numeric values as strings. {pull}38080[38080]
- Allow users to configure number of output workers (for outputs that support workers) with either `worker` or `workers`. {pull}38257[38257]
- Kafka output now validates the `topics` and `topic` configuration values {pull}38058[38058]

*Auditbeat*

Expand Down
30 changes: 30 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/rand"
"regexp"
"strings"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -77,6 +79,11 @@ type kafkaConfig struct {
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
Queue config.Namespace `config:"queue"`

// Currently only used for validation. Those values are later
// unpacked into temporary structs whenever they're necessary.
Topic string `config:"topic"`
Topics []any `config:"topics"`
}

type metaConfig struct {
Expand All @@ -102,6 +109,11 @@ var compressionModes = map[string]sarama.CompressionCodec{
"snappy": sarama.CompressionSnappy,
}

// validTopicRegExp is used to validate the topic contains only valid characters
// when running under Elastic-Agent. The regexp is taken from:
// https://github.com/apache/kafka/blob/a126e3a622f2b7142f3543b9dbee54b6412ba9d8/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L33
var validTopicRegExp = regexp.MustCompile("^[a-zA-Z0-9._-]+$")

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
Expand Down Expand Up @@ -169,6 +181,24 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

if c.Topic == "" && len(c.Topics) == 0 {
return errors.New("either 'topic' or 'topics' must be defined")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
if management.UnderAgent() {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}

if !validTopicRegExp.MatchString(c.Topic) {
return fmt.Errorf("topic '%s' is invalid, it must match '[a-zA-Z0-9._-]'", c.Topic)
}
}

return nil
}

Expand Down
95 changes: 92 additions & 3 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestConfigAcceptValid(t *testing.T) {
tests := map[string]mapstr.M{
"default config is valid": mapstr.M{},
"lz4 with 0.11": mapstr.M{
"compression": "lz4",
"version": "0.11",
"topic": "foo",
},
"lz4 with 1.0": mapstr.M{
"compression": "lz4",
"version": "1.0.0",
"topic": "foo",
},
"Kerberos with keytab": mapstr.M{
"topic": "foo",
"kerberos": mapstr.M{
"auth_type": "keytab",
"username": "elastic",
Expand All @@ -52,6 +55,7 @@ func TestConfigAcceptValid(t *testing.T) {
},
},
"Kerberos with user and password pair": mapstr.M{
"topic": "foo",
"kerberos": mapstr.M{
"auth_type": "password",
"username": "elastic",
Expand All @@ -67,7 +71,9 @@ func TestConfigAcceptValid(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
c := config.MustNewConfigFrom(test)
c.SetString("hosts", 0, "localhost")
if err := c.SetString("hosts", 0, "localhost"); err != nil {
t.Fatalf("could not set 'hosts' on config: %s", err)
}
cfg, err := readConfig(c)
if err != nil {
t.Fatalf("Can not create test configuration: %v", err)
Expand All @@ -89,13 +95,17 @@ func TestConfigInvalid(t *testing.T) {
"realm": "ELASTIC",
},
},
// The default config does not set `topic` nor `topics`.
"No topics or topic provided": mapstr.M{},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
c := config.MustNewConfigFrom(test)
c.SetString("hosts", 0, "localhost")
if err := c.SetString("hosts", 0, "localhost"); err != nil {
t.Fatalf("could not set 'hosts' on config: %s", err)
}
_, err := readConfig(c)
if err == nil {
t.Fatalf("Can create test configuration from invalid input")
Expand All @@ -104,6 +114,84 @@ func TestConfigInvalid(t *testing.T) {
}
}

func TestConfigUnderElasticAgent(t *testing.T) {
oldUnderAgent := management.UnderAgent()
t.Cleanup(func() {
// Restore the previous value
management.SetUnderAgent(oldUnderAgent)
})

management.SetUnderAgent(true)

tests := []struct {
name string
cfg mapstr.M
expectError bool
}{
{
name: "topic with all valid characters",
cfg: mapstr.M{
"topic": "abcdefghijklmnopqrstuvxz-ABCDEFGHIJKLMNOPQRSTUVXZ_01234567890.",
},
},
{
name: "topics is provided",
cfg: mapstr.M{
"topics": []string{"foo", "bar"},
},
expectError: true,
},
{
name: "topic cannot contain invalid characters",
cfg: mapstr.M{
"topic": "foo bar",
},
expectError: true,
},
{
name: "topic with invalid characters",
cfg: mapstr.M{
"topic": "foo + bar",
},
expectError: true,
},
{
name: "topic with invalid characters from dynamic topic selection",
cfg: mapstr.M{
"topic": "%{event.field}",
},
expectError: true,
},

// The default config does not set `topic` not `topics`.
{
name: "empty config is invalid",
cfg: mapstr.M{},
expectError: true,
},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
c := config.MustNewConfigFrom(test.cfg)
if err := c.SetString("hosts", 0, "localhost"); err != nil {
t.Fatalf("could not set 'hosts' on config: %s", err)
}

_, err := readConfig(c)

if test.expectError && err == nil {
t.Fatalf("invalid configuration must not be created")
}

if !test.expectError && err != nil {
t.Fatalf("could not create config: %s", err)
}
})
}
}

func TestBackoffFunc(t *testing.T) {
testutil.SeedPRNG(t)
tests := map[int]backoffConfig{
Expand Down Expand Up @@ -178,6 +266,7 @@ func TestTopicSelection(t *testing.T) {

for name, test := range cases {
t.Run(name, func(t *testing.T) {
test := test
selector, err := buildTopicSelector(config.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
Expand Down

0 comments on commit 4442583

Please sign in to comment.