Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3017] Remove producer metrics when tenant is deleted #3038

Merged
merged 1 commit into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -331,11 +331,12 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
final KafkaProducerFactory<String, Buffer> factory = CachingKafkaProducerFactory.sharedFactory(vertx);
factory.setMetricsSupport(kafkaClientMetricsSupport);

telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(factory, kafkaTelemetryConfig,
telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(vertx, factory, kafkaTelemetryConfig,
protocolAdapterProperties.isDefaultsEnabled(), tracer));
eventSenderProvider.setClient(new KafkaBasedEventSender(factory, kafkaEventConfig,
eventSenderProvider.setClient(new KafkaBasedEventSender(vertx, factory, kafkaEventConfig,
protocolAdapterProperties.isDefaultsEnabled(), tracer));
commandResponseSenderProvider.setClient(new KafkaBasedCommandResponseSender(
vertx,
factory,
kafkaCommandResponseConfig,
tracer));
Expand Down
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -87,14 +87,20 @@ protected MessagingClientProviders messagingClientProviders(
final KafkaProducerFactory<String, Buffer> factory = CachingKafkaProducerFactory.sharedFactory(vertx);
factory.setMetricsSupport(kafkaClientMetricsSupport);

telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(factory, kafkaTelemetryConfig(),
adapterProperties.isDefaultsEnabled(), tracer));
telemetrySenderProvider.setClient(new KafkaBasedTelemetrySender(
vertx,
factory,
kafkaTelemetryConfig(),
adapterProperties.isDefaultsEnabled(),
tracer));
eventSenderProvider.setClient(new KafkaBasedEventSender(
vertx,
factory,
kafkaEventConfig(),
adapterProperties.isDefaultsEnabled(),
tracer));
commandResponseSenderProvider.setClient(new KafkaBasedCommandResponseSender(
vertx,
factory,
kafkaCommandResponseConfig(),
tracer));
Expand Down
4 changes: 4 additions & 0 deletions clients/command-kafka/pom.xml
Expand Up @@ -42,6 +42,10 @@
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-kafka-common</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-notification</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-registry</artifactId>
Expand Down
Expand Up @@ -14,14 +14,19 @@

import java.util.Map;
import java.util.Objects;
import java.util.stream.Stream;

import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.producer.AbstractKafkaBasedMessageSender;
import org.eclipse.hono.client.kafka.producer.KafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.KafkaProducerHelper;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.util.DownstreamMessageProperties;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationAssertion;
Expand All @@ -31,7 +36,9 @@
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaProducer;

/**
* A Kafka based client for sending command response messages downstream via a Kafka cluster.
Expand All @@ -43,16 +50,31 @@ public class KafkaBasedCommandResponseSender extends AbstractKafkaBasedMessageSe
/**
* Creates a new Kafka-based command response sender.
*
* @param vertx The vert.x instance to use.
* @param producerFactory The factory to use for creating Kafka producers.
* @param producerConfig The Kafka producer configuration properties to use.
* @param tracer The OpenTracing tracer.
* @throws NullPointerException if any of the parameters are {@code null}.
*/
public KafkaBasedCommandResponseSender(
final Vertx vertx,
final KafkaProducerFactory<String, Buffer> producerFactory,
final MessagingKafkaProducerConfigProperties producerConfig,
final Tracer tracer) {
super(producerFactory, CommandConstants.COMMAND_RESPONSE_ENDPOINT, producerConfig, tracer);

NotificationEventBusSupport.registerConsumer(vertx, TenantChangeNotification.TYPE,
notification -> {
if (LifecycleChange.DELETE.equals(notification.getChange())) {
producerFactory.getProducer(CommandConstants.COMMAND_RESPONSE_ENDPOINT)
.ifPresent(producer -> removeTenantTopicBasedProducerMetrics(producer, notification.getTenantId()));
}
});
}

private void removeTenantTopicBasedProducerMetrics(final KafkaProducer<String, Buffer> producer, final String tenantId) {
final HonoTopic topic = new HonoTopic(HonoTopic.Type.COMMAND_RESPONSE, tenantId);
KafkaProducerHelper.removeTopicMetrics(producer, Stream.of(topic.toString()));
}

@Override
Expand Down
Expand Up @@ -12,7 +12,11 @@
*/
package org.eclipse.hono.client.command.kafka;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import static com.google.common.truth.Truth.assertThat;

Expand All @@ -28,7 +32,10 @@
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.kafka.test.KafkaClientUnitTestHelper;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.test.TracingMockSupport;
import org.eclipse.hono.test.VertxMockSupport;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessagingType;
Expand All @@ -44,6 +51,7 @@
import io.opentracing.noop.NoopSpan;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
Expand Down Expand Up @@ -96,7 +104,7 @@ void testIfValidCommandResponseKafkaRecordIsSent(final VertxTestContext ctx, fin
final var mockProducer = KafkaClientUnitTestHelper.newMockProducer(true);
final var factory = CachingKafkaProducerFactory
.testFactory(vertx, (n, c) -> KafkaClientUnitTestHelper.newKafkaProducer(mockProducer));
final var sender = new KafkaBasedCommandResponseSender(factory, kafkaProducerConfig, tracer);
final var sender = new KafkaBasedCommandResponseSender(vertx, factory, kafkaProducerConfig, tracer);
final TenantObject tenant = TenantObject.from(tenantId);
tenant.setResourceLimits(new ResourceLimits().setMaxTtlCommandResponse(10L));

Expand Down Expand Up @@ -161,4 +169,27 @@ void testIfValidCommandResponseKafkaRecordIsSent(final VertxTestContext ctx, fin
ctx.completeNow();
}));
}

