Skip to content

Commit

Permalink
Updated with Kirk's first review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Dec 2, 2021
1 parent 4c8672a commit ee9b459
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.VM.getVM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -62,7 +59,11 @@
import org.apache.geode.internal.cache.CustomerIDPartitionResolver;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
Expand Down Expand Up @@ -193,7 +194,6 @@ public void tearDown() {

protected Properties createLocatorConfig(int systemId, int locatorPort, int remoteLocatorPort) {
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
config.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + ']');
Expand Down Expand Up @@ -259,7 +259,6 @@ protected GatewayReceiverFactory createGatewayReceiverFactory(int receiverPort)

protected Properties createServerConfig(int locatorPort) {
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "localhost[" + locatorPort + ']');
return config;
}
Expand Down Expand Up @@ -288,10 +287,8 @@ protected boolean isRunning(GatewaySender sender) {

protected void validateRegionSize(String regionName, final int regionSize) {
final Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR + regionName);
assertNotNull(r);
if (regionSize != r.keySet().size()) {
await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize));
}
assertThat(r).isNotNull();
await().untilAsserted(() -> assertThat(r.keySet().size()).isEqualTo(regionSize));
}

protected List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
Expand All @@ -310,7 +307,7 @@ protected List<Integer> getSenderStats(String senderId, final int expectedQueueS
await()
.untilAsserted(() -> assertThat(regionQueue.size()).isEqualTo(expectedQueueSize));
}
ArrayList<Integer> stats = new ArrayList<>();
List<Integer> stats = new ArrayList<>();
stats.add(statistics.getEventQueueSize());
stats.add(statistics.getEventsReceived());
stats.add(statistics.getEventsQueued());
Expand Down Expand Up @@ -340,26 +337,30 @@ protected GatewaySender getGatewaySender(String senderId) {
return sender;
}

