From 6f4bbbd96bcecdb82cf7753ce1dae9fa6baebf9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Jos=C3=A9=20Ramos?= Date: Wed, 14 Aug 2019 21:38:31 -0300 Subject: [PATCH] GEODE-7079: Prevent NPE During Queue Conflation (#3911) * GEODE-7079: Prevent NPE During Queue Conflation - Added tests. - Fixed minor warnings. - Use the cached region name when doing conflation instead of the actual region so the processor doesn't need to wait for the actual region to be fully initialized. Co-authored-by: Benjamin Ross --- .../AsyncEventListenerDistributedTest.java | 195 +++++++++++----- .../AbstractGatewaySenderEventProcessor.java | 4 +- ...lGatewaySenderEventProcessorJUnitTest.java | 35 +++ .../serial/SerialWANConflationDUnitTest.java | 216 ++++++++++++++++-- 4 files changed, 374 insertions(+), 76 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java index e3350815163a..09cab0e6373f 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDistributedTest.java @@ -16,6 +16,7 @@ import static org.apache.geode.cache.RegionShortcut.PARTITION; import static org.apache.geode.cache.RegionShortcut.REPLICATE; +import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.dunit.VM.getCurrentVMNum; import static org.apache.geode.test.dunit.VM.getVM; @@ -26,6 +27,7 @@ import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -57,6 +59,7 @@ import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory; import org.apache.geode.cache.asyncqueue.internal.InternalAsyncEventQueue; import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.internal.cache.wan.InternalGatewaySender; @@ -127,9 +130,9 @@ protected Properties getDistributedSystemProperties() { @Test // serial, ReplicateRegion public void testSerialAsyncEventQueueSize() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, dispatcherThreadCount, 100)); @@ -146,9 +149,9 @@ public void testSerialAsyncEventQueueSize() { vm1.invoke(() -> getInternalGatewaySender().pause()); vm2.invoke(() -> getInternalGatewaySender().pause()); - vm0.invoke(() -> waitForDispatcherToPause()); - vm1.invoke(() -> waitForDispatcherToPause()); - vm2.invoke(() -> waitForDispatcherToPause()); + vm0.invoke(this::waitForDispatcherToPause); + vm1.invoke(this::waitForDispatcherToPause); + vm2.invoke(this::waitForDispatcherToPause); vm0.invoke(() -> doPuts(replicateRegionName, 1000)); @@ -159,9 +162,9 @@ public void testSerialAsyncEventQueueSize() { @Test // serial, ReplicateRegion public void testReplicatedSerialAsyncEventQueue() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, dispatcherThreadCount, 100)); @@ -186,9 +189,9 @@ public void testReplicatedSerialAsyncEventQueue() { @Test // serial, conflation, ReplicateRegion public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100, dispatcherThreadCount, 100)); @@ -205,9 +208,9 @@ public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() { vm1.invoke(() -> getInternalGatewaySender().pause()); vm2.invoke(() -> getInternalGatewaySender().pause()); - vm0.invoke(() -> waitForDispatcherToPause()); - vm1.invoke(() -> waitForDispatcherToPause()); - vm2.invoke(() -> waitForDispatcherToPause()); + vm0.invoke(this::waitForDispatcherToPause); + vm1.invoke(this::waitForDispatcherToPause); + vm2.invoke(this::waitForDispatcherToPause); Map keyValues = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -252,16 +255,19 @@ public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() { @Test // serial, persistent, conflation, ReplicateRegion public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + true, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm0.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId)); vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId)); @@ -283,7 +289,8 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart() createCache(); createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, - createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100); + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL); createReplicateRegion(replicateRegionName, asyncEventQueueId); @@ -298,7 +305,8 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart() createCache(); createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, - createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100); + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL); createReplicateRegion(replicateRegionName, asyncEventQueueId); // primary sender @@ -315,16 +323,19 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart() @Ignore("TODO: Disabled for 52351") @Test // serial, persistent, ReplicateRegion public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm0.invoke(() -> { createReplicateRegion(replicateRegionName, asyncEventQueueId); @@ -334,11 +345,11 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() vm1.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId)); vm2.invoke(() -> createReplicateRegion(replicateRegionName, asyncEventQueueId)); - vm1.invoke(() -> waitForSenderToBecomePrimary()); + vm1.invoke(this::waitForSenderToBecomePrimary); vm1.invoke(() -> doPuts(replicateRegionName, 2000)); - vm1.invoke(() -> waitForRegionQueuesToEmpty()); - vm2.invoke(() -> waitForRegionQueuesToEmpty()); + vm1.invoke(this::waitForRegionQueuesToEmpty); + vm2.invoke(this::waitForRegionQueuesToEmpty); int vm1size = vm1.invoke(() -> ((Map) getSpyAsyncEventListener().getEventsMap()).size()); int vm2size = vm2.invoke(() -> ((Map) getSpyAsyncEventListener().getEventsMap()).size()); @@ -347,11 +358,65 @@ public void testReplicatedSerialAsyncEventQueueWithPersistenceEnabled_Restart2() assertThat(vm1size + vm2size).isGreaterThanOrEqualTo(2000); } + @Test + // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk + // and the processor started dispatching events before the actual region was available. + public void replicatedRegionWithPersistentSerialAsyncEventQueueAndConflationEnabledShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue() + throws IOException { + // Custom Log File to manually search for exceptions. + File customLogFile = temporaryFolder.newFile("memberLog.log"); + Properties dsProperties = getDistributedSystemProperties(); + dsProperties.setProperty(ConfigurationProperties.LOG_FILE, customLogFile.getAbsolutePath()); + + // Create Region, AsyncEventQueue and Insert Some Entries. + vm0.invoke(() -> { + createCache(); + // Large batch time interval and low batch size so no events are processed before the restart. + createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 10, + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, 120000); + createReplicateRegion(replicateRegionName, asyncEventQueueId); + doPuts(replicateRegionName, 5); + waitForAsyncEventQueueSize(5); + }); + + vm0.invoke(() -> { + // Restart the cache. + getCache().close(); + cacheRule.createCache(dsProperties); + + // Recover the queue from disk, reduce thresholds so processing starts right away. + SpyAsyncEventListener spyAsyncEventListener = new SpyAsyncEventListener(); + createPersistentAsyncEventQueue(asyncEventQueueId, spyAsyncEventListener, true, 5, + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL); + waitForSenderToBecomePrimary(); + + // Wait for the processors to start. + await().until(() -> { + Set threads = Thread.getAllStackTraces().keySet(); + return threads + .stream() + .filter(t -> t.getName().contains("Processor for GatewaySender_AsyncEventQueue")) + .allMatch(Thread::isAlive); + }); + + // Create the region, processing will continue and no NPE should be thrown anymore. + createReplicateRegion(replicateRegionName, asyncEventQueueId); + waitForRegionQueuesToEmpty(); + assertThat(spyAsyncEventListener.getEventsMap().size()).isEqualTo(5); + }); + + Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line) + .as("Dispatcher shouldn't have thrown any errors while processing batches") + .doesNotContain("An Exception occurred. The dispatcher will continue.") + .doesNotContain("java.lang.NullPointerException")); + } + @Test // serial, PartitionedRegion public void testPartitionedSerialAsyncEventQueue() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, dispatcherThreadCount, 100)); @@ -377,9 +442,9 @@ public void testPartitionedSerialAsyncEventQueue() { @Test // serial, conflation, PartitionedRegion public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), true, 100, dispatcherThreadCount, 100)); @@ -396,9 +461,9 @@ public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() { vm1.invoke(() -> getInternalGatewaySender().pause()); vm2.invoke(() -> getInternalGatewaySender().pause()); - vm0.invoke(() -> waitForDispatcherToPause()); - vm1.invoke(() -> waitForDispatcherToPause()); - vm2.invoke(() -> waitForDispatcherToPause()); + vm0.invoke(this::waitForDispatcherToPause); + vm1.invoke(this::waitForDispatcherToPause); + vm2.invoke(this::waitForDispatcherToPause); Map keyValues = new HashMap<>(); for (int i = 0; i < 1000; i++) { @@ -451,16 +516,19 @@ public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() { */ @Test // persistent, PartitionedRegion public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled() { - vm0.invoke(() -> createCache()); - vm1.invoke(() -> createCache()); - vm2.invoke(() -> createCache()); + vm0.invoke(this::createCache); + vm1.invoke(this::createCache); + vm2.invoke(this::createCache); vm0.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm1.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm2.invoke(() -> createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), - false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100)); + false, 100, createDiskStoreName(asyncEventQueueId), false, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL)); vm0.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16)); vm1.invoke(() -> createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16)); @@ -483,7 +551,8 @@ public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled_Restart() createCache(); createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, - createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100); + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL); createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16); @@ -497,7 +566,8 @@ public void testPartitionedSerialAsyncEventQueueWithPersistenceEnabled_Restart() createCache(); createPersistentAsyncEventQueue(asyncEventQueueId, new SpyAsyncEventListener(), false, 100, - createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100); + createDiskStoreName(asyncEventQueueId), true, dispatcherThreadCount, 100, + DEFAULT_BATCH_TIME_INTERVAL); createPartitionedRegion(partitionedRegionName, asyncEventQueueId, 0, 16); // primary sender @@ -600,11 +670,12 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId, String diskStoreName, boolean isDiskSynchronous, int dispatcherThreads, - int maximumQueueMemory) { + int maximumQueueMemory, + int batchTimeInterval) { + assertThat(asyncEventQueueId).isNotEmpty(); assertThat(asyncEventListener).isNotNull(); assertThat(diskStoreName).isNotEmpty(); - createDiskStore(diskStoreName, asyncEventQueueId); AsyncEventQueueFactory asyncEventQueueFactory = getCache().createAsyncEventQueueFactory(); @@ -614,6 +685,7 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId, asyncEventQueueFactory.setDiskSynchronous(isDiskSynchronous); asyncEventQueueFactory.setDispatcherThreads(dispatcherThreads); asyncEventQueueFactory.setMaximumQueueMemory(maximumQueueMemory); + asyncEventQueueFactory.setBatchTimeInterval(batchTimeInterval); asyncEventQueueFactory.setParallel(false); asyncEventQueueFactory.setPersistent(true); @@ -623,17 +695,18 @@ private void createPersistentAsyncEventQueue(String asyncEventQueueId, private void addClosingCacheListener(String regionName, int closeAfterCreateKey) { assertThat(regionName).isNotEmpty(); - Region region = getCache().getRegion(regionName); + Region region = getCache().getRegion(regionName); assertNotNull(region); - CacheListenerAdapter cacheListener = new CacheListenerAdapter() { - @Override - public void afterCreate(EntryEvent event) { - if ((Integer) event.getKey() == closeAfterCreateKey) { - getCache().close(); - } - } - }; + CacheListenerAdapter cacheListener = + new CacheListenerAdapter() { + @Override + public void afterCreate(EntryEvent event) { + if ((Integer) event.getKey() == closeAfterCreateKey) { + getCache().close(); + } + } + }; region.getAttributesMutator().addCacheListener(cacheListener); } @@ -676,6 +749,7 @@ private int getTotalRegionQueueSize() { return totalSize; } + @SuppressWarnings("unchecked") private void waitForAsyncEventListenerWithEventsMapSize(int expectedSize) { await().untilAsserted( () -> assertThat(getSpyAsyncEventListener().getEventsMap()).hasSize(expectedSize)); @@ -745,6 +819,7 @@ Map getEventsMap() { } @Override + @SuppressWarnings("unchecked") public boolean processEvents(List events) { for (AsyncEvent event : events) { eventsMap.put(event.getKey(), event.getDeserializedValue()); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 3d5405ea88c9..698bc7ce9b02 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -783,7 +783,7 @@ public List conflate(List events) { if (gsEvent.shouldBeConflated()) { // The event should be conflated. Create the conflation key // (comprised of the event's region, key and the operation). - ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(), + ConflationKey key = new ConflationKey(gsEvent.getRegionPath(), gsEvent.getKeyToConflate(), gsEvent.getOperation()); // Get the entry at that key @@ -799,7 +799,7 @@ public List conflate(List events) { } else { // The event should not be conflated (create or destroy). Add it to // the map. - ConflationKey key = new ConflationKey(gsEvent.getRegion().getFullPath(), + ConflationKey key = new ConflationKey(gsEvent.getRegionPath(), gsEvent.getKeyToConflate(), gsEvent.getOperation(), gsEvent.getShadowKey()); conflatedEventsMap.put(key, gsEvent); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java index 7b0cfb293134..e6abad59ffea 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java @@ -15,7 +15,10 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -106,6 +109,38 @@ public void validateBatchConflationWithBatchContainingDuplicateConflatableEvents assertThat(gsei2.getShadowKey()).isEqualTo(lastUpdateShadowKey); } + // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk + // and the processor started dispatching events before the actual region was available. + @Test + public void verifyBatchConflationWithNullEventRegionDoesNowThrowException() + throws Exception { + AbstractGatewaySenderEventProcessor processor = + ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender); + + List events = new ArrayList(); + + LocalRegion lr = mock(LocalRegion.class); + when(lr.getFullPath()).thenReturn("/dataStoreRegion"); + when(lr.getCache()).thenReturn(this.cache); + + // Create two events for the same key, so that conflation will be needed. Mock the getRegion() + // value to return as null so we will hit the NPE if + // it is referenced. + GatewaySenderEventImpl gsei1 = + spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.CREATE, + "Object_13964", "Object_13964_1", 100, 27709)); + doReturn(null).when(gsei1).getRegion(); + + GatewaySenderEventImpl gsei2 = + spy(ParallelGatewaySenderHelper.createGatewaySenderEvent(lr, Operation.UPDATE, + "Object_13964", "Object_13964_2", 101, 27822)); + doReturn(null).when(gsei2).getRegion(); + + events.add(gsei1); + events.add(gsei2); + assertThatCode(() -> processor.conflate(events)).doesNotThrowAnyException(); + } + @Test public void validateBatchConflationWithBatchContainingDuplicateNonConflatableEvents() throws Exception { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java index a3f2f305efbf..df3a19db71cb 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java @@ -14,23 +14,149 @@ */ package org.apache.geode.internal.cache.wan.serial; +import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_TIME_INTERVAL; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskStore; +import org.apache.geode.cache.DiskStoreFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.wan.GatewayEventFilter; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.cache.wan.GatewaySenderFactory; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory; import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; public class SerialWANConflationDUnitTest extends WANTestBase { + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + private void createCacheWithLogFile(Integer locPort, String logFile) { + WANTestBase test = new WANTestBase(); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + String logLevel = System.getProperty(LOG_LEVEL, "info"); + props.setProperty(LOG_LEVEL, logLevel); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + props.setProperty(LOG_FILE, logFile); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + } + + private File createDirectory(String name) { + assertThat(name).isNotEmpty(); + + File directory = new File(temporaryFolder.getRoot(), name); + if (!directory.exists()) { + try { + return temporaryFolder.newFolder(name); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + return directory; + } + + private GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, + boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, + boolean isPersistent, GatewayEventFilter filter, int numDispatchers, + GatewaySender.OrderPolicy policy, int socketBufferSize, int batchTimeInterval) { + InternalGatewaySenderFactory gateway = + (InternalGatewaySenderFactory) cache.createGatewaySenderFactory(); + gateway.setParallel(isParallel); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setBatchConflationEnabled(isConflation); + gateway.setDispatcherThreads(numDispatchers); + gateway.setOrderPolicy(policy); + gateway.setLocatorDiscoveryCallback(new MyLocatorCallback()); + gateway.setSocketBufferSize(socketBufferSize); + gateway.setBatchTimeInterval(batchTimeInterval); + + if (filter != null) { + eventFilter = filter; + gateway.addGatewayEventFilter(filter); + } + + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + + return gateway; + } + + private void createSender(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, + int batchTimeInterval) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + File persistentDirectory = createDirectory(dsName + "_disk_" + VM.getCurrentVMNum()); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] {persistentDirectory}; + GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, + batchSize, isConflation, isPersistent, filter, numDispatcherThreadsForTheRun, + GatewaySender.DEFAULT_ORDER_POLICY, + GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE, batchTimeInterval); + gateway.create(dsName, remoteDsId); + + } finally { + exln.remove(); + } + } + + private void waitForEventQueueSize(int expectedQueueSize) { + await().untilAsserted(() -> { + Set senders = cache.getGatewaySenders(); + Optional sender = + senders.stream().filter(s -> s.getId().equals("ln")).findFirst(); + assertThat(sender.isPresent()).isTrue(); + Set queues = ((AbstractGatewaySender) sender.get()).getQueues(); + int totalEvents = queues.stream().mapToInt(RegionQueue::size).sum(); + assertThat(totalEvents).isEqualTo(expectedQueueSize); + }); + } + @Test - public void testSerialPropagationPartitionRegionBatchConflation() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + public void testSerialPropagationPartitionRegionBatchConflation() { + Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); @@ -59,7 +185,7 @@ public void testSerialPropagationPartitionRegionBatchConflation() throws Excepti vm7.invoke(() -> pauseSender("ln")); - final Map keyValues = new HashMap(); + final Map keyValues = new HashMap<>(); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { @@ -92,9 +218,9 @@ public void testSerialPropagationPartitionRegionBatchConflation() throws Excepti } @Test - public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + public void testSerialPropagationPartitionRegionConflationDuringEnqueue() { + Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); @@ -123,7 +249,7 @@ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws vm7.invoke(() -> pauseSender("ln")); - final Map keyValues = new HashMap(); + final Map keyValues = new HashMap<>(); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { @@ -134,8 +260,8 @@ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws ArrayList v4List = (ArrayList) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20)); - assertTrue("After conflation during enqueue, there should be only 20 events", - v4List.get(0) == 20); + assertEquals("After conflation during enqueue, there should be only 20 events", 20, + (int) v4List.get(0)); vm4.invoke(() -> resumeSender("ln")); vm5.invoke(() -> resumeSender("ln")); @@ -150,13 +276,75 @@ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws ArrayList v7List = (ArrayList) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0)); - assertTrue("No events in secondary queue stats since it's serial sender", - (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0); - assertTrue("Total queued events should be 100", - (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100); + assertEquals("No events in secondary queue stats since it's serial sender", 0, + (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10))); + assertEquals("Total queued events should be 100", 100, + (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2))); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); } + @Test + // See GEODE-7079: a NullPointerException was thrown whenever the queue was recovered from disk + // and the processor started dispatching events before the actual region was available. + public void persistentSerialGatewayWithConflationShouldNotLooseEventsNorThrowNullPointerExceptionsWhenMemberIsRestartedWhileEventsAreStillOnTheQueue() + throws IOException { + Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createReplicatedRegion(getTestMethodName(), null, Scope.DISTRIBUTED_ACK, + DataPolicy.PERSISTENT_REPLICATE, isOffHeap())); + createReceiverInVMs(vm2); + + // Create Region, associate gateway and insert some entries. + vm4.invoke(() -> { + createCache(lnPort); + createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, + isOffHeap()); + + // Large batch time interval and low batch size so no events are processed before the restart. + createSender("ln", 2, false, 100, 10, true, true, null, 120000); + + Region region = cache.getRegion(getTestMethodName()); + for (int i = 0; i < 5; i++) { + region.put(i, i); + } + waitForEventQueueSize(5); + }); + vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0)); + + // Custom Log File to manually search for exceptions. + File customLogFile = temporaryFolder.newFile("memberLog.log"); + + vm4.invoke(() -> { + // Restart the cache. + cache.close(); + createCacheWithLogFile(lnPort, customLogFile.getAbsolutePath()); + + // Recover the queue from disk, reduce batch thresholds so processing starts right away. + createSender("ln", 2, false, 100, 5, true, true, null, DEFAULT_BATCH_TIME_INTERVAL); + waitForSenderToBecomePrimary("ln"); + + // Wait for the processors to start. + await().until(() -> { + Set threads = Thread.getAllStackTraces().keySet(); + return threads + .stream() + .filter(t -> t.getName().contains("Processor for GatewaySender_ln")) + .allMatch(Thread::isAlive); + }); + + // Create the region, processing will continue and no NPE should be thrown anymore. + createReplicatedRegion(getTestMethodName(), "ln", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, + isOffHeap()); + }); + vm2.invoke(() -> validateRegionSize(getTestMethodName(), 5)); + + Files.lines(customLogFile.toPath()).forEach((line) -> assertThat(line) + .as("Dispatchers shouldn't have thrown any errors while processing batches") + .doesNotContain("An Exception occurred. The dispatcher will continue.") + .doesNotContain("java.lang.NullPointerException")); + } }