diff --git a/Makefile b/Makefile index 8464ec6c..7a2a4c88 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -GO_TEST_TIMEOUT=180s +GO_TEST_TIMEOUT=240s GOTESTFLAGS= GO_TEST_COUNT=10 diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 50a1ffce..80b2c42a 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -108,10 +108,13 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) // // Topics that already exist will be updated. func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error { - // TODO(axw) how should we record topics? - ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes( - semconv.MessagingSystemKey.String("kafka"), - )) + ctx, span := c.m.tracer.Start( + ctx, + "CreateTopics", + trace.WithAttributes( + semconv.MessagingSystemKey.String("kafka"), + ), + ) defer span.End() namespacePrefix := c.m.cfg.namespacePrefix() @@ -127,12 +130,32 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi return fmt.Errorf("failed to list kafka topics: %w", err) } - // missingTopics contains topics which need to be created. - missingTopics := make([]string, 0, len(topicNames)) - // updatePartitions contains topics which partitions' need to be updated. - updatePartitions := make([]string, 0, len(topicNames)) - // existingTopics contains the existing topics, used by AlterTopicConfigs. - existingTopics := make([]string, 0, len(topicNames)) + missingTopics, updatePartitions, existingTopics := c.categorizeTopics(existing, topicNames) + + var updateErrors []error + if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { + updateErrors = append(updateErrors, err) + } + + if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil { + updateErrors = append(updateErrors, err) + } + + if err := c.alterTopicConfigs(ctx, span, existingTopics); err != nil { + updateErrors = append(updateErrors, err) + } + + return errors.Join(updateErrors...) +} + +func (c *TopicCreator) categorizeTopics( + existing kadm.TopicDetails, + topicNames []string, +) (missingTopics, updatePartitions, existingTopics []string) { + missingTopics = make([]string, 0, len(topicNames)) + updatePartitions = make([]string, 0, len(topicNames)) + existingTopics = make([]string, 0, len(topicNames)) + for _, wantTopic := range topicNames { if !existing.Has(wantTopic) { missingTopics = append(missingTopics, wantTopic) @@ -144,6 +167,18 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi } } + return missingTopics, updatePartitions, existingTopics +} + +func (c *TopicCreator) createMissingTopics( + ctx context.Context, + span trace.Span, + missingTopics []string, +) error { + if len(missingTopics) == 0 { + return nil + } + responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), -1, // default.replication.factor @@ -155,6 +190,9 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to create kafka topics: %w", err) } + + namespacePrefix := c.m.cfg.namespacePrefix() + loggerFields := []zap.Field{ zap.Int("partition_count", c.partitionCount), } @@ -167,14 +205,14 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi var updateErrors []error for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) + logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } + if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { - // NOTE(axw) apmotel currently does nothing with span events, - // hence we log as well as create a span event. logger.Debug("kafka topic already exists", zap.String("topic", topicName), ) @@ -197,6 +235,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi } continue } + c.created.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet( semconv.MessagingSystemKey.String("kafka"), @@ -204,85 +243,157 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi attribute.String("topic", topicName), ), )) + logger.Info("created kafka topic", zap.String("topic", topicName)) } - // Update the topic partitions. - if len(updatePartitions) > 0 { - updateResp, err := c.m.adminClient.UpdatePartitions(ctx, - c.partitionCount, - updatePartitions..., + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + + return nil +} + +func (c *TopicCreator) updateTopicPartitions( + ctx context.Context, + span trace.Span, + updatePartitions []string, +) error { + if len(updatePartitions) == 0 { + return nil + } + + updateResp, err := c.m.adminClient.UpdatePartitions( + ctx, + c.partitionCount, + updatePartitions..., + ) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update partitions for kafka topics: %v: %w", + updatePartitions, err, + ) + } + + namespacePrefix := c.m.cfg.namespacePrefix() + + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), ) - if err != nil { + } + + var updateErrors []error + for _, response := range updateResp.Sorted() { + topicName := strings.TrimPrefix(response.Topic, namespacePrefix) + + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if errors.Is(response.Err, kerr.InvalidRequest) { + // If UpdatePartitions partition count isn't greater than the + // current number of partitions, each individual response + // returns `INVALID_REQUEST`. + continue + } + + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf("failed to update partitions for kafka topics: %v: %w", - updatePartitions, err, - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to update partitions for topic %q: %w", + topicName, err, + )) + continue } - for _, response := range updateResp.Sorted() { - topicName := strings.TrimPrefix(response.Topic, namespacePrefix) - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - if errors.Is(response.Err, kerr.InvalidRequest) { - // If UpdatePartitions partition count isn't greater than the - // current number of partitions, each individual response - // returns `INVALID_REQUEST`. - continue - } - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to update partitions for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("updated partitions for kafka topic", - zap.String("topic", topicName), - ) - } + logger.Info( + "updated partitions for kafka topic", + zap.String("topic", topicName), + ) } - if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { - alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) - for k, v := range c.topicConfigs { - alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) - } - alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, - alterCfg, existingTopics..., + + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + + return nil +} + +func (c *TopicCreator) alterTopicConfigs( + ctx context.Context, + span trace.Span, + existingTopics []string, +) error { + if len(existingTopics) == 0 || len(c.topicConfigs) == 0 { + return nil + } + + // Remove cleanup.policy if it exists, + // since this field cannot be altered. + delete(c.topicConfigs, "cleanup.policy") + + alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) + for k, v := range c.topicConfigs { + alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) + } + + alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, existingTopics...) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update configuration for kafka topics: %v: %w", + existingTopics, err, + ) + } + + namespacePrefix := c.m.cfg.namespacePrefix() + + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), ) - if err != nil { + } + + var updateErrors []error + for _, response := range alterResp { + topicName := strings.TrimPrefix(response.Name, namespacePrefix) + + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf( - "failed to update configuration for kafka topics: %v:%w", - existingTopics, err, - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to alter configuration for topic %q: %w", + topicName, err, + )) + continue } - for _, response := range alterResp { - topicName := strings.TrimPrefix(response.Name, namespacePrefix) - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to alter configuration for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("altered configuration for kafka topic", - zap.String("topic", topicName), - ) - } + logger.Info( + "altered configuration for kafka topic", + zap.String("topic", topicName), + ) } - return errors.Join(updateErrors...) + + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + + return nil } diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 919651e7..1e70cfd6 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -19,6 +19,7 @@ package kafka import ( "context" + "sort" "strings" "testing" "time" @@ -59,7 +60,7 @@ func TestNewTopicCreator(t *testing.T) { }, "\n")) } -func TestTopicCreatorCreateTopics(t *testing.T) { +func TestCreateTopics(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exp), @@ -74,18 +75,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) + t.Cleanup(func() { m.Close() }) - c, err := m.NewTopicCreator(TopicCreatorConfig{ - PartitionCount: 123, - TopicConfigs: map[string]string{ - "retention.ms": "123", - }, - MeterProvider: mt.MeterProvider, - }) - require.NoError(t, err) - // Simulate a situation where topic1, topic4 exists, topic2 is invalid and - // topic3 is successfully created. + // Simulate a situation where topic1, topic4 exists, topic2 is invalid and topic3 is successfully created. var createTopicsRequest *kmsg.CreateTopicsRequest cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createTopicsRequest = req.(*kmsg.CreateTopicsRequest) @@ -109,8 +102,8 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }}, }, nil, true }) - // Topics 1 and 4 are already created, and their partition count is set to - // the appropriate value. + + // Topics 1 and 4 are already created, and their partition count is set to the appropriate value. var createPartitionsRequest *kmsg.CreatePartitionsRequest cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) @@ -123,7 +116,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { return &res, nil, true }) - // Since topic 1 and 4 already exist, their configuration is altered. + // Since topic 1 already exist, their configuration is altered. var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) @@ -154,6 +147,16 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }, nil, true }) + c, err := m.NewTopicCreator(TopicCreatorConfig{ + PartitionCount: 123, + TopicConfigs: map[string]string{ + "retention.ms": "123", + "cleanup.policy": "compact,delete", + }, + MeterProvider: mt.MeterProvider, + }) + require.NoError(t, err) + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ @@ -161,55 +164,95 @@ func TestTopicCreatorCreateTopics(t *testing.T) { ) require.Len(t, createTopicsRequest.Topics, 4) - assert.Equal(t, []kmsg.CreateTopicsRequestTopic{{ - Topic: "name_space-topic1", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic2", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic3", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic4", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }}, createTopicsRequest.Topics) + + expected := []kmsg.CreateTopicsRequestTopic{ + { + Topic: "name_space-topic1", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "name_space-topic2", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "name_space-topic3", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "name_space-topic4", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, + } + + sortTopicConfigs(t, expected) + sortTopicConfigs(t, createTopicsRequest.Topics) + assert.Equal(t, expected, createTopicsRequest.Topics) // Ensure only `topic0` partitions are updated since it already exists. assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ {Topic: "name_space-topic0", Count: 123}, }, createPartitionsRequest.Topics) + // Ensure the `cleanup.policy` field for `topic0` is deleted. + expectedAlter := []kmsg.IncrementalAlterConfigsRequestResource{ + { + ResourceType: kmsg.ConfigResourceTypeTopic, + ResourceName: "name_space-topic0", + Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{ + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, + } + // Ensure only the existing topic config is updated since it already exists. assert.Len(t, alterConfigsRequest.Resources, 1) - assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{ - ResourceType: kmsg.ConfigResourceTypeTopic, - ResourceName: "name_space-topic0", - Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }}, alterConfigsRequest.Resources) + sortAlteredConfigs(t, expectedAlter) + sortAlteredConfigs(t, alterConfigsRequest.Resources) + assert.Equal(t, expectedAlter, alterConfigsRequest.Resources) // Log assertions. matchingLogs := observedLogs.FilterFieldKey("topic") @@ -225,7 +268,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), zap.String("topic", "topic1"), }, }, { @@ -237,7 +283,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), zap.String("topic", "topic4"), }, }, { @@ -249,7 +298,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), zap.String("topic", "topic3"), }, }, { @@ -257,7 +309,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), zap.String("topic", "topic0"), }, }, { @@ -265,7 +320,10 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Context: []zapcore.Field{ zap.String("namespace", "name_space"), zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), zap.String("topic", "topic0"), }, }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { @@ -317,6 +375,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { rm, err := mt.Collect(context.Background()) require.NoError(t, err) + // Filter all other kafka metrics. var metrics []metricdata.Metrics for _, sm := range rm.ScopeMetrics { @@ -325,6 +384,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { break } } + // Ensure only 1 topic was created, which also matches the number of spans. assert.Equal(t, metrictest.Int64Metrics{ {Name: "topics.created.count"}: { @@ -336,3 +396,21 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }, }, metrictest.GatherInt64Metric(metrics)) } + +func sortTopicConfigs(t *testing.T, topics []kmsg.CreateTopicsRequestTopic) { + t.Helper() + for i := range topics { + sort.Slice(topics[i].Configs, func(a, b int) bool { + return topics[i].Configs[a].Name < topics[i].Configs[b].Name + }) + } +} + +func sortAlteredConfigs(t *testing.T, topics []kmsg.IncrementalAlterConfigsRequestResource) { + t.Helper() + for i := range topics { + sort.Slice(topics[i].Configs, func(a, b int) bool { + return topics[i].Configs[a].Name < topics[i].Configs[b].Name + }) + } +}