Skip to content

Commit

Permalink
added metrics gathering for enforcement + pre-enforcement in Abstract…
Browse files Browse the repository at this point in the history
…PersistenceSupervisor

* also added counter for "sudo_commands"

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 5, 2022
1 parent a98174c commit 6d5fbb6
Showing 1 changed file with 129 additions and 22 deletions.
Expand Up @@ -29,13 +29,20 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.base.model.signals.WithType;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.service.actors.ShutdownBehaviour;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOff;
import org.eclipse.ditto.base.service.config.supervision.ExponentialBackOffConfig;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.PreparedTimer;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.policies.enforcement.pre.PreEnforcerProvider;

Expand Down Expand Up @@ -72,6 +79,21 @@ public abstract class AbstractPersistenceSupervisor<E extends EntityId, S extend
*/
protected static final Duration DEFAULT_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);

private static final String ENFORCEMENT_TIMER = "enforcement";
private static final String ENFORCEMENT_TIMER_SEGMENT_PRE_ENFORCEMENT = "pre_enf";
private static final String ENFORCEMENT_TIMER_SEGMENT_ENFORCEMENT = "enf";
private static final String ENFORCEMENT_TIMER_SEGMENT_PROCESSING = "process";
private static final String ENFORCEMENT_TIMER_SEGMENT_RESPONSE_FILTER = "resp_filter";
private static final String ENFORCEMENT_TIMER_TAG_CHANNEL = "channel";
private static final String ENFORCEMENT_TIMER_TAG_RESOURCE = "resource";
private static final String ENFORCEMENT_TIMER_TAG_CATEGORY = "category";
private static final String ENFORCEMENT_TIMER_TAG_OUTCOME = "outcome";
private static final String ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL = "fail";
private static final String ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS = "success";

private static final Counter SUDO_COMMANDS_COUNTER = DittoMetrics.counter("sudo_commands");
private static final String SUDO_COMMAND_COUNTER_TAG_TYPE = "type";

protected static final String PERSISTENCE_ACTOR_NAME = "pa";

