Skip to content

Commit

Permalink
GEODE-7079: Prevent NPE During Queue Conflation (#3911)
Browse files Browse the repository at this point in the history
* GEODE-7079: Prevent NPE During Queue Conflation

- Added tests.
- Fixed minor warnings.
- Use the cached region name when doing conflation instead of the actual region so the processor doesn't need to wait for the actual region to be fully initialized.

Co-authored-by: Benjamin Ross <bross@pivotal.io>
  • Loading branch information
2 people authored and gesterzhou committed Aug 15, 2019
1 parent 374eff7 commit 6f4bbbd
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 76 deletions.
Expand Up @@ -16,6 +16,7 @@

import static org.apache.geode.cache.RegionShortcut.PARTITION;
import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
import static org.apache.geode.test.dunit.VM.getVM;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -57,6 +59,7 @@
import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
Expand Down Expand Up @@ -127,9 +130,9 @@ protected Properties getDistributedSystemProperties() {

@Test // serial, ReplicateRegion
public void testSerialAsyncEventQueueSize() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
Expand All @@ -146,9 +149,9 @@ public void testSerialAsyncEventQueueSize() {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());

vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);

vm0.invoke(() -> doPuts(replicateRegionName, 1000));

Expand All @@ -159,9 +162,9 @@ public void testSerialAsyncEventQueueSize() {

@Test // serial, ReplicateRegion
public void testReplicatedSerialAsyncEventQueue() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
Expand All @@ -186,9 +189,9 @@ public void testReplicatedSerialAsyncEventQueue() {

@Test // serial, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
Expand All @@ -205,9 +208,9 @@ public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());

vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);

Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
Expand Down Expand Up @@ -252,16 +255,19 @@ public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {

@Test // serial, persistent, conflation, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));

vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
Expand All @@ -283,7 +289,8 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart()
createCache();

createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);

createReplicateRegion(replicateRegionName, asyncEventQueueId);

Expand All @@ -298,7 +305,8 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart()

createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createReplicateRegion(replicateRegionName, asyncEventQueueId);

// primary sender
Expand All @@ -315,16 +323,19 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart()
@Ignore("TODO: Disabled for 52351")
@Test // serial, persistent, ReplicateRegion
public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));

vm0.invoke(() -> {
createReplicateRegion(replicateRegionName, asyncEventQueueId);
Expand All @@ -334,11 +345,11 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2()
vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));
vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId));

vm1.invoke(() -> waitForSenderToBecomePrimary());
vm1.invoke(this::waitForSenderToBecomePrimary);
vm1.invoke(() -> doPuts(replicateRegionName, 2000));

vm1.invoke(() -> waitForRegionQueuesToEmpty());
vm2.invoke(() -> waitForRegionQueuesToEmpty());
vm1.invoke(this::waitForRegionQueuesToEmpty);
vm2.invoke(this::waitForRegionQueuesToEmpty);

int vm1size = vm1.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
int vm2size = vm2.invoke(() -> ((Map<?, ?>) getSpyAsyncEventListener().getEventsMap()).size());
Expand All @@ -347,11 +358,65 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2()
assertThat(vm1size + vm2size).isGreaterThanOrEqualTo(2000);
}

@Test
// See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk
// and the processor started dispatching events before the actual region was available.
public void replicatedRegionWithPersistentSerialAsyncEventQueueAndConflationEnabledShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue()

This comment has been minimized.

Copy link
@kirklund

kirklund Aug 15, 2019

Contributor

Test name is a bit long ;) lol

This comment has been minimized.

Copy link
@jujoramos

jujoramos Aug 15, 2019

Author Contributor

Agreed!! :-P.

throws IOException {
// Custom Log File to manually search for exceptions.
File customLogFile = temporaryFolder.newFile("memberLog.log");
Properties dsProperties = getDistributedSystemProperties();
dsProperties.setProperty(ConfigurationProperties.LOG_FILE, customLogFile.getAbsolutePath());

// Create Region, AsyncEventQueue and Insert Some Entries.
vm0.invoke(() -> {
createCache();
// Large batch time interval and low batch size so no events are processed before the restart.
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 10,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, 120000);
createReplicateRegion(replicateRegionName, asyncEventQueueId);
doPuts(replicateRegionName, 5);
waitForAsyncEventQueueSize(5);
});

