Skip to content

Commit

Permalink
Drop use of DelayedTask as deadlines aren't scheduled by the aggregate
Browse files Browse the repository at this point in the history
Drop use of DelayedTask as deadlines aren't scheduled by the aggregate
directly, but through a provided DeadlineManager

#220
  • Loading branch information
smcvb committed Jun 12, 2018
1 parent f6d2aac commit 2a230f2
Showing 1 changed file with 5 additions and 42 deletions.
Expand Up @@ -32,9 +32,11 @@
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -60,8 +62,7 @@ public class AnnotatedAggregate<T> extends AggregateLifecycle implements Aggrega

private final AggregateModel<T> inspector;
private final RepositoryProvider repositoryProvider;
private final PriorityQueue<DelayedTask> delayedTasks = new PriorityQueue<>();
private final AtomicInteger delayedTasksSequence = new AtomicInteger(0);
private final Queue<Runnable> delayedTasks = new LinkedList<>();
private final EventBus eventBus;
private T aggregateRoot;
private boolean applying = false;
Expand Down Expand Up @@ -453,8 +454,7 @@ protected <P> ApplyMore doApply(P payload, MetaData metaData) {
applying = false;
}
} else {
delayedTasks.add(DelayedTask.highPriorityTask(() -> publish(createMessage(payload, metaData)),
delayedTasksSequence.getAndIncrement()));
delayedTasks.add(() -> publish(createMessage(payload, metaData)));
}
return this;
}
Expand Down Expand Up @@ -499,7 +499,7 @@ public ApplyMore andThenApply(Supplier<?> payloadOrMessageSupplier) {
@Override
public ApplyMore andThen(Runnable runnable) {
if (applying || aggregateRoot == null) {
delayedTasks.add(DelayedTask.highPriorityTask(runnable, delayedTasksSequence.getAndIncrement()));
delayedTasks.add(runnable);
} else {
runnable.run();
}
Expand Down Expand Up @@ -560,41 +560,4 @@ public GenericDomainEventMessage<P> andMetaData(Map<String, ?> additionalMetaDat
}
}
}

private static class DelayedTask implements Runnable, Comparable<DelayedTask> {

private static final int HIGH_PRIORITY = 1;
private static final int LOW_PRIORITY = 10;

private final int priority;
private final int order;
private final Runnable delegate;

private DelayedTask(int priority, int order, Runnable delegate) {
this.priority = priority;
this.order = order;
this.delegate = delegate;
}

private static DelayedTask highPriorityTask(Runnable delegate, int order) {
return new DelayedTask(HIGH_PRIORITY, order, delegate);
}

private static DelayedTask lowPriorityTask(Runnable delegate, int order) {
return new DelayedTask(LOW_PRIORITY, order, delegate);
}

@Override
public void run() {
delegate.run();
}

@Override
public int compareTo(DelayedTask o) {
if (priority == o.priority) {
return Integer.compare(order, o.order);
}
return Integer.compare(priority, o.priority);
}
}
}

0 comments on commit 2a230f2

Please sign in to comment.