From 3f71b28726b19cbf9c49c59cb626e99113e21776 Mon Sep 17 00:00:00 2001 From: nabarun Date: Mon, 4 Apr 2016 11:07:16 -0700 Subject: [PATCH 1/2] GEODE-1018: Added a listener to sleep the receiver on AfterCreate event to solve timing issue. * Added a listener in the receiver VM to sleep for a duration on AfterCreate event. This will make sure that the transmission is not completed by the time the receiver is shut down. * The region entry mismatch mentioned in the GEODE-1018 was because the receiver and sender were started before the persistent region was created. This was solved in GEODE-1062 --- .../internal/cache/wan/WANTestBase.java | 17 +++++++++ .../serial/SerialWANPropogationDUnitTest.java | 36 +++++++------------ 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index 66854510ee2e..d77df206e192 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -1355,6 +1355,23 @@ public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) { } } + public static void addListenerToSleepAfterCreateEvent(int milliSeconds) { + cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() + .addCacheListener(new CacheListenerAdapter() { + @Override + public void afterCreate(final EntryEvent event) { + try { + Thread.sleep(milliSeconds); + } + catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + } + + public static void createCache(Integer locPort){ createCache(false, locPort); } diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java index 55f1ac194870..d656e98ad95e 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java @@ -782,54 +782,42 @@ public void testReplicatedSerialPropagationWithRemoteReceiverRestarted_SenderRec // senders are created on local site. Batch size is kept to a high (170) so // there will be less number of exceptions (occur during dispatchBatch) in // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, true, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 350, false, true, null, true)); // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR_1", null, isOffHeap() )); - + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); + vm2.invoke(() -> WANTestBase.addListenerToSleepAfterCreateEvent(2000)); // start the senders on local site startSenderInVMs("ln", vm4, vm5); // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap())); // start puts in RR_1 in another thread AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); // close cache in remote site. This will automatically kill the remote // receivers. - Wait.pause(2000); vm2.invoke(() -> WANTestBase.closeCache()); - // vm3.invoke(() -> WANTestBase.closeCache()); - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } + + inv1.join(); // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); vm2.invoke(() -> WANTestBase.createReceiver( nyPort )); - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); + vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); - vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); + vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats(1, 1)); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); } public void testReplicatedSerialPropagationWithRemoteSiteBouncedBack_ReceiverPersistent() From 3d9fc9686f745ee5c9ded3e37114e992dd7c7e1c Mon Sep 17 00:00:00 2001 From: nabarun Date: Mon, 4 Apr 2016 14:33:12 -0700 Subject: [PATCH 2/2] GEODE-1018: Removed stacktrace and new runtime exception. Replace with currentThread.interrupt() --- .../com/gemstone/gemfire/internal/cache/wan/WANTestBase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java index d77df206e192..afc534b455f9 100644 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java @@ -1364,8 +1364,7 @@ public void afterCreate(final EntryEvent event) { Thread.sleep(milliSeconds); } catch (InterruptedException e) { - e.printStackTrace(); - throw new RuntimeException(e); + Thread.currentThread().interrupt(); } } });