diff --git a/config/flipt.schema.cue b/config/flipt.schema.cue index a1595d98f1..f3137aa416 100644 --- a/config/flipt.schema.cue +++ b/config/flipt.schema.cue @@ -397,7 +397,9 @@ import "strings" topic: string bootstrap_servers: [...string] encoding?: *"protobuf" | "avro" - schema_registry?: string + schema_registry?: { + url: string + } | null require_tls?: bool | *false insecure_skip_tls?: bool | *false authentication?: { diff --git a/config/flipt.schema.json b/config/flipt.schema.json index 953f0c9917..3eb84401c3 100644 --- a/config/flipt.schema.json +++ b/config/flipt.schema.json @@ -1367,7 +1367,13 @@ "default": "protobuf" }, "schema_registry": { - "type": "string" + "type": ["object", "null"], + "additionalProperties": false, + "properties": { + "url": { + "type": "string" + } + } }, "require_tls": { "type": "boolean", diff --git a/go.work.sum b/go.work.sum index d0e4e81fcf..6707149b4d 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1,3 +1,4 @@ +cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= cloud.google.com/go/accessapproval v1.7.6/go.mod h1:bdDCS3iLSLhlK3pu8lJClaeIVghSpTLGChl1Ihr9Fsc= cloud.google.com/go/accesscontextmanager v1.8.6/go.mod h1:rMC0Z8pCe/JR6yQSksprDc6swNKjMEvkfCbaesh+OS0= @@ -200,6 +201,7 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM= +github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/containerd/aufs v1.0.0/go.mod h1:kL5kd6KM5TzQjR79jljyi4olc1Vrx6XBlcyj3gNv2PU= github.com/containerd/btrfs/v2 v2.0.0/go.mod h1:swkD/7j9HApWpzl8OHfrHNxppPd9l44DFZdF94BUj9k= @@ -517,6 +519,7 @@ go.etcd.io/etcd/client/v3 v3.5.12/go.mod h1:tSbBCakoWmmddL+BKVAJHa9km+O/E+bumDe9 go.flipt.io/stew v0.0.0-20240109140408-33ad11ecef1c/go.mod h1:uCrn9WPz5WKYJ7+YXQ/mFO7vIp06PxkxHnPzOpcgMD0= go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1/go.mod h1:SNgMg+EgDFwmvSmLRTNKC5fegJjB7v23qTQ0XLGUNHk= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0/go.mod h1:27iA5uvhuRNmalO+iEUdVn5ZMj2qy10Mm+XRIpRmyuU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= @@ -587,6 +590,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240325203815-454cdb8f5daa/go. google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w= google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6/go.mod h1:10yRODfgim2/T8csjQsMPgZOMvtytXKTDRzH6HRGzRw= google.golang.org/genproto/googleapis/api v0.0.0-20240506185236-b8a5c65736ae/go.mod h1:FfiGhwUm6CJviekPrc0oJ+7h29e+DmWU6UtjX0ZvI7Y= +google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= google.golang.org/genproto/googleapis/bytestream v0.0.0-20240429193739-8cf5692501f6/go.mod h1:ULqtoQMxDLNRfW+pJbKA68wtIy1OiYjdIsJs3PMpzh8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= @@ -595,6 +599,7 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/genproto/googleapis/rpc v0.0.0-20240610135401-a8a62080eff3/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= diff --git a/internal/config/audit.go b/internal/config/audit.go index 4d0913a00b..70e8298512 100644 --- a/internal/config/audit.go +++ b/internal/config/audit.go @@ -126,14 +126,14 @@ type LogSinkConfig struct { // KafkaSinkConfig contains fields that hold configuration for sending audits // to Kafka. type KafkaSinkConfig struct { - Enabled bool `json:"enabled,omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"` - Topic string `json:"topic,omitempty" mapstructure:"topic" yaml:"topic,omitempty"` - BootstrapServers []string `json:"bootstrapServers,omitempty" mapstructure:"bootstrap_servers" yaml:"bootstrap_servers,omitempty"` - Encoding string `json:"encoding,omitempty" mapstructure:"encoding" yaml:"encoding,omitempty"` - Authentication *KafkaAuthentication `json:"-" mapstructure:"authentication" yaml:"-"` - SchemaRegistry string `json:"-" mapstructure:"schema_registry" yaml:"-"` - RequireTLS bool `json:"requireTLS,omitempty" mapstructure:"require_tls" yaml:"require_tls,omitempty"` - InsecureSkipTLS bool `json:"-" mapstructure:"insecure_skip_tls" yaml:"-"` + Enabled bool `json:"enabled,omitempty" mapstructure:"enabled" yaml:"enabled,omitempty"` + Topic string `json:"topic,omitempty" mapstructure:"topic" yaml:"topic,omitempty"` + BootstrapServers []string `json:"bootstrapServers,omitempty" mapstructure:"bootstrap_servers" yaml:"bootstrap_servers,omitempty"` + Encoding string `json:"encoding,omitempty" mapstructure:"encoding" yaml:"encoding,omitempty"` + Authentication *KafkaAuthenticationConfig `json:"-" mapstructure:"authentication" yaml:"-"` + SchemaRegistry *KafkaSchemaRegistryConfig `json:"-" mapstructure:"schema_registry" yaml:"-"` + RequireTLS bool `json:"requireTLS,omitempty" mapstructure:"require_tls" yaml:"require_tls,omitempty"` + InsecureSkipTLS bool `json:"-" mapstructure:"insecure_skip_tls" yaml:"-"` } func (c *KafkaSinkConfig) validate() error { @@ -148,8 +148,12 @@ func (c *KafkaSinkConfig) validate() error { return nil } -// KafkaAuthentication contains fields that hold auth configuration for Kafka. -type KafkaAuthentication struct { +type KafkaSchemaRegistryConfig struct { + URL string `json:"-" mapstructure:"url" yaml:"-"` +} + +// KafkaAuthenticationConfig contains fields that hold auth configuration for Kafka. +type KafkaAuthenticationConfig struct { Username string `json:"-" mapstructure:"username" yaml:"-"` Password string `json:"-" mapstructure:"password" yaml:"-"` } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 75d4accd6a..e91cefaa3c 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -983,11 +983,13 @@ func TestLoad(t *testing.T) { cfg.Audit.Sinks.Kafka.Topic = "audit-topic" cfg.Audit.Sinks.Kafka.BootstrapServers = []string{"kafka-srv1", "kafka-srv2"} cfg.Audit.Sinks.Kafka.Encoding = "protobuf" - cfg.Audit.Sinks.Kafka.Authentication = &KafkaAuthentication{ + cfg.Audit.Sinks.Kafka.Authentication = &KafkaAuthenticationConfig{ Username: "user", Password: "passwd", } - cfg.Audit.Sinks.Kafka.SchemaRegistry = "http://registry" + cfg.Audit.Sinks.Kafka.SchemaRegistry = &KafkaSchemaRegistryConfig{ + URL: "http://registry", + } cfg.Audit.Sinks.Kafka.RequireTLS = true cfg.Audit.Sinks.Kafka.InsecureSkipTLS = true return cfg diff --git a/internal/config/testdata/audit/valid_kafka.yml b/internal/config/testdata/audit/valid_kafka.yml index 22b9a27123..9ad08e5dce 100644 --- a/internal/config/testdata/audit/valid_kafka.yml +++ b/internal/config/testdata/audit/valid_kafka.yml @@ -10,7 +10,7 @@ audit: authentication: username: user password: passwd - schema_registry: http://registry + schema_registry: + url: http://registry require_tls: true insecure_skip_tls: true - diff --git a/internal/server/audit/kafka/kafka.go b/internal/server/audit/kafka/kafka.go index 0066badf19..98a5feb706 100644 --- a/internal/server/audit/kafka/kafka.go +++ b/internal/server/audit/kafka/kafka.go @@ -76,7 +76,7 @@ func NewSink(ctx context.Context, logger *zap.Logger, cfg config.KafkaSinkConfig return nil, err } encodeFn := encoder.Encode - if cfg.SchemaRegistry != "" { + if cfg.SchemaRegistry != nil { encodeFn, err = setupSchemaRegistryEncoder(ctx, cfg, encoder) if err != nil { return nil, err @@ -93,7 +93,7 @@ func NewSink(ctx context.Context, logger *zap.Logger, cfg config.KafkaSinkConfig // setupSchemaRegistryEncoder sets up the schema registry client, registers the schema, // and returns an encoding function. func setupSchemaRegistryEncoder(ctx context.Context, cfg config.KafkaSinkConfig, encoder Encoder) (encodingFn, error) { - rcl, err := sr.NewClient(sr.URLs(cfg.SchemaRegistry)) + rcl, err := sr.NewClient(sr.URLs(cfg.SchemaRegistry.URL)) if err != nil { return nil, err } diff --git a/internal/server/audit/kafka/kafka_test.go b/internal/server/audit/kafka/kafka_test.go index d33013db14..b0d0fb840d 100644 --- a/internal/server/audit/kafka/kafka_test.go +++ b/internal/server/audit/kafka/kafka_test.go @@ -79,7 +79,7 @@ func TestNewSinkAndSend(t *testing.T) { } adminCreateUser(t, fmt.Sprintf("http://%s:9644", srv)) bootstrapServers := []string{srv} - defaultAuth := &config.KafkaAuthentication{Username: "admin", Password: "password"} + defaultAuth := &config.KafkaAuthenticationConfig{Username: "admin", Password: "password"} e := audit.NewEvent( flipt.NewRequest(flipt.ResourceFlag, flipt.ActionCreate, flipt.WithSubject(flipt.SubjectRule)), &audit.Actor{ @@ -118,12 +118,16 @@ func TestNewSinkAndSend(t *testing.T) { BootstrapServers: bootstrapServers, Topic: topic, Encoding: tt.enc, - SchemaRegistry: tt.schemaRegistry, RequireTLS: true, //nolint:gosec InsecureSkipTLS: true, Authentication: defaultAuth, } + + if tt.schemaRegistry != "" { + cfg.SchemaRegistry = &config.KafkaSchemaRegistryConfig{URL: tt.schemaRegistry} + } + s, err := NewSink(context.Background(), zaptest.NewLogger(t), cfg) require.NoError(t, err) t.Cleanup(func() { @@ -162,7 +166,7 @@ func TestNewSinkAndSend(t *testing.T) { Encoding: encodingProto, InsecureSkipTLS: true, RequireTLS: true, - Authentication: &config.KafkaAuthentication{Username: "user", Password: ""}, + Authentication: &config.KafkaAuthenticationConfig{Username: "user", Password: ""}, }, "SCRAM-SHA-256 user and pass must be non-empty", },