Skip to content

Commit

Permalink
Make connection metric alerting connection type dependant
Browse files Browse the repository at this point in the history
In effort to activate throttling metric alerting for Kafka after mapping the metric alerting now matches the connectionType of the corresponding connection. This enables the differentiation i.e. between AMQP and Kafka throttle alerting.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Dec 20, 2021
1 parent 04a63f6 commit 69f4fed
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.FilteredTopic;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.SshTunnel;
Expand Down Expand Up @@ -103,7 +101,6 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLoggerRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ConnectivityCounterRegistry;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ThrottledLoggerMetricsAlert;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelActor;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
Expand Down Expand Up @@ -190,8 +187,8 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta
private final int subscriptionIdPrefixLength;
private final String actorUUID;
private final ProtocolAdapter protocolAdapter;
private final ConnectivityCounterRegistry connectionCounterRegistry;
private final ConnectionLoggerRegistry connectionLoggerRegistry;
protected final ConnectivityCounterRegistry connectionCounterRegistry;
protected final ConnectionLoggerRegistry connectionLoggerRegistry;
protected final ConnectionLogger connectionLogger;
protected final ConnectivityStatusResolver connectivityStatusResolver;
private final boolean dryRun;
Expand All @@ -215,7 +212,7 @@ protected BaseClientActor(final Connection connection,
final ActorSystem system = getContext().getSystem();
final Config config = system.settings().config();
final Config withOverwrites = connectivityConfigOverwrites.withFallback(config);
this.connectivityConfig = ConnectivityConfig.of(withOverwrites);
connectivityConfig = ConnectivityConfig.of(withOverwrites);
this.connectionActor = connectionActor;
// this is retrieve via the extension for each baseClientActor in order to not pass it as constructor arg
// as all constructor arguments need to be serializable as the BaseClientActor is started behind a cluster
Expand Down Expand Up @@ -243,9 +240,6 @@ protected BaseClientActor(final Connection connection,
connectionLoggerRegistry = ConnectionLoggerRegistry.fromConfig(monitoringConfig.logger());
connectionLoggerRegistry.initForConnection(connection);
connectionLogger = connectionLoggerRegistry.forConnection(connection.getId());
connectionCounterRegistry.registerAlertFactory(MetricType.THROTTLED, MetricDirection.INBOUND,
ThrottledLoggerMetricsAlert.getFactory(
address -> connectionLoggerRegistry.forInboundThrottled(connection, address)));
connectionCounterRegistry.initForConnection(connection);
clientGauge = DittoMetrics.gauge("connection_client")
.tag("id", connectionId.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
Expand All @@ -65,6 +68,7 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.RecoverSession;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ThrottledLoggerMetricsAlert;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand Down Expand Up @@ -133,6 +137,11 @@ private AmqpClientActor(final Connection connection,
recoverSessionOnConnectionRestored = isRecoverSessionOnConnectionRestoredEnabled(connection);
clientAskTimeout = connectionConfig.getClientActorAskTimeout();
initialConsumerResourceStatusAskTimeout = amqp10Config.getInitialConsumerStatusAskTimeout();

connectionCounterRegistry.registerAlertFactory(ConnectionType.AMQP_10, MetricType.THROTTLED,
MetricDirection.INBOUND,
ThrottledLoggerMetricsAlert.getFactory(
address -> connectionLoggerRegistry.forInboundThrottled(connection, address)));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
import org.eclipse.ditto.connectivity.model.MetricType;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
Expand All @@ -40,6 +42,7 @@
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics.ThrottledLoggerMetricsAlert;
import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey;

import com.typesafe.config.Config;
Expand Down Expand Up @@ -88,6 +91,11 @@ private KafkaClientActor(final Connection connection,

this(connection, proxyActor, connectionActor, DefaultKafkaPublisherActorFactory.getInstance(), dittoHeaders,
connectivityConfigOverwrites);

connectionCounterRegistry.registerAlertFactory(ConnectionType.KAFKA, MetricType.THROTTLED,
MetricDirection.INBOUND,
ThrottledLoggerMetricsAlert.getFactory(
address -> connectionLoggerRegistry.forInboundThrottled(connection, address)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ public void resetForConnection(final Connection connection) {
/**
* Register a custom alert for the given parameters that is created by the given alert factory.
*/
public void registerAlertFactory(final MetricType metricType,
public void registerAlertFactory(final ConnectionType connectionType, final MetricType metricType,
final MetricDirection metricDirection, final MetricsAlertFactory factory) {

MetricsAlert.Key.from(metricDirection, metricType)
.ifPresent(alertKey -> alertRegistry.registerCustomAlert(alertKey, factory));
.ifPresent(alertKey -> alertRegistry.registerCustomAlert(connectionType, alertKey, factory));
}

private void initCounter(final Connection connection, final MetricDirection metricDirection, final String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.monitoring.metrics;

import java.time.Duration;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -27,6 +27,8 @@
import org.eclipse.ditto.connectivity.service.config.ConnectionThrottlingConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;

import akka.japi.Pair;

/**
* Registry to keep track and update existing {@code MetricsAlerts}.
*/
Expand All @@ -44,40 +46,47 @@ final class MetricAlertRegistry {
* An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND.
* These alerts will be instantiated using the registered Creator and passed to created SlidingWindowCounters.
*/
private static final Map<MetricsAlert.Key, MetricsAlertFactory> alertDefinitions = Map.of(
MetricsAlert.Key.CONSUMED_INBOUND,
(source, connectionType, config) -> {
// target counter is INBOUND + THROTTLED
final CounterKey target = CounterKey.of(source.getConnectionId(), MetricType.THROTTLED,
MetricDirection.INBOUND, source.getAddress());
private static final Map<Pair<ConnectionType, MetricsAlert.Key>, MetricsAlertFactory> ALERT_DEFINITIONS = Map.of(
Pair.apply(ConnectionType.AMQP_10, MetricsAlert.Key.CONSUMED_INBOUND), getThrottledAlert(),
Pair.apply(ConnectionType.KAFKA, MetricsAlert.Key.MAPPED_INBOUND), getThrottledAlert());

private static final ConcurrentMap<Pair<ConnectionType, CounterKey>, MetricsAlert> ALERTS =
new ConcurrentHashMap<>();

return new ThrottledMetricsAlert(THROTTLING_DETECTION_WINDOW,
calculateThrottlingLimitFromConfig(connectionType, config),
() -> ConnectivityCounterRegistry.lookup(target));
}
);
private final Map<Pair<ConnectionType, MetricsAlert.Key>, MetricsAlertFactory> customAlerts = new HashMap<>();

private static final ConcurrentMap<CounterKey, MetricsAlert> alerts = new ConcurrentHashMap<>();
private static MetricsAlertFactory getThrottledAlert() {
return (source, connectionType, config) -> {
// target counter is INBOUND + THROTTLED
final CounterKey target = CounterKey.of(source.getConnectionId(), MetricType.THROTTLED,
MetricDirection.INBOUND, source.getAddress());

private final Map<MetricsAlert.Key, MetricsAlertFactory> customAlerts = new EnumMap<>(MetricsAlert.Key.class);
return new ThrottledMetricsAlert(THROTTLING_DETECTION_WINDOW,
calculateThrottlingLimitFromConfig(connectionType, config),
() -> ConnectivityCounterRegistry.lookup(target));
};
}

/**
* Registers an alert with a custom MetricsAlertFactory.
*
* @param connectionType the type of the connection the alert is applicable for.
* @param key the alert key
* @param metricsAlertFactory the factory used to instantiate the alert
*/
void registerCustomAlert(final MetricsAlert.Key key, final MetricsAlertFactory metricsAlertFactory) {
customAlerts.put(key, metricsAlertFactory);
void registerCustomAlert(final ConnectionType connectionType, final MetricsAlert.Key key,
final MetricsAlertFactory metricsAlertFactory) {
customAlerts.put(Pair.apply(connectionType, key), metricsAlertFactory);
}

@Nullable
MetricsAlert getAlert(final CounterKey counterKey, final ConnectionType connectionType,
final ConnectivityConfig connectivityConfig) {
return Optional.ofNullable(alerts.get(counterKey))
return Optional.ofNullable(ALERTS.get(Pair.apply(connectionType, counterKey)))
.or(() -> MetricsAlert.Key.from(counterKey.getMetricDirection(), counterKey.getMetricType())
.map(key -> Optional.ofNullable(alertDefinitions.get(key)).orElse(customAlerts.get(key)))
.map(creator -> alerts.computeIfAbsent(counterKey,
.map(key -> Optional.ofNullable(ALERT_DEFINITIONS.get(Pair.apply(connectionType, key)))
.orElse(customAlerts.get(Pair.apply(connectionType, key))))
.map(creator -> ALERTS.computeIfAbsent(Pair.apply(connectionType, counterKey),
mk -> creator.create(counterKey, connectionType, connectivityConfig))))
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ interface MetricsAlert {
enum Key {

CONSUMED_INBOUND(MetricDirection.INBOUND, MetricType.CONSUMED),
MAPPED_INBOUND(MetricDirection.INBOUND, MetricType.MAPPED),
THROTTLED_INBOUND(MetricDirection.INBOUND, MetricType.THROTTLED);

private final MetricDirection metricDirection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void verifyConnectionLogIsAddedWhenSourceIsThrottled() {

final ConnectivityCounterRegistry counterRegistry =
ConnectivityCounterRegistry.newInstance(mockConnectivityConfig(1, Duration.ofSeconds(10)));
counterRegistry.registerAlertFactory(THROTTLED, INBOUND,
counterRegistry.registerAlertFactory(CONNECTION_TYPE, THROTTLED, INBOUND,
ThrottledLoggerMetricsAlert.getFactory(address -> logger));
counterRegistry.initForConnection(connection);

Expand Down

0 comments on commit 69f4fed

Please sign in to comment.