Skip to content

Commit

Permalink
Make EntityTaskResultSequentializer a EntityTaskScheduler
Browse files Browse the repository at this point in the history
* It now provides the possibility to wait with a subsequent task running
  until the previous one is completed.
  In this concere example the forwarding of a subsequent signal always happens
  after the forwarding of a previous signal is completed

Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 17, 2022
1 parent 7af3b71 commit d3bae0e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
Expand Down Expand Up @@ -94,7 +95,7 @@ private EdgeCommandForwarderActor(final ActorRef pubSubMediator, final ShardRegi
aggregatorProxyActor = getContext().actorOf(ThingsAggregatorProxyActor.props(pubSubMediator),
ThingsAggregatorProxyActor.ACTOR_NAME);
taskScheduler =
getContext().actorOf(EntityTaskResultSequentializer.props(), EntityTaskResultSequentializer.ACTOR_NAME);
getContext().actorOf(EntityTaskScheduler.props(), EntityTaskScheduler.ACTOR_NAME);
}

/**
Expand Down Expand Up @@ -159,29 +160,41 @@ public Receive createReceive() {

private void forwardToThings(final Signal<?> thingSignal) {
final ActorRef sender = getSender();
applySignalTransformation(thingSignal, sender)
.thenAccept(transformed -> {
log.withCorrelationId(transformed)
.info("Forwarding thing signal with ID <{}> and type <{}> to 'things' shard region",
transformed instanceof WithEntityId withEntityId ? withEntityId.getEntityId() :
null,
transformed.getType());
final CompletionStage<Signal<?>> signalTransformationCs = applySignalTransformation(thingSignal, sender);

if (!Signal.isChannelLive(transformed) &&
!Signal.isChannelSmart(transformed) &&
transformed instanceof Command<?> command &&
isIdempotent(command)) {
askWithRetryCommandForwarder.forwardCommand(command,
shardRegions.things(),
sender);
} else {
shardRegions.things().tell(transformed, sender);
}
});
scheduleTask(thingSignal, () -> signalTransformationCs.thenAccept(transformed -> {
log.withCorrelationId(transformed)
.info("Forwarding thing signal with ID <{}> and type <{}> to 'things' shard region",
transformed instanceof WithEntityId withEntityId ? withEntityId.getEntityId() :
null,
transformed.getType());

if (!Signal.isChannelLive(transformed) &&
!Signal.isChannelSmart(transformed) &&
transformed instanceof Command<?> command &&
isIdempotent(command)) {
askWithRetryCommandForwarder.forwardCommand(command,
shardRegions.things(),
sender);
} else {
shardRegions.things().tell(transformed, sender);
}
}));
}

private void scheduleTask(final Signal<?> signal, Supplier<CompletionStage<Void>> csSupplier) {
if (signal instanceof WithEntityId withEntityId) {
scheduleTaskForEntity(
new EntityTaskScheduler.Task<>(withEntityId.getEntityId(), csSupplier),
signal.getDittoHeaders()
);
} else {
csSupplier.get();
}
}

private CompletionStage<Signal<?>> applySignalTransformation(final Signal<?> signal, final ActorRef sender) {
final CompletionStage<Signal<?>> signalTransformationCs = signalTransformer.apply(signal)
return signalTransformer.apply(signal)
.whenComplete((transformed, error) -> {
if (error != null) {
final var dre = DittoRuntimeException.asDittoRuntimeException(error,
Expand All @@ -192,22 +205,13 @@ private CompletionStage<Signal<?>> applySignalTransformation(final Signal<?> sig
sender.tell(dre, ActorRef.noSender());
}
});

if (signal instanceof WithEntityId withEntityId) {
return scheduleTaskForEntity(
new EntityTaskResultSequentializer.Task<>(withEntityId.getEntityId(), signalTransformationCs),
signal.getDittoHeaders()
);
} else {
return signalTransformationCs;
}
}

private CompletionStage<Signal<?>> scheduleTaskForEntity(final EntityTaskResultSequentializer.Task<Signal<?>> task,
private CompletionStage<Void> scheduleTaskForEntity(final EntityTaskScheduler.Task<Void> task,
final DittoHeaders dittoHeaders) {
return Patterns.ask(taskScheduler, task, dittoHeaders.getTimeout().orElse(Duration.ofSeconds(60)))
.thenApply(result -> {
if (result instanceof EntityTaskResultSequentializer.TaskResult<?> taskResult) {
if (result instanceof EntityTaskScheduler.TaskResult<?> taskResult) {
return taskResult;
} else {
throw DittoInternalErrorException.newBuilder()
Expand All @@ -216,28 +220,25 @@ private CompletionStage<Signal<?>> scheduleTaskForEntity(final EntityTaskResultS
}
})
.thenCompose(taskResult -> {
if (taskResult.result() instanceof Signal<?> resultSignal) {
return CompletableFuture.completedStage(resultSignal);
} else if (taskResult.error() != null) {
if (taskResult.error() != null) {
return CompletableFuture.failedStage(taskResult.error());
} else {
// Should never happen. Either a Signal result or an error is expected
return CompletableFuture.failedStage(DittoInternalErrorException.newBuilder()
.dittoHeaders(dittoHeaders)
.build());
return CompletableFuture.completedStage(null);
}
});
}

private void forwardToThingsAggregatorProxy(final Command<?> command) {
final ActorRef sender = getSender();
applySignalTransformation(command, sender)
.thenAccept(transformed -> aggregatorProxyActor.tell(transformed, sender));
final CompletionStage<Signal<?>> signalTransformationCs = applySignalTransformation(command, sender);
scheduleTask(command,
() -> signalTransformationCs.thenAccept(transformed -> aggregatorProxyActor.tell(transformed, sender)));
}

private void forwardToPolicies(final PolicyCommand<?> policyCommand) {
final ActorRef sender = getSender();
applySignalTransformation(policyCommand, sender)
final CompletionStage<Signal<?>> signalTransformationCs = applySignalTransformation(policyCommand, sender);
scheduleTask(policyCommand, () -> signalTransformationCs
.thenAccept(transformed -> {
final PolicyCommand<?> transformedPolicyCommand = (PolicyCommand<?>) transformed;
log.withCorrelationId(transformedPolicyCommand)
Expand All @@ -251,14 +252,14 @@ private void forwardToPolicies(final PolicyCommand<?> policyCommand) {
} else {
shardRegions.policies().tell(transformedPolicyCommand, sender);
}
});

}));
}

private void forwardToConnectivity(final Command<?> connectivityCommand) {
if (connectivityCommand instanceof WithEntityId withEntityId) {
final ActorRef sender = getSender();
applySignalTransformation(connectivityCommand, sender)
final var signalTransformationCs = applySignalTransformation(connectivityCommand, sender);
scheduleTask(connectivityCommand, () -> signalTransformationCs
.thenAccept(transformed -> {
final Command<?> transformedConnectivityCommand = (Command<?>) transformed;
log.withCorrelationId(transformedConnectivityCommand)
Expand All @@ -268,7 +269,7 @@ private void forwardToConnectivity(final Command<?> connectivityCommand) {

// don't retry connectivity commands
shardRegions.connections().tell(transformedConnectivityCommand, sender);
});
}));
} else {
log.withCorrelationId(connectivityCommand)
.error("Could not forward ConnectivityCommand not implementing WithEntityId to 'connections' " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import javax.annotation.Nullable;

Expand All @@ -32,10 +33,10 @@

/**
* This class allows chaining futures related for a single entity.
* This means that you can be sure that previous tasks for the entity are completed when you're reiving the TaskResult
* This means that you can be sure that previous tasks for the entity are completed when you're receiving the TaskResult
* as response.
*/
final class EntityTaskResultSequentializer extends AbstractActor {
final class EntityTaskScheduler extends AbstractActor {

static final String ACTOR_NAME = "entity-task-scheduler";

Expand All @@ -47,15 +48,15 @@ final class EntityTaskResultSequentializer extends AbstractActor {
private final Counter scheduledTasks;
private final Counter completedTasks;

private EntityTaskResultSequentializer() {
private EntityTaskScheduler() {
taskCsPerEntityId = new HashMap<>();
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
scheduledTasks = DittoMetrics.counter("scheduled_tasks");
completedTasks = DittoMetrics.counter("completed_tasks");
}

static Props props() {
return Props.create(EntityTaskResultSequentializer.class);
return Props.create(EntityTaskScheduler.class);
}

@Override
Expand Down Expand Up @@ -98,7 +99,7 @@ private void taskComplete(final TaskComplete taskComplete) {
}

/**
* Schedule an enforcement task based on previous completion stage of a task for an entity.
* Schedule a task based on previous completion stage of a task for an entity.
* Informs self about completion by sending TaskComplete to self when the scheduled task has been completed.
*
* @param previousTaskCompletion in-flight tasks for the same entity.
Expand All @@ -109,12 +110,12 @@ private void taskComplete(final TaskComplete taskComplete) {
private CompletionStage<?> scheduleTaskAfter(final CompletionStage<?> previousTaskCompletion, final Task<?> task) {
return previousTaskCompletion
.exceptionally(error -> null) //future tasks should ignore failures of previous tasks
.thenCompose(lastResult -> task.startedStage()
.thenCompose(lastResult -> task.taskRunner().get()
.whenComplete((result, error) ->
self().tell(new TaskComplete(task.entityId()), ActorRef.noSender())));
}

record Task<R>(EntityId entityId, CompletionStage<R> startedStage) {}
record Task<R>(EntityId entityId, Supplier<CompletionStage<R>> taskRunner) {}

record TaskResult<R>(@Nullable R result, @Nullable Throwable error) {}

Expand Down

0 comments on commit d3bae0e

Please sign in to comment.