Skip to content

Commit

Permalink
made metric gathering mor stable in the way that a started timer is a…
Browse files Browse the repository at this point in the history
…lways finished with an outcome

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 6, 2022
1 parent b00b8d7 commit 9e03585
Showing 1 changed file with 76 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.Function;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -402,7 +403,8 @@ protected void becomeTwinSignalProcessingAwaiting() {
},
m -> {
log.withCorrelationId(m instanceof WithDittoHeaders w ? w : null)
.debug("stashing during 'becomeTwinSignalProcessingAwaiting': <{}>", m.getClass().getSimpleName());
.debug("stashing during 'becomeTwinSignalProcessingAwaiting': <{}>",
m.getClass().getSimpleName());
stash();
}
)
Expand Down Expand Up @@ -560,10 +562,10 @@ private void enforceAndForwardToTargetActor(final Object message) {

Patterns.pipe(
enforceSignalAndForwardToTargetActor((S) signal, sender)
.handle((response, throwable) -> {
handleSignalEnforcementResponse(response, throwable, signal, sender);
return Control.PROCESS_NEXT_TWIN_MESSAGE;
}),
.handle((response, throwable) -> {
handleSignalEnforcementResponse(response, throwable, signal, sender);
return Control.PROCESS_NEXT_TWIN_MESSAGE;
}),
getContext().getDispatcher()
).pipeTo(getSelf(), getSelf());
}
Expand Down Expand Up @@ -644,59 +646,82 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S signal, final ActorRef sender) {

if (null != enforcerChild) {
final StartedTrace trace = DittoTracing
.trace(signal, signal.getType())
final StartedTrace trace = DittoTracing.trace(signal, signal.getType())
.correlationId(signal.getDittoHeaders().getCorrelationId().orElse(null))
.start();
final S tracedSignal = DittoTracing.propagateContext(trace.getContext(), signal);
final StartedTimer rootTimer = createTimer(tracedSignal);
final StartedTimer preEnforcementTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_PRE_ENFORCEMENT);
return preEnforcer.apply(tracedSignal).thenCompose(preEnforcedSignal -> {
if (preEnforcedSignal instanceof Command<?> command) {
final StartedTimer preEnforcementTimer =
rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_PRE_ENFORCEMENT);
return preEnforcer.apply(tracedSignal)
.whenComplete((result, error) -> {
trace.mark("pre_enforced");
if (result instanceof Command<?> command) {
// update category which could have been changed by preEnforcer
final String category = command.getCategory().name().toLowerCase();
rootTimer.tag(ENFORCEMENT_TIMER_TAG_CATEGORY, category);
preEnforcementTimer.tag(ENFORCEMENT_TIMER_TAG_CATEGORY, category);
}
preEnforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
trace.mark("pre_enforced");
final StartedTimer enforcementTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_ENFORCEMENT);
stopTimer(preEnforcementTimer).accept(result, error);
})
.thenCompose(preEnforcedSignal -> {
final StartedTimer enforcementTimer = rootTimer
.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_ENFORCEMENT);
return askEnforcerChild(preEnforcedSignal)
.thenCompose(this::modifyEnforcerActorEnforcedSignalResponse)
.thenCompose(enforcedCommand -> enforcerResponseToTargetActor(
rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_PROCESSING),
enforcementTimer,
trace,
preEnforcedSignal.getDittoHeaders(),
enforcedCommand,
sender
))
.thenCompose(targetActorResponse ->
filterTargetActorResponseViaEnforcer(targetActorResponse, rootTimer, trace)
)
.whenComplete((result, error) -> {
rootTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, error != null ?
ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL :
ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
if (null != error) {
trace.fail(error);
}
trace.finish();
}
);
}
);
trace.mark("enforced");
stopTimer(enforcementTimer).accept(result, error);
})
.thenCompose(enforcedCommand -> {
final StartedTimer processingTimer =
rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_PROCESSING);
return enforcerResponseToTargetActor(preEnforcedSignal.getDittoHeaders(),
enforcedCommand, sender)
.whenComplete((result, error) -> {
trace.mark("processed");
stopTimer(processingTimer).accept(result, error);
});
})
.thenCompose(targetActorResponse -> {
final StartedTimer responseFilterTimer =
rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_RESPONSE_FILTER);
return filterTargetActorResponseViaEnforcer(targetActorResponse)
.whenComplete((result, error) -> {
trace.mark("response_filtered");
responseFilterTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME,
error != null ? ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL :
ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS).stop();
if (null != error) {
trace.fail(error);
}
});
});
}).whenComplete((result, error) -> {
if (null != error) {
trace.fail(error);
}
trace.finish();
stopTimer(rootTimer).accept(result, error);
});
} else {
log.withCorrelationId(signal)
.error("Could not enforce signal because enforcerChild was not present");
log.withCorrelationId(signal).error("Could not enforce signal because enforcerChild was not present");
return CompletableFuture.completedStage(null);
}
}

