diff --git a/kafka/manager.go b/kafka/manager.go index 1f64cf9e..9da79cdf 100644 --- a/kafka/manager.go +++ b/kafka/manager.go @@ -277,6 +277,15 @@ func (m *Manager) MonitorConsumerLag(topicConsumers []apmqueue.TopicConsumer) (m } for partition, lag := range partitions { if lag.Err != nil { + if lag.Err == kerr.UnknownTopicOrPartition { + logger.Debug("error getting consumer group lag", + zap.String("group", l.Group), + zap.String("topic", topic), + zap.Int32("partition", partition), + zap.Error(lag.Err), + ) + continue + } logger.Warn("error getting consumer group lag", zap.String("group", l.Group), zap.String("topic", topic), diff --git a/kafka/manager_test.go b/kafka/manager_test.go index e6b24842..c341c86f 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -771,6 +771,134 @@ func TestListTopics(t *testing.T) { assert.Equal(t, []string{"name_space-mytopic", "name_space-topic1", "name_space-topic3"}, topics) } +func TestUnknownTopicOrPartition(t *testing.T) { + testCases := []struct { + desc string + logLevel zapcore.Level + }{ + { + desc: "log UNKNOWN_TOPIC_OR_PARTITION error for consumer lag", + logLevel: zapcore.DebugLevel, + }, + { + desc: "ignore UNKNOWN_TOPIC_OR_PARTITION error for consumer lag", + logLevel: zapcore.WarnLevel, + }, + } + for _, tc := range testCases { + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + defer mp.Shutdown(context.Background()) + + cluster, commonConfig := newFakeCluster(t) + core, observedLogs := observer.New(tc.logLevel) + + commonConfig.Logger = zap.New(core) + commonConfig.MeterProvider = mp + + m, err := NewManager(ManagerConfig{CommonConfig: commonConfig}) + require.NoError(t, err) + t.Cleanup(func() { m.Close() }) + + registration, err := m.MonitorConsumerLag([]apmqueue.TopicConsumer{ + { + Topic: "topic", + Consumer: "consumer", + }, + }) + require.NoError(t, err) + t.Cleanup(func() { registration.Unregister() }) + + var describeGroupsRequest *kmsg.DescribeGroupsRequest + cluster.ControlKey(kmsg.DescribeGroups.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + describeGroupsRequest = req.(*kmsg.DescribeGroupsRequest) + return &kmsg.DescribeGroupsResponse{ + Version: describeGroupsRequest.Version, + Groups: []kmsg.DescribeGroupsResponseGroup{ + { + Group: "consumer", + ProtocolType: "consumer", + Members: []kmsg.DescribeGroupsResponseGroupMember{{ + MemberAssignment: (&kmsg.ConsumerMemberAssignment{ + Version: 2, + Topics: []kmsg.ConsumerMemberAssignmentTopic{{ + Topic: "name_space-topic", + Partitions: []int32{1}, + }}, + }).AppendTo(nil), + }}, + }, + }, + }, nil, true + }) + + var offsetFetchRequest *kmsg.OffsetFetchRequest + cluster.ControlKey(kmsg.OffsetFetch.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + offsetFetchRequest = req.(*kmsg.OffsetFetchRequest) + return &kmsg.OffsetFetchResponse{ + Version: offsetFetchRequest.Version, + Groups: []kmsg.OffsetFetchResponseGroup{{ + Group: "consumer", + Topics: []kmsg.OffsetFetchResponseGroupTopic{{ + Topic: "name_space-topic", + Partitions: []kmsg.OffsetFetchResponseGroupTopicPartition{{ + Partition: 1, + Offset: 100, + }}, + }}, + }}, + }, nil, true + }) + + var listOffsetsRequest *kmsg.ListOffsetsRequest + cluster.ControlKey(kmsg.ListOffsets.Int16(), func(req kmsg.Request) (kmsg.Response, error, bool) { + cluster.KeepControl() + listOffsetsRequest = req.(*kmsg.ListOffsetsRequest) + return &kmsg.ListOffsetsResponse{ + Version: listOffsetsRequest.Version, + Topics: []kmsg.ListOffsetsResponseTopic{{ + Topic: "name_space-topic", + Partitions: []kmsg.ListOffsetsResponseTopicPartition{{ + Partition: 1, + Offset: 200, + ErrorCode: kerr.UnknownTopicOrPartition.Code, + }}, + }}, + }, nil, true + }) + + rm := metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), &rm) + require.NoError(t, err) + + matchingLogs := observedLogs.FilterFieldKey("group") + actual := matchingLogs.AllUntimed() + + if tc.logLevel == zapcore.DebugLevel { + expected := []observer.LoggedEntry{ + { + Entry: zapcore.Entry{ + Level: zapcore.DebugLevel, + LoggerName: "kafka", + Message: "error getting consumer group lag", + }, + Context: []zapcore.Field{ + zap.String("namespace", "name_space"), + zap.String("group", "consumer"), + zap.String("topic", "topic"), + zap.Int32("partition", 1), + zap.Error(kerr.UnknownTopicOrPartition), + }, + }, + } + assert.Len(t, actual, 1) + assert.Equal(t, expected, actual) + } else { + assert.Empty(t, actual) + } + } +} + func newFakeCluster(t testing.TB) (*kfake.Cluster, CommonConfig) { cluster, err := kfake.NewCluster( // Just one broker to simplify dealing with sharded requests.