vm0.invoke(() -> {
// Restart the cache.
getCache().close();
cacheRule.createCache(dsProperties);

// Recover the queue from disk, reduce thresholds so processing starts right away.
SpyAsyncEventListener spyAsyncEventListener = new SpyAsyncEventListener();
createPersistentAsyncEventQueue(asyncEventQueueId, spyAsyncEventListener, true, 5,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
waitForSenderToBecomePrimary();

// Wait for the processors to start.
await().until(() -> {
Set<Thread> threads = Thread.getAllStackTraces().keySet();
return threads
.stream()
.filter(t -> t.getName().contains("Processor for GatewaySender_AsyncEventQueue"))
.allMatch(Thread::isAlive);
});

// Create the region, processing will continue and no NPE should be thrown anymore.
createReplicateRegion(replicateRegionName, asyncEventQueueId);
waitForRegionQueuesToEmpty();
assertThat(spyAsyncEventListener.getEventsMap().size()).isEqualTo(5);
});

Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line)
.as("Dispatcher shouldn't have thrown any errors while processing batches")
.doesNotContain("An Exception occurred. The dispatcher will continue.")
.doesNotContain("java.lang.NullPointerException"));
}

@Test // serial, PartitionedRegion
public void testPartitionedSerialAsyncEventQueue() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false,
100, dispatcherThreadCount, 100));
Expand All @@ -377,9 +442,9 @@ public void testPartitionedSerialAsyncEventQueue() {

@Test // serial, conflation, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true,
100, dispatcherThreadCount, 100));
Expand All @@ -396,9 +461,9 @@ public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
vm1.invoke(() -> getInternalGatewaySender().pause());
vm2.invoke(() -> getInternalGatewaySender().pause());

vm0.invoke(() -> waitForDispatcherToPause());
vm1.invoke(() -> waitForDispatcherToPause());
vm2.invoke(() -> waitForDispatcherToPause());
vm0.invoke(this::waitForDispatcherToPause);
vm1.invoke(this::waitForDispatcherToPause);
vm2.invoke(this::waitForDispatcherToPause);

Map<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < 1000; i++) {
Expand Down Expand Up @@ -451,16 +516,19 @@ public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
*/
@Test // persistent, PartitionedRegion
public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled() {
vm0.invoke(() -> createCache());
vm1.invoke(() -> createCache());
vm2.invoke(() -> createCache());
vm0.invoke(this::createCache);
vm1.invoke(this::createCache);
vm2.invoke(this::createCache);

vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));
vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(),
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100));
false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL));

vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16));
Expand All @@ -483,7 +551,8 @@ public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled_Restart()
createCache();

createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);

createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);

Expand All @@ -497,7 +566,8 @@ public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled_Restart()

createCache();
createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100,
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100);
createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100,
DEFAULT_BATCH_TIME_INTERVAL);
createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16);

// primary sender
Expand Down Expand Up @@ -600,11 +670,12 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId,
String diskStoreName,
boolean isDiskSynchronous,
int dispatcherThreads,
int maximumQueueMemory) {
int maximumQueueMemory,
int batchTimeInterval) {

assertThat(asyncEventQueueId).isNotEmpty();
assertThat(asyncEventListener).isNotNull();
assertThat(diskStoreName).isNotEmpty();

createDiskStore(diskStoreName, asyncEventQueueId);

AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory();
Expand All @@ -614,6 +685,7 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId,
asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous);
asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads);
asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory);
asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval);
asyncEventQueueFactory.setParallel(false);
asyncEventQueueFactory.setPersistent(true);

Expand All @@ -623,17 +695,18 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId,
private void addClosingCacheListener(String regionName, int closeAfterCreateKey) {
assertThat(regionName).isNotEmpty();

Region region = getCache().getRegion(regionName);
Region<Integer, Integer> region = getCache().getRegion(regionName);
assertNotNull(region);

CacheListenerAdapter cacheListener = new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
if ((Integer) event.getKey() == closeAfterCreateKey) {
getCache().close();
}
}
};
CacheListenerAdapter<Integer, Integer> cacheListener =
new CacheListenerAdapter<Integer, Integer>() {
@Override
public void afterCreate(EntryEvent event) {
if ((Integer) event.getKey() == closeAfterCreateKey) {
getCache().close();
}
}
};

region.getAttributesMutator().addCacheListener(cacheListener);
}
Expand Down Expand Up @@ -676,6 +749,7 @@ private int getTotalRegionQueueSize() {
return totalSize;
}

@SuppressWarnings("unchecked")
private void waitForAsyncEventListenerWithEventsMapSize(int expectedSize) {
await().untilAsserted(
() -> assertThat(getSpyAsyncEventListener().getEventsMap()).hasSize(expectedSize));
Expand Down Expand Up @@ -745,6 +819,7 @@ Map<K, V> getEventsMap() {
}

@Override
@SuppressWarnings("unchecked")
public boolean processEvents(List<AsyncEvent> events) {
for (AsyncEvent<K, V> event : events) {
eventsMap.put(event.getKey(), event.getDeserializedValue());
Expand Down

0 comments on commit 6f4bbbd

Please sign in to comment.