Skip to content

Commit

Permalink
add optional MetricsAlert to SlidingWindowCounter that triggers a giv…
Browse files Browse the repository at this point in the history
…en action in case a given condition is met, implement ThrottledMetricsAlert that detect if a metric has reached a given threshold, use the ThrottledMetricsAlert to monitor the consumed inbound counters and record threshold exceedance in the new THROTTLED metrics counter, fix DefaultConnectionThrottling config loading the corrct values from the given config

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Sep 23, 2021
1 parent 711a867 commit 8592249
Show file tree
Hide file tree
Showing 27 changed files with 845 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public enum MetricType {
/**
* Counts inbound/outbound messages that were acknowledged.
*/
ACKNOWLEDGED("acknowledged", MetricDirection.INBOUND, MetricDirection.OUTBOUND);
ACKNOWLEDGED("acknowledged", MetricDirection.INBOUND, MetricDirection.OUTBOUND),

/**
* Signals if the connection was throttled i.e. 1 = throttled, 0 not throttled.
*/
THROTTLED("throttled", MetricDirection.INBOUND);

private final String name;
private final List<MetricDirection> possibleMetricDirections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ final class DefaultConnectionThrottlingConfig implements ConnectionThrottlingCon
private final ThrottlingConfig throttlingConfig;
private final double maxInFlightFactor;

private DefaultConnectionThrottlingConfig(final ScopedConfig config) {
throttlingConfig = ThrottlingConfig.of(config);
private DefaultConnectionThrottlingConfig(final ScopedConfig config, final ThrottlingConfig throttlingConfig) {
maxInFlightFactor = config.getPositiveDoubleOrThrow(ConfigValue.MAX_IN_FLIGHT_FACTOR);
this.throttlingConfig = throttlingConfig;
if (maxInFlightFactor < 1.0) {
throw new DittoConfigError(MessageFormat.format(
"The double value at <{0}> must be >= 1.0 but it was <{1}>!",
Expand All @@ -45,8 +45,9 @@ private DefaultConnectionThrottlingConfig(final ScopedConfig config) {
}

static ConnectionThrottlingConfig of(final Config config) {
final ThrottlingConfig throttlingConfig = ThrottlingConfig.of(config);
return new DefaultConnectionThrottlingConfig(
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()));
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConfigValue.values()), throttlingConfig);
}

@Override
Expand All @@ -66,14 +67,10 @@ public double getMaxInFlightFactor() {

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DefaultConnectionThrottlingConfig)) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultConnectionThrottlingConfig that = (DefaultConnectionThrottlingConfig) o;
return maxInFlightFactor == that.maxInFlightFactor &&
return Double.compare(that.maxInFlightFactor, maxInFlightFactor) == 0 &&
Objects.equals(throttlingConfig, that.throttlingConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ protected BaseClientActor(final Connection connection,
connectionContext = DittoConnectionContext.of(connection, staticConnectivityConfig);

final var monitoringConfig = staticConnectivityConfig.getMonitoringConfig();
connectionCounterRegistry = ConnectivityCounterRegistry.newInstance();
connectionCounterRegistry = ConnectivityCounterRegistry.newInstance(staticConnectivityConfig);
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLoggerRegistry.initForConnection(connection);
connectionCounterRegistry.initForConnection(connection);
Expand Down Expand Up @@ -377,6 +377,7 @@ public void postStop() {

@Override
public void onConnectivityConfigModified(final ConnectivityConfig modifiedConfig) {
connectionCounterRegistry.onConnectivityConfigModified(connection, modifiedConfig);
final var modifiedContext = connectionContext.withConnectivityConfig(modifiedConfig);
if (hasInboundMapperConfigChanged(modifiedConfig)) {
logger.debug("Config changed for InboundMappingProcessor, recreating it.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ protected BaseConsumerActor(final Connection connection,

acknowledgementConfig = connectivityConfig.getAcknowledgementConfig();

inboundMonitor = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig())
inboundMonitor = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig)
.forInboundConsumed(connection, sourceAddress);

inboundAcknowledgedMonitor =
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig())
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig)
.forInboundAcknowledged(connection, sourceAddress);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected BasePublisherActor(final Connection connection,
final MonitoringLoggerConfig loggerConfig = monitoringConfig.logger();
this.connectionLogger = ConnectionLogger.getInstance(connection.getId(), loggerConfig);
this.connectivityStatusResolver = checkNotNull(connectivityStatusResolver, "connectivityStatusResolver");
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig);
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(connection);
responsePublishedMonitor = connectionMonitorRegistry.forResponsePublished(connection);
responseAcknowledgedMonitor = connectionMonitorRegistry.forResponseAcknowledged(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ private InboundDispatchingSink(
ConnectivityPlaceholders.newConnectionIdPlaceholder(),
connection.getId());

final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();

connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig);
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(connection);
toErrorResponseFunction = DittoRuntimeExceptionToErrorResponseFunction.of(limitsConfig.getHeadersMaxSize());
acknowledgementConfig = connectivityConfig.getConnectionConfig().getAcknowledgementConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,10 @@ private OutboundMappingProcessorActor(final ActorRef clientActor,
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());

final DittoConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(dittoScoped);
final MonitoringConfig monitoringConfig = connectivityConfig.getMonitoringConfig();
mappingConfig = connectivityConfig.getMappingConfig();
final LimitsConfig limitsConfig = DefaultLimitsConfig.of(dittoScoped);

connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(monitoringConfig);
connectionMonitorRegistry = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig);
responseDispatchedMonitor = connectionMonitorRegistry.forResponseDispatched(this.connection);
responseDroppedMonitor = connectionMonitorRegistry.forResponseDropped(this.connection);
responseMappedMonitor = connectionMonitorRegistry.forResponseMapped(this.connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static OutboundMappingSettings of(final ConnectionContext connectionContext,
messageMapperFactory.registryOf(DittoMessageMapper.CONTEXT, mappingDefinition);

final ConnectionMonitorRegistry<ConnectionMonitor> connectionMonitorRegistry =
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig());
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig);
final SignalFilter signalFilter = SignalFilter.of(connection, connectionMonitorRegistry);

final AcknowledgementConfig acknowledgementConfig = connectivityConfig.getAcknowledgementConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.ditto.connectivity.service.messaging.monitoring;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;

/**
* Registry that provides monitors for the different use cases inside a connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringConfig;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLoggerRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ConnectivityCounterRegistry;
Expand All @@ -44,15 +45,16 @@ private DefaultConnectionMonitorRegistry(final ConnectionLoggerRegistry connecti

/**
* Builds a new {@code DefaultConnectionMonitorRegistry} from a configuration.
* @param config the configuration to use.
* @param connectivityConfig the configuration to use.
* @return a new instance of {@code DefaultConnectionMonitorRegistry}.
* @throws java.lang.NullPointerException if {@code config} is null.
* @throws java.lang.NullPointerException if {@code connectivityConfig} is null.
*/
public static DefaultConnectionMonitorRegistry fromConfig(final MonitoringConfig config) {
checkNotNull(config);
public static DefaultConnectionMonitorRegistry fromConfig(final ConnectivityConfig connectivityConfig) {
checkNotNull(connectivityConfig);

final ConnectionLoggerRegistry loggerRegistry = ConnectionLoggerRegistry.fromConfig(config.logger());
final ConnectivityCounterRegistry counterRegistry = ConnectivityCounterRegistry.newInstance();
final ConnectionLoggerRegistry loggerRegistry =
ConnectionLoggerRegistry.fromConfig(connectivityConfig.getMonitoringConfig().logger());
final ConnectivityCounterRegistry counterRegistry = ConnectivityCounterRegistry.newInstance(connectivityConfig);

return new DefaultConnectionMonitorRegistry(loggerRegistry, counterRegistry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MonitoringLoggerConfig;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public interface ConnectionMetricsCounter {
*/
void recordFailure();

/**
* Record a successful operation.
*/
void recordSuccess(long ts);

/**
* Record a failed operation.
*/
void recordFailure(long ts);

/**
* @return the metricType of this collector.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.service.config.ThrottlingConfig;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;

/**
* Factory class for
* {@link org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ConnectionMetricsCounter}s.
*/
final class ConnectionMetricsCounterFactory {

private static final MeasurementWindow[] DEFAULT_WINDOWS =
{MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION,
MeasurementWindow.ONE_HOUR_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_DAY_WITH_ONE_HOUR_RESOLUTION};

/**
* Defines which measurement window is used to detect throttling i.e. what is the maximum allowed messages per
* interval. The throttling limits from ConnectivityConfig must be adjusted to the resolution of this window
* (see {@link #calculateThrottlingLimitFromConfig}).
*/
private static final MeasurementWindow THROTTLING_DETECTION_WINDOW =
MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION;

/**
* An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND. These
* alerts will be instantiated using the registered Creator and used with newly created SlidingWindowCounters.
*/
private static final Map<MetricsAlert.Key, AlertsCreator> alerts = Map.of(
MetricsAlert.Key.CONSUMED_INBOUND,
(connectionId, connectionType, address, config) -> {
final ConnectivityCounterRegistry.MapKey
target = new ConnectivityCounterRegistry.MapKey(connectionId, MetricType.THROTTLED,
MetricDirection.INBOUND, address);
return new ThrottledMetricsAlert(THROTTLING_DETECTION_WINDOW,
calculateThrottlingLimitFromConfig(connectionType, config),
() -> ConnectivityCounterRegistry.lookup(target));
}
);

private ConnectionMetricsCounterFactory() {}

static DefaultConnectionMetricsCounter create(
final MetricType metricType,
final MetricDirection metricDirection, final ConnectionId connectionId,
final ConnectionType connectionType,
final String address,
final Clock clock, final ConnectivityConfig connectivityConfig) {
final Counter metricsCounter = metricsCounter(connectionId, connectionType, metricType, metricDirection);
final SlidingWindowCounter counter;
if (MetricType.THROTTLED == metricType) {
counter = SlidingWindowCounter.newBuilder(metricsCounter).clock(clock)
// we need to record for every minute of the last 24h if throttling occurred
.recordingMeasurementWindows(MeasurementWindow.ONE_DAY_WITH_ONE_MINUTE_RESOLUTION)
// reporting windows are the same as for the other metrics (1m, 1h, 1d)
.reportingMeasurementWindows(MeasurementWindow.ONE_MINUTE_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_HOUR_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_DAY_WITH_ONE_MINUTE_RESOLUTION)
.maximumPerSlot(1L)
.build();
} else {
final MetricsAlert metricsAlert = resolveOptionalAlert(metricDirection, address, metricType, connectionId,
connectionType, connectivityConfig);
counter = SlidingWindowCounter.newBuilder(metricsCounter)
.clock(clock)
.metricsAlert(metricsAlert)
.measurementWindows(DEFAULT_WINDOWS)
.build();
}
return new DefaultConnectionMetricsCounter(metricDirection, address, metricType, counter);
}

private static Counter metricsCounter(final ConnectionId connectionId,
final ConnectionType connectionType,
final MetricType metricType,
final MetricDirection metricDirection) {

return DittoMetrics.counter("connection_messages")
.tag("id", connectionId.toString())
.tag("type", connectionType.getName())
.tag("category", metricType.getName())
.tag("direction", metricDirection.getName());
}

@Nullable
private static MetricsAlert resolveOptionalAlert(final MetricDirection metricDirection, final String address,
final MetricType metricType, final ConnectionId connectionId, final ConnectionType connectionType,
final ConnectivityConfig connectivityConfig) {
return MetricsAlert.Key.from(metricDirection, metricType)
.map(alerts::get)
.map(c -> c.create(connectionId, connectionType, address, connectivityConfig))
.orElse(null);
}

private static int calculateThrottlingLimitFromConfig(final ConnectionType connectionType,
final ConnectivityConfig config) {
switch (connectionType) {
case AMQP_10:
final ConnectionThrottlingConfig amqp10ThrottlingConfig =
config.getConnectionConfig().getAmqp10Config().getConsumerConfig().getThrottlingConfig();
return perInterval(amqp10ThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution());
case KAFKA:
final ConnectionThrottlingConfig kafkaThrottlingConfig =
config.getConnectionConfig().getKafkaConfig().getConsumerConfig().getThrottlingConfig();
return perInterval(kafkaThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution());
case MQTT:
case AMQP_091:
case HTTP_PUSH:
case MQTT_5:
default:
// effectively no limit
return Integer.MAX_VALUE;
}
}

private static int perInterval(final ThrottlingConfig throttlingConfig, final Duration resolution) {
final Duration interval = throttlingConfig.getInterval();
final float factor = (float) resolution.toMillis() / interval.toMillis();
final int limit = throttlingConfig.getLimit();
return Math.round(limit * factor);
}

/**
* Creator interface for MetricsAlerts which are stored in the map of existing {@link #alerts}.
*/
@FunctionalInterface
interface AlertsCreator {

/**
* Create a new instantiation of a metrics alert.
*
* @param connectionId the connection id
* @param connectionType the connection type
* @param address the address
* @param connectivityConfig the connectivity config
* @return the new metrics alert
*/
MetricsAlert create(final ConnectionId connectionId, final ConnectionType connectionType, final String address,
final ConnectivityConfig connectivityConfig);
}
}
Loading

0 comments on commit 8592249

Please sign in to comment.