protected void doPutsInsideTransactions(String regionName, Map<Object, Object> keyValues,
int eventsPerTransaction) {
Region<Object, Object> r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
int eventInTransaction = 0;
CacheTransactionManager cacheTransactionManager =
cacheRule.getCache().getCacheTransactionManager();
for (Object key : keyValues.keySet()) {
if (eventInTransaction == 0) {
cacheTransactionManager.begin();
}
r.put(key, keyValues.get(key));
if (++eventInTransaction == eventsPerTransaction) {
cacheTransactionManager.commit();
eventInTransaction = 0;
}
protected void doTxPuts(String regionName, final long putsPerTransaction,
final long transactions) {
doTxPuts(regionName, putsPerTransaction, transactions, 0);
}

protected void doTxPuts(String regionName, final long putsPerTransaction,
final long transactions, long initialKeyId) {
Region<Object, Object> region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager();
for (int i = 0; i < transactions; i++) {
long keyId = initialKeyId + (i * putsPerTransaction);
doOneTxWithPuts(region, mgr, putsPerTransaction, keyId);
}
if (eventInTransaction != 0) {
cacheTransactionManager.commit();
}

private void doOneTxWithPuts(Region<Object, Object> region, CacheTransactionManager mgr,
long putsPerTransaction, long initialKeyId) {
mgr.begin();
for (int j = 0; j < putsPerTransaction; j++) {
long key = initialKeyId + j;
String value = "Value_" + key;
region.put(key, value);
}
mgr.commit();
}

protected void checkGatewayReceiverStats(int processBatches, int eventsReceived,
Expand All @@ -386,35 +387,42 @@ protected void checkGatewayReceiverStats(int processBatches, int eventsReceived,
}

protected void doTxPutsWithRetryIfError(String regionName, final long putsPerTransaction,
final long transactions, long offset) {
Region<Object, Object> r = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
long keyOffset = offset * ((putsPerTransaction + (10 * transactions)) * 100);
long j;
final long transactions, long initialKeyId) {
Region<Object, Object> region = cacheRule.getCache().getRegion(Region.SEPARATOR + regionName);
CacheTransactionManager mgr = cacheRule.getCache().getCacheTransactionManager();
for (int i = 0; i < transactions; i++) {
boolean done = false;
do {
long keyId = initialKeyId + (i * putsPerTransaction);
doOneTxWithPutsWithRetryIfError(region, mgr, putsPerTransaction, keyId);
}
}

private void doOneTxWithPutsWithRetryIfError(Region<Object, Object> region,
CacheTransactionManager mgr, long putsPerTransaction, long initialKeyId) {
while (true) {
try {
mgr.begin();
for (int j = 0; j < putsPerTransaction; j++) {
long key = initialKeyId + j;
String value = "Value_" + key;
region.put(key, value);
}
mgr.commit();
return;
} catch (TransactionException ignore) {
} catch (IllegalStateException ignore) {
try {
mgr.begin();
for (j = 0; j < putsPerTransaction; j++) {
long key = keyOffset + ((j + (10L * i)) * 100);
String value = "Value_" + key;
r.put(key, value);
}
mgr.commit();
done = true;
} catch (TransactionException ignore) {
} catch (IllegalStateException ignore) {
try {
mgr.rollback();
} catch (Exception ignored) {
}
mgr.rollback();
} catch (Exception ignored) {
}
} while (!done);
}
}
}

public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
createCustomerOrderShipmentPartitionedRegion(senderId, 0);
}

public void createCustomerOrderShipmentPartitionedRegion(String senderId, int redundantCopies) {
RegionFactory<Object, Object> fact =
cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
if (senderId != null) {
Expand All @@ -423,10 +431,12 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {

PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
paf.setRedundantCopies(redundantCopies);
fact.setPartitionAttributes(paf.create());
fact.create(customerRegionName);

paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundantCopies);
paf.setColocatedWith(customerRegionName)
.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
Expand All @@ -437,6 +447,7 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
fact.create(orderRegionName);

paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundantCopies);
paf.setColocatedWith(orderRegionName)
.setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
fact = cacheRule.getCache().createRegionFactory(RegionShortcut.PARTITION);
Expand All @@ -447,33 +458,62 @@ public void createCustomerOrderShipmentPartitionedRegion(String senderId) {
fact.create(shipmentRegionName);
}

public void doOrderAndShipmentPutsInsideTransactions(Map<Object, Object> keyValues,
int eventsPerTransaction) {
Region<Object, Object> orderRegion = cacheRule.getCache().getRegion(orderRegionName);
Region<Object, Object> shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName);
assertNotNull(orderRegion);
assertNotNull(shipmentRegion);
int eventInTransaction = 0;
public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction,
int transactions) {
doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction, transactions, false);
}

public void doOrderAndShipmentPutsInsideTransactions(int customerId, int eventsPerTransaction,
int transactions, boolean retryIfError) {
CacheTransactionManager cacheTransactionManager =
cacheRule.getCache().getCacheTransactionManager();
for (Object key : keyValues.keySet()) {
if (eventInTransaction == 0) {
cacheTransactionManager.begin();
}
Region<Object, Object> r;
if (key instanceof OrderId) {
r = orderRegion;
for (int i = 0; i < transactions; i++) {
int keyId = i * eventsPerTransaction;
if (retryIfError) {
doOneTxOrderAndShipmentPutsWithRetryIfError(cacheTransactionManager, keyId,
eventsPerTransaction, customerId);
} else {
r = shipmentRegion;
doOneTxOrderAndShipmentPuts(cacheTransactionManager, keyId, eventsPerTransaction,
customerId);
}
r.put(key, keyValues.get(key));
if (++eventInTransaction == eventsPerTransaction) {
}
}

private void doOneTxOrderAndShipmentPuts(
CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction,
int customerId) {
cacheTransactionManager.begin();
doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId);
cacheTransactionManager.commit();
}

private void doOneTxOrderAndShipmentPutsWithRetryIfError(
CacheTransactionManager cacheTransactionManager, int keyId, int eventsPerTransaction,
int customerId) {
while (true) {
try {
cacheTransactionManager.begin();
doOneOrderAndShipmentPuts(keyId, eventsPerTransaction, customerId);
cacheTransactionManager.commit();
eventInTransaction = 0;
break;
} catch (TransactionException exception) {
} catch (IllegalStateException exception) {
try {
cacheTransactionManager.rollback();
} catch (Exception ignored) {
}
}
}
if (eventInTransaction != 0) {
cacheTransactionManager.commit();
}

private void doOneOrderAndShipmentPuts(int keyId, int eventsPerTransaction, int customerId) {
Region<Object, Object> orderRegion = cacheRule.getCache().getRegion(orderRegionName);
Region<Object, Object> shipmentRegion = cacheRule.getCache().getRegion(shipmentRegionName);
OrderId orderId = new OrderId(keyId, new CustId(customerId));
orderRegion.put(orderId, new Order());
for (int i = 0; i < eventsPerTransaction - 1; i++) {
ShipmentId shipmentId = new ShipmentId(keyId + i, orderId);
shipmentRegion.put(shipmentId, new Shipment());
}
}

Expand All @@ -498,15 +538,15 @@ protected void checkGatewayReceiverStatsHA(int processBatches, int eventsReceive

protected void putGivenKeyValues(String regionName, Map<?, ?> keyValues) {
Region<Object, Object> r = cacheRule.getCache().getRegion(SEPARATOR + regionName);
assertNotNull(r);
assertThat(r).isNotNull();
for (Object key : keyValues.keySet()) {
r.put(key, keyValues.get(key));
}
}

protected void checkConflatedStats(String senderId, final int eventsConflated) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
assertThat(statistics.getEventsNotQueuedConflated()).isEqualTo(eventsConflated);
}

protected GatewaySenderStats getGatewaySenderStats(String senderId) {
Expand All @@ -517,12 +557,9 @@ protected GatewaySenderStats getGatewaySenderStats(String senderId) {
protected void validateGatewaySenderQueueAllBucketsDrained(final String senderId) {
GatewaySender sender = getGatewaySender(senderId);
final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
await().untilAsserted(() -> {
assertThat(abstractSender.getEventQueueSize()).isEqualTo(0);
});
await().untilAsserted(() -> {
assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0);
});
await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).isEqualTo(0));
await()
.untilAsserted(() -> assertThat(abstractSender.getSecondaryEventQueueSize()).isEqualTo(0));
}

public static void setNumDispatcherThreadsForTheRun(int numThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import junitparams.Parameters;
Expand All @@ -29,8 +31,6 @@
import org.apache.geode.internal.cache.execute.data.CustId;
import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.WanTest;
Expand Down Expand Up @@ -62,46 +62,37 @@ public void testPartitionedRegionPropagationWithGroupTransactionEventsAndMixOfEv
}

int customers = 4;

int transactionsPerCustomer = 1000;
final Map<Object, Object> keyValuesInTransactions = new HashMap<>();
for (int custId = 0; custId < customers; custId++) {
for (int i = 0; i < transactionsPerCustomer; i++) {
CustId custIdObject = new CustId(custId);
OrderId orderId = new OrderId(i, custIdObject);
ShipmentId shipmentId1 = new ShipmentId(i, orderId);
ShipmentId shipmentId2 = new ShipmentId(i + 1, orderId);
ShipmentId shipmentId3 = new ShipmentId(i + 2, orderId);
keyValuesInTransactions.put(orderId, new Order());
keyValuesInTransactions.put(shipmentId1, new Shipment());
keyValuesInTransactions.put(shipmentId2, new Shipment());
keyValuesInTransactions.put(shipmentId3, new Shipment());
}
}

int ordersPerCustomerNotInTransactions = 1000;

final Map<Object, Object> keyValuesNotInTransactions = new HashMap<>();
for (int custId = 0; custId < customers; custId++) {
for (int i = 0; i < ordersPerCustomerNotInTransactions; i++) {
CustId custIdObject = new CustId(custId);
OrderId orderId = new OrderId(i + transactionsPerCustomer * customers, custIdObject);
OrderId orderId =
new OrderId(i + ordersPerCustomerNotInTransactions * customers, custIdObject);
keyValuesNotInTransactions.put(orderId, new Order());
}
}

// eventsPerTransaction is 1 (orders) + 3 (shipments)
int eventsPerTransaction = 4;
AsyncInvocation<Void> putsInTransactionsInvocation =
londonServer1VM.invokeAsync(
() -> doOrderAndShipmentPutsInsideTransactions(keyValuesInTransactions,
eventsPerTransaction));
List<AsyncInvocation<Void>> putsInTransactionsInvocationList = new ArrayList<>(customers);
for (int i = 0; i < customers; i++) {
final int customerId = i;
putsInTransactionsInvocationList.add(
londonServer1VM.invokeAsync(
() -> doOrderAndShipmentPutsInsideTransactions(customerId, eventsPerTransaction,
transactionsPerCustomer)));
}

AsyncInvocation<Void> putsNotInTransactionsInvocation =
londonServer2VM.invokeAsync(
() -> putGivenKeyValues(orderRegionName, keyValuesNotInTransactions));

putsInTransactionsInvocation.await();
for (AsyncInvocation<Void> putsInTransactionInvocation : putsInTransactionsInvocationList) {
putsInTransactionInvocation.await();
}
putsNotInTransactionsInvocation.await();

int entries =
Expand Down

0 comments on commit ee9b459

Please sign in to comment.