Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GO_TEST_TIMEOUT=180s
GO_TEST_TIMEOUT=240s
GOTESTFLAGS=
GO_TEST_COUNT=10

Expand Down
267 changes: 189 additions & 78 deletions kafka/topiccreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error)
//
// Topics that already exist will be updated.
func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topic) error {
// TODO(axw) how should we record topics?
ctx, span := c.m.tracer.Start(ctx, "CreateTopics", trace.WithAttributes(
semconv.MessagingSystemKey.String("kafka"),
))
ctx, span := c.m.tracer.Start(
ctx,
"CreateTopics",
trace.WithAttributes(
semconv.MessagingSystemKey.String("kafka"),
),
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the TODO comment, since I think this is redundant now.

defer span.End()

namespacePrefix := c.m.cfg.namespacePrefix()
Expand All @@ -127,12 +130,32 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
return fmt.Errorf("failed to list kafka topics: %w", err)
}

// missingTopics contains topics which need to be created.
missingTopics := make([]string, 0, len(topicNames))
// updatePartitions contains topics which partitions' need to be updated.
updatePartitions := make([]string, 0, len(topicNames))
// existingTopics contains the existing topics, used by AlterTopicConfigs.
existingTopics := make([]string, 0, len(topicNames))
missingTopics, updatePartitions, existingTopics := c.categorizeTopics(existing, topicNames)

var updateErrors []error
if err := c.createMissingTopics(ctx, span, missingTopics); err != nil {
updateErrors = append(updateErrors, err)
}

if err := c.updateTopicPartitions(ctx, span, updatePartitions); err != nil {
updateErrors = append(updateErrors, err)
}

if err := c.alterTopicConfigs(ctx, span, existingTopics); err != nil {
updateErrors = append(updateErrors, err)
}

return errors.Join(updateErrors...)
}

func (c *TopicCreator) categorizeTopics(
existing kadm.TopicDetails,
topicNames []string,
) (missingTopics, updatePartitions, existingTopics []string) {
missingTopics = make([]string, 0, len(topicNames))
updatePartitions = make([]string, 0, len(topicNames))
existingTopics = make([]string, 0, len(topicNames))

for _, wantTopic := range topicNames {
if !existing.Has(wantTopic) {
missingTopics = append(missingTopics, wantTopic)
Expand All @@ -144,6 +167,18 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
}
}

return missingTopics, updatePartitions, existingTopics
}

func (c *TopicCreator) createMissingTopics(
ctx context.Context,
span trace.Span,
missingTopics []string,
) error {
if len(missingTopics) == 0 {
return nil
}

responses, err := c.m.adminClient.CreateTopics(ctx,
int32(c.partitionCount),
-1, // default.replication.factor
Expand All @@ -155,6 +190,9 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to create kafka topics: %w", err)
}

namespacePrefix := c.m.cfg.namespacePrefix()

loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
Expand All @@ -167,14 +205,14 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
var updateErrors []error
for _, response := range responses.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)

logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...)
}

if err := response.Err; err != nil {
if errors.Is(err, kerr.TopicAlreadyExists) {
// NOTE(axw) apmotel currently does nothing with span events,
// hence we log as well as create a span event.
logger.Debug("kafka topic already exists",
zap.String("topic", topicName),
)
Expand All @@ -197,92 +235,165 @@ func (c *TopicCreator) CreateTopics(ctx context.Context, topics ...apmqueue.Topi
}
continue
}

c.created.Add(context.Background(), 1, metric.WithAttributeSet(
attribute.NewSet(
semconv.MessagingSystemKey.String("kafka"),
attribute.String("outcome", "success"),
attribute.String("topic", topicName),
),
))

logger.Info("created kafka topic", zap.String("topic", topicName))
}