protected final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
Expand Down Expand Up @@ -479,6 +501,9 @@ private void forwardSudoCommandToChildIfAvailable(final SudoCommand<?> sudoComma
sudoCommand);
unhandled(sudoCommand);
} else {
SUDO_COMMANDS_COUNTER
.tag(SUDO_COMMAND_COUNTER_TAG_TYPE, sudoCommand.getType())
.increment();
persistenceActorChild.forward(sudoCommand, getContext());
}
} else {
Expand All @@ -494,6 +519,13 @@ private void forwardDittoSudoToChildIfAvailable(final WithDittoHeaders withDitto
withDittoHeaders);
unhandled(withDittoHeaders);
} else {
if (withDittoHeaders instanceof WithType withType) {
SUDO_COMMANDS_COUNTER
.tag(SUDO_COMMAND_COUNTER_TAG_TYPE, withType.getType())
.increment();
} else {
SUDO_COMMANDS_COUNTER.increment();
}
persistenceActorChild.forward(withDittoHeaders, getContext());
}
} else {
Expand Down Expand Up @@ -595,11 +627,9 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
* All signals are treated in the following way:
* <ul>
* <li>they are sent to the enforcer child which enforces/applies authorization of the signal</li>
* <li>after successful enforcement, they are optionally modified in {@link #modifyEnforcerActorEnforcedSignalResponse(Object)}</li>
* <li>afterwards, the enforced signal is sent to the persistence actor child in
* {@link #enforcerResponseToTargetActor(org.eclipse.ditto.base.model.headers.DittoHeaders, Object, akka.actor.ActorRef)}</li>
* <li>the persistence actor's response is handled in
* {@link #filterTargetActorResponseViaEnforcer(org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceSupervisor.EnforcedSignalAndTargetActorResponse)}
* <li>after successful enforcement, they are optionally modified in {@code #modifyEnforcerActorEnforcedSignalResponse(...)}</li>
* <li>afterwards, the enforced signal is sent to the persistence actor child in {@code #enforcerResponseToTargetActor(...)}</li>
* <li>the persistence actor's response is handled in {@code #filterTargetActorResponseViaEnforcer(...)}
* where the enforcer applies optionally filtering of the response</li>
* <li>the result is returned in the CompletionStage</li>
* </ul>
Expand All @@ -612,15 +642,39 @@ private void handleSignalEnforcementResponse(@Nullable final Object response,
protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S signal, final ActorRef sender) {

if (null != enforcerChild) {
return preEnforcer.apply(signal).thenCompose(preEnforcedSignal ->
askEnforcerChild(preEnforcedSignal)
.thenCompose(this::modifyEnforcerActorEnforcedSignalResponse)
.thenCompose(enforcedCommand -> enforcerResponseToTargetActor(
preEnforcedSignal.getDittoHeaders(),
enforcedCommand,
sender
))
.thenCompose(this::filterTargetActorResponseViaEnforcer)
final StartedTimer rootTimer = createTimer(signal);
final StartedTimer preEnforcementTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_PRE_ENFORCEMENT);
return preEnforcer.apply(signal).thenCompose(preEnforcedSignal -> {
if (preEnforcedSignal instanceof Command<?> command) {
// update category which could have been changed by preEnforcer
final String category = command.getCategory().name().toLowerCase();
rootTimer.tag(ENFORCEMENT_TIMER_TAG_CATEGORY, category);
preEnforcementTimer.tag(ENFORCEMENT_TIMER_TAG_CATEGORY, category);
}
preEnforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
final StartedTimer enforcementTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_ENFORCEMENT);
return askEnforcerChild(preEnforcedSignal)
.thenCompose(this::modifyEnforcerActorEnforcedSignalResponse)
.thenCompose(enforcedCommand -> enforcerResponseToTargetActor(
rootTimer.startNewSegment(ENFORCEMENT_TIMER_SEGMENT_PROCESSING),
enforcementTimer,
preEnforcedSignal.getDittoHeaders(),
enforcedCommand,
sender
))
.thenCompose(targetActorResponse ->
filterTargetActorResponseViaEnforcer(targetActorResponse, rootTimer)
)
.whenComplete((result, error) -> rootTimer
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, error != null ?
ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL :
ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop()
);
}
);
} else {
log.withCorrelationId(signal)
Expand All @@ -629,40 +683,65 @@ protected CompletionStage<Object> enforceSignalAndForwardToTargetActor(final S s
}
}

private StartedTimer createTimer(final WithDittoHeaders withDittoHeaders) {
final PreparedTimer expiringTimer = DittoMetrics.timer(ENFORCEMENT_TIMER);

withDittoHeaders.getDittoHeaders().getChannel().ifPresent(channel ->
expiringTimer.tag(ENFORCEMENT_TIMER_TAG_CHANNEL, channel)
);
if (withDittoHeaders instanceof WithResource withResource) {
expiringTimer.tag(ENFORCEMENT_TIMER_TAG_RESOURCE, withResource.getResourceType());
}
if (withDittoHeaders instanceof Command<?> command) {
expiringTimer.tag(ENFORCEMENT_TIMER_TAG_CATEGORY, command.getCategory().name().toLowerCase());
}
return expiringTimer.start();
}

private CompletionStage<EnforcedSignalAndTargetActorResponse> enforcerResponseToTargetActor(
final StartedTimer processingTimer,
final StartedTimer enforcementTimer,
final DittoHeaders dittoHeaders,
@Nullable final Object enforcerResponse,
final ActorRef sender
) {
final ActorRef sender) {

if (null == persistenceActorChild) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
throw getUnavailableExceptionBuilder(entityId)
.dittoHeaders(dittoHeaders)
.build();
} else if (enforcerResponse instanceof Signal<?> enforcedSignal) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS).stop();
log.withCorrelationId(enforcedSignal)
.debug("Received enforcedSignal from enforcerChild, forwarding to target actor: {}",
enforcedSignal);
return askTargetActor(enforcedSignal, shouldSendResponse(enforcedSignal), sender)
.thenCompose(response ->
modifyTargetActorCommandResponse(enforcedSignal, response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response)
new EnforcedSignalAndTargetActorResponse(enforcedSignal, response, processingTimer)
);
} else if (enforcerResponse instanceof DistributedPubWithMessage distributedPubWithMessage) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS).stop();
return askTargetActor(distributedPubWithMessage,
distributedPubWithMessage.signal().getDittoHeaders().isResponseRequired(), sender
)
.thenCompose(response ->
modifyTargetActorCommandResponse(distributedPubWithMessage.signal(), response))
.thenApply(response ->
new EnforcedSignalAndTargetActorResponse(distributedPubWithMessage.signal(), response)
new EnforcedSignalAndTargetActorResponse(distributedPubWithMessage.signal(), response,
processingTimer)
);
} else if (enforcerResponse instanceof DittoRuntimeException dre) {
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
log.withCorrelationId(dittoHeaders)
.debug("Received DittoRuntimeException as response from enforcerChild: {}", dre);
throw dre;
} else {
return CompletableFuture.completedStage(new EnforcedSignalAndTargetActorResponse(null, null));
enforcementTimer.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL).stop();
return CompletableFuture.completedStage(
new EnforcedSignalAndTargetActorResponse(null, null, processingTimer)
);
}
}

