Skip to content

Commit

Permalink
got rid of blocking the Akka dispatcher thread completely
Browse files Browse the repository at this point in the history
  • Loading branch information
thjaeckle committed Jun 28, 2023
1 parent 50202df commit 42a8ee3
Showing 1 changed file with 16 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -68,6 +67,7 @@
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryTimedOut;
import akka.persistence.SaveSnapshotFailure;
Expand Down Expand Up @@ -348,6 +348,8 @@ protected void becomeCreatedHandler() {
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.match(PersistEventAsync.class, persistEventAsync ->
persistAndApplyEvent((E) persistEventAsync.event, persistEventAsync.handler))
.build())
.orElse(matchAnyAfterInitialization());

Expand Down Expand Up @@ -545,32 +547,19 @@ protected void persistAndApplyEvent(final E event, final BiConsumer<E, S> handle
}
}

private record PersistEventAsync<
E extends EventsourcedEvent<? extends E>,
S extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField>>(E event, BiConsumer<E, S> handler) {};

/**
* Persist an event, modify actor state by the event strategy, then invoke the handler.
*
* @param event the event to persist and apply.
* @param handler what happens afterwards.
*/
protected void persistAndApplyEventAsync(final CompletionStage<E> event, final BiConsumer<E, S> handler) {

event.thenAccept(completedEvent -> {
final E modifiedEvent = modifyEventBeforePersist(completedEvent);
if (modifiedEvent.getDittoHeaders().isDryRun()) {
handler.accept(modifiedEvent, entity);
} else {
persistEventAsync(modifiedEvent, persistedEvent -> {
// after the event was persisted, apply the event on the current actor state
applyEvent(persistedEvent);
handler.accept(persistedEvent, entity);
});
}
}).toCompletableFuture()
.orTimeout(MAX_ASYNC_EVENT_PRE_PROCESSING_TIMEOUT.getSeconds(), TimeUnit.SECONDS)
.join(); // this blocks the Akka dispatcher thread, but is needed so that Akka persistence persist()
// is invoked in the correct context
// as most of the provided `CompletionStage<E> event` are already completed futures, this only
// is of relevance for asynchronously preprocessed events to persist, like e.g. CreateThing
// being enhanced by a WoT based Thing JSON skeleton
Patterns.pipe(event.thenApply(e -> new PersistEventAsync<>(e, handler)), getContext().getDispatcher())
.to(getSelf());
}

/**
Expand Down Expand Up @@ -640,6 +629,8 @@ private Receive createDeletedBehavior() {
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.match(PersistEventAsync.class, persistEventAsync ->
persistAndApplyEvent((E) persistEventAsync.event, persistEventAsync.handler))
.build())
.orElse(matchAnyWhenDeleted());
}
Expand Down Expand Up @@ -790,16 +781,8 @@ private long getNextRevisionNumber() {
return getRevisionNumber() + 1;
}

private void persistEvent(final E event, final Consumer<E> handler) {
persistEvent(event, handler, false);
}

private void persistEventAsync(final E event, final Consumer<E> handler) {
persistEvent(event, handler, true);
}

@SuppressWarnings("unchecked")
private void persistEvent(final E event, final Consumer<E> handler, final boolean async) {
private void persistEvent(final E event, final Consumer<E> handler) {
final var l = log.withCorrelationId(event);
l.debug("Persisting Event <{}>.", event.getType());

Expand All @@ -810,17 +793,10 @@ private void persistEvent(final E event, final Consumer<E> handler, final boolea
.tag(SpanTagKey.SIGNAL_TYPE.getTagForValue(event.getType()))
.start();

if (async) {
persistAsync(
event.setDittoHeaders(DittoHeaders.of(persistOperationSpan.propagateContext(event.getDittoHeaders()))),
persistedEvent -> handlePersistedEvent(handler, l, persistOperationSpan, persistedEvent)
);
} else {
persist(
event.setDittoHeaders(DittoHeaders.of(persistOperationSpan.propagateContext(event.getDittoHeaders()))),
persistedEvent -> handlePersistedEvent(handler, l, persistOperationSpan, persistedEvent)
);
}
persist(
event.setDittoHeaders(DittoHeaders.of(persistOperationSpan.propagateContext(event.getDittoHeaders()))),
persistedEvent -> handlePersistedEvent(handler, l, persistOperationSpan, persistedEvent)
);
}

private void handlePersistedEvent(final Consumer<E> handler, final DittoDiagnosticLoggingAdapter l,
Expand Down

0 comments on commit 42a8ee3

Please sign in to comment.