Skip to content

Commit

Permalink
EntityTaskScheduler polishing:
Browse files Browse the repository at this point in the history
* added unit tests testing EntityTaskScheduler and also EdgeCommandForwraderActor "ordering" aspect
* added metric name for EntityTaskScheduler counters
* removed WARN log which would get triggered by tasks scheduled basically at the same time
* added some more debug logs

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Aug 18, 2022
1 parent d3bae0e commit 4d95655
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionIds;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint;
import org.eclipse.ditto.internal.utils.extension.DittoExtensionPoint.ExtensionId.ExtensionIdConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
Expand All @@ -33,7 +33,7 @@

public final class SignalTransformers implements DittoExtensionPoint, SignalTransformer {

private static final Logger LOGGER = LoggerFactory.getLogger(SignalTransformers.class);
private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(SignalTransformers.class);
private static final String SIGNAL_TRANSFORMERS = "signal-transformers";
private final List<SignalTransformer> transformers;

Expand Down Expand Up @@ -63,9 +63,11 @@ public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
}
return prior.whenComplete((result, error) -> {
if (error != null) {
LOGGER.debug("Error happened during signal transforming.", error);
LOGGER.withCorrelationId(signal)
.debug("Error happened during signal transforming.", error);
} else {
LOGGER.debug("Signal transforming of <{}> resulted in <{}>.", signal, result);
LOGGER.withCorrelationId(signal)
.debug("Signal transforming of <{}> resulted in <{}>.", signal, result);
}
});
}
Expand Down
10 changes: 10 additions & 0 deletions edge/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@
<artifactId>MutabilityDetector</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class EdgeCommandForwarderActor extends AbstractActor {
*/
public static final String ACTOR_NAME = "edgeCommandForwarder";

private static final Duration FALLBACK_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(60);

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef pubSubMediator;
Expand All @@ -95,7 +98,7 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
aggregatorProxyActor = getContext().actorOf(ThingsAggregatorProxyActor.props(pubSubMediator),
ThingsAggregatorProxyActor.ACTOR_NAME);
taskScheduler =
getContext().actorOf(EntityTaskScheduler.props(), EntityTaskScheduler.ACTOR_NAME);
getContext().actorOf(EntityTaskScheduler.props(ACTOR_NAME), EntityTaskScheduler.ACTOR_NAME);
}

/**
Expand Down Expand Up @@ -182,14 +185,30 @@ private void forwardToThings(final Signal<?> thingSignal) {
}));
}

private void scheduleTask(final Signal<?> signal, Supplier<CompletionStage<Void>> csSupplier) {
private void scheduleTask(final Signal<?> signal,
final Supplier<CompletionStage<Void>> signalTransformationCsSupplier) {

if (signal instanceof WithEntityId withEntityId) {
final EntityId entityId = withEntityId.getEntityId();
log.withCorrelationId(signal)
.debug("Scheduling signal transformation task for entityId <{}>", entityId);
scheduleTaskForEntity(
new EntityTaskScheduler.Task<>(withEntityId.getEntityId(), csSupplier),
new EntityTaskScheduler.Task<>(entityId, signalTransformationCsSupplier),
signal.getDittoHeaders()
).whenComplete((aVoid, throwable) ->
log.withCorrelationId(signal)
.debug("Scheduled task for entityId <{}> completed successfully: <{}>",
entityId, throwable == null)
);
} else {
csSupplier.get();
log.withCorrelationId(signal)
.debug("Scheduling signal transformation task without entity");
signalTransformationCsSupplier.get()
.whenComplete((aVoid, throwable) ->
log.withCorrelationId(signal)
.debug("Scheduled task without entityId completed successfully: <{}>",
throwable == null)
);
}
}

Expand All @@ -209,11 +228,14 @@ private CompletionStage<Signal<?>> applySignalTransformation(final Signal<?> sig

private CompletionStage<Void> scheduleTaskForEntity(final EntityTaskScheduler.Task<Void> task,
final DittoHeaders dittoHeaders) {
return Patterns.ask(taskScheduler, task, dittoHeaders.getTimeout().orElse(Duration.ofSeconds(60)))

return Patterns.ask(taskScheduler, task, dittoHeaders.getTimeout().orElse(FALLBACK_LOCAL_ASK_TIMEOUT))
.thenApply(result -> {
if (result instanceof EntityTaskScheduler.TaskResult<?> taskResult) {
return taskResult;
} else {
log.withCorrelationId(dittoHeaders)
.error("Did not receive expected TaskResult from TaskScheduler, was: <{}>", result);
throw DittoInternalErrorException.newBuilder()
.dittoHeaders(dittoHeaders)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.edge.service.dispatching;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -40,23 +42,32 @@ final class EntityTaskScheduler extends AbstractActor {

static final String ACTOR_NAME = "entity-task-scheduler";

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

/**
* Remembers running tasks for a certain entity ID. May contain an already completed future.
*/
private final Map<EntityId, CompletionStage<?>> taskCsPerEntityId;
private final DittoDiagnosticLoggingAdapter log;
private final Counter scheduledTasks;
private final Counter completedTasks;

