Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix for issue hazelcast#8261 . Makes sure that the CleanupResourcesTa…
…sk only sets the future value after the connection is closed.
  • Loading branch information
ihsandemir committed Jul 18, 2016
1 parent e9a59a0 commit c23081d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 2 deletions.
Expand Up @@ -53,6 +53,7 @@ public class ClientConnection implements Connection {
@Probe
protected final int connectionId;
private final AtomicBoolean live = new AtomicBoolean(true);
private final AtomicBoolean closeCompeleted = new AtomicBoolean(false);
private final ILogger logger;

private final AtomicInteger pendingPacketCount = new AtomicInteger(0);
Expand Down Expand Up @@ -242,6 +243,12 @@ public void close(String reason, Throwable cause) {
} else {
logger.finest(message);
}

closeCompeleted.set(true);
}

public boolean isCloseCompleted() {
return closeCompeleted.get();
}

protected void innerClose() throws IOException {
Expand Down
Expand Up @@ -204,6 +204,10 @@ public void run() {
continue;
}

if (!connection.isAlive() && connection.isCloseCompleted()) {
continue;
}

if (connection.getPendingPacketCount() != 0) {
long closedTime = connection.getClosedTime();
long elapsed = System.currentTimeMillis() - closedTime;
Expand Down
Expand Up @@ -7,6 +7,8 @@
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
Expand All @@ -16,10 +18,18 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.internal.cluster.impl.AdvancedClusterStateTest.changeClusterStateEventually;
import static com.hazelcast.test.HazelcastTestSupport.assertClusterSizeEventually;
import static com.hazelcast.test.HazelcastTestSupport.randomMapName;
import static com.hazelcast.test.HazelcastTestSupport.warmUpPartitions;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
Expand All @@ -34,9 +44,9 @@ public class ClientClusterStateTest {
@Before
public void before() {
factory = new TestHazelcastFactory();
instances = factory.newInstances(new Config(), 3);
instances = factory.newInstances(new Config(), 4);
for (HazelcastInstance instance : instances) {
assertClusterSizeEventually(3, instance);
assertClusterSizeEventually(4, instance);
}
instance = instances[instances.length - 1];
}
Expand Down Expand Up @@ -116,4 +126,67 @@ public void testClient_canExecuteOperations_whenClusterState_goesBackToActive_fr
IMap<Object, Object> map = client.getMap(randomMapName());
map.put(1, 1);
}

@Test
public void testClusterShutdownDuringMapPutAll() {
HazelcastInstance client = factory.newHazelcastClient();
final IMap<Object, Object> map = client.getMap("abc");

final HashMap values = new HashMap<Double, Double>();
for (int i = 0; i < 1000; i++) {
double value = Math.random();
values.put(value, value);
}

final int numThreads = 10;
final CountDownLatch threadsFinished = new CountDownLatch(numThreads);
final CountDownLatch threadsStarted = new CountDownLatch(numThreads);

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < numThreads; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
ILogger logger = Logger.getLogger(getClass());
boolean finished = false;
threadsStarted.countDown();
logger.info("PutAll thread started");
while (!finished) {
try {
map.putAll(values);
Thread.sleep(100);
} catch (IllegalStateException e) {
logger.warning("Expected exception for Map putAll during cluster shutdown:", e);
finished = true;
} catch (InterruptedException e) {
// do nothing
}
}
threadsFinished.countDown();
logger.info("PutAll thread finishing. Current finished thread count is:" + (numThreads - threadsFinished
.getCount()));
}
});
}

try {
assertTrue("All threads could not be started", threadsStarted.await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
fail("All threads could not be started due to InterruptedException. Could not start " + threadsStarted.getCount()
+ " threads out of " + numThreads);
}

instance.getCluster().shutdown();

executor.shutdown();

try {
assertTrue("All threads could not be finished", threadsFinished.await(2, TimeUnit.MINUTES));
} catch (InterruptedException e) {
fail("All threads could not be finished due to InterruptedException. Could not finish " + threadsFinished.getCount()
+ " threads out of " + numThreads);
} finally {
executor.shutdownNow();
}
}
}

0 comments on commit c23081d

Please sign in to comment.