Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process events with identical TrackingToken together in the PooledStreamingEventProcessor #2275

Merged
merged 8 commits into from Jul 7, 2022
Expand Up @@ -34,8 +34,11 @@
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -781,15 +784,19 @@ private void coordinateWorkPackages() throws InterruptedException {
fetched < WorkPackage.BUFFER_SIZE && isSpaceAvailable() && eventStream.hasNextAvailable();
fetched++) {
TrackedEventMessage<?> event = eventStream.nextAvailable();
offerEventToWorkPackages(event);
lastScheduledToken = event.trackingToken();

// Make sure all subsequent events with the same token as the last are added as well.
// These are the result of upcasting and should always be processed in the same batch.
while (eventStream.peek()
.filter(e -> lastScheduledToken.equals(e.trackingToken()))
.isPresent()) {
offerEventToWorkPackages(eventStream.nextAvailable());
// These are the result of upcasting and should always be scheduled in one go.
if (eventsEqualingLastScheduledToken()) {
List<TrackedEventMessage<?>> events = new ArrayList<>();
events.add(event);
while (eventsEqualingLastScheduledToken()) {
events.add(eventStream.nextAvailable());
}
offerEventsToWorkPackages(events);
} else {
offerEventToWorkPackages(event);
}
}

Expand All @@ -803,6 +810,12 @@ private void coordinateWorkPackages() throws InterruptedException {
.forEach(WorkPackage::scheduleWorker);
}

private boolean eventsEqualingLastScheduledToken() {
return eventStream.peek()
.filter(e -> lastScheduledToken.equals(e.trackingToken()))
.isPresent();
}

private void offerEventToWorkPackages(TrackedEventMessage<?> event) {
boolean anyScheduled = false;
for (WorkPackage workPackage : workPackages.values()) {
Expand All @@ -817,6 +830,22 @@ private void offerEventToWorkPackages(TrackedEventMessage<?> event) {
}
}

private void offerEventsToWorkPackages(List<TrackedEventMessage<?>> events) {
boolean anyScheduled = false;
for (WorkPackage workPackage : workPackages.values()) {
boolean scheduled = workPackage.scheduleEvents(Collections.unmodifiableList(events));
anyScheduled = anyScheduled || scheduled;
}
if (!anyScheduled) {
events.forEach(event -> {
ignoredMessageHandler.accept(event);
if (!eventFilter.canHandleTypeOf(event)) {
eventStream.skipMessagesWithPayloadTypeOf(event);
}
});
}
}

private void scheduleImmediateCoordinationTask() {
scheduleCoordinationTask(0);
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package org.axonframework.eventhandling.pooled;

import org.axonframework.common.Assert;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
Expand All @@ -36,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -121,15 +123,80 @@ private WorkPackage(Builder builder) {
this.lastClaimExtension = System.currentTimeMillis();
}

/**
* Schedule a collection of {@link TrackedEventMessage TrackedEventMessages} for processing by this work package.
* <p>
* Only use this method if the {@link TrackingToken TrackingTokens} of every event are equal, as those events should
* be handled within a single transaction. This scenario presents itself whenever an event is upcasted into
* <em>several instances</em>. When tokens differ between events please use
* {@link #scheduleEvent(TrackedEventMessage)}.
* <p>
* Will disregard the given {@code events} if their {@code TrackingTokens} are covered by the previously scheduled
* event.
* <p>
* <b>Threading note:</b> This method is and should only to be called by the {@link Coordinator} thread of a {@link
* PooledStreamingEventProcessor}.
*
* @param events The events to schedule for work in this work package.
* @return {@code True} if this {@link WorkPackage} scheduled one of the events for execution, otherwise
* {@code false}.
*/
public boolean scheduleEvents(List<TrackedEventMessage<?>> events) {
if (events.isEmpty()) {
// cannot schedule an empty events list
return false;
}
assertEqualTokens(events);

if (events.stream().allMatch(this::shouldNotSchedule)) {
if (logger.isTraceEnabled()) {
events.forEach(event -> logger.trace(
"Ignoring event [{}] with position [{}] for work package [{}]. "
+ "The last token [{}] covers event's token [{}].",
event.getIdentifier(), event.trackingToken().position().orElse(-1), segment.getSegmentId(),
lastDeliveredToken, event.trackingToken()
));
}
return false;
}

BatchProcessingEntry batchProcessingEntry = new BatchProcessingEntry();
boolean canHandleAny = events.stream()
.map(event -> {
boolean canHandle = canHandle(event);
batchProcessingEntry.add(new DefaultProcessingEntry(event, canHandle));
return canHandle;
})
.reduce(Boolean::logicalOr)
.orElse(false);

processingQueue.add(batchProcessingEntry);
lastDeliveredToken = batchProcessingEntry.trackingToken();
// the worker must always be scheduled to ensure claims are extended
scheduleWorker();

return canHandleAny;
}

private void assertEqualTokens(List<TrackedEventMessage<?>> events) {
TrackingToken expectedToken = events.get(0).trackingToken();
Assert.isTrue(
events.stream()
.map(TrackedEventMessage::trackingToken)
.allMatch(token -> Objects.equals(expectedToken, token)),
() -> "All tokens should match when scheduling multiple events in one go."
);
}

/**
* Schedule a {@link TrackedEventMessage} for processing by this work package. Will immediately disregard the given
* {@code event} if its {@link TrackingToken} is covered by the previously scheduled event.
* <p>
* <b>Threading note:</b> This method is and should only to be called by the {@link Coordinator} thread of a {@link
* PooledStreamingEventProcessor}
* PooledStreamingEventProcessor}.
*
* @param event the event to schedule for work in this work package
* @return {@code true} if this {@link WorkPackage} scheduled the event for execution, otherwise {@code false}
* @param event The event to schedule for work in this work package.
* @return {@code True} if this {@link WorkPackage} scheduled the event for execution, otherwise {@code false}.
*/
public boolean scheduleEvent(TrackedEventMessage<?> event) {
if (shouldNotSchedule(event)) {
Expand All @@ -143,7 +210,7 @@ public boolean scheduleEvent(TrackedEventMessage<?> event) {
event.getIdentifier(), event.trackingToken().position().orElse(-1), segment.getSegmentId());

boolean canHandle = canHandle(event);
processingQueue.add(new ProcessingEntry(event, canHandle));
processingQueue.add(new DefaultProcessingEntry(event, canHandle));
lastDeliveredToken = event.trackingToken();
// the worker must always be scheduled to ensure claims are extended
scheduleWorker();
Expand All @@ -152,24 +219,17 @@ public boolean scheduleEvent(TrackedEventMessage<?> event) {
}

/**
* The given {@code event} should not be scheduled if:
* <ol>
* <li>The {@link TrackedEventMessage#trackingToken()} {@link TrackingToken#covers(TrackingToken)} the last delivered token, and</li>
* <li>if the last delivered token does not equal the given {@code event's} token.</li>
* </ol>
* The given {@code event} should not be scheduled if the {@link TrackedEventMessage#trackingToken()}
* {@link TrackingToken#covers(TrackingToken)} the last delivered token.
* <p>
* The first validation ensures events that this work package already covered are ignored. The second validation
* ensures that subsequent events with the exact same token <b>are</b> included. The last step is necessary to
* include events that are the result of a one-to-many upcaster.
* This validation ensures events that this work package already covered are ignored.
*
* @param event The event to validate whether it should be scheduled yes or no.
* @return {@code true} if the given {@code event} should not be scheduled, {@code false} otherwise.
*/
private boolean shouldNotSchedule(TrackedEventMessage<?> event) {
// Null check is done to solve potential NullPointerException.
return lastDeliveredToken != null
&& lastDeliveredToken.covers(event.trackingToken())
&& !lastDeliveredToken.equals(event.trackingToken());
return lastDeliveredToken != null && lastDeliveredToken.covers(event.trackingToken());
}

private boolean canHandle(TrackedEventMessage<?> event) {
Expand Down Expand Up @@ -226,15 +286,13 @@ private void processEvents() throws Exception {
List<TrackedEventMessage<?>> eventBatch = new ArrayList<>();
while (!isAbortTriggered() && eventBatch.size() < batchSize && !processingQueue.isEmpty()) {
ProcessingEntry entry = processingQueue.poll();
lastConsumedToken = WrappedToken.advance(lastConsumedToken, entry.eventMessage().trackingToken());
if (entry.canHandle()) {
eventBatch.add(
entry.eventMessage()
.withTrackingToken(lastConsumedToken)
);
}
lastConsumedToken = WrappedToken.advance(lastConsumedToken, entry.trackingToken());
entry.addToBatch(eventBatch, lastConsumedToken);
}

// Make sure all subsequent events with the same token (if non-null) as the last are added as well.
// These are the result of upcasting and should always be processed in the same batch.

if (!eventBatch.isEmpty()) {
logger.debug("Work Package [{}]-[{}] is processing a batch of {} events.",
segment.getSegmentId(), name, eventBatch.size());
Expand Down Expand Up @@ -570,27 +628,81 @@ WorkPackage build() {
}
}

/**
* Marker interface defining a unit of work containing one or more event messages to be processed by this work
* package.
*/
private interface ProcessingEntry {

/**
* Return the position of this processing entry.
*
* @return The position of this processing entry.
*/
TrackingToken trackingToken();

/**
* Add this entry's events to the {@code eventBatch}. The events should reference the {@code wrappedToken} for
* correctly handling token progression.
*
* @param eventBatch The list of events to add this entry's events to.
* @param wrappedToken The wrapped token to attach to all events of this entry.
*/
void addToBatch(List<TrackedEventMessage<?>> eventBatch, TrackingToken wrappedToken);
}

/**
* Container of a {@link TrackedEventMessage} and {@code boolean} whether the given {@code eventMessage} can be
* handled in this package. The combination constitutes to a processing entry the {@link WorkPackage} should
* ingest.
*/
private static class ProcessingEntry {
private static class DefaultProcessingEntry implements ProcessingEntry {

private final TrackedEventMessage<?> eventMessage;
private final boolean canHandle;

public ProcessingEntry(TrackedEventMessage<?> eventMessage, boolean canHandle) {
public DefaultProcessingEntry(TrackedEventMessage<?> eventMessage, boolean canHandle) {
this.eventMessage = eventMessage;
this.canHandle = canHandle;
}

public TrackedEventMessage<?> eventMessage() {
return eventMessage;
@Override
public TrackingToken trackingToken() {
return eventMessage.trackingToken();
}

@Override
public void addToBatch(List<TrackedEventMessage<?>> eventBatch, TrackingToken wrappedToken) {
if (canHandle) {
eventBatch.add(eventMessage.withTrackingToken(wrappedToken));
}
}
}

/**
* Container of a batch of {@link ProcessingEntry ProcessingEntries}. These entries are grouped together since they
* should be handled within a single batch by the work package.
*/
private static class BatchProcessingEntry implements ProcessingEntry {

private final List<ProcessingEntry> processingEntries;

public BatchProcessingEntry() {
this.processingEntries = new ArrayList<>();
}

public void add(ProcessingEntry processingEntry) {
processingEntries.add(processingEntry);
}

@Override
public TrackingToken trackingToken() {
return processingEntries.get(0).trackingToken();
}

public boolean canHandle() {
return canHandle;
@Override
public void addToBatch(List<TrackedEventMessage<?>> eventBatch, TrackingToken wrappedToken) {
processingEntries.forEach(entry -> entry.addToBatch(eventBatch, wrappedToken));
}
}
}