/**
* Verifies that the sender registers itself for notifications of the type {@link TenantChangeNotification}.
*/
@Test
public void testThatNotificationConsumerIsRegistered() {
final EventBus eventBus = mock(EventBus.class);
final Vertx vertx = mock(Vertx.class);
when(vertx.eventBus()).thenReturn(eventBus);

final Span span = TracingMockSupport.mockSpan();
final Tracer tracer = TracingMockSupport.mockTracer(span);
final var mockProducer = KafkaClientUnitTestHelper.newMockProducer(true);
final var factory = CachingKafkaProducerFactory
.testFactory(vertx, (n, c) -> KafkaClientUnitTestHelper.newKafkaProducer(mockProducer));
@SuppressWarnings("unused")
final var sender = new KafkaBasedCommandResponseSender(vertx, factory, kafkaProducerConfig, tracer);

verify(eventBus).consumer(eq(NotificationEventBusSupport.getEventBusAddress(TenantChangeNotification.TYPE)),
VertxMockSupport.anyHandler());

verifyNoMoreInteractions(eventBus);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -207,12 +207,7 @@ private Handler<Throwable> getExceptionHandler(final String producerName, final
};
}

/**
* Gets an existing producer.
*
* @param producerName The name to look up the producer.
* @return The producer or {@code null} if the cache does not contain the name.
*/
@Override
public Optional<KafkaProducer<K, V>> getProducer(final String producerName) {
return Optional.ofNullable(activeProducers.get(producerName));
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -14,6 +14,7 @@
package org.eclipse.hono.client.kafka.producer;

import java.time.Duration;
import java.util.Optional;

import org.apache.kafka.clients.CommonClientConfigs;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
Expand All @@ -33,6 +34,14 @@
*/
public interface KafkaProducerFactory<K, V> {

/**
* Gets an existing producer for sending data to Kafka, if one was already created with the given producer name.
*
* @param producerName The name to identify the producer.
* @return An existing producer or an empty Optional if no such producer exists.
*/
Optional<KafkaProducer<K, V>> getProducer(String producerName);

/**
* Gets a producer for sending data to Kafka.
* <p>
Expand Down
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.client.kafka.producer;

import java.lang.reflect.Field;
import java.util.Objects;
import java.util.stream.Stream;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.kafka.client.producer.KafkaProducer;

/**
* Utility methods for working with Kafka Producers.
*/
public final class KafkaProducerHelper {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerHelper.class);

private KafkaProducerHelper() {
}

/**
* Removes topic-related metrics in the given Kafka producer.
*
* @param kafkaProducer The Kafka producer to use.
* @param topics The topics for which to remove the metrics.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
public static void removeTopicMetrics(final KafkaProducer<?, ?> kafkaProducer, final Stream<String> topics) {
Objects.requireNonNull(kafkaProducer);
Objects.requireNonNull(topics);
final Metrics metrics = getInternalMetricsObject(kafkaProducer.unwrap());
if (metrics != null) {
topics.forEach(topic -> {
metrics.removeSensor("topic." + topic + ".records-per-batch");
metrics.removeSensor("topic." + topic + ".bytes");
metrics.removeSensor("topic." + topic + ".compression-rate");
metrics.removeSensor("topic." + topic + ".record-retries");
metrics.removeSensor("topic." + topic + ".record-errors");
});
}
}

private static Metrics getInternalMetricsObject(final Producer<?, ?> producer) {
if (producer instanceof org.apache.kafka.clients.producer.KafkaProducer) {
try {
final Field field = org.apache.kafka.clients.producer.KafkaProducer.class.getDeclaredField("metrics");
field.setAccessible(true);
return (Metrics) field.get(producer);
} catch (final Exception e) {
LOG.warn("failed to get metrics object", e);
}
}
return null;
}
}
6 changes: 5 additions & 1 deletion clients/telemetry-kafka/pom.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2021 Contributors to the Eclipse Foundation
Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Expand Down Expand Up @@ -33,6 +33,10 @@
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-telemetry</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-notification</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client</artifactId>
Expand Down