private EntityTaskScheduler() {
@SuppressWarnings("unused")
private EntityTaskScheduler(final String metricsNameTag) {
taskCsPerEntityId = new HashMap<>();
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
scheduledTasks = DittoMetrics.counter("scheduled_tasks");
completedTasks = DittoMetrics.counter("completed_tasks");
scheduledTasks = DittoMetrics.counter("scheduled_tasks")
.tag("name", metricsNameTag);
completedTasks = DittoMetrics.counter("completed_tasks")
.tag("name", metricsNameTag);
}

static Props props() {
return Props.create(EntityTaskScheduler.class);
/**
* Creates Akka configuration object for this actor.
*
* @param metricsNameTag a name tag to include in the gathered counters/metrics of the actor.
* @return the Akka configuration Props object.
*/
static Props props(final String metricsNameTag) {
return Props.create(EntityTaskScheduler.class, checkNotNull(metricsNameTag, "metricsNameTag"));
}

@Override
Expand All @@ -69,6 +80,7 @@ public Receive createReceive() {
}

private void scheduleTask(final Task<?> task) {

final ActorRef sender = sender();
final CompletionStage<?> taskCs = taskCsPerEntityId.compute(task.entityId(), (entityId, previousTaskCS) -> {
final CompletionStage<?> previous =
Expand All @@ -84,12 +96,10 @@ private void scheduleTask(final Task<?> task) {
}

private void taskComplete(final TaskComplete taskComplete) {

taskCsPerEntityId.compute(taskComplete.entityId(), (entityId, previousTaskCs) -> {
if (previousTaskCs == null) {
log.warning("PreviousTaskCs must never be null on task completion. We at least expect the cs for " +
"the task which is completed by this message.");
return null;
} else if (previousTaskCs.toCompletableFuture().isDone()) {
if (previousTaskCs == null || previousTaskCs.toCompletableFuture().isDone()) {
// no pending task was existing or it was already done/deleted
return null;
} else {
return previousTaskCs;
Expand All @@ -108,8 +118,9 @@ private void taskComplete(final TaskComplete taskComplete) {
* Completes after all previous tasks are completed as well.
*/
private CompletionStage<?> scheduleTaskAfter(final CompletionStage<?> previousTaskCompletion, final Task<?> task) {

return previousTaskCompletion
.exceptionally(error -> null) //future tasks should ignore failures of previous tasks
.exceptionally(error -> null) // future tasks should ignore failures of previous tasks
.thenCompose(lastResult -> task.taskRunner().get()
.whenComplete((result, error) ->
self().tell(new TaskComplete(task.entityId()), ActorRef.noSender())));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

/**
* Unit tests for {@link EdgeCommandForwarderActor}.
*/
public final class EdgeCommandForwarderActorTest {

public static final PolicyId POLICY_ID = PolicyId.of("foo:bar");
public static final ThingId THING_ID = ThingId.of("foo:bar");
@Nullable private static ActorSystem actorSystem;

@BeforeClass
public static void init() {
actorSystem = ActorSystem.create("AkkaTestSystem", ConfigFactory.load("test"));
}

@AfterClass
public static void tearDown() {
if (actorSystem != null) {
TestKit.shutdownActorSystem(actorSystem);
}
}

@Test
public void ensureCommandOrderIsMaintainedForSlowSignalTransformations() {
assert actorSystem != null;
new TestKit(actorSystem) {{
final ShardRegions shardRegionsMock = Mockito.mock(ShardRegions.class);
final TestProbe thingsProbe = new TestProbe(actorSystem);
Mockito.when(shardRegionsMock.things()).thenReturn(thingsProbe.ref());

final Props props = EdgeCommandForwarderActor.props(getRef(), shardRegionsMock);
final ActorRef underTest = actorSystem.actorOf(props);

final CreateThing createThing = CreateThing.of(Thing.newBuilder()
.setId(THING_ID)
.setPolicyId(POLICY_ID)
.build(),
null,
DittoHeaders.newBuilder().correlationId("cid-1-create").build()
);
final ModifyAttribute modifyAttribute = ModifyAttribute.of(THING_ID,
JsonPointer.of("foo"),
JsonValue.of(42),
DittoHeaders.newBuilder().correlationId("cid-2-modify").build()
);

underTest.tell(createThing, getRef());
underTest.tell(modifyAttribute, getRef());

thingsProbe.expectMsg(createThing);
thingsProbe.expectMsg(modifyAttribute);
}};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.signaltransformer.SignalTransformer;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;

import com.typesafe.config.Config;

import akka.actor.ActorSystem;

/**
* Signal transformer used for {@link EdgeCommandForwarderActorTest} to artificially delay certain commands in their
* signal transformation.
*/
public final class EdgeCommandForwarderActorTestSignalTransformer implements SignalTransformer {

private static final Duration CREATE_THING_TRANSFORMATION_DURATION = Duration.ofMillis(200);

EdgeCommandForwarderActorTestSignalTransformer(final ActorSystem actorSystem, final Config config) {
}

@Override
public CompletionStage<Signal<?>> apply(final Signal<?> signal) {
if (signal instanceof CreateThing) {
return new CompletableFuture<Signal<?>>()
.completeOnTimeout(signal, CREATE_THING_TRANSFORMATION_DURATION.toMillis(), TimeUnit.MILLISECONDS);
} else {
return CompletableFuture.completedStage(signal);
}
}

}
Loading

0 comments on commit 4d95655

Please sign in to comment.