// Update the topic partitions.
if len(updatePartitions) > 0 {
updateResp, err := c.m.adminClient.UpdatePartitions(ctx,
c.partitionCount,
updatePartitions...,
if len(updateErrors) > 0 {
return errors.Join(updateErrors...)
}

return nil
}

func (c *TopicCreator) updateTopicPartitions(
ctx context.Context,
span trace.Span,
updatePartitions []string,
) error {
if len(updatePartitions) == 0 {
return nil
}

updateResp, err := c.m.adminClient.UpdatePartitions(
ctx,
c.partitionCount,
updatePartitions...,
)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update partitions for kafka topics: %v: %w",
updatePartitions, err,
)
}

namespacePrefix := c.m.cfg.namespacePrefix()

loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
if err != nil {
}

var updateErrors []error
for _, response := range updateResp.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)

logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...)
}

if errors.Is(response.Err, kerr.InvalidRequest) {
// If UpdatePartitions partition count isn't greater than the
// current number of partitions, each individual response
// returns `INVALID_REQUEST`.
continue
}

if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf("failed to update partitions for kafka topics: %v: %w",
updatePartitions, err,
)
updateErrors = append(updateErrors, fmt.Errorf(
"failed to update partitions for topic %q: %w",
topicName, err,
))
continue
}
for _, response := range updateResp.Sorted() {
topicName := strings.TrimPrefix(response.Topic, namespacePrefix)
logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...)
}

if errors.Is(response.Err, kerr.InvalidRequest) {
// If UpdatePartitions partition count isn't greater than the
// current number of partitions, each individual response
// returns `INVALID_REQUEST`.
continue
}
if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to update partitions for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("updated partitions for kafka topic",
zap.String("topic", topicName),
)
}
logger.Info(
"updated partitions for kafka topic",
zap.String("topic", topicName),
)
}
if len(existingTopics) > 0 && len(c.topicConfigs) > 0 {
alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}
alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx,
alterCfg, existingTopics...,

if len(updateErrors) > 0 {
return errors.Join(updateErrors...)
}

return nil
}

func (c *TopicCreator) alterTopicConfigs(
ctx context.Context,
span trace.Span,
existingTopics []string,
) error {
if len(existingTopics) == 0 || len(c.topicConfigs) == 0 {
return nil
}

// Remove cleanup.policy if it exists,
// since this field cannot be altered.
delete(c.topicConfigs, "cleanup.policy")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the code refactoring, this is really the only logic change in this PR. cc @marclop


alterCfg := make([]kadm.AlterConfig, 0, len(c.topicConfigs))
for k, v := range c.topicConfigs {
alterCfg = append(alterCfg, kadm.AlterConfig{Name: k, Value: v})
}

alterResp, err := c.m.adminClient.AlterTopicConfigs(ctx, alterCfg, existingTopics...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v: %w",
existingTopics, err,
)
}

namespacePrefix := c.m.cfg.namespacePrefix()

loggerFields := []zap.Field{
zap.Int("partition_count", c.partitionCount),
}
if len(c.origTopicConfigs) > 0 {
loggerFields = append(loggerFields,
zap.Reflect("topic_configs", c.origTopicConfigs),
)
if err != nil {
}

var updateErrors []error
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)

logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...)
}

if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return fmt.Errorf(
"failed to update configuration for kafka topics: %v:%w",
existingTopics, err,
)
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
for _, response := range alterResp {
topicName := strings.TrimPrefix(response.Name, namespacePrefix)
logger := c.m.cfg.Logger.With(loggerFields...)
if c.m.cfg.TopicLogFieldsFunc != nil {
logger = logger.With(c.m.cfg.TopicLogFieldsFunc(topicName)...)
}

if err := response.Err; err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
updateErrors = append(updateErrors, fmt.Errorf(
"failed to alter configuration for topic %q: %w",
topicName, err,
))
continue
}
logger.Info("altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
logger.Info(
"altered configuration for kafka topic",
zap.String("topic", topicName),
)
}
return errors.Join(updateErrors...)

if len(updateErrors) > 0 {
return errors.Join(updateErrors...)
}

return nil
}
Loading