private BiConsumer<Object, Throwable> stopTimer(final StartedTimer timerToStop) {
return (result, error) -> {
final String outcome = error != null ?
ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL :
ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS;
if (timerToStop.isRunning()) {
timerToStop.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, outcome)
.stop();
}
};
}

private StartedTimer createTimer(final WithDittoHeaders withDittoHeaders) {
final PreparedTimer expiringTimer = DittoMetrics.timer(ENFORCEMENT_TIMER);

Expand All @@ -713,54 +738,40 @@ private StartedTimer createTimer(final WithDittoHeaders withDittoHeaders) {
}

private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseToTargetActor(
final StartedTimer processingTimer,
final StartedTimer enforcementTimer,
final StartedTrace trace,
final DittoHeaders dittoHeaders,
@Nullable final Object enforcerResponse,
final ActorRef sender) {

if (null == persistenceActorChild) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
trace.fail("unavailable");
throw getUnavailableExceptionBuilder(entityId)
return CompletableFuture.failedFuture(getUnavailableExceptionBuilder(entityId)
.dittoHeaders(dittoHeaders)
.build();
.build());
} else if (enforcerResponse instanceof Signal<?> enforcedSignal) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS).stop();
trace.mark("enforced");
log.withCorrelationId(enforcedSignal)
.debug("Received enforcedSignal from enforcerChild, forwarding to target actor: {}",
enforcedSignal);
return askTargetActor(enforcedSignal, shouldSendResponse(enforcedSignal), sender)
.thenCompose(response ->
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response, processingTimer)
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
);
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS).stop();
trace.mark("enforced");
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
)
.thenCompose(response ->
modifyTargetActorCommandResponse(distributedPubWithMessage.signal(), response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(distributedPubWithMessage.signal(), response,
processingTimer)
new EnforcedSignalAndTargetActorResponse(distributedPubWithMessage.signal(), response)
);
} else if (enforcerResponse instanceof DittoRuntimeException dre) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
trace.fail(dre);
log.withCorrelationId(dittoHeaders)
.debug("Received DittoRuntimeException as response from enforcerChild: {}", dre);
throw dre;
return CompletableFuture.failedFuture(dre);
} else {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
trace.fail("unknown_response");
return CompletableFuture.completedStage(
new EnforcedSignalAndTargetActorResponse(null, null, processingTimer)
new EnforcedSignalAndTargetActorResponse(null, null)
);
}
}
Expand All @@ -776,70 +787,30 @@ protected boolean shouldSendResponse(final WithDittoHeaders withDittoHeaders) {
}

protected CompletionStage<Object> filterTargetActorResponseViaEnforcer(
final EnforcedSignalAndTargetActorResponse targetActorResponse,
final StartedTimer rootTimer,
final StartedTrace trace) {
final EnforcedSignalAndTargetActorResponse targetActorResponse) {

if (targetActorResponse.response() instanceof CommandResponse<?> commandResponse) {
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
trace.mark("processed");
final StartedTimer responseFilterTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_RESPONSE_FILTER);

log.withCorrelationId(commandResponse)
.debug("Received CommandResponse from target actor, " +
"telling enforcerChild to apply response filtering: {}", commandResponse);
return askEnforcerChild(commandResponse)
.whenComplete((result, error) -> {
responseFilterTimer
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, error != null ?
ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL : ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
if (null != error) {
trace.fail(error);
}
trace.mark("response_filtered");
}
);
return askEnforcerChild(commandResponse);
} else if (targetActorResponse.response() instanceof DittoRuntimeException dre) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.debug("Received DittoRuntimeException as response from target actor: {}", dre);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
trace.fail(dre).mark("processed");
throw dre;
return CompletableFuture.failedFuture(dre);
} else if (targetActorResponse.response() instanceof Status.Success success) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.info("Got success message from target actor: {}", success);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
trace.mark("processed");
return CompletableFuture.completedStage(success);
} else if (targetActorResponse.response() instanceof AskTimeoutException askTimeoutException) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.warning("Encountered ask timeout from target actor: {}", askTimeoutException.getMessage());
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
trace.fail(askTimeoutException).mark("processed");
return CompletableFuture.completedStage(null);
} else if (targetActorResponse.response() instanceof Throwable throwable) {
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
trace.fail(throwable).mark("processed");
return CompletableFuture.failedFuture(throwable);
} else {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.warning("Unexpected response from target actor: {}", targetActorResponse);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
trace.fail("unknown_response").mark("processed");
return CompletableFuture.completedStage(null);
}
}
Expand Down Expand Up @@ -894,6 +865,5 @@ public enum Control {
}

private record EnforcedSignalAndTargetActorResponse(@Nullable Signal<?> enforcedSignal,
@Nullable Object response,
StartedTimer processingTimer) {}
@Nullable Object response) {}
}

0 comments on commit 9e03585

Please sign in to comment.