Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process events with identical TrackingToken together in the PooledStreamingEventProcessor #2275

Merged
merged 8 commits into from Jul 7, 2022
Expand Up @@ -34,8 +34,11 @@
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -781,15 +784,19 @@ private void coordinateWorkPackages() throws InterruptedException {
fetched < WorkPackage.BUFFER_SIZE && isSpaceAvailable() && eventStream.hasNextAvailable();
fetched++) {
TrackedEventMessage<?> event = eventStream.nextAvailable();
offerEventToWorkPackages(event);
lastScheduledToken = event.trackingToken();

// Make sure all subsequent events with the same token as the last are added as well.
// These are the result of upcasting and should always be processed in the same batch.
while (eventStream.peek()
.filter(e -> lastScheduledToken.equals(e.trackingToken()))
.isPresent()) {
offerEventToWorkPackages(eventStream.nextAvailable());
// These are the result of upcasting and should always be scheduled in one go.
if (eventsEqualingLastScheduledToken()) {
List<TrackedEventMessage<?>> events = new ArrayList<>();
events.add(event);
while (eventsEqualingLastScheduledToken()) {
events.add(eventStream.nextAvailable());
}
offerEventsToWorkPackages(events);
} else {
offerEventToWorkPackages(event);
}
}

Expand All @@ -803,6 +810,12 @@ private void coordinateWorkPackages() throws InterruptedException {
.forEach(WorkPackage::scheduleWorker);
}

private boolean eventsEqualingLastScheduledToken() {
return eventStream.peek()
.filter(e -> lastScheduledToken.equals(e.trackingToken()))
.isPresent();
}

private void offerEventToWorkPackages(TrackedEventMessage<?> event) {
boolean anyScheduled = false;
for (WorkPackage workPackage : workPackages.values()) {
Expand All @@ -817,6 +830,22 @@ private void offerEventToWorkPackages(TrackedEventMessage<?> event) {
}
}

private void offerEventsToWorkPackages(List<TrackedEventMessage<?>> events) {
boolean anyScheduled = false;
for (WorkPackage workPackage : workPackages.values()) {
boolean scheduled = workPackage.scheduleEvents(Collections.unmodifiableList(events));
anyScheduled = anyScheduled || scheduled;
}
if (!anyScheduled) {
events.forEach(event -> {
ignoredMessageHandler.accept(event);
if (!eventFilter.canHandleTypeOf(event)) {
eventStream.skipMessagesWithPayloadTypeOf(event);
}
});
}
}

private void scheduleImmediateCoordinationTask() {
scheduleCoordinationTask(0);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.axonframework.eventhandling.pooled;

import org.axonframework.common.Assert;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
Expand All @@ -36,6 +37,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -121,15 +124,77 @@ private WorkPackage(Builder builder) {
this.lastClaimExtension = System.currentTimeMillis();
}

/**
* Schedule a collection of {@link TrackedEventMessage TrackedEventMessages} for processing by this work package.
* <p>
* Only use this method if the {@link TrackingToken TrackingTokens} of every event are equal, as those events should
* be handled within a single transaction. This scenario presents itself whenever an event is upcasted into
* <em>several instances</em>. When tokens differ between events please use
* {@link #scheduleEvent(TrackedEventMessage)}.
* <p>
* Will disregard the given {@code events} if their {@code TrackingTokens} are covered by the previously scheduled
* event.
* <p>
* <b>Threading note:</b> This method is and should only to be called by the {@link Coordinator} thread of a {@link
* PooledStreamingEventProcessor}.
*
* @param events The events to schedule for work in this work package.
* @return {@code True} if this {@link WorkPackage} scheduled one of the events for execution, otherwise
* {@code false}.
*/
public boolean scheduleEvents(List<TrackedEventMessage<?>> events) {
if (events.isEmpty()) {
// cannot schedule an empty events list
return false;
}

Assert.isTrue(allTokensMatch(events), () -> "All tokens should match when invoking this method.");

if (events.stream().allMatch(this::shouldNotSchedule)) {
if (logger.isTraceEnabled()) {
events.forEach(event -> logger.trace(
"Ignoring event [{}] with position [{}] for work package [{}]. "
+ "The last token [{}] covers event's token [{}].",
event.getIdentifier(), event.trackingToken().position().orElse(-1), segment.getSegmentId(),
lastDeliveredToken, event.trackingToken()
));
}
return false;
}

boolean canHandleAny = events.stream()
.map(event -> {
boolean canHandle = canHandle(event);
processingQueue.add(new ProcessingEntry(event, canHandle));
smcvb marked this conversation as resolved.
Show resolved Hide resolved
return canHandle;
})
.reduce((canHandleAccumulation, canHandle) -> canHandleAccumulation || canHandle)
smcvb marked this conversation as resolved.
Show resolved Hide resolved
.orElse(false);

// we can pick the first entry's token, as all are certain to be identical
lastDeliveredToken = events.get(0).trackingToken();
// the worker must always be scheduled to ensure claims are extended
scheduleWorker();

return canHandleAny;
}

private boolean allTokensMatch(List<TrackedEventMessage<?>> events) {
smcvb marked this conversation as resolved.
Show resolved Hide resolved
TrackingToken expectedToken = events.get(0).trackingToken();
return events.stream()
.map(TrackedEventMessage::trackingToken)
.allMatch(token -> Objects.equals(expectedToken, token));
}

/**
* Schedule a {@link TrackedEventMessage} for processing by this work package. Will immediately disregard the given
* {@code event} if its {@link TrackingToken} is covered by the previously scheduled event.
* <p>
* <b>Threading note:</b> This method is and should only to be called by the {@link Coordinator} thread of a {@link
* PooledStreamingEventProcessor}
* PooledStreamingEventProcessor}.
*
* @param event the event to schedule for work in this work package
* @return {@code true} if this {@link WorkPackage} scheduled the event for execution, otherwise {@code false}
* @param event The event to schedule for work in this work package.
* @return {@code True} if this {@link WorkPackage} scheduled the event for execution, otherwise {@code false}.
*/
public boolean scheduleEvent(TrackedEventMessage<?> event) {
if (shouldNotSchedule(event)) {
Expand All @@ -152,24 +217,17 @@ public boolean scheduleEvent(TrackedEventMessage<?> event) {
}

/**
* The given {@code event} should not be scheduled if:
* <ol>
* <li>The {@link TrackedEventMessage#trackingToken()} {@link TrackingToken#covers(TrackingToken)} the last delivered token, and</li>
* <li>if the last delivered token does not equal the given {@code event's} token.</li>
* </ol>
* The given {@code event} should not be scheduled if the {@link TrackedEventMessage#trackingToken()}
* {@link TrackingToken#covers(TrackingToken)} the last delivered token.
* <p>
* The first validation ensures events that this work package already covered are ignored. The second validation
* ensures that subsequent events with the exact same token <b>are</b> included. The last step is necessary to
* include events that are the result of a one-to-many upcaster.
* This validation ensures events that this work package already covered are ignored.
*
* @param event The event to validate whether it should be scheduled yes or no.
* @return {@code true} if the given {@code event} should not be scheduled, {@code false} otherwise.
*/
private boolean shouldNotSchedule(TrackedEventMessage<?> event) {
// Null check is done to solve potential NullPointerException.
return lastDeliveredToken != null
&& lastDeliveredToken.covers(event.trackingToken())
&& !lastDeliveredToken.equals(event.trackingToken());
return lastDeliveredToken != null && lastDeliveredToken.covers(event.trackingToken());
}

private boolean canHandle(TrackedEventMessage<?> event) {
Expand Down Expand Up @@ -225,14 +283,13 @@ public void scheduleWorker() {
private void processEvents() throws Exception {
List<TrackedEventMessage<?>> eventBatch = new ArrayList<>();
while (!isAbortTriggered() && eventBatch.size() < batchSize && !processingQueue.isEmpty()) {
ProcessingEntry entry = processingQueue.poll();
lastConsumedToken = WrappedToken.advance(lastConsumedToken, entry.eventMessage().trackingToken());
if (entry.canHandle()) {
eventBatch.add(
entry.eventMessage()
.withTrackingToken(lastConsumedToken)
);
}
consumeEntry(eventBatch);
}

// Make sure all subsequent events with the same token (if non-null) as the last are added as well.
// These are the result of upcasting and should always be processed in the same batch.
while (eventsEqualingLastConsumedTokenRemain()) {
consumeEntry(eventBatch);
}

if (!eventBatch.isEmpty()) {
Expand All @@ -257,6 +314,24 @@ private void processEvents() throws Exception {
}
}

private void consumeEntry(List<TrackedEventMessage<?>> eventBatch) {
ProcessingEntry entry = processingQueue.poll();
assert entry != null;
smcvb marked this conversation as resolved.
Show resolved Hide resolved
lastConsumedToken = WrappedToken.advance(lastConsumedToken, entry.eventMessage().trackingToken());
if (entry.canHandle()) {
eventBatch.add(
entry.eventMessage()
.withTrackingToken(lastConsumedToken)
);
}
}

private boolean eventsEqualingLastConsumedTokenRemain() {
return Optional.ofNullable(processingQueue.peek())
.filter(entry -> entry.eventMessage().trackingToken().equals(lastConsumedToken))
.isPresent();
}

private void extendClaim() {
logger.debug("Work Package [{}]-[{}] will extend its token claim.", name, segment.getSegmentId());
tokenStore.extendClaim(name, segment.getSegmentId());
Expand Down