Skip to content

Commit

Permalink
ARTEMIS-3311 - ensure visibility of error state on operation context …
Browse files Browse the repository at this point in the history
…callback registration, fix and test
  • Loading branch information
gtully committed May 24, 2021
1 parent 23a28dc commit 72c9cae
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,63 +130,62 @@ public void executeOnCompletion(IOCallback runnable) {

@Override
public void executeOnCompletion(final IOCallback completion, final boolean storeOnly) {
if (errorCode != -1) {
completion.onError(errorCode, errorMessage);
return;
}

boolean executeNow = false;

synchronized (this) {
final int UNDEFINED = Integer.MIN_VALUE;
int storeLined = UNDEFINED;
int pageLined = UNDEFINED;
int replicationLined = UNDEFINED;
if (storeOnly) {
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
}
} else {
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = (replicationLined = replicationLineUp.intValue());
minimalStore = (storeLined = storeLineUp.intValue());
minimalPage = (pageLined = pageLineUp.intValue());
}
}
//On the next branches each of them is been used
if (replicationLined == UNDEFINED) {
replicationLined = replicationLineUp.intValue();
storeLined = storeLineUp.intValue();
pageLined = pageLineUp.intValue();
}
// On this case, we can just execute the context directly

if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (executorsPending.get() == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
if (errorCode == -1) {
final int UNDEFINED = Integer.MIN_VALUE;
int storeLined = UNDEFINED;
int pageLined = UNDEFINED;
int replicationLined = UNDEFINED;
if (storeOnly) {
if (storeOnlyTasks == null) {
storeOnlyTasks = new LinkedList<>();
}
} else {
execute(completion);
if (tasks == null) {
tasks = new LinkedList<>();
minimalReplicated = (replicationLined = replicationLineUp.intValue());
minimalStore = (storeLined = storeLineUp.intValue());
minimalPage = (pageLined = pageLineUp.intValue());
}
}
} else {
if (storeOnly) {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
//On the next branches each of them is been used
if (replicationLined == UNDEFINED) {
replicationLined = replicationLineUp.intValue();
storeLined = storeLineUp.intValue();
pageLined = pageLineUp.intValue();
}
// On this case, we can just execute the context directly

if (replicationLined == replicated && storeLined == stored && pageLined == paged) {
// We want to avoid the executor if everything is complete...
// However, we can't execute the context if there are executions pending
// We need to use the executor on this case
if (executorsPending.get() == 0) {
// No need to use an executor here or a context switch
// there are no actions pending.. hence we can just execute the task directly on the same thread
executeNow = true;
} else {
execute(completion);
}
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
if (storeOnly) {
assert !storeOnlyTasks.isEmpty() ? storeOnlyTasks.peekLast().storeLined <= storeLined : true;
storeOnlyTasks.add(new StoreOnlyTaskHolder(completion, storeLined));
} else {
// ensure total ordering
assert validateTasksAdd(storeLined, replicationLined, pageLined);
tasks.add(new TaskHolder(completion, storeLined, replicationLined, pageLined));
}
}
}
}

if (executeNow) {
// Executing outside of any locks
// Executing outside of any locks
if (errorCode != -1) {
completion.onError(errorCode, errorMessage);
} else if (executeNow) {
completion.done();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -100,6 +101,70 @@ public void done() {
}
}

@Test
public void testErrorNotLostOnPageSyncError() throws Exception {

ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
ExecutorService pageSyncTimer = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());

class PageWriteErrorJob implements Runnable {
final OperationContextImpl operationContext;
PageWriteErrorJob(OperationContextImpl impl) {
impl.pageSyncLineUp();
operationContext = impl;
}

@Override
public void run() {
try {
operationContext.onError(10, "bla");
} finally {
operationContext.pageSyncDone();
}
}
}

try {
final int numJobs = 10000;
final CountDownLatch errorsOnLateRegister = new CountDownLatch(numJobs);

for (int i = 0; i < numJobs; i++) {
final OperationContextImpl impl = new OperationContextImpl(executor);

final CountDownLatch done = new CountDownLatch(1);

pageSyncTimer.execute(new PageWriteErrorJob(impl));
impl.executeOnCompletion(new IOCallback() {

@Override
public void onError(int errorCode, String errorMessage) {
errorsOnLateRegister.countDown();
done.countDown();
}

@Override
public void done() {
done.countDown();
}
});

done.await();
}

assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return errorsOnLateRegister.await(1, TimeUnit.SECONDS);
}
}));


} finally {
executor.shutdown();
pageSyncTimer.shutdown();
}
}

@Test
public void testCaptureExceptionOnExecutor() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
Expand Down

0 comments on commit 72c9cae

Please sign in to comment.