-
Notifications
You must be signed in to change notification settings - Fork 214
/
EntityTaskScheduler.java
137 lines (114 loc) · 5.39 KB
/
EntityTaskScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.edge.service.dispatching;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
/**
* This class allows chaining futures related for a single entity.
* This means that you can be sure that previous tasks for the entity are completed when you're receiving the TaskResult
* as response.
*/
final class EntityTaskScheduler extends AbstractActor {
static final String ACTOR_NAME = "entity-task-scheduler";
private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
/**
* Remembers running tasks for a certain entity ID. May contain an already completed future.
*/
private final Map<EntityId, CompletionStage<?>> taskCsPerEntityId;
private final Counter scheduledTasks;
private final Counter completedTasks;
@SuppressWarnings("unused")
private EntityTaskScheduler(final String metricsNameTag) {
taskCsPerEntityId = new HashMap<>();
scheduledTasks = DittoMetrics.counter("scheduled_tasks")
.tag("name", metricsNameTag);
completedTasks = DittoMetrics.counter("completed_tasks")
.tag("name", metricsNameTag);
}
/**
* Creates Akka configuration object for this actor.
*
* @param metricsNameTag a name tag to include in the gathered counters/metrics of the actor.
* @return the Akka configuration Props object.
*/
static Props props(final String metricsNameTag) {
return Props.create(EntityTaskScheduler.class, checkNotNull(metricsNameTag, "metricsNameTag"));
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(Task.class, this::scheduleTask)
.match(TaskComplete.class, this::taskComplete)
.matchAny(message -> log.warning("UnknownMessage <{}>", message))
.build();
}
private void scheduleTask(final Task<?> task) {
final ActorRef sender = sender();
final CompletionStage<?> taskCs = taskCsPerEntityId.compute(task.entityId(), (entityId, previousTaskCS) -> {
final CompletionStage<?> previous =
previousTaskCS != null ? previousTaskCS : CompletableFuture.completedStage(null);
return scheduleTaskAfter(previous, task);
});
scheduledTasks.increment();
if (sender != null && sender != getContext().system().deadLetters()) {
taskCs.whenComplete((result, error) -> {
final TaskResult<?> taskResult = new TaskResult<>(result, error);
sender.tell(taskResult, ActorRef.noSender());
});
}
}
private void taskComplete(final TaskComplete taskComplete) {
taskCsPerEntityId.compute(taskComplete.entityId(), (entityId, previousTaskCs) -> {
if (previousTaskCs == null || previousTaskCs.toCompletableFuture().isDone()) {
// no pending task was existing or it was already done/deleted
return null;
} else {
return previousTaskCs;
}
});
completedTasks.increment();
}
/**
* Schedule a task based on previous completion stage of a task for an entity.
* Informs self about completion by sending TaskComplete to self when the scheduled task has been completed.
*
* @param previousTaskCompletion in-flight tasks for the same entity.
* @param task the task to schedule.
* @return the next in-flight task, including the scheduled task.
* Completes after all previous tasks are completed as well.
*/
private CompletionStage<?> scheduleTaskAfter(final CompletionStage<?> previousTaskCompletion, final Task<?> task) {
return previousTaskCompletion
.exceptionally(error -> null) // future tasks should ignore failures of previous tasks
.thenCompose(lastResult -> task.taskRunner().get()
.whenComplete((result, error) ->
self().tell(new TaskComplete(task.entityId()), ActorRef.noSender())));
}
record Task<R>(EntityId entityId, Supplier<CompletionStage<R>> taskRunner) {}
record TaskResult<R>(@Nullable R result, @Nullable Throwable error) {}
private record TaskComplete(EntityId entityId) {}
}