Skip to content

Commit

Permalink
Add MetricsAlertGauge to handle throttle events for in flight message…
Browse files Browse the repository at this point in the history
… gauge

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Mar 4, 2022
1 parent 8abf9d5 commit dd08357
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.eclipse.ditto.connectivity.service.messaging.InboundMappingSink.ExternalMessageWithSender;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.CounterKey;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.MetricAlertRegistry;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
Expand All @@ -65,7 +67,6 @@
public abstract class BaseConsumerActor extends AbstractActorWithTimers {

private static final String TIMER_ACK_HANDLING = "connectivity_ack_handling";
private static final String COUNTER_ACK_HANDLING = "connectivity_ack_handling_counter";

protected final String sourceAddress;
protected final Source source;
Expand All @@ -76,6 +77,7 @@ public abstract class BaseConsumerActor extends AbstractActorWithTimers {
protected final ConnectivityStatusResolver connectivityStatusResolver;

private final Sink<Object, ?> inboundMappingSink;
private final ConnectivityConfig connectivityConfig;
private final AcknowledgementConfig acknowledgementConfig;

@Nullable private ResourceStatus resourceStatus;
Expand All @@ -95,6 +97,7 @@ protected BaseConsumerActor(final Connection connection,
this.connectivityStatusResolver = checkNotNull(connectivityStatusResolver, "connectivityStatusResolver");
resetResourceStatus();

this.connectivityConfig = connectivityConfig;
acknowledgementConfig = connectivityConfig.getAcknowledgementConfig();

inboundMonitor = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig)
Expand Down Expand Up @@ -135,7 +138,9 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.getName())
.start();
final var ackCounter = DittoMetrics.gauge(COUNTER_ACK_HANDLING)
final var ackCounter = MetricAlertRegistry.getMetricsAlertGaugeOrDefault(
CounterKey.of(connectionId, null, null, sourceAddress), MetricAlertRegistry.COUNTER_ACK_HANDLING,
connectionType, connectivityConfig)
.tag(TracingTags.CONNECTION_ID, connectionId.toString())
.tag(TracingTags.CONNECTION_TYPE, connectionType.toString())
.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.Objects;
import java.util.Optional;

import javax.annotation.Nullable;

Expand All @@ -25,11 +26,11 @@
/**
* Identifies a counter by connection id, metric direction, metric type and address.
*/
final class CounterKey {
public final class CounterKey {

private final ConnectionId connectionId;
private final MetricType metricType;
private final MetricDirection metricDirection;
@Nullable private final MetricType metricType;
@Nullable private final MetricDirection metricDirection;
private final String address;

/**
Expand All @@ -40,29 +41,35 @@ final class CounterKey {
* @param metricDirection the metricDirection
* @param address the address
*/
private CounterKey(final ConnectionId connectionId, final MetricType metricType,
final MetricDirection metricDirection, final String address) {
private CounterKey(final ConnectionId connectionId,
@Nullable final MetricType metricType,
@Nullable final MetricDirection metricDirection,
final String address) {

this.connectionId = checkNotNull(connectionId, "connectionId");
this.metricType = checkNotNull(metricType, "metricType");
this.metricDirection = checkNotNull(metricDirection, "metricDirection");
this.metricType = metricType;
this.metricDirection = metricDirection;
this.address = checkNotNull(address, "address");
}

static CounterKey of(final ConnectionId connectionId, final MetricType metricType,
final MetricDirection metricDirection, final String address) {
public static CounterKey of(final ConnectionId connectionId,
@Nullable final MetricType metricType,
@Nullable final MetricDirection metricDirection,
final String address) {

return new CounterKey(connectionId, metricType, metricDirection, address);
}

ConnectionId getConnectionId() {
return connectionId;
}

MetricType getMetricType() {
return metricType;
Optional<MetricType> getMetricType() {
return Optional.ofNullable(metricType);
}

MetricDirection getMetricDirection() {
return metricDirection;
Optional<MetricDirection> getMetricDirection() {
return Optional.ofNullable(metricDirection);
}

String getAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -26,13 +28,17 @@
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.KamonGauge;

import akka.japi.Pair;

/**
* Registry to keep track and update existing {@code MetricsAlerts}.
*/
final class MetricAlertRegistry {
public final class MetricAlertRegistry {

public static final String COUNTER_ACK_HANDLING = "connectivity_ack_handling_counter";

/**
* Defines which measurement window is used to detect throttling i.e. what is the maximum allowed messages per
Expand All @@ -50,19 +56,23 @@ final class MetricAlertRegistry {
Pair.apply(ConnectionType.AMQP_10, MetricsAlert.Key.CONSUMED_INBOUND), getThrottledAlert(),
Pair.apply(ConnectionType.KAFKA, MetricsAlert.Key.MAPPED_INBOUND), getThrottledAlert());

private static final Map<Pair<ConnectionType, String>, MetricsAlertFactory> GAUGE_ALERT_DEFINITIONS = Map.of(
Pair.apply(ConnectionType.AMQP_10, COUNTER_ACK_HANDLING), getThrottledAlert(),
Pair.apply(ConnectionType.KAFKA, COUNTER_ACK_HANDLING), getThrottledAlert());

private static final ConcurrentMap<Pair<ConnectionType, CounterKey>, MetricsAlert> ALERTS =
new ConcurrentHashMap<>();

private final Map<Pair<ConnectionType, MetricsAlert.Key>, MetricsAlertFactory> customAlerts = new HashMap<>();

private static MetricsAlertFactory getThrottledAlert() {
return (source, connectionType, config) -> {
return (source, connectionType, config, isGauge) -> {
// target counter is INBOUND + THROTTLED
final CounterKey target = CounterKey.of(source.getConnectionId(), MetricType.THROTTLED,
MetricDirection.INBOUND, source.getAddress());

return new ThrottledMetricsAlert(THROTTLING_DETECTION_WINDOW,
calculateThrottlingLimitFromConfig(connectionType, config),
calculateThrottlingLimitFromConfig(connectionType, config, isGauge),
() -> ConnectivityCounterRegistry.lookup(target));
};
}
Expand All @@ -76,32 +86,49 @@ private static MetricsAlertFactory getThrottledAlert() {
*/
void registerCustomAlert(final ConnectionType connectionType, final MetricsAlert.Key key,
final MetricsAlertFactory metricsAlertFactory) {

customAlerts.put(Pair.apply(connectionType, key), metricsAlertFactory);
}

@Nullable
MetricsAlert getAlert(final CounterKey counterKey, final ConnectionType connectionType,
final ConnectivityConfig connectivityConfig) {

checkNotNull(counterKey.getMetricDirection(), "metricDirection");
checkNotNull(counterKey.getMetricType(), "metricType");
return Optional.ofNullable(ALERTS.get(Pair.apply(connectionType, counterKey)))
.or(() -> MetricsAlert.Key.from(counterKey.getMetricDirection(), counterKey.getMetricType())
.or(() -> MetricsAlert.Key.from(counterKey.getMetricDirection().get(), counterKey.getMetricType().get())
.map(key -> Optional.ofNullable(ALERT_DEFINITIONS.get(Pair.apply(connectionType, key)))
.orElse(customAlerts.get(Pair.apply(connectionType, key))))
.map(creator -> ALERTS.computeIfAbsent(Pair.apply(connectionType, counterKey),
mk -> creator.create(counterKey, connectionType, connectivityConfig))))
mk -> creator.create(counterKey, connectionType, connectivityConfig, false))))
.orElse(null);
}


public static Gauge getMetricsAlertGaugeOrDefault(final CounterKey counterKey,
final String gaugeName,
final ConnectionType connectionType,
final ConnectivityConfig connectivityConfig) {

return Optional.ofNullable(GAUGE_ALERT_DEFINITIONS.get(Pair.apply(connectionType, gaugeName)))
.map(alertFactory -> alertFactory.create(counterKey, connectionType, connectivityConfig, true))
.map(alert -> MetricsAlertGauge.newGauge(gaugeName, alert, THROTTLING_DETECTION_WINDOW))
.orElse(KamonGauge.newGauge(gaugeName));
}

private static long calculateThrottlingLimitFromConfig(final ConnectionType connectionType,
final ConnectivityConfig config) {
final ConnectivityConfig config, final boolean isGauge) {

switch (connectionType) {
case AMQP_10:
final ConnectionThrottlingConfig amqp10ThrottlingConfig =
config.getConnectionConfig().getAmqp10Config().getConsumerConfig().getThrottlingConfig();
return perInterval(amqp10ThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution());
return perInterval(amqp10ThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution(), isGauge);
case KAFKA:
final ConnectionThrottlingConfig kafkaThrottlingConfig =
config.getConnectionConfig().getKafkaConfig().getConsumerConfig().getThrottlingConfig();
return perInterval(kafkaThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution());
return perInterval(kafkaThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution(), isGauge);
case AMQP_091:
case HTTP_PUSH:
case MQTT:
Expand All @@ -112,7 +139,19 @@ private static long calculateThrottlingLimitFromConfig(final ConnectionType conn
}
}

private static long perInterval(final ConnectionThrottlingConfig throttlingConfig, final Duration resolution) {
private static long perInterval(final ConnectionThrottlingConfig throttlingConfig, final Duration resolution,
final boolean isGauge) {

final long result;
if (isGauge) {
result = perIntervalForGauge(throttlingConfig);
} else {
result = perIntervalForConnectionMetric(throttlingConfig, resolution);
}
return result;
}

private static long perIntervalForConnectionMetric(final ConnectionThrottlingConfig throttlingConfig, final Duration resolution) {
final double tolerance = throttlingConfig.getThrottlingDetectionTolerance();
final Duration interval = throttlingConfig.getInterval();
// calculate factor to adjust the limit to the given resolution
Expand All @@ -124,4 +163,11 @@ private static long perInterval(final ConnectionThrottlingConfig throttlingConfi
return (long) (limitAdjustedToResolution * (1 - tolerance));
}

private static long perIntervalForGauge(final ConnectionThrottlingConfig throttlingConfig) {
// Return plain max inflight, since this is the single threshold determining in-flight throttling without
// influence of time window.
final double tolerance = throttlingConfig.getThrottlingDetectionTolerance();
return (long) (throttlingConfig.getMaxInFlight() * (1 - tolerance));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ public interface MetricsAlertFactory {
/**
* Creates a new instance of a metrics alert.
*
* @param counterKey the key that identifies a counter
* @param connectionType the connection type
* @param connectivityConfig the connectivity config
* @return the new metrics alert
* @param counterKey the key that identifies a counter.
* @param connectionType the connection type.
* @param connectivityConfig the connectivity config.
* @param isGauge whether the alert is a gauge or not.
* @return the new metrics alert.
*/
MetricsAlert create(final CounterKey counterKey, final ConnectionType connectionType,
final ConnectivityConfig connectivityConfig);
final ConnectivityConfig connectivityConfig, final boolean isGauge);
}
Loading

0 comments on commit dd08357

Please sign in to comment.