Skip to content

Commit

Permalink
[BACKPORT] Fix re-execution of completed task when killing the owner …
Browse files Browse the repository at this point in the history
…node
  • Loading branch information
tkountis committed Mar 6, 2017
1 parent c5d82f2 commit 291856d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import com.hazelcast.nio.Bits;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.BackupAwareOperation;
import com.hazelcast.spi.Notifier;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.WaitNotifyKey;

import java.io.IOException;

public class PutResultOperation extends AbstractDurableExecutorOperation implements Notifier {
public class PutResultOperation
extends AbstractDurableExecutorOperation
implements Notifier, BackupAwareOperation {

private int sequence;

Expand Down Expand Up @@ -57,6 +61,11 @@ public WaitNotifyKey getNotifiedKey() {
return new DurableExecutorWaitNotifyKey(name, uniqueId);
}

@Override
public Operation getBackupOperation() {
return new PutResultOperation(name, sequence, result);
}

@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.executor.ExecutorServiceTestSupport;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.TestHazelcastInstanceFactory;
Expand All @@ -30,6 +31,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -153,6 +155,33 @@ public void testRetrieve_WhenOwnerMemberDown() throws Exception {
assertTrue(future.get());
}

@Test
public void testSingleExecution_WhenMigratedAfterCompletion_WhenOwnerMemberKilled() throws Exception {
String name = randomString();
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(3);
HazelcastInstance instance1 = factory.newHazelcastInstance();
HazelcastInstance instance2 = factory.newHazelcastInstance();
factory.newHazelcastInstance();
String key = generateKeyOwnedBy(instance1);

String runCounterName = "runCount";
IAtomicLong runCount = instance2.getAtomicLong(runCounterName);

DurableExecutorService executorService = instance1.getDurableExecutorService(name);
IncrementAtomicLongRunnable task = new IncrementAtomicLongRunnable(runCounterName);
DurableExecutorServiceFuture future = executorService.submitToKeyOwner(task, key);

future.get(); // Wait for it to finish

instance1.getLifecycleService().terminate();

executorService = instance2.getDurableExecutorService(name);
Future<Object> newFuture = executorService.retrieveResult(future.getTaskId());
newFuture.get(); // Make sure its completed

assertEquals(1, runCount.get());
}

@Test
public void testRetrieve_WhenResultOverwritten() throws Exception {
String name = randomString();
Expand Down

0 comments on commit 291856d

Please sign in to comment.