Skip to content

Commit

Permalink
Reworked tagging to be more correct.
Browse files Browse the repository at this point in the history
* More correct means that tag keys and values now are enforced to be non-null and non-blank.
* Introduced package 'tag' in ditto-internal-utils-metrics for custom Ditto tag types.
* Introduced `TagSet` which is meant to replace `Map<String, String>` for tags. Working with `Map` is more cumbersome.
* Replaced `SpanTags` by `SpanTagKey` which allows to create `Tag`s for well-known keys in a typesafe manner.
* Renamed `TraceUriGenerator` to `TraceInformationGenerator` to reflect its actual purpose.
* Reduced API of `StoppedTimer`, `TraceInformation`, `TaggableMetricsInstrument` and `TaggedMetricInstrument` to the actually required functionality.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Oct 20, 2022
1 parent 2ee61dd commit 0323c01
Show file tree
Hide file tree
Showing 55 changed files with 1,937 additions and 1,019 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTags;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;

import akka.NotUsed;
import akka.actor.AbstractActorWithTimers;
Expand Down Expand Up @@ -132,16 +133,16 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
final ActorRef responseCollector) {

final var ackTimer = DittoMetrics.timer(TIMER_ACK_HANDLING)
.tag(SpanTags.CONNECTION_ID, connectionId.toString())
.tag(SpanTags.CONNECTION_TYPE, connectionType.getName())
.tag(SpanTagKey.CONNECTION_ID.getTagForValue(connectionId.toString()))
.tag(SpanTagKey.CONNECTION_TYPE.getTagForValue(connectionType.getName()))
.start();
final var ackCounter =
MetricAlertRegistry.getMetricsAlertGaugeOrDefault(CounterKey.of(connectionId, sourceAddress),
MetricAlertRegistry.COUNTER_ACK_HANDLING,
connectionType,
connectivityConfig)
.tag(SpanTags.CONNECTION_ID, connectionId.toString())
.tag(SpanTags.CONNECTION_TYPE, connectionType.toString())
.tag(SpanTagKey.CONNECTION_ID.getTagForValue(connectionId.toString()))
.tag(SpanTagKey.CONNECTION_TYPE.getTagForValue(connectionType.toString()))
.increment();

final var acknowledgeableExternalMessage = acknowledgeableMessage.getMessage();
Expand All @@ -166,7 +167,8 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
if (output != null) {
final List<CommandResponse<?>> failedResponses = output.getFailedResponses();
if (output.allExpectedResponsesArrived() && failedResponses.isEmpty()) {
ackTimer.tag(SpanTags.ACK_SUCCESS, true).stop();
ackTimer.tag(getAckSuccessTag(true));
ackTimer.stop();
ackCounter.decrement();
acknowledgeableMessage.settle();
} else {
Expand All @@ -175,9 +177,9 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
someFailedResponseRequiresRedelivery(failedResponses);
log().debug("Rejecting [redeliver={}] due to failed responses <{}>. " +
"ResponseCollector=<{}>", shouldRedeliver, failedResponses, responseCollector);
ackTimer.tag(SpanTags.ACK_SUCCESS, false)
.tag(SpanTags.ACK_REDELIVER, shouldRedeliver)
.stop();
ackTimer.tag(getAckSuccessTag(false));
ackTimer.tag(getAckRedeliverTag(shouldRedeliver));
ackTimer.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(shouldRedeliver);
}
Expand All @@ -189,25 +191,26 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>. " +
"ResponseCollector=<{}>", rootCause, responseCollector);
ackTimer.tag(SpanTags.ACK_SUCCESS, false)
.tag(SpanTags.ACK_REDELIVER, true)
.stop();
ackTimer.tag(getAckSuccessTag(false));
ackTimer.tag(getAckRedeliverTag(true));
ackTimer.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(true);
return null;
});
if (dittoRuntimeException != null) {
if (isConsideredSuccess(dittoRuntimeException)) {
ackTimer.tag(SpanTags.ACK_SUCCESS, true).stop();
ackTimer.tag(getAckSuccessTag(true));
ackTimer.stop();
ackCounter.decrement();
acknowledgeableMessage.settle();
} else {
final var shouldRedeliver = requiresRedelivery(dittoRuntimeException.getHttpStatus());
log().debug("Rejecting [redeliver={}] due to error <{}>. ResponseCollector=<{}>",
shouldRedeliver, dittoRuntimeException, responseCollector);
ackTimer.tag(SpanTags.ACK_SUCCESS, false)
.tag(SpanTags.ACK_REDELIVER, shouldRedeliver)
.stop();
ackTimer.tag(getAckSuccessTag(false));
ackTimer.tag(getAckRedeliverTag(shouldRedeliver));
ackTimer.stop();
ackCounter.decrement();
acknowledgeableMessage.reject(shouldRedeliver);
}
Expand All @@ -220,6 +223,14 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable
});
}

