Skip to content

Commit

Permalink
add @nullable annotation and extended some javadoc;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Nov 24, 2021
1 parent 193e164 commit 834e1be
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import org.slf4j.Logger;

/**
* This registry holds loggers for the connectivity service. The loggers are identified by the connection ID, a {@link
* org.eclipse.ditto.connectivity.model.LogType}, a {@link org.eclipse.ditto.connectivity.model.LogCategory} and an
* address. The public methods of this class should not throw exceptions since this can lead to crashing connections.
* This registry holds loggers for the connectivity service. The loggers are identified by the connection ID,
* a {@link org.eclipse.ditto.connectivity.model.LogType}, a {@link org.eclipse.ditto.connectivity.model.LogCategory}
* and an address. The public methods of this class should not throw exceptions since this can lead to crashing connections.
*/
public final class ConnectionLoggerRegistry implements ConnectionMonitorRegistry<ConnectionLogger> {

Expand Down Expand Up @@ -182,6 +182,7 @@ private List<LogEntry> restrictMaxLogEntriesLength(final List<LogEntry> original
currentSize = sizeWithNextEntry;
}
Collections.reverse(restrictedLogs);

return restrictedLogs;
}

Expand All @@ -195,6 +196,7 @@ static boolean isActiveForConnection(final ConnectionId connectionId) {
final boolean muted = streamLoggers(connectionId)
.filter(MuteableConnectionLogger.class::isInstance)
.anyMatch(logger -> ((MuteableConnectionLogger) logger).isMuted());

return !muted;
}

