Skip to content

Commit

Permalink
[Kafka output] Add config validation for topic and topics fields
Browse files Browse the repository at this point in the history
This commit adds configuration validation for `topic` and `topics`
fields from the Kafka output. Validation is performed for both cases:
standalone Beat and running under Elastic-Agent. The latter does not
support dynamic topics, hence setting `topics` is rejected by the
validation.
  • Loading branch information
belimawr committed Feb 19, 2024
1 parent 01833d2 commit de3d736
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Upgrade to elastic-agent-libs v0.7.3 and golang.org/x/crypto v0.17.0. {pull}37544[37544]
- Make more selective the Pod autodiscovery upon node and namespace update events. {issue}37338[37338] {pull}37431[37431]
- Upgrade go-sysinfo from 1.12.0 to 1.13.1. {pull}37996[37996]
- Kafka output now validates the `topics` and `topic` configuration values {pull}38058[38058]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -77,6 +78,11 @@ type kafkaConfig struct {
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
Queue config.Namespace `config:"queue"`

// Currently only used for validation. Those values are later
// unpacked into temporary structs whenever they're necessary.
Topic string `config:"topic"`
Topics []string `config:"topics"`
}

type metaConfig struct {
Expand Down Expand Up @@ -169,6 +175,20 @@ func (c *kafkaConfig) Validate() error {
return fmt.Errorf("compression_level must be between 0 and 9")
}
}

if c.Topic == "" && len(c.Topics) == 0 {
return errors.New("either 'topic' or 'topics' must be defined")
}

// When running under Elastic-Agent we do not support dynamic topic
// selection, so `topics` is not supported and `topic` is treated as an
// plain string
if management.UnderAgent() {
if len(c.Topics) != 0 {
return errors.New("'topics' is not supported when running under Elastic-Agent")
}
}

return nil
}

Expand Down
38 changes: 37 additions & 1 deletion libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestConfigAcceptValid(t *testing.T) {
tests := map[string]mapstr.M{
"default config is valid": mapstr.M{},
"lz4 with 0.11": mapstr.M{
"compression": "lz4",
"version": "0.11",
"topic": "foo",
},
"lz4 with 1.0": mapstr.M{
"compression": "lz4",
"version": "1.0.0",
"topic": "foo",
},
"Kerberos with keytab": mapstr.M{
"topic": "foo",
"kerberos": mapstr.M{
"auth_type": "keytab",
"username": "elastic",
Expand All @@ -52,6 +55,7 @@ func TestConfigAcceptValid(t *testing.T) {
},
},
"Kerberos with user and password pair": mapstr.M{
"topic": "foo",
"kerberos": mapstr.M{
"auth_type": "password",
"username": "elastic",
Expand Down Expand Up @@ -89,6 +93,8 @@ func TestConfigInvalid(t *testing.T) {
"realm": "ELASTIC",
},
},
// The default config does not set `topic` nor `topics`.
"No topics or topic provided": mapstr.M{},
}

for name, test := range tests {
Expand All @@ -104,6 +110,36 @@ func TestConfigInvalid(t *testing.T) {
}
}

func TestInvalidConfigUnderElasticAgent(t *testing.T) {
oldUnderAgent := management.UnderAgent()
t.Cleanup(func() {
// Restore the previous value
management.SetUnderAgent(oldUnderAgent)
})

management.SetUnderAgent(true)
tests := map[string]mapstr.M{
"topics is provided": mapstr.M{
"topics": []string{"foo", "bar"},
},
// The default config does not set `topic` not `topics`.
"No topics or topic provided": mapstr.M{},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
c := config.MustNewConfigFrom(test)
c.SetString("hosts", 0, "localhost")

Check failure on line 133 in libbeat/outputs/kafka/config_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `c.SetString` is not checked (errcheck)

Check failure on line 133 in libbeat/outputs/kafka/config_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `c.SetString` is not checked (errcheck)
_, err := readConfig(c)
if err == nil {
t.Fatalf("Can create test configuration from invalid input")
}
})
}

}

func TestBackoffFunc(t *testing.T) {
testutil.SeedPRNG(t)
tests := map[int]backoffConfig{
Expand Down

0 comments on commit de3d736

Please sign in to comment.