Expand All @@ -677,30 +756,57 @@ protected boolean shouldSendResponse(final WithDittoHeaders withDittoHeaders) {
}

protected CompletionStage<Object> filterTargetActorResponseViaEnforcer(
final EnforcedSignalAndTargetActorResponse targetActorResponse) {
final EnforcedSignalAndTargetActorResponse targetActorResponse, final StartedTimer rootTimer) {

if (targetActorResponse.response() instanceof CommandResponse<?> commandResponse) {
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop();
final StartedTimer responseFilterTimer = rootTimer.startNewSegment(
ENFORCEMENT_TIMER_SEGMENT_RESPONSE_FILTER);

log.withCorrelationId(commandResponse)
.debug("Received CommandResponse from target actor, " +
"telling enforcerChild to apply response filtering: {}", commandResponse);
return askEnforcerChild(commandResponse);
return askEnforcerChild(commandResponse)
.whenComplete((result, error) ->
responseFilterTimer
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, error != null ?
ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL : ENFORCEMENT_TIMER_TAG_OUTCOME_SUCCESS)
.stop()
);
} else if (targetActorResponse.response() instanceof DittoRuntimeException dre) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.debug("Received DittoRuntimeException as response from target actor: {}", dre);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
throw dre;
} else if (targetActorResponse.response() instanceof Status.Success success) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.info("Got success message from target actor: {}", success);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
return CompletableFuture.completedStage(success);
} else if (targetActorResponse.response() instanceof AskTimeoutException askTimeoutException) {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.warning("Encountered ask timeout from target actor: {}", askTimeoutException.getMessage());
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
return CompletableFuture.completedStage(null);
} else if (targetActorResponse.response() instanceof Throwable throwable) {
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
return CompletableFuture.failedFuture(throwable);
} else {
log.withCorrelationId(targetActorResponse.enforcedSignal())
.warning("Unexpected response from target actor: {}", targetActorResponse);
targetActorResponse.processingTimer()
.tag(ENFORCEMENT_TIMER_TAG_OUTCOME, ENFORCEMENT_TIMER_TAG_OUTCOME_FAIL)
.stop();
return CompletableFuture.completedStage(null);
}
}
Expand Down Expand Up @@ -755,5 +861,6 @@ public enum Control {
}

private record EnforcedSignalAndTargetActorResponse(@Nullable Signal<?> enforcedSignal,
@Nullable Object response) {}
@Nullable Object response,
StartedTimer processingTimer) {}
}

0 comments on commit 6d5fbb6

Please sign in to comment.