Expand All @@ -210,6 +212,7 @@ public static boolean isLoggingExpired(final ConnectionId connectionId, final In
if (enabledUntil == null || timestamp.isAfter(enabledUntil)) {
LOGGER.withMdcEntry(MDC_CONNECTION_ID, connectionId)
.debug("Logging for connection <{}> expired.", connectionId);

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.time.Clock;

import javax.annotation.Nullable;

import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
Expand All @@ -27,10 +29,11 @@
*/
final class ConnectionMetricsCounterFactory {

private static final MeasurementWindow[] DEFAULT_WINDOWS =
{MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION,
MeasurementWindow.ONE_HOUR_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_DAY_WITH_ONE_HOUR_RESOLUTION};
private static final MeasurementWindow[] DEFAULT_WINDOWS = {
MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION,
MeasurementWindow.ONE_HOUR_WITH_ONE_MINUTE_RESOLUTION,
MeasurementWindow.ONE_DAY_WITH_ONE_HOUR_RESOLUTION
};

/**
* Create a new instance of a DefaultConnectionMetricsCounter from the given arguments.
Expand All @@ -41,7 +44,7 @@ final class ConnectionMetricsCounterFactory {
* @param connectionType the connection type required to build a metrics counter
* @param address the monitored address
* @param clock the clock to be used
* @param connectivityConfig the connectivity config required to read throttling limits from the config
* @param metricsAlert the metricsAlert
*/
static DefaultConnectionMetricsCounter create(
final MetricType metricType,
Expand All @@ -50,7 +53,7 @@ static DefaultConnectionMetricsCounter create(
final ConnectionType connectionType,
final String address,
final Clock clock,
final MetricsAlert metricsAlert) {
@Nullable final MetricsAlert metricsAlert) {
final Counter metricsCounter = metricsCounter(connectionId, connectionType, metricType, metricDirection);
final SlidingWindowCounter counter;
if (MetricType.THROTTLED == metricType) {
Expand All @@ -73,6 +76,7 @@ static DefaultConnectionMetricsCounter create(
.measurementWindows(DEFAULT_WINDOWS)
.build();
}

return new DefaultConnectionMetricsCounter(metricDirection, address, metricType, counter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.eclipse.ditto.connectivity.service.messaging.monitoring.ConnectionMonitorRegistry;

/**
* This registry holds counters for the connectivity service. The counters are identified by the connection id, a {@link
* MetricType}, a {@link MetricDirection} and an address.
* This registry holds counters for the connectivity service. The counters are identified by the connection id,
* a {@link MetricType}, a {@link MetricDirection} and an address.
*/
public final class ConnectivityCounterRegistry implements ConnectionMonitorRegistry<ConnectionMetricsCounter> {

Expand Down Expand Up @@ -146,6 +146,7 @@ public ConnectionMetricsCounter getCounter(
final String address) {
final ConnectionId connectionId = connection.getId();
final ConnectionType connectionType = connection.getConnectionType();

return getCounter(CLOCK_UTC, connectionId, connectionType, metricType, metricDirection, address);
}

Expand All @@ -168,6 +169,7 @@ ConnectionMetricsCounter getCounter(
final String address) {
final CounterKey key = CounterKey.of(connectionId, metricType, metricDirection, address);
final MetricsAlert alert = alertRegistry.getAlert(key, connectionType, connectivityConfig);

return counters.computeIfAbsent(key,
m -> ConnectionMetricsCounterFactory.create(metricType, metricDirection, connectionId, connectionType,
address, clock, alert));
Expand Down Expand Up @@ -255,7 +257,6 @@ public ConnectionMetricsCounter forResponseAcknowledged(final Connection connect

private static Stream<DefaultConnectionMetricsCounter> streamFor(final ConnectionId connectionId,
final MetricDirection metricDirection) {

return counters.entrySet()
.stream()
.filter(e -> e.getKey().getConnectionId().equals(connectionId))
Expand All @@ -276,6 +277,7 @@ private static Map<String, AddressMetric> aggregateMetrics(final ConnectionId co
? ConnectivityModelFactory.newAddressMetric(metric, measurements)
: ConnectivityModelFactory.newAddressMetric(measurements);
}));

return addressMetrics;
}

Expand Down Expand Up @@ -353,6 +355,7 @@ public ConnectionMetrics aggregateConnectionMetrics(
final SourceMetrics sourceMetrics, final TargetMetrics targetMetrics) {
final AddressMetric fromSources = mergeAllMetrics(sourceMetrics.getAddressMetrics().values());
final AddressMetric fromTargets = mergeAllMetrics(targetMetrics.getAddressMetrics().values());

return ConnectivityModelFactory.newConnectionMetrics(fromSources, fromTargets);
}

Expand All @@ -361,6 +364,7 @@ private static AddressMetric mergeAllMetrics(final Collection<AddressMetric> met
for (AddressMetric metric : metrics) {
result = mergeAddressMetric(result, metric);
}

return result;
}

Expand All @@ -386,6 +390,7 @@ merged, latest(
measurementB.getLastMessageAt().orElse(null)
));
}));

return ConnectivityModelFactory.newAddressMetric(new HashSet<>(result.values()));
}

Expand All @@ -401,6 +406,7 @@ public static Map<String, AddressMetric> mergeAddressMetricMap(final Map<String,
final Map<String, AddressMetric> second) {
final Map<String, AddressMetric> result = new HashMap<>(first);
second.forEach((k, v) -> result.merge(k, v, ConnectivityCounterRegistry::mergeAddressMetric));

return result;
}

Expand All @@ -414,6 +420,7 @@ private static Map<Duration, Long> mergeMeasurements(final Map<Duration, Long> m
final Map<Duration, Long> measurementB) {
final Map<Duration, Long> result = new HashMap<>(measurementA);
measurementB.forEach((k, v) -> result.merge(k, v, Long::sum));

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ public void triggerAction(final long timestamp, final long value) {
metricsAlert.triggerAction(timestamp, value);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ final class MetricAlertRegistry {
MeasurementWindow.ONE_MINUTE_WITH_TEN_SECONDS_RESOLUTION;

/**
* An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND. These
* alerts will be instantiated using the registered Creator and passed to created SlidingWindowCounters.
* An alert can be registered for a combination of MetricType and MetricDirection e.g. CONSUMED + INBOUND.
* These alerts will be instantiated using the registered Creator and passed to created SlidingWindowCounters.
*/
private static final Map<MetricsAlert.Key, MetricsAlertFactory> alertDefinitions = Map.of(
MetricsAlert.Key.CONSUMED_INBOUND,
(source, connectionType, config) -> {
// target counter is INBOUND + THROTTLED
final CounterKey target = CounterKey.of(source.getConnectionId(), MetricType.THROTTLED,
MetricDirection.INBOUND, source.getAddress());

return new ThrottledMetricsAlert(THROTTLING_DETECTION_WINDOW,
calculateThrottlingLimitFromConfig(connectionType, config),
() -> ConnectivityCounterRegistry.lookup(target));
}
);

private static final ConcurrentMap<CounterKey, MetricsAlert> alerts = new ConcurrentHashMap<>();

private final Map<MetricsAlert.Key, MetricsAlertFactory> customAlerts = new EnumMap<>(MetricsAlert.Key.class);
Expand Down Expand Up @@ -91,9 +93,9 @@ private static long calculateThrottlingLimitFromConfig(final ConnectionType conn
final ConnectionThrottlingConfig kafkaThrottlingConfig =
config.getConnectionConfig().getKafkaConfig().getConsumerConfig().getThrottlingConfig();
return perInterval(kafkaThrottlingConfig, THROTTLING_DETECTION_WINDOW.getResolution());
case MQTT:
case AMQP_091:
case HTTP_PUSH:
case MQTT:
case MQTT_5:
default:
// effectively no limit
Expand All @@ -108,6 +110,7 @@ private static long perInterval(final ConnectionThrottlingConfig throttlingConfi
final double factor = (double) resolution.toMillis() / interval.toMillis();
final int limit = throttlingConfig.getLimit();
final double limitAdjustedToResolution = limit * factor;

// apply the configured tolerance to the resulting limit
return (long) (limitAdjustedToResolution * (1 - tolerance));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public final class ThrottledLoggerMetricsAlert implements MetricsAlert {
private final CounterKey counterKey;

/**
* Returns an instance of {@code MetricsAlertFactory}.
*
* @param loggerSupplier the supplier to retrieve the logger to write the throttled log entry
* @return the {@code MetricsAlertFactory} that creates new ThrottledLoggerMetricsAlert instances
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ public String toString() {
", targetMeasurementWindow=" + targetMeasurementWindow +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,30 @@ public void testThrottlingCounterRegistered() {
recordInboundMessageAndCheckThrottled(counterRegistry, connection, consumedInbound, true);
}

@Test
public void verifyConnectionLogIsAddedWhenSourceIsThrottled() {
final Connection connection = TestConstants.createConnection();
final ConnectionLogger logger = mock(ConnectionLogger.class);

final ConnectivityCounterRegistry counterRegistry =
ConnectivityCounterRegistry.newInstance(mockConnectivityConfig(1, Duration.ofSeconds(10)));
counterRegistry.registerAlertFactory(THROTTLED, INBOUND,
ThrottledLoggerMetricsAlert.getFactory(address -> logger));
counterRegistry.initForConnection(connection);

final ConnectionMetricsCounter consumedInbound =
counterRegistry.getCounter(connection, CONSUMED, INBOUND, TestConstants.Sources.AMQP_SOURCE_ADDRESS);

recordInboundMessageAndCheckThrottled(counterRegistry, connection, consumedInbound, true);

Mockito.verify(logger)
.failure(ArgumentMatchers.argThat((ArgumentMatcher<String>) argument -> {
assertThat(argument).contains("Throttling event occurred", "throttled", "inbound",
TestConstants.Sources.AMQP_SOURCE_ADDRESS);
return true;
}));
}

private void recordInboundMessageAndCheckThrottled(final ConnectivityCounterRegistry counterRegistry,
final Connection connection,
final ConnectionMetricsCounter consumedInbound, final boolean expectThrottled) {
Expand Down Expand Up @@ -257,31 +281,8 @@ private static ConnectivityConfig mockConnectivityConfig(final int limit, final
when(consumerConfig.getThrottlingConfig()).thenReturn(throttlingConfig);
when(throttlingConfig.getLimit()).thenReturn(limit);
when(throttlingConfig.getInterval()).thenReturn(interval);

return connectivityConfig;
}

@Test
public void verifyConnectionLogIsAddedWhenSourceIsThrottled() {
final Connection connection = TestConstants.createConnection();
final ConnectionLogger logger = mock(ConnectionLogger.class);

final ConnectivityCounterRegistry counterRegistry =
ConnectivityCounterRegistry.newInstance(mockConnectivityConfig(1,
Duration.ofSeconds(10)));
counterRegistry.registerAlertFactory(THROTTLED, INBOUND,
ThrottledLoggerMetricsAlert.getFactory(address -> logger));
counterRegistry.initForConnection(connection);

final ConnectionMetricsCounter consumedInbound =
counterRegistry.getCounter(connection, CONSUMED, INBOUND, TestConstants.Sources.AMQP_SOURCE_ADDRESS);

recordInboundMessageAndCheckThrottled(counterRegistry, connection, consumedInbound, true);

Mockito.verify(logger)
.failure(ArgumentMatchers.argThat((ArgumentMatcher<String>) argument -> {
assertThat(argument).contains("Throttling event occurred", "throttled", "inbound",
TestConstants.Sources.AMQP_SOURCE_ADDRESS);
return true;
}));
}
}

0 comments on commit 834e1be

Please sign in to comment.