Skip to content

Commit

Permalink
Fix concurrency issues in KafkaConsumerMetricsRegistry
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Nov 9, 2021
1 parent a97a0d0 commit 2986ac9
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public final class KafkaConsumerMetricsRegistry {
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(KafkaConsumerMetricsRegistry.class);

private final AtomicReference<Set<NewConsumer>> rememberForInit = new AtomicReference<>(new HashSet<>());
private final AtomicReference<Set<NewConsumer>> rememberForRegister = new AtomicReference<>(new HashSet<>());
private final AtomicReference<Set<CacheKey>> rememberForDeregister = new AtomicReference<>(new HashSet<>());

private KafkaConsumerMetricsRegistry(final Duration metricCollectingInterval) {
metricsMap = new HashMap<>();
Expand Down Expand Up @@ -78,7 +79,7 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo

LOGGER.debug("Registering new consumer for metric reporting: <{}:{}>", connectionId, streamName);
// No way to check whether consumerControl is ready, thus waiting for interval till next metric reporting.
rememberForInit.getAndUpdate(set -> {
rememberForRegister.getAndUpdate(set -> {
set.add(new NewConsumer(connectionId, streamName, consumerControl));
return set;
});
Expand All @@ -92,7 +93,13 @@ void registerConsumer(final ConnectionId connectionId, final Consumer.DrainingCo
*/
void deregisterConsumer(final ConnectionId connectionId, final String streamName) {
LOGGER.debug("De-registering consumer for metric reporting: <{}:{}>", connectionId, streamName);
metricsMap.remove(new CacheKey(connectionId, streamName));
// Since new consumer are registered in interval, de-registering should also work this way.
// If i.e. a consumer would be de-registered before the registering interval is reached this would cause
// concurrency issues.
rememberForDeregister.getAndUpdate(set -> {
set.add(new CacheKey(connectionId, streamName));
return set;
});
}

private void scheduleMetricReporting(final Duration metricCollectingInterval) {
Expand All @@ -105,12 +112,13 @@ private void reportMetrics() {
LOGGER.debug("Reporting metrics for Kafka consumer streams. <{}> consumer streams registered",
metricsMap.size());

createNewKafkaConsumerMetrics();
registerNewKafkaConsumerMetrics();
deregisterKafkaConsumerMetrics();
metricsMap.forEach((cacheKey, kafkaConsumerMetrics) -> kafkaConsumerMetrics.reportMetrics());
}

private void createNewKafkaConsumerMetrics() {
rememberForInit.getAndUpdate(
private void registerNewKafkaConsumerMetrics() {
rememberForRegister.getAndUpdate(
set -> {
set.forEach(newConsumer -> metricsMap.put(
new CacheKey(newConsumer.connectionId, newConsumer.streamName),
Expand All @@ -120,6 +128,15 @@ private void createNewKafkaConsumerMetrics() {
});
}

private void deregisterKafkaConsumerMetrics() {
rememberForDeregister.getAndUpdate(
set -> {
set.forEach(newConsumer -> metricsMap.remove(
new CacheKey(newConsumer.connectionId, newConsumer.streamName)));
return new HashSet<>();
});
}

private static final class CacheKey {

private final ConnectionId connectionId;
Expand Down

0 comments on commit 2986ac9

Please sign in to comment.