From 6526adef506f21f0302b6cc08de83c56e70cc270 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Mon, 8 Sep 2025 17:33:26 -0400 Subject: [PATCH 01/11] add create admin topic interface --- kafka/topiccreator.go | 82 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 50a1ffce..5e2a179a 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -104,10 +104,90 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) }, nil } +func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topic) error { + fmt.Println("-> CreateAdminTopic") + + ctx, span := c.m.tracer.Start( + ctx, + "CreateAdminTopic", + trace.WithAttributes( + semconv.MessagingSystemKey.String("kafka"), + ), + ) + defer span.End() + + response, err := c.m.adminClient.CreateTopic( + ctx, + int32(c.partitionCount), + -1, // default.replication.factor + c.topicConfigs, + string(topic), + ) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf("failed to create kafka admin topic: %w", err) + } + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), + ) + } + fmt.Println("-> responses:", response) + + var updateErrors []error + topicName := response.Topic + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if err := response.Err; err != nil { + if errors.Is(err, kerr.TopicAlreadyExists) { + logger.Debug("kafka admin topic already exists", + zap.String("topic", topicName), + ) + span.AddEvent("kafka admin topic already exists", trace.WithAttributes( + semconv.MessagingDestinationKey.String(topicName), + )) + } else { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to create admin topic %q: %w", topicName, err, + )) + c.created.Add(context.Background(), 1, metric.WithAttributeSet( + attribute.NewSet( + semconv.MessagingSystemKey.String("kafka"), + attribute.String("outcome", "failure"), + attribute.String("topic", topicName), + ), + )) + } + } + + c.created.Add(context.Background(), 1, metric.WithAttributeSet( + attribute.NewSet( + semconv.MessagingSystemKey.String("kafka"), + attribute.String("outcome", "success"), + attribute.String("topic", topicName), + ), + )) + + logger.Info("created kafka admin topic", zap.String("topic", topicName)) + + return errors.Join(updateErrors...) +} + // CreateTopics creates one or more topics. // // Topics that already exist will be updated. func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error { + fmt.Println("-> apmqueue: CreateTopics") + // TODO(axw) how should we record topics? ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes( semconv.MessagingSystemKey.String("kafka"), @@ -247,6 +327,7 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi ) } } + if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) for k, v := range c.topicConfigs { @@ -284,5 +365,6 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi ) } } + return errors.Join(updateErrors...) } From ac534c3d5a5838cfca319e2d3626b3796614838a Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Mon, 8 Sep 2025 18:19:37 -0400 Subject: [PATCH 02/11] check if admin topic exists before creation --- cmd/queuebench/bench.go | 2 +- kafka/log_compacted_consumer_test.go | 2 +- kafka/topiccreator.go | 120 +++++++++++++++------------ kafka/topiccreator_test.go | 26 +++--- systemtest/infra_kafka.go | 2 +- systemtest/manager_test.go | 8 +- 6 files changed, 85 insertions(+), 75 deletions(-) diff --git a/cmd/queuebench/bench.go b/cmd/queuebench/bench.go index fdf74a4d..8ab1281f 100644 --- a/cmd/queuebench/bench.go +++ b/cmd/queuebench/bench.go @@ -115,7 +115,7 @@ func createTopics(ctx context.Context, mngr *kafka.Manager, cfg kafka.TopicCreat } for _, topic := range topics { - err = creator.CreateTopics(ctx, topic) + err = creator.CreateProjectTopics(ctx, topic) if err != nil { return fmt.Errorf("cannot create topics: %w", err) } diff --git a/kafka/log_compacted_consumer_test.go b/kafka/log_compacted_consumer_test.go index 182f6ccd..f084036f 100644 --- a/kafka/log_compacted_consumer_test.go +++ b/kafka/log_compacted_consumer_test.go @@ -118,7 +118,7 @@ func newLogCompactedFakeCluster(tb testing.TB, topic string, partitions int32) * require.NoError(tb, err) kadmClient := kadm.NewClient(c) - tresp, err := kadmClient.CreateTopic(context.Background(), partitions, 1, map[string]*string{ + tresp, err := kadmClient.CreateProjectTopic(context.Background(), partitions, 1, map[string]*string{ "cleanup.policy": kadm.StringPtr("compact"), }, topic) require.NoError(tb, err) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 5e2a179a..d3844da4 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -116,80 +116,90 @@ func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topi ) defer span.End() - response, err := c.m.adminClient.CreateTopic( - ctx, - int32(c.partitionCount), - -1, // default.replication.factor - c.topicConfigs, - string(topic), - ) + existing, err := c.m.adminClient.ListTopics(ctx) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf("failed to create kafka admin topic: %w", err) - } - loggerFields := []zap.Field{ - zap.Int("partition_count", c.partitionCount), - } - if len(c.origTopicConfigs) > 0 { - loggerFields = append(loggerFields, - zap.Reflect("topic_configs", c.origTopicConfigs), - ) + return fmt.Errorf("failed to list kafka topics: %w", err) } - fmt.Println("-> responses:", response) var updateErrors []error - topicName := response.Topic - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - - if err := response.Err; err != nil { - if errors.Is(err, kerr.TopicAlreadyExists) { - logger.Debug("kafka admin topic already exists", - zap.String("topic", topicName), - ) - span.AddEvent("kafka admin topic already exists", trace.WithAttributes( - semconv.MessagingDestinationKey.String(topicName), - )) - } else { + if !existing.Has(string(topic)) { + response, err := c.m.adminClient.CreateTopic( + ctx, + int32(c.partitionCount), + -1, // default.replication.factor + c.topicConfigs, + string(topic), + ) + if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to create admin topic %q: %w", topicName, err, - )) - c.created.Add(context.Background(), 1, metric.WithAttributeSet( - attribute.NewSet( - semconv.MessagingSystemKey.String("kafka"), - attribute.String("outcome", "failure"), - attribute.String("topic", topicName), - ), - )) + return fmt.Errorf("failed to create kafka admin topic: %w", err) } - } - c.created.Add(context.Background(), 1, metric.WithAttributeSet( - attribute.NewSet( - semconv.MessagingSystemKey.String("kafka"), - attribute.String("outcome", "success"), - attribute.String("topic", topicName), - ), - )) + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), + ) + } + fmt.Println("-> responses:", response) - logger.Info("created kafka admin topic", zap.String("topic", topicName)) + topicName := response.Topic + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if err := response.Err; err != nil { + if errors.Is(err, kerr.TopicAlreadyExists) { + logger.Debug("kafka admin topic already exists", + zap.String("topic", topicName), + ) + span.AddEvent("kafka admin topic already exists", trace.WithAttributes( + semconv.MessagingDestinationKey.String(topicName), + )) + } else { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to create admin topic %q: %w", topicName, err, + )) + c.created.Add(context.Background(), 1, metric.WithAttributeSet( + attribute.NewSet( + semconv.MessagingSystemKey.String("kafka"), + attribute.String("outcome", "failure"), + attribute.String("topic", topicName), + ), + )) + } + } + + c.created.Add(context.Background(), 1, metric.WithAttributeSet( + attribute.NewSet( + semconv.MessagingSystemKey.String("kafka"), + attribute.String("outcome", "success"), + attribute.String("topic", topicName), + ), + )) + + logger.Info("created kafka admin topic", zap.String("topic", topicName)) + } return errors.Join(updateErrors...) } -// CreateTopics creates one or more topics. +// CreateProjectTopics creates one or more topics. // // Topics that already exist will be updated. -func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error { - fmt.Println("-> apmqueue: CreateTopics") +func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmqueue.Topic) error { + fmt.Println("-> apmqueue: CreateProjectTopics") // TODO(axw) how should we record topics? - ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes( + ctx, span := c.m.tracer.Start(ctx, "CreateProjectTopics", trace.WithAttributes( semconv.MessagingSystemKey.String("kafka"), )) defer span.End() diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 919651e7..08d47139 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -59,7 +59,7 @@ func TestNewTopicCreator(t *testing.T) { }, "\n")) } -func TestTopicCreatorCreateTopics(t *testing.T) { +func TestTopicCreatorCreateProjectTopics(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exp), @@ -86,12 +86,12 @@ func TestTopicCreatorCreateTopics(t *testing.T) { // Simulate a situation where topic1, topic4 exists, topic2 is invalid and // topic3 is successfully created. - var createTopicsRequest *kmsg.CreateTopicsRequest - cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { - createTopicsRequest = req.(*kmsg.CreateTopicsRequest) - return &kmsg.CreateTopicsResponse{ + var createTopicsRequest *kmsg.CreateProjectTopicsRequest + cluster.ControlKey(kmsg.CreateProjectTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + createTopicsRequest = req.(*kmsg.CreateProjectTopicsRequest) + return &kmsg.CreateProjectTopicsResponse{ Version: req.GetVersion(), - Topics: []kmsg.CreateTopicsResponseTopic{{ + Topics: []kmsg.CreateProjectTopicsResponseTopic{{ Topic: "name_space-topic1", ErrorCode: kerr.TopicAlreadyExists.Code, ErrorMessage: &kerr.TopicAlreadyExists.Message, @@ -154,18 +154,18 @@ func TestTopicCreatorCreateTopics(t *testing.T) { }, nil, true }) - err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") + err = c.CreateProjectTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, ) require.Len(t, createTopicsRequest.Topics, 4) - assert.Equal(t, []kmsg.CreateTopicsRequestTopic{{ + assert.Equal(t, []kmsg.CreateProjectTopicsRequestTopic{{ Topic: "name_space-topic1", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -173,7 +173,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Topic: "name_space-topic2", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -181,7 +181,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Topic: "name_space-topic3", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -189,7 +189,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { Topic: "name_space-topic4", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -290,7 +290,7 @@ func TestTopicCreatorCreateTopics(t *testing.T) { spans := exp.GetSpans() require.Len(t, spans, 1) - assert.Equal(t, "CreateTopics", spans[0].Name) + assert.Equal(t, "CreateProjectTopics", spans[0].Name) assert.Equal(t, codes.Error, spans[0].Status.Code) require.Len(t, spans[0].Events, 3) diff --git a/systemtest/infra_kafka.go b/systemtest/infra_kafka.go index 76fe1893..faa30865 100644 --- a/systemtest/infra_kafka.go +++ b/systemtest/infra_kafka.go @@ -167,7 +167,7 @@ func CreateKafkaTopics(ctx context.Context, t testing.TB, partitions int, topics }) require.NoError(t, err) - err = topicCreator.CreateTopics(ctx, topics...) + err = topicCreator.CreateProjectTopics(ctx, topics...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, manager.DeleteTopics( diff --git a/systemtest/manager_test.go b/systemtest/manager_test.go index 6be48bef..eee2ed78 100644 --- a/systemtest/manager_test.go +++ b/systemtest/manager_test.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/apm-queue/v2/kafka" ) -func TestManagerCreateTopics(t *testing.T) { +func TestManagerCreateProjectTopics(t *testing.T) { // This test covers: // - Creating topics with a number of partitions // - Updating the partition count @@ -58,14 +58,14 @@ func TestManagerCreateTopics(t *testing.T) { } topics = SuffixTopics(topics...) // Create topics for the first time. - assert.NoError(t, creator.CreateTopics(ctx, topics...)) + assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) // New creator with increased partitions cfg.PartitionCount = 4 creator, err = manager.NewTopicCreator(cfg) require.NoError(t, err) // Update Partition count. - assert.NoError(t, creator.CreateTopics(ctx, topics...)) + assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) // New creator with increased retention cfg.TopicConfigs = map[string]string{ @@ -74,6 +74,6 @@ func TestManagerCreateTopics(t *testing.T) { creator, err = manager.NewTopicCreator(cfg) require.NoError(t, err) // Update topic configuration. - assert.NoError(t, creator.CreateTopics(ctx, topics...)) + assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) }) } From d2c819204b5eb66cff18a12d74f2011a996f7ae5 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Mon, 8 Sep 2025 20:55:05 -0400 Subject: [PATCH 03/11] refactor topic creation code --- kafka/topiccreator.go | 335 ++++++++++++++++++++++++------------------ 1 file changed, 189 insertions(+), 146 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index d3844da4..f4d4653f 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -104,9 +104,10 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) }, nil } +// CreateAdminTopic creates a single log compacted topic +// +// Topics that already exist will be ignored. func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topic) error { - fmt.Println("-> CreateAdminTopic") - ctx, span := c.m.tracer.Start( ctx, "CreateAdminTopic", @@ -123,85 +124,26 @@ func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topi return fmt.Errorf("failed to list kafka topics: %w", err) } - var updateErrors []error - if !existing.Has(string(topic)) { - response, err := c.m.adminClient.CreateTopic( - ctx, - int32(c.partitionCount), - -1, // default.replication.factor - c.topicConfigs, - string(topic), - ) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf("failed to create kafka admin topic: %w", err) - } + missingTopics, _, _ := c.categorizeTopics(existing, []string{string(topic)}) - loggerFields := []zap.Field{ - zap.Int("partition_count", c.partitionCount), - } - if len(c.origTopicConfigs) > 0 { - loggerFields = append(loggerFields, - zap.Reflect("topic_configs", c.origTopicConfigs), - ) - } - fmt.Println("-> responses:", response) - - topicName := response.Topic - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - - if err := response.Err; err != nil { - if errors.Is(err, kerr.TopicAlreadyExists) { - logger.Debug("kafka admin topic already exists", - zap.String("topic", topicName), - ) - span.AddEvent("kafka admin topic already exists", trace.WithAttributes( - semconv.MessagingDestinationKey.String(topicName), - )) - } else { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to create admin topic %q: %w", topicName, err, - )) - c.created.Add(context.Background(), 1, metric.WithAttributeSet( - attribute.NewSet( - semconv.MessagingSystemKey.String("kafka"), - attribute.String("outcome", "failure"), - attribute.String("topic", topicName), - ), - )) - } - } - - c.created.Add(context.Background(), 1, metric.WithAttributeSet( - attribute.NewSet( - semconv.MessagingSystemKey.String("kafka"), - attribute.String("outcome", "success"), - attribute.String("topic", topicName), - ), - )) - - logger.Info("created kafka admin topic", zap.String("topic", topicName)) + if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { + return err } - return errors.Join(updateErrors...) + return nil } // CreateProjectTopics creates one or more topics. // // Topics that already exist will be updated. func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmqueue.Topic) error { - fmt.Println("-> apmqueue: CreateProjectTopics") - - // TODO(axw) how should we record topics? - ctx, span := c.m.tracer.Start(ctx, "CreateProjectTopics", trace.WithAttributes( - semconv.MessagingSystemKey.String("kafka"), - )) + ctx, span := c.m.tracer.Start( + ctx, + "CreateProjectTopics", + trace.WithAttributes( + semconv.MessagingSystemKey.String("kafka"), + ), + ) defer span.End() namespacePrefix := c.m.cfg.namespacePrefix() @@ -217,12 +159,31 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque return fmt.Errorf("failed to list kafka topics: %w", err) } - // missingTopics contains topics which need to be created. - missingTopics := make([]string, 0, len(topicNames)) - // updatePartitions contains topics which partitions' need to be updated. - updatePartitions := make([]string, 0, len(topicNames)) - // existingTopics contains the existing topics, used by AlterTopicConfigs. - existingTopics := make([]string, 0, len(topicNames)) + missingTopics, updatePartitions, existingTopics := c.categorizeTopics(existing, topicNames) + + if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { + return err + } + + if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil { + return err + } + + if err := c.alterTopicConfigs(ctx, span, existingTopics); err != nil { + return err + } + + return nil +} + +func (c *TopicCreator) categorizeTopics( + existing kadm.TopicDetails, + topicNames []string, +) (missingTopics, updatePartitions, existingTopics []string) { + missingTopics = make([]string, 0, len(topicNames)) + updatePartitions = make([]string, 0, len(topicNames)) + existingTopics = make([]string, 0, len(topicNames)) + for _, wantTopic := range topicNames { if !existing.Has(wantTopic) { missingTopics = append(missingTopics, wantTopic) @@ -234,6 +195,18 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque } } + return missingTopics, updatePartitions, existingTopics +} + +func (c *TopicCreator) createMissingTopics( + ctx context.Context, + span trace.Span, + missingTopics []string, +) error { + if len(missingTopics) == 0 { + return nil + } + responses, err := c.m.adminClient.CreateTopics(ctx, int32(c.partitionCount), -1, // default.replication.factor @@ -245,6 +218,9 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque span.SetStatus(codes.Error, err.Error()) return fmt.Errorf("failed to create kafka topics: %w", err) } + + namespacePrefix := c.m.cfg.namespacePrefix() + loggerFields := []zap.Field{ zap.Int("partition_count", c.partitionCount), } @@ -257,14 +233,14 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque var updateErrors []error for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) + logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldsFunc != nil { logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) } + if err := response.Err; err != nil { if errors.Is(err, kerr.TopicAlreadyExists) { - // NOTE(axw) apmotel currently does nothing with span events, - // hence we log as well as create a span event. logger.Debug("kafka topic already exists", zap.String("topic", topicName), ) @@ -287,6 +263,7 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque } continue } + c.created.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet( semconv.MessagingSystemKey.String("kafka"), @@ -294,87 +271,153 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque attribute.String("topic", topicName), ), )) + logger.Info("created kafka topic", zap.String("topic", topicName)) } - // Update the topic partitions. - if len(updatePartitions) > 0 { - updateResp, err := c.m.adminClient.UpdatePartitions(ctx, - c.partitionCount, - updatePartitions..., + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + + return nil +} + +func (c *TopicCreator) updateTopicPartitions( + ctx context.Context, + span trace.Span, + updatePartitions []string, +) error { + if len(updatePartitions) == 0 { + return nil + } + + updateResp, err := c.m.adminClient.UpdatePartitions( + ctx, + c.partitionCount, + updatePartitions..., + ) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update partitions for kafka topics: %v: %w", + updatePartitions, err, + ) + } + + namespacePrefix := c.m.cfg.namespacePrefix() + + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), ) - if err != nil { + } + + var updateErrors []error + for _, response := range updateResp.Sorted() { + topicName := strings.TrimPrefix(response.Topic, namespacePrefix) + + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if errors.Is(response.Err, kerr.InvalidRequest) { + // If UpdatePartitions partition count isn't greater than the + // current number of partitions, each individual response + // returns `INVALID_REQUEST`. + continue + } + + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf("failed to update partitions for kafka topics: %v: %w", - updatePartitions, err, - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to update partitions for topic %q: %w", + topicName, err, + )) + continue } - for _, response := range updateResp.Sorted() { - topicName := strings.TrimPrefix(response.Topic, namespacePrefix) - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - if errors.Is(response.Err, kerr.InvalidRequest) { - // If UpdatePartitions partition count isn't greater than the - // current number of partitions, each individual response - // returns `INVALID_REQUEST`. - continue - } - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to update partitions for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("updated partitions for kafka topic", - zap.String("topic", topicName), - ) - } + logger.Info( + "updated partitions for kafka topic", + zap.String("topic", topicName), + ) } - if len(existingTopics) > 0 && len(c.topicConfigs) > 0 { - alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) - for k, v := range c.topicConfigs { - alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) - } - alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, - alterCfg, existingTopics..., + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + + return nil +} + +func (c *TopicCreator) alterTopicConfigs( + ctx context.Context, + span trace.Span, + existingTopics []string, +) error { + if len(existingTopics) == 0 || len(c.topicConfigs) == 0 { + return nil + } + + alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) + for k, v := range c.topicConfigs { + alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) + } + + alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, existingTopics...) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return fmt.Errorf( + "failed to update configuration for kafka topics: %v: %w", + existingTopics, err, + ) + } + + namespacePrefix := c.m.cfg.namespacePrefix() + + loggerFields := []zap.Field{ + zap.Int("partition_count", c.partitionCount), + } + if len(c.origTopicConfigs) > 0 { + loggerFields = append(loggerFields, + zap.Reflect("topic_configs", c.origTopicConfigs), ) - if err != nil { + } + + var updateErrors []error + for _, response := range alterResp { + topicName := strings.TrimPrefix(response.Name, namespacePrefix) + + logger := c.m.cfg.Logger.With(loggerFields...) + if c.m.cfg.TopicLogFieldsFunc != nil { + logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) + } + + if err := response.Err; err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf( - "failed to update configuration for kafka topics: %v:%w", - existingTopics, err, - ) + updateErrors = append(updateErrors, fmt.Errorf( + "failed to alter configuration for topic %q: %w", + topicName, err, + )) + continue } - for _, response := range alterResp { - topicName := strings.TrimPrefix(response.Name, namespacePrefix) - logger := c.m.cfg.Logger.With(loggerFields...) - if c.m.cfg.TopicLogFieldsFunc != nil { - logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...) - } - if err := response.Err; err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - updateErrors = append(updateErrors, fmt.Errorf( - "failed to alter configuration for topic %q: %w", - topicName, err, - )) - continue - } - logger.Info("altered configuration for kafka topic", - zap.String("topic", topicName), - ) - } + logger.Info( + "altered configuration for kafka topic", + zap.String("topic", topicName), + ) + } + + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) } - return errors.Join(updateErrors...) + return nil } From 686814a1a02edc6a807670aa0f2858cf4fdf4edb Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Mon, 8 Sep 2025 20:59:33 -0400 Subject: [PATCH 04/11] fix global renaming --- kafka/log_compacted_consumer_test.go | 2 +- kafka/topiccreator_test.go | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/kafka/log_compacted_consumer_test.go b/kafka/log_compacted_consumer_test.go index f084036f..182f6ccd 100644 --- a/kafka/log_compacted_consumer_test.go +++ b/kafka/log_compacted_consumer_test.go @@ -118,7 +118,7 @@ func newLogCompactedFakeCluster(tb testing.TB, topic string, partitions int32) * require.NoError(tb, err) kadmClient := kadm.NewClient(c) - tresp, err := kadmClient.CreateProjectTopic(context.Background(), partitions, 1, map[string]*string{ + tresp, err := kadmClient.CreateTopic(context.Background(), partitions, 1, map[string]*string{ "cleanup.policy": kadm.StringPtr("compact"), }, topic) require.NoError(tb, err) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 08d47139..b86ecdc1 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -59,7 +59,7 @@ func TestNewTopicCreator(t *testing.T) { }, "\n")) } -func TestTopicCreatorCreateProjectTopics(t *testing.T) { +func TestTopicCreatorTopics(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exp), @@ -86,12 +86,12 @@ func TestTopicCreatorCreateProjectTopics(t *testing.T) { // Simulate a situation where topic1, topic4 exists, topic2 is invalid and // topic3 is successfully created. - var createTopicsRequest *kmsg.CreateProjectTopicsRequest - cluster.ControlKey(kmsg.CreateProjectTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { - createTopicsRequest = req.(*kmsg.CreateProjectTopicsRequest) - return &kmsg.CreateProjectTopicsResponse{ + var createTopicsRequest *kmsg.CreateTopicsRequest + cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + createTopicsRequest = req.(*kmsg.CreateTopicsRequest) + return &kmsg.CreateTopicsResponse{ Version: req.GetVersion(), - Topics: []kmsg.CreateProjectTopicsResponseTopic{{ + Topics: []kmsg.CreateTopicsResponseTopic{{ Topic: "name_space-topic1", ErrorCode: kerr.TopicAlreadyExists.Code, ErrorMessage: &kerr.TopicAlreadyExists.Message, @@ -161,11 +161,11 @@ func TestTopicCreatorCreateProjectTopics(t *testing.T) { ) require.Len(t, createTopicsRequest.Topics, 4) - assert.Equal(t, []kmsg.CreateProjectTopicsRequestTopic{{ + assert.Equal(t, []kmsg.CreateTopicsRequestTopic{{ Topic: "name_space-topic1", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -173,7 +173,7 @@ func TestTopicCreatorCreateProjectTopics(t *testing.T) { Topic: "name_space-topic2", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -181,7 +181,7 @@ func TestTopicCreatorCreateProjectTopics(t *testing.T) { Topic: "name_space-topic3", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, @@ -189,7 +189,7 @@ func TestTopicCreatorCreateProjectTopics(t *testing.T) { Topic: "name_space-topic4", NumPartitions: 123, ReplicationFactor: -1, - Configs: []kmsg.CreateProjectTopicsRequestTopicConfig{{ + Configs: []kmsg.CreateTopicsRequestTopicConfig{{ Name: "retention.ms", Value: kmsg.StringPtr("123"), }}, From 02ed8151504b7b0fe82d41d224b34abb1d4b241d Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 9 Sep 2025 11:54:57 -0400 Subject: [PATCH 05/11] aggregate errors --- kafka/topiccreator.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index f4d4653f..cc9cfacf 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -161,19 +161,20 @@ func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmque missingTopics, updatePartitions, existingTopics := c.categorizeTopics(existing, topicNames) + var updateErrors []error if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { - return err + updateErrors = append(updateErrors, err) } if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil { - return err + updateErrors = append(updateErrors, err) } if err := c.alterTopicConfigs(ctx, span, existingTopics); err != nil { - return err + updateErrors = append(updateErrors, err) } - return nil + return errors.Join(updateErrors...) } func (c *TopicCreator) categorizeTopics( From a22b6cf59533838f61d9f7db39c5b814193cb2f7 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 9 Sep 2025 12:01:44 -0400 Subject: [PATCH 06/11] increase timeout for CI --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 8464ec6c..7a2a4c88 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -GO_TEST_TIMEOUT=180s +GO_TEST_TIMEOUT=240s GOTESTFLAGS= GO_TEST_COUNT=10 From 29a5de6be9df9c6f965c300c1fc89e1a77504f09 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 9 Sep 2025 17:16:35 -0400 Subject: [PATCH 07/11] test: add unit test for admin topic creation --- kafka/topiccreator.go | 23 +++- kafka/topiccreator_test.go | 265 ++++++++++++++++++++++++++++++++++++- 2 files changed, 281 insertions(+), 7 deletions(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index cc9cfacf..58f0ff2f 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -104,19 +104,24 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) }, nil } -// CreateAdminTopic creates a single log compacted topic +// CreateAdminTopics creates a single log compacted topic // // Topics that already exist will be ignored. -func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topic) error { +func (c *TopicCreator) CreateAdminTopics(ctx context.Context, topics ...apmqueue.Topic) error { ctx, span := c.m.tracer.Start( ctx, - "CreateAdminTopic", + "CreateAdminTopics", trace.WithAttributes( semconv.MessagingSystemKey.String("kafka"), ), ) defer span.End() + topicNames := make([]string, len(topics)) + for i, topic := range topics { + topicNames[i] = fmt.Sprintf("%s", topic) + } + existing, err := c.m.adminClient.ListTopics(ctx) if err != nil { span.RecordError(err) @@ -124,13 +129,18 @@ func (c *TopicCreator) CreateAdminTopic(ctx context.Context, topic apmqueue.Topi return fmt.Errorf("failed to list kafka topics: %w", err) } - missingTopics, _, _ := c.categorizeTopics(existing, []string{string(topic)}) + missingTopics, updatePartitions, _ := c.categorizeTopics(existing, topicNames) + var updateErrors []error if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { - return err + updateErrors = append(updateErrors, err) } - return nil + if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil { + updateErrors = append(updateErrors, err) + } + + return errors.Join(updateErrors...) } // CreateProjectTopics creates one or more topics. @@ -234,6 +244,7 @@ func (c *TopicCreator) createMissingTopics( var updateErrors []error for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) + fmt.Println(":: topicName:", topicName) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldsFunc != nil { diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index b86ecdc1..5a612640 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -19,6 +19,7 @@ package kafka import ( "context" + "sort" "strings" "testing" "time" @@ -59,7 +60,7 @@ func TestNewTopicCreator(t *testing.T) { }, "\n")) } -func TestTopicCreatorTopics(t *testing.T) { +func TestCreateProjectTopics(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exp), @@ -336,3 +337,265 @@ func TestTopicCreatorTopics(t *testing.T) { }, }, metrictest.GatherInt64Metric(metrics)) } + +func TestCreateAdminTopics(t *testing.T) { + exp := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exp), + ) + defer tp.Shutdown(context.Background()) + + mt := metrictest.New() + cluster, commonConfig := newFakeCluster(t) + core, observedLogs := observer.New(zapcore.DebugLevel) + commonConfig.Logger = zap.New(core) + commonConfig.TracerProvider = tp + + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) + require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + c, err := m.NewTopicCreator(TopicCreatorConfig{ + PartitionCount: 123, + TopicConfigs: map[string]string{ + "retention.ms": "123", + "cleanup.policy": "compact,delete", + }, + MeterProvider: mt.MeterProvider, + }) + require.NoError(t, err) + + // Simulate a situation where topic1 exists, topic2 is invalid, and topic3 is successfully created. + var createTopicsRequest *kmsg.CreateTopicsRequest + cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + createTopicsRequest = req.(*kmsg.CreateTopicsRequest) + return &kmsg.CreateTopicsResponse{ + Version: req.GetVersion(), + Topics: []kmsg.CreateTopicsResponseTopic{{ + Topic: "topic1", + ErrorCode: kerr.TopicAlreadyExists.Code, + ErrorMessage: &kerr.TopicAlreadyExists.Message, + }, { + Topic: "topic2", + ErrorCode: kerr.InvalidTopicException.Code, + ErrorMessage: &kerr.InvalidTopicException.Message, + }, { + Topic: "topic3", + TopicID: [16]byte{123}, + }}, + }, nil, true + }) + + // Topics 1 and 4 are already created, and their partition count is set to the appropriate value. + var createPartitionsRequest *kmsg.CreatePartitionsRequest + cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) + res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()} + for _, t := range createPartitionsRequest.Topics { + res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{ + Topic: t.Topic, + }) + } + return &res, nil, true + }) + + // Allow some time for the ForceMetadataRefresh to run. + <-time.After(10 * time.Millisecond) + + // Simulate topic0 already exists in Kafka. + cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { + return &kmsg.MetadataResponse{ + Version: r.GetVersion(), + Topics: []kmsg.MetadataResponseTopic{{ + Topic: kmsg.StringPtr("topic0"), + TopicID: [16]byte{111}, + Partitions: []kmsg.MetadataResponseTopicPartition{{ + Partition: 0, + }}, + }}, + }, nil, true + }) + + err = c.CreateAdminTopics(context.Background(), "topic0", "topic1", "topic2", "topic3") + require.Error(t, err) + assert.EqualError(t, err, `failed to create topic "topic2": `+ + `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, + ) + + require.Len(t, createTopicsRequest.Topics, 3) + + expected := []kmsg.CreateTopicsRequestTopic{ + { + Topic: "topic1", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "topic2", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "topic3", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, + } + + sortConfigs(t, expected) + sortConfigs(t, createTopicsRequest.Topics) + assert.Equal(t, expected, createTopicsRequest.Topics) + + // Ensure only `topic0` partitions are updated since it already exists. + assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ + {Topic: "topic0", Count: 123}, + }, createPartitionsRequest.Topics) + + // Log assertions. + matchingLogs := observedLogs.FilterFieldKey("topic") + for _, ml := range matchingLogs.AllUntimed() { + t.Log(ml.Message) + } + + diff := cmp.Diff([]observer.LoggedEntry{{ + Entry: zapcore.Entry{ + Level: zapcore.DebugLevel, + LoggerName: "kafka", + Message: "kafka topic already exists", + }, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), + zap.String("topic", "topic1"), + }, + }, { + Entry: zapcore.Entry{ + Level: zapcore.InfoLevel, + LoggerName: "kafka", + Message: "created kafka topic", + }, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), + zap.String("topic", "topic3"), + }, + }, { + Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"}, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), + zap.String("topic", "topic0"), + }, + }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { + var ai, bi int + for i, v := range a.Context { + if v.Key == "topic" { + ai = i + break + } + } + for i, v := range b.Context { + if v.Key == "topic" { + bi = i + break + } + } + return a.Context[ai].String < b.Context[bi].String + })) + + if diff != "" { + t.Error(diff) + } + + spans := exp.GetSpans() + require.Len(t, spans, 1) + assert.Equal(t, "CreateAdminTopics", spans[0].Name) + assert.Equal(t, codes.Error, spans[0].Status.Code) + require.Len(t, spans[0].Events, 2) + + // Topic 1 already exists error + assert.Equal(t, "kafka topic already exists", spans[0].Events[0].Name) + assert.Equal(t, []attribute.KeyValue{ + semconv.MessagingDestinationKey.String("topic1"), + }, spans[0].Events[0].Attributes) + + // Topic 2 exception + assert.Equal(t, "exception", spans[0].Events[1].Name) + assert.Equal(t, []attribute.KeyValue{ + semconv.ExceptionTypeKey.String("*kerr.Error"), + semconv.ExceptionMessageKey.String( + "INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.", + ), + }, spans[0].Events[1].Attributes) + + rm, err := mt.Collect(context.Background()) + require.NoError(t, err) + + // Filter all other kafka metrics. + var metrics []metricdata.Metrics + for _, sm := range rm.ScopeMetrics { + if sm.Scope.Name == "github.com/elastic/apm-queue/kafka" { + metrics = sm.Metrics + break + } + } + + // Ensure only 1 topic was created, which also matches the number of spans. + assert.Equal(t, metrictest.Int64Metrics{ + {Name: "topics.created.count"}: { + {K: "topic", V: "topic2"}: 1, + {K: "topic", V: "topic3"}: 1, + {K: "messaging.system", V: "kafka"}: 2, + {K: "outcome", V: "failure"}: 1, + {K: "outcome", V: "success"}: 1, + }, + }, metrictest.GatherInt64Metric(metrics)) +} + +func sortConfigs(t *testing.T, topics []kmsg.CreateTopicsRequestTopic) { + t.Helper() + for i := range topics { + sort.Slice(topics[i].Configs, func(a, b int) bool { + return topics[i].Configs[a].Name < topics[i].Configs[b].Name + }) + } +} From 88a6e5f655a46d38025d01baf59ef1bee2bb64b8 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Tue, 9 Sep 2025 17:21:14 -0400 Subject: [PATCH 08/11] run linter --- kafka/topiccreator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 58f0ff2f..92f18f59 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -119,7 +119,7 @@ func (c *TopicCreator) CreateAdminTopics(ctx context.Context, topics ...apmqueue topicNames := make([]string, len(topics)) for i, topic := range topics { - topicNames[i] = fmt.Sprintf("%s", topic) + topicNames[i] = string(topic) } existing, err := c.m.adminClient.ListTopics(ctx) From 3a641bda8bc4ba0cb80e7bd9ad1024aec418495a Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Thu, 11 Sep 2025 11:37:37 -0400 Subject: [PATCH 09/11] delete cleanup policy on topic config alter --- cmd/queuebench/bench.go | 2 +- kafka/topiccreator.go | 50 +---- kafka/topiccreator_test.go | 375 ++++++++++--------------------------- systemtest/infra_kafka.go | 2 +- systemtest/manager_test.go | 8 +- 5 files changed, 108 insertions(+), 329 deletions(-) diff --git a/cmd/queuebench/bench.go b/cmd/queuebench/bench.go index 8ab1281f..fdf74a4d 100644 --- a/cmd/queuebench/bench.go +++ b/cmd/queuebench/bench.go @@ -115,7 +115,7 @@ func createTopics(ctx context.Context, mngr *kafka.Manager, cfg kafka.TopicCreat } for _, topic := range topics { - err = creator.CreateProjectTopics(ctx, topic) + err = creator.CreateTopics(ctx, topic) if err != nil { return fmt.Errorf("cannot create topics: %w", err) } diff --git a/kafka/topiccreator.go b/kafka/topiccreator.go index 92f18f59..80b2c42a 100644 --- a/kafka/topiccreator.go +++ b/kafka/topiccreator.go @@ -104,52 +104,13 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) }, nil } -// CreateAdminTopics creates a single log compacted topic -// -// Topics that already exist will be ignored. -func (c *TopicCreator) CreateAdminTopics(ctx context.Context, topics ...apmqueue.Topic) error { - ctx, span := c.m.tracer.Start( - ctx, - "CreateAdminTopics", - trace.WithAttributes( - semconv.MessagingSystemKey.String("kafka"), - ), - ) - defer span.End() - - topicNames := make([]string, len(topics)) - for i, topic := range topics { - topicNames[i] = string(topic) - } - - existing, err := c.m.adminClient.ListTopics(ctx) - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return fmt.Errorf("failed to list kafka topics: %w", err) - } - - missingTopics, updatePartitions, _ := c.categorizeTopics(existing, topicNames) - - var updateErrors []error - if err := c.createMissingTopics(ctx, span, missingTopics); err != nil { - updateErrors = append(updateErrors, err) - } - - if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil { - updateErrors = append(updateErrors, err) - } - - return errors.Join(updateErrors...) -} - -// CreateProjectTopics creates one or more topics. +// CreateTopics creates one or more topics. // // Topics that already exist will be updated. -func (c *TopicCreator) CreateProjectTopics(ctx context.Context, topics ...apmqueue.Topic) error { +func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error { ctx, span := c.m.tracer.Start( ctx, - "CreateProjectTopics", + "CreateTopics", trace.WithAttributes( semconv.MessagingSystemKey.String("kafka"), ), @@ -244,7 +205,6 @@ func (c *TopicCreator) createMissingTopics( var updateErrors []error for _, response := range responses.Sorted() { topicName := strings.TrimPrefix(response.Topic, namespacePrefix) - fmt.Println(":: topicName:", topicName) logger := c.m.cfg.Logger.With(loggerFields...) if c.m.cfg.TopicLogFieldsFunc != nil { @@ -376,6 +336,10 @@ func (c *TopicCreator) alterTopicConfigs( return nil } + // Remove cleanup.policy if it exists, + // since this field cannot be altered. + delete(c.topicConfigs, "cleanup.policy") + alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs)) for k, v := range c.topicConfigs { alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v}) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 5a612640..28de0bfd 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -19,6 +19,7 @@ package kafka import ( "context" + "fmt" "sort" "strings" "testing" @@ -60,7 +61,7 @@ func TestNewTopicCreator(t *testing.T) { }, "\n")) } -func TestCreateProjectTopics(t *testing.T) { +func TestCreateTopics(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := sdktrace.NewTracerProvider( sdktrace.WithSyncer(exp), @@ -75,18 +76,20 @@ func TestCreateProjectTopics(t *testing.T) { m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + c, err := m.NewTopicCreator(TopicCreatorConfig{ PartitionCount: 123, TopicConfigs: map[string]string{ - "retention.ms": "123", + "retention.ms": "123", + "cleanup.policy": "compact,delete", }, MeterProvider: mt.MeterProvider, }) require.NoError(t, err) - // Simulate a situation where topic1, topic4 exists, topic2 is invalid and - // topic3 is successfully created. + // Simulate a situation where topic1, topic4 exists, topic2 is invalid and topic3 is successfully created. var createTopicsRequest *kmsg.CreateTopicsRequest cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createTopicsRequest = req.(*kmsg.CreateTopicsRequest) @@ -110,8 +113,8 @@ func TestCreateProjectTopics(t *testing.T) { }}, }, nil, true }) - // Topics 1 and 4 are already created, and their partition count is set to - // the appropriate value. + + // Topics 1 and 4 are already created, and their partition count is set to the appropriate value. var createPartitionsRequest *kmsg.CreatePartitionsRequest cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) @@ -130,6 +133,7 @@ func TestCreateProjectTopics(t *testing.T) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()} for _, r := range alterConfigsRequest.Resources { + fmt.Println(r) res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{ ResourceType: r.ResourceType, ResourceName: r.ResourceName, @@ -155,277 +159,17 @@ func TestCreateProjectTopics(t *testing.T) { }, nil, true }) - err = c.CreateProjectTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, ) require.Len(t, createTopicsRequest.Topics, 4) - assert.Equal(t, []kmsg.CreateTopicsRequestTopic{{ - Topic: "name_space-topic1", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic2", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic3", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }, { - Topic: "name_space-topic4", - NumPartitions: 123, - ReplicationFactor: -1, - Configs: []kmsg.CreateTopicsRequestTopicConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }}, createTopicsRequest.Topics) - - // Ensure only `topic0` partitions are updated since it already exists. - assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ - {Topic: "name_space-topic0", Count: 123}, - }, createPartitionsRequest.Topics) - - // Ensure only the existing topic config is updated since it already exists. - assert.Len(t, alterConfigsRequest.Resources, 1) - assert.Equal(t, []kmsg.IncrementalAlterConfigsRequestResource{{ - ResourceType: kmsg.ConfigResourceTypeTopic, - ResourceName: "name_space-topic0", - Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{{ - Name: "retention.ms", - Value: kmsg.StringPtr("123"), - }}, - }}, alterConfigsRequest.Resources) - - // Log assertions. - matchingLogs := observedLogs.FilterFieldKey("topic") - for _, ml := range matchingLogs.AllUntimed() { - t.Log(ml.Message) - } - diff := cmp.Diff([]observer.LoggedEntry{{ - Entry: zapcore.Entry{ - Level: zapcore.DebugLevel, - LoggerName: "kafka", - Message: "kafka topic already exists", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic1"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.DebugLevel, - LoggerName: "kafka", - Message: "kafka topic already exists", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic4"), - }, - }, { - Entry: zapcore.Entry{ - Level: zapcore.InfoLevel, - LoggerName: "kafka", - Message: "created kafka topic", - }, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic3"), - }, - }, { - Entry: zapcore.Entry{LoggerName: "kafka", Message: "updated partitions for kafka topic"}, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic0"), - }, - }, { - Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"}, - Context: []zapcore.Field{ - zap.String("namespace", "name_space"), - zap.Int("partition_count", 123), - zap.Any("topic_configs", map[string]string{"retention.ms": "123"}), - zap.String("topic", "topic0"), - }, - }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { - var ai, bi int - for i, v := range a.Context { - if v.Key == "topic" { - ai = i - break - } - } - for i, v := range b.Context { - if v.Key == "topic" { - bi = i - break - } - } - return a.Context[ai].String < b.Context[bi].String - })) - if diff != "" { - t.Error(diff) - } - - spans := exp.GetSpans() - require.Len(t, spans, 1) - assert.Equal(t, "CreateProjectTopics", spans[0].Name) - assert.Equal(t, codes.Error, spans[0].Status.Code) - require.Len(t, spans[0].Events, 3) - - // Topic 1 already exists error - assert.Equal(t, "kafka topic already exists", spans[0].Events[0].Name) - assert.Equal(t, []attribute.KeyValue{ - semconv.MessagingDestinationKey.String("topic1"), - }, spans[0].Events[0].Attributes) - - // Topic 2 exception - assert.Equal(t, "exception", spans[0].Events[1].Name) - assert.Equal(t, []attribute.KeyValue{ - semconv.ExceptionTypeKey.String("*kerr.Error"), - semconv.ExceptionMessageKey.String( - "INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.", - ), - }, spans[0].Events[1].Attributes) - - // Topic 4 already exists error. - assert.Equal(t, "kafka topic already exists", spans[0].Events[2].Name) - assert.Equal(t, []attribute.KeyValue{ - semconv.MessagingDestinationKey.String("topic4"), - }, spans[0].Events[2].Attributes) - - rm, err := mt.Collect(context.Background()) - require.NoError(t, err) - // Filter all other kafka metrics. - var metrics []metricdata.Metrics - for _, sm := range rm.ScopeMetrics { - if sm.Scope.Name == "github.com/elastic/apm-queue/kafka" { - metrics = sm.Metrics - break - } - } - // Ensure only 1 topic was created, which also matches the number of spans. - assert.Equal(t, metrictest.Int64Metrics{ - {Name: "topics.created.count"}: { - {K: "topic", V: "topic2"}: 1, - {K: "topic", V: "topic3"}: 1, - {K: "messaging.system", V: "kafka"}: 2, - {K: "outcome", V: "failure"}: 1, - {K: "outcome", V: "success"}: 1, - }, - }, metrictest.GatherInt64Metric(metrics)) -} - -func TestCreateAdminTopics(t *testing.T) { - exp := tracetest.NewInMemoryExporter() - tp := sdktrace.NewTracerProvider( - sdktrace.WithSyncer(exp), - ) - defer tp.Shutdown(context.Background()) - - mt := metrictest.New() - cluster, commonConfig := newFakeCluster(t) - core, observedLogs := observer.New(zapcore.DebugLevel) - commonConfig.Logger = zap.New(core) - commonConfig.TracerProvider = tp - - m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) - require.NoError(t, err) - t.Cleanup(func() { m.Close() }) - c, err := m.NewTopicCreator(TopicCreatorConfig{ - PartitionCount: 123, - TopicConfigs: map[string]string{ - "retention.ms": "123", - "cleanup.policy": "compact,delete", - }, - MeterProvider: mt.MeterProvider, - }) - require.NoError(t, err) - - // Simulate a situation where topic1 exists, topic2 is invalid, and topic3 is successfully created. - var createTopicsRequest *kmsg.CreateTopicsRequest - cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { - createTopicsRequest = req.(*kmsg.CreateTopicsRequest) - return &kmsg.CreateTopicsResponse{ - Version: req.GetVersion(), - Topics: []kmsg.CreateTopicsResponseTopic{{ - Topic: "topic1", - ErrorCode: kerr.TopicAlreadyExists.Code, - ErrorMessage: &kerr.TopicAlreadyExists.Message, - }, { - Topic: "topic2", - ErrorCode: kerr.InvalidTopicException.Code, - ErrorMessage: &kerr.InvalidTopicException.Message, - }, { - Topic: "topic3", - TopicID: [16]byte{123}, - }}, - }, nil, true - }) - - // Topics 1 and 4 are already created, and their partition count is set to the appropriate value. - var createPartitionsRequest *kmsg.CreatePartitionsRequest - cluster.ControlKey(kmsg.CreatePartitions.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { - createPartitionsRequest = req.(*kmsg.CreatePartitionsRequest) - res := kmsg.CreatePartitionsResponse{Version: req.GetVersion()} - for _, t := range createPartitionsRequest.Topics { - res.Topics = append(res.Topics, kmsg.CreatePartitionsResponseTopic{ - Topic: t.Topic, - }) - } - return &res, nil, true - }) - - // Allow some time for the ForceMetadataRefresh to run. - <-time.After(10 * time.Millisecond) - - // Simulate topic0 already exists in Kafka. - cluster.ControlKey(kmsg.Metadata.Int16(), func(r kmsg.Request) (kmsg.Response, error, bool) { - return &kmsg.MetadataResponse{ - Version: r.GetVersion(), - Topics: []kmsg.MetadataResponseTopic{{ - Topic: kmsg.StringPtr("topic0"), - TopicID: [16]byte{111}, - Partitions: []kmsg.MetadataResponseTopicPartition{{ - Partition: 0, - }}, - }}, - }, nil, true - }) - - err = c.CreateAdminTopics(context.Background(), "topic0", "topic1", "topic2", "topic3") - require.Error(t, err) - assert.EqualError(t, err, `failed to create topic "topic2": `+ - `INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic.`, - ) - - require.Len(t, createTopicsRequest.Topics, 3) expected := []kmsg.CreateTopicsRequestTopic{ { - Topic: "topic1", + Topic: "name_space-topic1", NumPartitions: 123, ReplicationFactor: -1, Configs: []kmsg.CreateTopicsRequestTopicConfig{ @@ -439,7 +183,7 @@ func TestCreateAdminTopics(t *testing.T) { }, }, }, { - Topic: "topic2", + Topic: "name_space-topic2", NumPartitions: 123, ReplicationFactor: -1, Configs: []kmsg.CreateTopicsRequestTopicConfig{ @@ -453,7 +197,21 @@ func TestCreateAdminTopics(t *testing.T) { }, }, }, { - Topic: "topic3", + Topic: "name_space-topic3", + NumPartitions: 123, + ReplicationFactor: -1, + Configs: []kmsg.CreateTopicsRequestTopicConfig{ + { + Name: "cleanup.policy", + Value: kmsg.StringPtr("compact,delete"), + }, + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, { + Topic: "name_space-topic4", NumPartitions: 123, ReplicationFactor: -1, Configs: []kmsg.CreateTopicsRequestTopicConfig{ @@ -469,21 +227,40 @@ func TestCreateAdminTopics(t *testing.T) { }, } - sortConfigs(t, expected) - sortConfigs(t, createTopicsRequest.Topics) + sortTopicConfigs(t, expected) + sortTopicConfigs(t, createTopicsRequest.Topics) assert.Equal(t, expected, createTopicsRequest.Topics) // Ensure only `topic0` partitions are updated since it already exists. assert.Equal(t, []kmsg.CreatePartitionsRequestTopic{ - {Topic: "topic0", Count: 123}, + {Topic: "name_space-topic0", Count: 123}, }, createPartitionsRequest.Topics) + // Ensure the `cleanup.policy` field for `topic0` is deleted. + expectedAlter := []kmsg.IncrementalAlterConfigsRequestResource{ + { + ResourceType: kmsg.ConfigResourceTypeTopic, + ResourceName: "name_space-topic0", + Configs: []kmsg.IncrementalAlterConfigsRequestResourceConfig{ + { + Name: "retention.ms", + Value: kmsg.StringPtr("123"), + }, + }, + }, + } + + // Ensure only the existing topic config is updated since it already exists. + assert.Len(t, alterConfigsRequest.Resources, 1) + sortAlteredConfigs(t, expectedAlter) + sortAlteredConfigs(t, alterConfigsRequest.Resources) + assert.Equal(t, expectedAlter, alterConfigsRequest.Resources) + // Log assertions. matchingLogs := observedLogs.FilterFieldKey("topic") for _, ml := range matchingLogs.AllUntimed() { t.Log(ml.Message) } - diff := cmp.Diff([]observer.LoggedEntry{{ Entry: zapcore.Entry{ Level: zapcore.DebugLevel, @@ -499,6 +276,21 @@ func TestCreateAdminTopics(t *testing.T) { }), zap.String("topic", "topic1"), }, + }, { + Entry: zapcore.Entry{ + Level: zapcore.DebugLevel, + LoggerName: "kafka", + Message: "kafka topic already exists", + }, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), + zap.String("topic", "topic4"), + }, }, { Entry: zapcore.Entry{ Level: zapcore.InfoLevel, @@ -525,6 +317,17 @@ func TestCreateAdminTopics(t *testing.T) { }), zap.String("topic", "topic0"), }, + }, { + Entry: zapcore.Entry{LoggerName: "kafka", Message: "altered configuration for kafka topic"}, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.Int("partition_count", 123), + zap.Any("topic_configs", map[string]string{ + "cleanup.policy": "compact,delete", + "retention.ms": "123", + }), + zap.String("topic", "topic0"), + }, }}, matchingLogs.AllUntimed(), cmpopts.SortSlices(func(a, b observer.LoggedEntry) bool { var ai, bi int for i, v := range a.Context { @@ -541,16 +344,15 @@ func TestCreateAdminTopics(t *testing.T) { } return a.Context[ai].String < b.Context[bi].String })) - if diff != "" { t.Error(diff) } spans := exp.GetSpans() require.Len(t, spans, 1) - assert.Equal(t, "CreateAdminTopics", spans[0].Name) + assert.Equal(t, "CreateTopics", spans[0].Name) assert.Equal(t, codes.Error, spans[0].Status.Code) - require.Len(t, spans[0].Events, 2) + require.Len(t, spans[0].Events, 3) // Topic 1 already exists error assert.Equal(t, "kafka topic already exists", spans[0].Events[0].Name) @@ -567,9 +369,14 @@ func TestCreateAdminTopics(t *testing.T) { ), }, spans[0].Events[1].Attributes) + // Topic 4 already exists error. + assert.Equal(t, "kafka topic already exists", spans[0].Events[2].Name) + assert.Equal(t, []attribute.KeyValue{ + semconv.MessagingDestinationKey.String("topic4"), + }, spans[0].Events[2].Attributes) + rm, err := mt.Collect(context.Background()) require.NoError(t, err) - // Filter all other kafka metrics. var metrics []metricdata.Metrics for _, sm := range rm.ScopeMetrics { @@ -578,7 +385,6 @@ func TestCreateAdminTopics(t *testing.T) { break } } - // Ensure only 1 topic was created, which also matches the number of spans. assert.Equal(t, metrictest.Int64Metrics{ {Name: "topics.created.count"}: { @@ -591,7 +397,16 @@ func TestCreateAdminTopics(t *testing.T) { }, metrictest.GatherInt64Metric(metrics)) } -func sortConfigs(t *testing.T, topics []kmsg.CreateTopicsRequestTopic) { +func sortTopicConfigs(t *testing.T, topics []kmsg.CreateTopicsRequestTopic) { + t.Helper() + for i := range topics { + sort.Slice(topics[i].Configs, func(a, b int) bool { + return topics[i].Configs[a].Name < topics[i].Configs[b].Name + }) + } +} + +func sortAlteredConfigs(t *testing.T, topics []kmsg.IncrementalAlterConfigsRequestResource) { t.Helper() for i := range topics { sort.Slice(topics[i].Configs, func(a, b int) bool { diff --git a/systemtest/infra_kafka.go b/systemtest/infra_kafka.go index faa30865..76fe1893 100644 --- a/systemtest/infra_kafka.go +++ b/systemtest/infra_kafka.go @@ -167,7 +167,7 @@ func CreateKafkaTopics(ctx context.Context, t testing.TB, partitions int, topics }) require.NoError(t, err) - err = topicCreator.CreateProjectTopics(ctx, topics...) + err = topicCreator.CreateTopics(ctx, topics...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, manager.DeleteTopics( diff --git a/systemtest/manager_test.go b/systemtest/manager_test.go index eee2ed78..6be48bef 100644 --- a/systemtest/manager_test.go +++ b/systemtest/manager_test.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/apm-queue/v2/kafka" ) -func TestManagerCreateProjectTopics(t *testing.T) { +func TestManagerCreateTopics(t *testing.T) { // This test covers: // - Creating topics with a number of partitions // - Updating the partition count @@ -58,14 +58,14 @@ func TestManagerCreateProjectTopics(t *testing.T) { } topics = SuffixTopics(topics...) // Create topics for the first time. - assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) + assert.NoError(t, creator.CreateTopics(ctx, topics...)) // New creator with increased partitions cfg.PartitionCount = 4 creator, err = manager.NewTopicCreator(cfg) require.NoError(t, err) // Update Partition count. - assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) + assert.NoError(t, creator.CreateTopics(ctx, topics...)) // New creator with increased retention cfg.TopicConfigs = map[string]string{ @@ -74,6 +74,6 @@ func TestManagerCreateProjectTopics(t *testing.T) { creator, err = manager.NewTopicCreator(cfg) require.NoError(t, err) // Update topic configuration. - assert.NoError(t, creator.CreateProjectTopics(ctx, topics...)) + assert.NoError(t, creator.CreateTopics(ctx, topics...)) }) } From 63536fed28f88711e622bc8aa933c7d00c9b3545 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Thu, 11 Sep 2025 13:14:12 -0400 Subject: [PATCH 10/11] minor refactor --- kafka/topiccreator_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index 28de0bfd..c05f68ff 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -79,16 +79,6 @@ func TestCreateTopics(t *testing.T) { t.Cleanup(func() { m.Close() }) - c, err := m.NewTopicCreator(TopicCreatorConfig{ - PartitionCount: 123, - TopicConfigs: map[string]string{ - "retention.ms": "123", - "cleanup.policy": "compact,delete", - }, - MeterProvider: mt.MeterProvider, - }) - require.NoError(t, err) - // Simulate a situation where topic1, topic4 exists, topic2 is invalid and topic3 is successfully created. var createTopicsRequest *kmsg.CreateTopicsRequest cluster.ControlKey(kmsg.CreateTopics.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { @@ -159,6 +149,16 @@ func TestCreateTopics(t *testing.T) { }, nil, true }) + c, err := m.NewTopicCreator(TopicCreatorConfig{ + PartitionCount: 123, + TopicConfigs: map[string]string{ + "retention.ms": "123", + "cleanup.policy": "compact,delete", + }, + MeterProvider: mt.MeterProvider, + }) + require.NoError(t, err) + err = c.CreateTopics(context.Background(), "topic0", "topic1", "topic2", "topic3", "topic4") require.Error(t, err) assert.EqualError(t, err, `failed to create topic "topic2": `+ @@ -377,6 +377,7 @@ func TestCreateTopics(t *testing.T) { rm, err := mt.Collect(context.Background()) require.NoError(t, err) + // Filter all other kafka metrics. var metrics []metricdata.Metrics for _, sm := range rm.ScopeMetrics { @@ -385,6 +386,7 @@ func TestCreateTopics(t *testing.T) { break } } + // Ensure only 1 topic was created, which also matches the number of spans. assert.Equal(t, metrictest.Int64Metrics{ {Name: "topics.created.count"}: { From 918921d88b8b6c71fb399825d21ac4fbdfabc018 Mon Sep 17 00:00:00 2001 From: Ruben van Staden Date: Thu, 11 Sep 2025 15:52:53 -0400 Subject: [PATCH 11/11] fix comment --- kafka/topiccreator_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/topiccreator_test.go b/kafka/topiccreator_test.go index c05f68ff..1e70cfd6 100644 --- a/kafka/topiccreator_test.go +++ b/kafka/topiccreator_test.go @@ -19,7 +19,6 @@ package kafka import ( "context" - "fmt" "sort" "strings" "testing" @@ -117,13 +116,12 @@ func TestCreateTopics(t *testing.T) { return &res, nil, true }) - // Since topic 1 and 4 already exist, their configuration is altered. + // Since topic 1 already exist, their configuration is altered. var alterConfigsRequest *kmsg.IncrementalAlterConfigsRequest cluster.ControlKey(kmsg.IncrementalAlterConfigs.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { alterConfigsRequest = req.(*kmsg.IncrementalAlterConfigsRequest) res := kmsg.IncrementalAlterConfigsResponse{Version: req.GetVersion()} for _, r := range alterConfigsRequest.Resources { - fmt.Println(r) res.Resources = append(res.Resources, kmsg.IncrementalAlterConfigsResponseResource{ ResourceType: r.ResourceType, ResourceName: r.ResourceName,