Skip to content

Commit

Permalink
Fix errors related to filtering based on user-specified consumer grou…
Browse files Browse the repository at this point in the history
…ps filters (#14406)

* Add extra test case

* Move consumer group validation to config class

* Move validation of correct consumer group configuration to Config class

Since it's a configuration concern, that's where it seems to belong,
and the behavior would thus be consistent with where other
`ConfigurationError`s are raised.

* Make sure we don't return metrics for consumer groups that don't exist

* Separate partition filtering step

* Separate filtering based on `consumer_groups` to its own function

* Use consistent logic for all the types of filtering

* Separate all filtering to a separate method

* Fix early skip when filtering by regexes

* Change looping order to simplify and avoid unnecessary work

* Simplify filtering functions

* Deduplicate filtered partitions and remove unnecessary option checking

* Rename exact match filtering function to better reflect what it does

* Fix wording on test id

Co-authored-by: Ilia Kurenkov <ilia.kurenkov@datadoghq.com>

---------

Co-authored-by: Ilia Kurenkov <ilia.kurenkov@datadoghq.com>
Co-authored-by: Andrew Zhang <andrew.zhang@datadoghq.com>
  • Loading branch information
3 people committed Apr 19, 2023
1 parent e40cd40 commit 85ab590
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 101 deletions.
173 changes: 72 additions & 101 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,28 @@ def get_consumer_offsets(self):
return consumer_offsets

def _get_consumer_groups(self):
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_regex:
# Get all consumer groups
consumer_groups = []
consumer_groups_future = self.kafka_client.list_consumer_groups()
self.log.debug('MONITOR UNLISTED CG FUTURES: %s', consumer_groups_future)
try:
list_consumer_groups_result = consumer_groups_future.result()
self.log.debug('MONITOR UNLISTED FUTURES RESULT: %s', list_consumer_groups_result)

consumer_groups.extend(
valid_consumer_group.group_id for valid_consumer_group in list_consumer_groups_result.valid
)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return consumer_groups
elif self.config._consumer_groups:
return self.config._consumer_groups
# Get all consumer groups
consumer_groups = []
consumer_groups_future = self.kafka_client.list_consumer_groups()
self.log.debug('MONITOR UNLISTED CG FUTURES: %s', consumer_groups_future)
try:
list_consumer_groups_result = consumer_groups_future.result()
self.log.debug('MONITOR UNLISTED FUTURES RESULT: %s', list_consumer_groups_result)

consumer_groups.extend(
valid_consumer_group.group_id for valid_consumer_group in list_consumer_groups_result.valid
)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return consumer_groups

def _get_consumer_offset_futures(self, consumer_groups):
topics = self.kafka_client.list_topics(timeout=self.config._request_timeout)
# {(consumer_group, topic, partition): offset}
topic_metadata = self.kafka_client.list_topics(timeout=self.config._request_timeout).topics
topics = {
topic: list(topic_metadata[topic].partitions.keys())
for topic in topic_metadata
if topic not in KAFKA_INTERNAL_TOPICS
}

for consumer_group in consumer_groups:
self.log.debug('CONSUMER GROUP: %s', consumer_group)
Expand All @@ -186,91 +187,61 @@ def _get_consumer_offset_futures(self, consumer_groups):
)[consumer_group]

def _get_topic_partitions(self, topics, consumer_group):
for topic in topics.topics:
if topic in KAFKA_INTERNAL_TOPICS:
for topic, partitions in topics.items():
self.log.debug('CONFIGURED TOPICS: %s', topic)

if self.config._monitor_unlisted_consumer_groups:
filtered_partitions = partitions
else:
filtered_partitions = self._filter_partitions(consumer_group, topic, partitions)

for partition in filtered_partitions:
topic_partition = TopicPartition(topic, partition)
self.log.debug("TOPIC PARTITION: %s", topic_partition)
yield topic_partition

def _filter_partitions(self, consumer_group, topic, partitions):
return (
self._filter_partitions_with_regex(consumer_group, topic, partitions)
| self._filter_partitions_with_exact_match(consumer_group, topic, partitions)
) # fmt: skip

def _filter_partitions_with_regex(self, consumer_group, topic, partitions):
partitions_to_collect = set()

for consumer_group_regex, topic_filters in self.config._consumer_groups_compiled_regex.items():
if not consumer_group_regex.match(consumer_group):
continue

self.log.debug('CONFIGURED TOPICS: %s', topic)
# No topics specified means we collect all topics and partitions
if not topic_filters:
return set(partitions)