private static Tag getAckSuccessTag(final boolean success) {
return Tag.of("ditto.ack.success", success);
}

private static Tag getAckRedeliverTag(final boolean success) {
return Tag.of("ditto.ack.redeliver", success);
}

private ExternalMessage addSourceAndReplyTarget(final ExternalMessage message) {
return ExternalMessageFactory.newExternalMessageBuilder(message)
.withSource(source)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTags;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.ExpressionResolver;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
Expand Down Expand Up @@ -476,7 +476,7 @@ private SendingOrDropped publishToGenericTarget(final ExpressionResolver resolve
SpanOperationName.of(connection.getConnectionType() + "_publish")
)
.connectionId(connection.getId())
.tag(SpanTags.CONNECTION_TYPE, connection.getConnectionType().toString())
.tag(SpanTagKey.CONNECTION_TYPE.getTagForValue(connection.getConnectionType()))
.start();
final var mappedMessageWithTraceContext =
mappedMessage.withHeaders(startedSpan.propagateContext(mappedMessage.getHeaders()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTags;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
import org.eclipse.ditto.protocol.Adaptable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -83,16 +84,20 @@ private static StartedTimer startNewTimer(
final CharSequence messageDirection
) {
return DittoMetrics.timer(TIMER_NAME)
.tag(SpanTags.CONNECTION_ID, connectionId.toString())
.tag(SpanTags.CONNECTION_TYPE, connectionType.getName())
.tag(SpanTagKey.CONNECTION_ID.getTagForValue(connectionId))
.tag(SpanTagKey.CONNECTION_TYPE.getTagForValue(connectionType.getName()))
.tag(DIRECTION_TAG_NAME, messageDirection.toString())
.onExpiration(expiredTimer -> {
LOGGER.warn("Mapping timer expired. This should not happen. Timer: <{}>", expiredTimer);
expiredTimer.tag(SpanTags.MAPPING_SUCCESS, false);
expiredTimer.tag(getMappingSuccessTag(false));
})
.start();
}

private static Tag getMappingSuccessTag(final boolean success) {
return Tag.of(SpanTagKey.KEY_PREFIX + "mapping.success", success);
}

/**
* @param connectionId ID of the connection.
* @param connectionType the type of the connection.
Expand Down Expand Up @@ -210,10 +215,12 @@ <T> T protocol(final Supplier<T> supplier) {
private static <T> T timed(final StartedTimer startedTimer, final Supplier<T> supplier) {
try {
final var result = supplier.get();
startedTimer.tag(SpanTags.MAPPING_SUCCESS, true).stop();
startedTimer.tag(getMappingSuccessTag(true));
startedTimer.stop();
return result;
} catch (final Exception ex) {
startedTimer.tag(SpanTags.MAPPING_SUCCESS, false).stop();
startedTimer.tag(getMappingSuccessTag(false));
startedTimer.stop();
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Instant;
import java.util.Map;
import java.util.Optional;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag;
import org.eclipse.ditto.internal.utils.metrics.instruments.tag.TagSet;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.KamonGauge;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,25 +96,19 @@ public Long get() {
}

@Override
public Gauge tag(final String key, final String value) {
final var taggedDelegee = delegee.tag(key, value);
return new MetricsAlertGauge(taggedDelegee, metricsAlert, measurementWindow);
public MetricsAlertGauge tag(final Tag tag) {
return new MetricsAlertGauge(delegee.tag(tag), metricsAlert, measurementWindow);
}

@Override
public Gauge tags(final Map<String, String> tags) {
public MetricsAlertGauge tags(final TagSet tags) {
final var taggedDelegee = delegee.tags(tags);
return new MetricsAlertGauge(taggedDelegee, metricsAlert, measurementWindow);
}

@Override
public Optional<String> getTag(final String key) {
return delegee.getTag(key);
}

@Override
public Map<String, String> getTags() {
return delegee.getTags();
public TagSet getTagSet() {
return delegee.getTagSet();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import java.time.Duration;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.gateway.api.GatewayServiceUnavailableException;
Expand All @@ -28,7 +32,7 @@
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StoppedTimer;
import org.eclipse.ditto.internal.utils.tracing.TraceUtils;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTags;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.slf4j.Logger;

import akka.http.javadsl.model.ContentTypes;
Expand Down Expand Up @@ -83,7 +87,11 @@ public Route handleRequestTimeout(final CharSequence correlationId, final Suppli
final Supplier<Route> innerWithTimer = () -> Directives.mapResponse(response -> {
final int statusCode = response.status().intValue();
if (timer.isRunning()) {
final StoppedTimer stoppedTimer = timer.tag(SpanTags.STATUS_CODE, statusCode).stop();
@Nullable final var httpStatus = tryToGetResponseHttpStatus(response, logger);
if (null != httpStatus) {
timer.tag(SpanTagKey.HTTP_STATUS.getTagForValue(httpStatus));
}
final var stoppedTimer = timer.stop();
logger.debug("Finished timer <{}> with status <{}>.", timer, statusCode);
checkDurationWarning(stoppedTimer, logger);
}
Expand All @@ -96,21 +104,49 @@ public Route handleRequestTimeout(final CharSequence correlationId, final Suppli
));
}

@Nullable
private static HttpStatus tryToGetResponseHttpStatus(
final HttpResponse httpResponse,
final ThreadSafeDittoLogger logger
) {
try {
return getResponseHttpStatus(httpResponse);
} catch (final HttpStatusCodeOutOfRangeException e) {
logger.info("Failed to get {} for HTTP response: {}", HttpStatus.class.getSimpleName(), e.getMessage());
return null;
}
}

private static HttpStatus getResponseHttpStatus(final HttpResponse httpResponse)
throws HttpStatusCodeOutOfRangeException {

final var statusCode = httpResponse.status();
return HttpStatus.getInstance(statusCode.intValue());
}

private static void checkDurationWarning(final StoppedTimer mutableTimer, final Logger logger) {
final Duration duration = mutableTimer.getDuration();
final String requestPath = mutableTimer.getTag(SpanTags.REQUEST_PATH);
final var duration = mutableTimer.getDuration();

if (requestPath != null && requestPath.contains("/search/things")) {
if (isThingsSearchRequest(mutableTimer)) {
if (SEARCH_WARN_TIMEOUT_MS.minus(duration).isNegative()) {
logger.warn("Encountered slow search which took over <{}> ms: <{}> ms!", SEARCH_WARN_TIMEOUT_MS.toMillis(),
logger.warn("Encountered slow search which took over <{}> ms: <{}> ms!",
SEARCH_WARN_TIMEOUT_MS.toMillis(),
duration.toMillis());
}
} else if (HTTP_WARN_TIMEOUT_MS.minus(duration).isNegative()) {
logger.warn("Encountered slow HTTP request which took over <{}> ms: <{}> ms!",
HTTP_WARN_TIMEOUT_MS.toMillis(), duration.toMillis());
HTTP_WARN_TIMEOUT_MS.toMillis(),
duration.toMillis());
}
}

private static boolean isThingsSearchRequest(final StoppedTimer stoppedTimer) {
final var stoppedTimerTagSet = stoppedTimer.getTagSet();
return stoppedTimerTagSet.getTagValue(SpanTagKey.REQUEST_URI.toString())
.filter(requestUriString -> requestUriString.contains("/search/things"))
.isPresent();
}

private HttpResponse doHandleRequestTimeout(final CharSequence correlationId,
final RequestContext requestContext,
final StartedTimer timer,
Expand All @@ -124,20 +160,21 @@ private HttpResponse doHandleRequestTimeout(final CharSequence correlationId,

/* We have to log and create a trace here because the RequestResultLoggingDirective won't be called by akka
in case of a timeout */
final int statusCode = cre.getHttpStatus().getCode();
final var httpStatus = cre.getHttpStatus();

final String requestMethod = request.method().name();
final String requestUri = request.getUri().toRelative().toString();
logger.warn("Request <{} {}> timed out after <{}>!", requestMethod, requestUri,
httpConfig.getRequestTimeout());
logger.info("Status code of request <{} {}> was <{}>.", requestMethod, requestUri, statusCode);
logger.info("Status code of request <{} {}> was <{}>.", requestMethod, requestUri, httpStatus.getCode());
final String rawRequestUri = getRawRequestUri(request);
logger.debug("Raw request URI was <{}>.", rawRequestUri);

if (timer.isRunning()) {
timer.tag(SpanTags.STATUS_CODE, statusCode)
.stop();
logger.debug("Finished mutable timer <{}> after a request timeout with status <{}>", timer, statusCode);
timer.tag(SpanTagKey.HTTP_STATUS.getTagForValue(httpStatus));
timer.stop();
logger.debug("Finished mutable timer <{}> after a request timeout with status <{}>", timer,
httpStatus.getCode());
} else {
logger.warn("Wanted to stop() timer which was already stopped indicating that a requestTimeout" +
" was detected where it should not have been");
Expand All @@ -148,7 +185,7 @@ private HttpResponse doHandleRequestTimeout(final CharSequence correlationId,
* called by akka in case of a timeout.
*/
return HttpResponse.create()
.withStatus(statusCode)
.withStatus(httpStatus.getCode())
.withEntity(ContentTypes.APPLICATION_JSON, ByteString.fromString(cre.toJsonString()));
}

Expand Down
Loading

0 comments on commit 0323c01

Please sign in to comment.