From ee9b4591ae600feed99d279626bc95d60ef687f5 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Thu, 2 Dec 2021 06:52:38 +0100 Subject: [PATCH] Updated with Kirk's first review comments --- .../txgrouping/TxGroupingBaseDUnitTest.java | 191 +++++++++++------- .../TxGroupingPartitionedRegionDUnitTest.java | 39 ++-- ...reateTxGroupingGatewaySenderDUnitTest.java | 2 +- .../parallel/TxGroupingParallelDUnitTest.java | 142 ++++--------- .../serial/TxGroupingSerialDUnitTest.java | 98 ++++----- .../CommonTxGroupingGatewaySenderFactory.java | 4 +- .../TxGroupingParallelGatewaySenderQueue.java | 63 ++++-- .../TxGroupingSerialGatewaySenderQueue.java | 59 ++++-- .../geode/internal/cache/wan/WANTestBase.java | 47 ----- 9 files changed, 289 insertions(+), 356 deletions(-) diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java index 4bf839eaa223..85b02878f171 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingBaseDUnitTest.java @@ -17,15 +17,12 @@ import static org.apache.geode.cache.Region.SEPARATOR; import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR; import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.dunit.VM.getVM; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import java.io.File; import java.io.IOException; @@ -62,7 +59,11 @@ import org.apache.geode.internal.cache.CustomerIDPartitionResolver; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.execute.data.CustId; +import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; +import org.apache.geode.internal.cache.execute.data.Shipment; +import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.internal.cache.tier.sockets.CacheServerStats; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.GatewayReceiverStats; @@ -193,7 +194,6 @@ public void tearDown() { protected Properties createLocatorConfig(int systemId, int locatorPort, int remoteLocatorPort) { Properties config = new Properties(); - config.setProperty(MCAST_PORT, "0"); config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId)); config.setProperty(LOCATORS, "localhost[" + locatorPort + ']'); config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + ']'); @@ -259,7 +259,6 @@ protected GatewayReceiverFactory createGatewayReceiverFactory(int receiverPort) protected Properties createServerConfig(int locatorPort) { Properties config = new Properties(); - config.setProperty(MCAST_PORT, "0"); config.setProperty(LOCATORS, "localhost[" + locatorPort + ']'); return config; } @@ -288,10 +287,8 @@ protected boolean isRunning(GatewaySender sender) { protected void validateRegionSize(String regionName, final int regionSize) { final Region r = cacheRule.getCache().getRegion(SEPARATOR + regionName); - assertNotNull(r); - if (regionSize != r.keySet().size()) { - await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize)); - } + assertThat(r).isNotNull(); + await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize)); } protected List getSenderStats(String senderId, final int expectedQueueSize) { @@ -310,7 +307,7 @@ protected List getSenderStats(String senderId, final int expectedQueueS await() .untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize)); } - ArrayList stats = new ArrayList<>(); + List stats = new ArrayList<>(); stats.add(statistics.getEventQueueSize()); stats.add(statistics.getEventsReceived()); stats.add(statistics.getEventsQueued()); @@ -340,26 +337,30 @@ protected GatewaySender getGatewaySender(String senderId) { return sender; } - protected void doPutsInsideTransactions(String regionName, Map keyValues, - int eventsPerTransaction) { - Region r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - int eventInTransaction = 0; - CacheTransactionManager cacheTransactionManager = - cacheRule.getCache().getCacheTransactionManager(); - for (Object key : keyValues.keySet()) { - if (eventInTransaction == 0) { - cacheTransactionManager.begin(); - } - r.put(key, keyValues.get(key)); - if (++eventInTransaction == eventsPerTransaction) { - cacheTransactionManager.commit(); - eventInTransaction = 0; - } + protected void doTxPuts(String regionName, final long putsPerTransaction, + final long transactions) { + doTxPuts(regionName, putsPerTransaction, transactions, 0); + } + + protected void doTxPuts(String regionName, final long putsPerTransaction, + final long transactions, long initialKeyId) { + Region region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName); + CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager(); + for (int i = 0; i < transactions; i++) { + long keyId = initialKeyId + (i * putsPerTransaction); + doOneTxWithPuts(region, mgr, putsPerTransaction, keyId); } - if (eventInTransaction != 0) { - cacheTransactionManager.commit(); + } + + private void doOneTxWithPuts(Region region, CacheTransactionManager mgr, + long putsPerTransaction, long initialKeyId) { + mgr.begin(); + for (int j = 0; j < putsPerTransaction; j++) { + long key = initialKeyId + j; + String value = "Value_" + key; + region.put(key, value); } + mgr.commit(); } protected void checkGatewayReceiverStats(int processBatches, int eventsReceived, @@ -386,35 +387,42 @@ protected void checkGatewayReceiverStats(int processBatches, int eventsReceived, } protected void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction, - final long transactions, long offset) { - Region r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName); - long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100); - long j; + final long transactions, long initialKeyId) { + Region region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName); CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager(); for (int i = 0; i < transactions; i++) { - boolean done = false; - do { + long keyId = initialKeyId + (i * putsPerTransaction); + doOneTxWithPutsWithRetryIfError(region, mgr, putsPerTransaction, keyId); + } + } + + private void doOneTxWithPutsWithRetryIfError(Region region, + CacheTransactionManager mgr, long putsPerTransaction, long initialKeyId) { + while (true) { + try { + mgr.begin(); + for (int j = 0; j < putsPerTransaction; j++) { + long key = initialKeyId + j; + String value = "Value_" + key; + region.put(key, value); + } + mgr.commit(); + return; + } catch (TransactionException ignore) { + } catch (IllegalStateException ignore) { try { - mgr.begin(); - for (j = 0; j < putsPerTransaction; j++) { - long key = keyOffset + ((j + (10L * i)) * 100); - String value = "Value_" + key; - r.put(key, value); - } - mgr.commit(); - done = true; - } catch (TransactionException ignore) { - } catch (IllegalStateException ignore) { - try { - mgr.rollback(); - } catch (Exception ignored) { - } + mgr.rollback(); + } catch (Exception ignored) { } - } while (!done); + } } } public void createCustomerOrderShipmentPartitionedRegion(String senderId) { + createCustomerOrderShipmentPartitionedRegion(senderId, 0); + } + + public void createCustomerOrderShipmentPartitionedRegion(String senderId, int redundantCopies) { RegionFactory fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION); if (senderId != null) { @@ -423,10 +431,12 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) { PartitionAttributesFactory paf = new PartitionAttributesFactory(); paf.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); + paf.setRedundantCopies(redundantCopies); fact.setPartitionAttributes(paf.create()); fact.create(customerRegionName); paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies); paf.setColocatedWith(customerRegionName) .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION); @@ -437,6 +447,7 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) { fact.create(orderRegionName); paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies); paf.setColocatedWith(orderRegionName) .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION); @@ -447,33 +458,62 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) { fact.create(shipmentRegionName); } - public void doOrderAndShipmentPutsInsideTransactions(Map keyValues, - int eventsPerTransaction) { - Region orderRegion = cacheRule.getCache().getRegion(orderRegionName); - Region shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName); - assertNotNull(orderRegion); - assertNotNull(shipmentRegion); - int eventInTransaction = 0; + public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction, + int transactions) { + doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction, transactions, false); + } + + public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction, + int transactions, boolean retryIfError) { CacheTransactionManager cacheTransactionManager = cacheRule.getCache().getCacheTransactionManager(); - for (Object key : keyValues.keySet()) { - if (eventInTransaction == 0) { - cacheTransactionManager.begin(); - } - Region r; - if (key instanceof OrderId) { - r = orderRegion; + for (int i = 0; i < transactions; i++) { + int keyId = i * eventsPerTransaction; + if (retryIfError) { + doOneTxOrderAndShipmentPutsWithRetryIfError(cacheTransactionManager, keyId, + eventsPerTransaction, customerId); } else { - r = shipmentRegion; + doOneTxOrderAndShipmentPuts(cacheTransactionManager, keyId, eventsPerTransaction, + customerId); } - r.put(key, keyValues.get(key)); - if (++eventInTransaction == eventsPerTransaction) { + } + } + + private void doOneTxOrderAndShipmentPuts( + CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction, + int customerId) { + cacheTransactionManager.begin(); + doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId); + cacheTransactionManager.commit(); + } + + private void doOneTxOrderAndShipmentPutsWithRetryIfError( + CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction, + int customerId) { + while (true) { + try { + cacheTransactionManager.begin(); + doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId); cacheTransactionManager.commit(); - eventInTransaction = 0; + break; + } catch (TransactionException exception) { + } catch (IllegalStateException exception) { + try { + cacheTransactionManager.rollback(); + } catch (Exception ignored) { + } } } - if (eventInTransaction != 0) { - cacheTransactionManager.commit(); + } + + private void doOneOrderAndShipmentPuts(int keyId, int eventsPerTransaction, int customerId) { + Region orderRegion = cacheRule.getCache().getRegion(orderRegionName); + Region shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName); + OrderId orderId = new OrderId(keyId, new CustId(customerId)); + orderRegion.put(orderId, new Order()); + for (int i = 0; i < eventsPerTransaction - 1; i++) { + ShipmentId shipmentId = new ShipmentId(keyId + i, orderId); + shipmentRegion.put(shipmentId, new Shipment()); } } @@ -498,7 +538,7 @@ protected void checkGatewayReceiverStatsHA(int processBatches, int eventsReceive protected void putGivenKeyValues(String regionName, Map keyValues) { Region r = cacheRule.getCache().getRegion(SEPARATOR + regionName); - assertNotNull(r); + assertThat(r).isNotNull(); for (Object key : keyValues.keySet()) { r.put(key, keyValues.get(key)); } @@ -506,7 +546,7 @@ protected void putGivenKeyValues(String regionName, Map keyValues) { protected void checkConflatedStats(String senderId, final int eventsConflated) { GatewaySenderStats statistics = getGatewaySenderStats(senderId); - assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated()); + assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated); } protected GatewaySenderStats getGatewaySenderStats(String senderId) { @@ -517,12 +557,9 @@ protected GatewaySenderStats getGatewaySenderStats(String senderId) { protected void validateGatewaySenderQueueAllBucketsDrained(final String senderId) { GatewaySender sender = getGatewaySender(senderId); final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; - await().untilAsserted(() -> { - assertThat(abstractSender.getEventQueueSize()).isEqualTo(0); - }); - await().untilAsserted(() -> { - assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0); - }); + await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).isEqualTo(0)); + await() + .untilAsserted(() -> assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0)); } public static void setNumDispatcherThreadsForTheRun(int numThreads) { diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java index 09d8d487c987..d49e8c3e1c29 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/TxGroupingPartitionedRegionDUnitTest.java @@ -17,7 +17,9 @@ import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import junitparams.Parameters; @@ -29,8 +31,6 @@ import org.apache.geode.internal.cache.execute.data.CustId; import org.apache.geode.internal.cache.execute.data.Order; import org.apache.geode.internal.cache.execute.data.OrderId; -import org.apache.geode.internal.cache.execute.data.Shipment; -import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.junit.categories.WanTest; @@ -62,46 +62,37 @@ public void testPartitionedRegionPropagationWithGroupTransactionEventsAndMixOfEv } int customers = 4; - int transactionsPerCustomer = 1000; - final Map keyValuesInTransactions = new HashMap<>(); - for (int custId = 0; custId < customers; custId++) { - for (int i = 0; i < transactionsPerCustomer; i++) { - CustId custIdObject = new CustId(custId); - OrderId orderId = new OrderId(i, custIdObject); - ShipmentId shipmentId1 = new ShipmentId(i, orderId); - ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); - ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); - keyValuesInTransactions.put(orderId, new Order()); - keyValuesInTransactions.put(shipmentId1, new Shipment()); - keyValuesInTransactions.put(shipmentId2, new Shipment()); - keyValuesInTransactions.put(shipmentId3, new Shipment()); - } - } - int ordersPerCustomerNotInTransactions = 1000; final Map keyValuesNotInTransactions = new HashMap<>(); for (int custId = 0; custId < customers; custId++) { for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) { CustId custIdObject = new CustId(custId); - OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject); + OrderId orderId = + new OrderId(i + ordersPerCustomerNotInTransactions * customers, custIdObject); keyValuesNotInTransactions.put(orderId, new Order()); } } // eventsPerTransaction is 1 (orders) + 3 (shipments) int eventsPerTransaction = 4; - AsyncInvocation putsInTransactionsInvocation = - londonServer1VM.invokeAsync( - () -> doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions, - eventsPerTransaction)); + List> putsInTransactionsInvocationList = new ArrayList<>(customers); + for (int i = 0; i < customers; i++) { + final int customerId = i; + putsInTransactionsInvocationList.add( + londonServer1VM.invokeAsync( + () -> doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction, + transactionsPerCustomer))); + } AsyncInvocation putsNotInTransactionsInvocation = londonServer2VM.invokeAsync( () -> putGivenKeyValues(orderRegionName, keyValuesNotInTransactions)); - putsInTransactionsInvocation.await(); + for (AsyncInvocation putsInTransactionInvocation : putsInTransactionsInvocationList) { + putsInTransactionInvocation.await(); + } putsNotInTransactionsInvocation.await(); int entries = diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/cli/commands/CreateTxGroupingGatewaySenderDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/cli/commands/CreateTxGroupingGatewaySenderDUnitTest.java index e0f4abd86d2b..c69d226c509a 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/cli/commands/CreateTxGroupingGatewaySenderDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/cli/commands/CreateTxGroupingGatewaySenderDUnitTest.java @@ -36,7 +36,7 @@ public class CreateTxGroupingGatewaySenderDUnitTest { @Rule public GfshCommandRule gfsh = new GfshCommandRule(); - MemberVM locator; + private MemberVM locator; @Before public void before() throws Exception { diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java index 10b9458c7f56..27fbf6893157 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/parallel/TxGroupingParallelDUnitTest.java @@ -15,8 +15,8 @@ package org.apache.geode.internal.cache.wan.txgrouping.parallel; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.HashMap; @@ -36,10 +36,6 @@ import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.execute.data.CustId; import org.apache.geode.internal.cache.execute.data.Customer; -import org.apache.geode.internal.cache.execute.data.Order; -import org.apache.geode.internal.cache.execute.data.OrderId; -import org.apache.geode.internal.cache.execute.data.Shipment; -import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; import org.apache.geode.internal.cache.wan.txgrouping.TxGroupingBaseDUnitTest; import org.apache.geode.internal.util.ArrayUtils; @@ -72,25 +68,6 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEvents( }); } - final Map custKeyValue = new HashMap<>(); - int intCustId = 1; - CustId custId = new CustId(intCustId); - custKeyValue.put(custId, new Customer()); - londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, custKeyValue)); - - int transactions = 3; - final Map keyValues = new HashMap<>(); - for (int i = 0; i < transactions; i++) { - OrderId orderId = new OrderId(i, custId); - ShipmentId shipmentId1 = new ShipmentId(i, orderId); - ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); - ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); - keyValues.put(orderId, new Order()); - keyValues.put(shipmentId1, new Shipment()); - keyValues.put(shipmentId2, new Shipment()); - keyValues.put(shipmentId3, new Shipment()); - } - // 3 transactions of 4 events each are sent so that the batch would // initially contain the first 2 transactions complete and the first // 2 events of the last transaction (10 entries). @@ -101,13 +78,13 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEvents( // remaining 2 events of the last transaction are added to the second batch // which makes that 2 batches will be sent, one with 10 events and // one with 2. + int transactions = 3; int eventsPerTransaction = 4; - londonServer1VM.invoke(() -> doOrderAndShipmentPutsInsideTransactions(keyValues, - eventsPerTransaction)); + londonServer1VM.invoke( + () -> doOrderAndShipmentPutsInsideTransactions(0, eventsPerTransaction, transactions)); - int entries = (transactions * eventsPerTransaction) + 1; + int entries = transactions * eventsPerTransaction; - londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 1)); londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, transactions)); londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName, transactions * 3)); @@ -135,7 +112,7 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEvents( @Test @Parameters({"true", "false"}) public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithCompleteTransactions_SeveralClients( - boolean isBatchesRedistributed) { + boolean isBatchesRedistributed) throws InterruptedException { newYorkServerVM.invoke("create New York server", () -> { startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort, !isBatchesRedistributed); createCustomerOrderShipmentPartitionedRegion(null); @@ -152,48 +129,32 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC }); } - int clients = 4; - int transactions = 300; // batchSize is 10. Each transaction will contain 1 order + 3 shipments = 4 events. // As a result, all batches will contain extra events to complete the // transactions it will deliver. int shipmentsPerTransaction = 3; + int eventsPerTransaction = shipmentsPerTransaction + 1; + int transactions = 300; - final List> customerData = new ArrayList<>(clients); - for (int intCustId = 0; intCustId < clients; intCustId++) { + int clients = 4; + for (int i = 0; i < clients; i++) { final Map custKeyValue = new HashMap<>(); - CustId custId = new CustId(intCustId); - custKeyValue.put(custId, new Customer()); - customerData.add(new HashMap<>()); + custKeyValue.put(new CustId(i), new Customer()); londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, custKeyValue)); - - for (int i = 0; i < transactions; i++) { - OrderId orderId = new OrderId(i, custId); - customerData.get(intCustId).put(orderId, new Order()); - for (int j = 0; j < shipmentsPerTransaction; j++) { - customerData.get(intCustId).put(new ShipmentId(i + j, orderId), new Shipment()); - } - } } List> asyncInvocations = new ArrayList<>(clients); - int eventsPerTransaction = shipmentsPerTransaction + 1; for (int i = 0; i < clients; i++) { final int intCustId = i; AsyncInvocation asyncInvocation = londonServer1VM.invokeAsync(() -> doOrderAndShipmentPutsInsideTransactions( - customerData.get(intCustId), - eventsPerTransaction)); + intCustId, eventsPerTransaction, transactions)); asyncInvocations.add(asyncInvocation); } - try { - for (AsyncInvocation asyncInvocation : asyncInvocations) { - asyncInvocation.await(); - } - } catch (InterruptedException e) { - fail("Interrupted"); + for (AsyncInvocation asyncInvocation : asyncInvocations) { + asyncInvocation.await(); } londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, clients)); @@ -244,15 +205,12 @@ public void testPRParallelPropagationWithGroupTransactionEventsWithIncompleteTra // it will be impossible for the batches to have complete transactions as some // events for a transaction will be handled by one dispatcher thread and some other events by // another thread. - final Map keyValue = new HashMap<>(); - int entries = 30; - for (int i = 0; i < entries; i++) { - keyValue.put(i, i); - } - int entriesPerTransaction = 3; + int transactions = 10; + int entries = transactions * entriesPerTransaction; + londonServer1VM - .invoke(() -> doPutsInsideTransactions(REGION_NAME, keyValue, entriesPerTransaction)); + .invoke(() -> doTxPuts(REGION_NAME, 3, 10)); londonServer1VM.invoke(() -> validateRegionSize(REGION_NAME, entries)); @@ -300,25 +258,6 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEventsWithBatc createCustomerOrderShipmentPartitionedRegion(null); }); - final Map custKeyValue = new HashMap<>(); - int intCustId = 1; - CustId custId = new CustId(intCustId); - custKeyValue.put(custId, new Customer()); - londonServer1VM.invoke(() -> putGivenKeyValues(customerRegionName, custKeyValue)); - - int transactions = 6; - final Map keyValues = new HashMap<>(); - for (int i = 0; i < transactions; i++) { - OrderId orderId = new OrderId(i, custId); - ShipmentId shipmentId1 = new ShipmentId(i, orderId); - ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId); - ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId); - keyValues.put(orderId, new Order()); - keyValues.put(shipmentId1, new Shipment()); - keyValues.put(shipmentId2, new Shipment()); - keyValues.put(shipmentId3, new Shipment()); - } - // 6 transactions of 4 events each are sent with batch size = 10 // - With group transaction events: // The first batch would initially contain the first 2 transactions complete and the first @@ -329,19 +268,19 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEventsWithBatc // second batch which will contain 12 events too. // - Without group-transaction-events 3 batches will be sent. 2 // with 10 events and one with 4. + int transactions = 6; + int eventsPerTransaction = 4; int expectedBatchesSent; if (groupTransactionEvents) { expectedBatchesSent = 2; } else { expectedBatchesSent = 3; } - int eventsPerTransaction = 4; - londonServer1VM.invoke(() -> doOrderAndShipmentPutsInsideTransactions(keyValues, - eventsPerTransaction)); + londonServer1VM.invoke( + () -> doOrderAndShipmentPutsInsideTransactions(0, eventsPerTransaction, transactions)); - int entries = (transactions * eventsPerTransaction) + 1; + int entries = transactions * eventsPerTransaction; - londonServer1VM.invoke(() -> validateRegionSize(customerRegionName, 1)); londonServer1VM.invoke(() -> validateRegionSize(orderRegionName, transactions)); londonServer1VM.invoke(() -> validateRegionSize(shipmentRegionName, transactions * 3)); @@ -374,37 +313,43 @@ public void testPRParallelPropagationWithVsWithoutGroupTransactionEventsWithBatc public void testParallelPropagationHAWithGroupTransactionEvents() throws Exception { newYorkServerVM.invoke("create New York server", () -> { startServerWithReceiver(newYorkLocatorPort, newYorkReceiverPort); - createPartitionedRegion(REGION_NAME, null); + createPartitionedRegion(orderRegionName, null); + createPartitionedRegion(shipmentRegionName, null); }); - int batchSize = 9; + int batchSize = 11; int redundantCopies = 3; for (VM server : londonServersVM) { server.invoke("create London server " + server.getId(), () -> { startServerWithSender(server.getId(), londonLocatorPort, newYorkId, newYorkName, true, true, batchSize, redundantCopies); - createPartitionedRegion(REGION_NAME, newYorkName, redundantCopies); + createCustomerOrderShipmentPartitionedRegion(newYorkName, redundantCopies); GatewaySender sender = cacheRule.getCache().getGatewaySender(newYorkName); await().untilAsserted(() -> assertThat(isRunning(sender)).isTrue()); }); } - int putsPerTransaction = 2; + // putsPerTransaction = 1 order + 3 shipments + int putsPerTransaction = 4; int transactions = 1000; AsyncInvocation asyncPutInvocation = londonServer2VM.invokeAsync( - () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, transactions, 0)); + () -> doOrderAndShipmentPutsInsideTransactions(0, putsPerTransaction, transactions, + true)); newYorkServerVM.invoke(() -> await() - .untilAsserted(() -> assertThat(getRegionSize(REGION_NAME)).isGreaterThan(40))); + .untilAsserted(() -> assertThat(getRegionSize(shipmentRegionName)).isGreaterThan(40))); AsyncInvocation killServerInvocation = londonServer1VM.invokeAsync(() -> cacheRule.getCache().close()); + asyncPutInvocation.await(); killServerInvocation.await(); int entries = transactions * putsPerTransaction; newYorkServerVM - .invoke(() -> validateRegionSize(REGION_NAME, transactions * putsPerTransaction)); + .invoke(() -> validateRegionSize(shipmentRegionName, transactions * 3)); + newYorkServerVM + .invoke(() -> validateRegionSize(orderRegionName, transactions)); List londonServerStats = getSenderStats(newYorkName, 0, (VM[]) ArrayUtils.remove(londonServersVM, 0)); @@ -426,8 +371,9 @@ public void testParallelPropagationHAWithGroupTransactionEvents() throws Excepti assertThat(londonServerStats.get(5)).isEqualTo(0); // batchesReceived is equal to numberOfEntries/(batchSize+1) - // As transactions are 2 events long, for each batch it will always be necessary to - // add one more entry to the 9 events batch in order to have complete transactions in the batch. + // As transactions are 4 events long, for each batch it will always be necessary to + // add one more entry to the 11 events batch in order to have complete transactions in the + // batch. int batchesReceived = (entries) / (batchSize + 1); newYorkServerVM.invoke(() -> checkGatewayReceiverStatsHA(batchesReceived, entries, entries)); } @@ -454,19 +400,13 @@ private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated(String } protected void validateGatewaySenderQueueAllBucketsDrained(final String senderId) { - IgnoredException exp = - IgnoredException.addIgnoredException(RegionDestroyedException.class.getName()); - IgnoredException exp1 = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); - try { + try (IgnoredException ignoredE1 = addIgnoredException(RegionDestroyedException.class); + IgnoredException ignoredE2 = addIgnoredException(ForceReattemptException.class)) { GatewaySender sender = getGatewaySender(senderId); final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).isEqualTo(0)); await().untilAsserted( () -> assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0)); - } finally { - exp.remove(); - exp1.remove(); } } diff --git a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/serial/TxGroupingSerialDUnitTest.java b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/serial/TxGroupingSerialDUnitTest.java index 190412d6ecb5..24878ba8ee1b 100644 --- a/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/serial/TxGroupingSerialDUnitTest.java +++ b/geode-wan-txgrouping/src/distributedTest/java/org/apache/geode/internal/cache/wan/txgrouping/serial/TxGroupingSerialDUnitTest.java @@ -18,9 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import junitparams.Parameters; import org.junit.Test; @@ -63,12 +61,6 @@ public void testReplicatedSerialPropagationWithVsWithoutGroupTransactionEvents( }); } - final Map keyValues = new HashMap<>(); - int entries = 12; - for (int i = 0; i < entries; i++) { - keyValues.put(i, i + "_Value"); - } - // 4 transactions of 3 events each are sent so that the first batch // would initially contain the first 3 transactions complete and the first // event of the next transaction (10 entries). @@ -79,10 +71,12 @@ public void testReplicatedSerialPropagationWithVsWithoutGroupTransactionEvents( // events of the third transaction are added to the next batch which makes // that the 2 batches are sent. One with 10 events and another one // with 2 events. + final int transactions = 4; + final int eventsPerTransaction = 3; + final int entries = transactions * eventsPerTransaction; int expectedBatchesSent = groupTransactionEvents ? 1 : 2; - int eventsPerTransaction = 3; - londonServer2VM.invoke(() -> doPutsInsideTransactions(REGION_NAME, keyValues, - eventsPerTransaction)); + londonServer2VM + .invoke(() -> doTxPuts(REGION_NAME, eventsPerTransaction, transactions)); newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries)); @@ -125,26 +119,16 @@ public void testReplicatedSerialPropagationWithGroupTransactionEventsSendsBatche int clients = 2; int eventsPerTransaction = batchSize + 1; - int entriesPerInvocation = eventsPerTransaction * 200; - - final List> data = new ArrayList<>(clients); - for (int clientId = 0; clientId < clients; clientId++) { - final Map keyValues = new HashMap<>(); - for (int i = entriesPerInvocation * clientId; i < entriesPerInvocation - * (clientId + 1); i++) { - keyValues.put(i, i + "_Value"); - } - data.add(keyValues); - } - - int entries = entriesPerInvocation * clients; + int transactions = 200; + int entries = eventsPerTransaction * transactions * clients; + int entriesPerInvocation = (entries) / clients; List> putAsyncInvocations = new ArrayList<>(clients); for (int i = 0; i < clients; i++) { final int index = i; AsyncInvocation asyncInvocation = - londonServer1VM.invokeAsync(() -> doPutsInsideTransactions(REGION_NAME, data.get(index), - eventsPerTransaction)); + londonServer1VM.invokeAsync(() -> doTxPuts(REGION_NAME, + eventsPerTransaction, transactions, index * entriesPerInvocation)); putAsyncInvocations.add(asyncInvocation); } @@ -183,23 +167,6 @@ public void testReplicatedSerialPropagationWithVsWithoutGroupTransactionEventsWi }); } - final Map keyValues = new HashMap<>(); - int entries = 24; - for (int i = 0; i < entries; i++) { - keyValues.put(i, i + "_Value"); - } - int eventsPerTransaction = 3; - londonServer2VM.invoke(() -> doPutsInsideTransactions(REGION_NAME, keyValues, - eventsPerTransaction)); - - // wait for batches to be redistributed and then start the receiver - londonServer1VM.invoke(() -> await() - .untilAsserted(() -> assertThat(getSenderStats(newYorkName, -1).get(5)).isGreaterThan(0))); - - newYorkServerVM.invoke(this::startReceiver); - - newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries)); - // 8 transactions of 3 events each are sent. // - With group-transaction-events // The first batch would initially contain the first 3 transactions complete @@ -210,8 +177,23 @@ public void testReplicatedSerialPropagationWithVsWithoutGroupTransactionEventsWi // second batch which will contain 12 events too. // - Without group-transaction-events 3 batches are sent, 2 with 10 events // and one with 4. + final int transactions = 8; + final int eventsPerTransaction = 3; + final int entries = transactions * eventsPerTransaction; int expectedBatchesSent = groupTransactionEvents ? 2 : 3; + londonServer2VM + .invoke(() -> doTxPuts(REGION_NAME, eventsPerTransaction, transactions)); + + // wait for batches to be redistributed and then start the receiver + londonServer1VM.invoke(() -> await() + .untilAsserted(() -> assertThat(getSenderStats(newYorkName, -1).get(5)).isGreaterThan(0))); + + newYorkServerVM.invoke(this::startReceiver); + + newYorkServerVM.invoke(() -> validateRegionSize(REGION_NAME, entries)); + + newYorkServerVM .invoke(() -> checkGatewayReceiverStats(expectedBatchesSent, entries, entries, true)); @@ -250,7 +232,8 @@ public void testReplicatedSerialPropagationHAWithGroupTransactionEvents() throws () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, transactions, 0)); AsyncInvocation putsInvocation2 = londonServer4VM.invokeAsync( - () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, transactions, 1)); + () -> doTxPutsWithRetryIfError(REGION_NAME, putsPerTransaction, transactions, + putsPerTransaction * transactions)); newYorkServerVM.invoke(() -> await() .untilAsserted(() -> assertThat(getRegionSize(REGION_NAME)).isGreaterThan(40))); @@ -284,17 +267,16 @@ private void checkQueuesAreEmptyAndOnlyCompleteTransactionsAreReplicated( boolean isBatchesRedistributed) { // Wait for sender queues to be empty List> londonServersStats = new ArrayList(londonServersVM.length); - int i = 0; for (VM londonServer : londonServersVM) { londonServersStats.add(londonServer.invoke(() -> getSenderStats(newYorkName, 0))); } - int queueSize = londonServersStats.stream().map(x -> x.get(0)).reduce(0, (y, z) -> y + z); + int queueSize = londonServersStats.stream().map(x -> x.get(0)).reduce(0, Integer::sum); assertThat(queueSize).isEqualTo(0); // batches redistributed: int batchesRedistributed = - londonServersStats.stream().map(x -> x.get(5)).reduce(0, (y, z) -> y + z); + londonServersStats.stream().map(x -> x.get(5)).reduce(0, Integer::sum); if (isBatchesRedistributed) { assertThat(batchesRedistributed).isGreaterThan(0); } else { @@ -344,23 +326,19 @@ private void checkStats_Failover(String senderId, final int eventsReceived) { + statistics.getUnprocessedEventsRemovedByPrimary())).isEqualTo(eventsReceived); } - private Boolean killPrimarySender(String senderId) { - IgnoredException ignoredException1 = IgnoredException.addIgnoredException("Could not connect"); - IgnoredException ignoredException2 = - IgnoredException.addIgnoredException(CacheClosedException.class.getName()); - IgnoredException ignoredException3 = - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); - try { + private boolean killPrimarySender(String senderId) { + try (IgnoredException ignoredException1 = + IgnoredException.addIgnoredException("Could not connect"); + IgnoredException ignoredException2 = + IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + IgnoredException ignoredException3 = + IgnoredException.addIgnoredException(ForceReattemptException.class.getName())) { AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId); if (sender.isPrimary()) { cacheRule.getCache().close(); - return Boolean.TRUE; + return true; } - return Boolean.FALSE; - } finally { - ignoredException1.remove(); - ignoredException2.remove(); - ignoredException3.remove(); + return false; } } } diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/CommonTxGroupingGatewaySenderFactory.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/CommonTxGroupingGatewaySenderFactory.java index dd68338e3352..3d94408e3dec 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/CommonTxGroupingGatewaySenderFactory.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/CommonTxGroupingGatewaySenderFactory.java @@ -23,8 +23,8 @@ import org.apache.geode.internal.cache.wan.GatewaySenderException; import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes; -public abstract class CommonTxGroupingGatewaySenderFactory { - public static void validate(final @NotNull GatewaySenderTypeFactory factory, +public interface CommonTxGroupingGatewaySenderFactory { + static void validate(final @NotNull GatewaySenderTypeFactory factory, final @NotNull MutableGatewaySenderAttributes attributes) { if (attributes.isBatchConflationEnabled()) { throw new GatewaySenderException( diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/parallel/TxGroupingParallelGatewaySenderQueue.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/parallel/TxGroupingParallelGatewaySenderQueue.java index 19c6afe17757..487ec0b238f6 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/parallel/TxGroupingParallelGatewaySenderQueue.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/parallel/TxGroupingParallelGatewaySenderQueue.java @@ -54,7 +54,7 @@ protected void postProcessBatch(final @NotNull PartitionedRegion partitionedRegi return; } - final Map incompleteTransactionIdsInBatch = + Map incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch); if (incompleteTransactionIdsInBatch.isEmpty()) { return; @@ -62,27 +62,8 @@ protected void postProcessBatch(final @NotNull PartitionedRegion partitionedRegi int retries = 0; while (true) { - for (Iterator> iter = - incompleteTransactionIdsInBatch.entrySet().iterator(); iter.hasNext();) { - Map.Entry pendingTransaction = iter.next(); - TransactionId transactionId = pendingTransaction.getKey(); - int bucketId = pendingTransaction.getValue(); - List events = - peekEventsWithTransactionId(partitionedRegion, bucketId, transactionId); - for (Object object : events) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; - batch.add(event); - peekedEvents.add(event); - if (logger.isDebugEnabled()) { - logger.debug( - "Peeking extra event: {}, bucketId: {}, isLastEventInTransaction: {}, batch size: {}", - event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size()); - } - if (event.isLastEventInTransaction()) { - iter.remove(); - } - } - } + peekAndAddEventsToBatchToCompleteTransactions( + partitionedRegion, batch, incompleteTransactionIdsInBatch); if (incompleteTransactionIdsInBatch.size() == 0 || retries >= sender.getRetriesToGetTransactionEventsFromQueue()) { break; @@ -101,6 +82,44 @@ protected void postProcessBatch(final @NotNull PartitionedRegion partitionedRegi } } + private void peekAndAddEventsToBatchToCompleteTransactions( + @NotNull PartitionedRegion partitionedRegion, @NotNull List batch, + Map incompleteTransactionIdsInBatch) { + for (Iterator> incompleteTransactionsIter = + incompleteTransactionIdsInBatch.entrySet().iterator(); incompleteTransactionsIter + .hasNext();) { + Map.Entry pendingTransaction = incompleteTransactionsIter.next(); + TransactionId transactionId = pendingTransaction.getKey(); + int bucketId = pendingTransaction.getValue(); + + List events = + peekEventsWithTransactionId(partitionedRegion, bucketId, transactionId); + + addEventsToBatch(batch, bucketId, events); + + for (Object event : events) { + if (((GatewaySenderEventImpl) event).isLastEventInTransaction()) { + incompleteTransactionsIter.remove(); + } + } + } + } + + private void addEventsToBatch( + @NotNull List batch, + int bucketId, List events) { + for (Object object : events) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) object; + batch.add(event); + peekedEvents.add(event); + if (logger.isDebugEnabled()) { + logger.debug( + "Peeking extra event: {}, bucketId: {}, isLastEventInTransaction: {}, batch size: {}", + event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size()); + } + } + } + protected List peekEventsWithTransactionId(PartitionedRegion prQ, int bucketId, TransactionId transactionId) throws CacheException { List objects; diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java index 2e411f9827ef..bbca1c67b35f 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java @@ -80,28 +80,8 @@ protected void postProcessBatch(final List> batch, final long l int retries = 0; while (true) { - for (Iterator iter = incompleteTransactionIdsInBatch.iterator(); iter - .hasNext();) { - TransactionId transactionId = iter.next(); - List keyAndEventPairs = - peekEventsWithTransactionId(transactionId, lastKey); - if (keyAndEventPairs.size() > 0 - && ((GatewaySenderEventImpl) (keyAndEventPairs.get(keyAndEventPairs.size() - 1)).event) - .isLastEventInTransaction()) { - for (KeyAndEventPair object : keyAndEventPairs) { - GatewaySenderEventImpl event = (GatewaySenderEventImpl) object.event; - batch.add(event); - peekedIds.add(object.key); - extraPeekedIds.add(object.key); - if (logger.isDebugEnabled()) { - logger.debug( - "Peeking extra event: {}, isLastEventInTransaction: {}, batch size: {}", - event.getKey(), event.isLastEventInTransaction(), batch.size()); - } - } - iter.remove(); - } - } + peekAndAddEventsToBatchToCompleteTransactions(batch, lastKey, + incompleteTransactionIdsInBatch); if (incompleteTransactionIdsInBatch.size() == 0 || retries >= sender.getRetriesToGetTransactionEventsFromQueue()) { break; @@ -120,6 +100,41 @@ protected void postProcessBatch(final List> batch, final long l } } + private void peekAndAddEventsToBatchToCompleteTransactions(List> batch, + long lastKey, Set incompleteTransactionIdsInBatch) { + for (Iterator incompleteTransactionsIter = + incompleteTransactionIdsInBatch.iterator(); incompleteTransactionsIter.hasNext();) { + TransactionId transactionId = incompleteTransactionsIter.next(); + List keyAndEventPairs = + peekEventsWithTransactionId(transactionId, lastKey); + if (lastEventInTransactionPresent(keyAndEventPairs)) { + addEventsToBatch(batch, keyAndEventPairs); + incompleteTransactionsIter.remove(); + } + } + } + + private boolean lastEventInTransactionPresent(List keyAndEventPairs) { + return keyAndEventPairs.size() > 0 + && ((GatewaySenderEventImpl) (keyAndEventPairs.get(keyAndEventPairs.size() - 1)).event) + .isLastEventInTransaction(); + } + + private void addEventsToBatch(List> batch, + List keyAndEventPairs) { + for (KeyAndEventPair object : keyAndEventPairs) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl) object.event; + batch.add(event); + peekedIds.add(object.key); + extraPeekedIds.add(object.key); + if (logger.isDebugEnabled()) { + logger.debug( + "Peeking extra event: {}, isLastEventInTransaction: {}, batch size: {}", + event.getKey(), event.isLastEventInTransaction(), batch.size()); + } + } + } + private Set getIncompleteTransactionsInBatch(List> batch) { Set incompleteTransactionsInBatch = new HashSet<>(); for (Object object : batch) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 30f1c54b2570..a98d2d710fe0 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -2349,53 +2349,6 @@ public static void putGivenKeyValue(String regionName, Map keyValues) { } } - public static void doOrderAndShipmentPutsInsideTransactions(Map keyValues, - int eventsPerTransaction) { - Region orderRegion = cache.getRegion(orderRegionName); - Region shipmentRegion = cache.getRegion(shipmentRegionName); - int eventInTransaction = 0; - CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager(); - for (Object key : keyValues.keySet()) { - if (eventInTransaction == 0) { - cacheTransactionManager.begin(); - } - Region r; - if (key instanceof OrderId) { - r = orderRegion; - } else { - r = shipmentRegion; - } - r.put(key, keyValues.get(key)); - if (++eventInTransaction == eventsPerTransaction) { - cacheTransactionManager.commit(); - eventInTransaction = 0; - } - } - if (eventInTransaction != 0) { - cacheTransactionManager.commit(); - } - } - - public static void doPutsInsideTransactions(String regionName, Map keyValues, - int eventsPerTransaction) { - Region r = cache.getRegion(Region.SEPARATOR + regionName); - int eventInTransaction = 0; - CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager(); - for (Object key : keyValues.keySet()) { - if (eventInTransaction == 0) { - cacheTransactionManager.begin(); - } - r.put(key, keyValues.get(key)); - if (++eventInTransaction == eventsPerTransaction) { - cacheTransactionManager.commit(); - eventInTransaction = 0; - } - } - if (eventInTransaction != 0) { - cacheTransactionManager.commit(); - } - } - public static void destroyRegion(String regionName) { destroyRegion(regionName, -1); }