partitions = list(topics.topics[topic].partitions.keys())
for topic_regex, topic_partitions in topic_filters.items():
if not topic_regex.match(topic):
continue

if self.config._monitor_unlisted_consumer_groups:
for partition in partitions:
topic_partition = TopicPartition(topic, partition)
self.log.debug("TOPIC PARTITION: %s", topic_partition)
yield topic_partition

elif self.config._consumer_groups_regex:
for filtered_topic_partition in self._get_regex_filtered_topic_partitions(
consumer_group, topic, partitions
):
topic_partition = TopicPartition(filtered_topic_partition[0], filtered_topic_partition[1])
self.log.debug("TOPIC PARTITION: %s", topic_partition)
yield topic_partition

if self.config._consumer_groups:
for partition in partitions:
# Get all topic-partition combinations allowed based on config
# if topics is None => collect all topics and partitions for the consumer group
# if partitions is None => collect all partitions from the consumer group's topic
if self.config._consumer_groups.get(consumer_group):
if (
self.config._consumer_groups[consumer_group]
and topic not in self.config._consumer_groups[consumer_group]
):
self.log.debug(
"Partition %s skipped because the topic %s is not in the consumer_group.",
partition,
topic,
)
continue
if (
self.config._consumer_groups[consumer_group].get(topic)
and partition not in self.config._consumer_groups[consumer_group][topic]
):
self.log.debug(
"Partition %s skipped because it is not defined in the consumer group for the topic %s",
partition,
topic,
)
continue

topic_partition = TopicPartition(topic, partition)
self.log.debug("TOPIC PARTITION: %s", topic_partition)
yield topic_partition

def _get_regex_filtered_topic_partitions(self, consumer_group, topic, partitions):
for partition in partitions:
# Do a regex filtering here for consumer groups
for consumer_group_compiled_regex in self.config._consumer_groups_compiled_regex:
if not consumer_group_compiled_regex.match(consumer_group):
return

consumer_group_topics_regex = self.config._consumer_groups_compiled_regex.get(
consumer_group_compiled_regex
)

# If topics is empty, return all combinations of topic and partition
if not consumer_group_topics_regex:
yield (topic, partition)

# Do a regex filtering here for topics
for topic_regex in consumer_group_topics_regex:
if not topic_regex.match(topic):
self.log.debug(
"Partition %s skipped because the topic %s is not in the consumer_group.", partition, topic
)
continue
# No partitions specified means we collect all
if not topic_partitions:
return set(partitions)

if (
consumer_group_topics_regex.get(topic_regex)
and partition not in consumer_group_topics_regex[topic_regex]
):
self.log.debug(
"Partition %s skipped because it is not defined in the consumer group for the topic %s",
partition,
topic,
)
continue
partitions_to_collect.update(topic_partitions)

return partitions_to_collect.intersection(partitions)

def _filter_partitions_with_exact_match(self, consumer_group, topic, partitions):
if consumer_group not in self.config._consumer_groups:
return set()

# No topics specified means we allow all topics and partitions
if not self.config._consumer_groups[consumer_group]:
return set(partitions)

if topic not in self.config._consumer_groups[consumer_group]:
return set()

# No partitions specified means we collect all
if not self.config._consumer_groups[consumer_group][topic]:
return set(partitions)

yield (topic, partition)
return set(self.config._consumer_groups[consumer_group][topic]).intersection(partitions)
18 changes: 18 additions & 0 deletions kafka_consumer/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def test_monitor_broker_highwatermarks(
2,
id="One consumer group, one topic, all partitions",
),
pytest.param(
{'consumer_groups': {'nonsense': {'marvel': None}}},
does_not_raise(),
0,
id="Nonexistent consumer group, resulting in no metrics",
),
pytest.param(
{'consumer_groups': {'my_consumer': {'marvel': [1]}}},
does_not_raise(),
Expand Down Expand Up @@ -305,6 +311,18 @@ def test_config(dd_run_check, check, kafka_instance, override, aggregator, expec
'',
id="Specified topic, monitor_unlisted_consumer_groups false",
),
pytest.param(
{
'consumer_groups': {},
'consumer_groups_regex': {'foo': {'bar': []}, 'my_consume*': {'dc': []}},
'monitor_unlisted_consumer_groups': False,
},
2,
2,
2,
'',
id="Specified topic with an extra nonmatching consumer group regex, monitor_unlisted_consumer_groups false",
),
pytest.param(
{
'consumer_groups': {'my_consumer': {'marvel': []}},
Expand Down

0 comments on commit 85ab590

Please sign in to comment.