Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8302: Fixed 'events not queued conflated' stats when group-tran… #5313

Merged
merged 7 commits into from
Jul 10, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface GatewaySenderFactory {

/**
* Indicates whether all VMs need to distribute events to remote site. In this case only the
* events originating in a particular VM will be in dispatched in order.
* events originating in a particular VM will be dispatched in order.
*
* @param isParallel boolean to indicate whether distribution policy is parallel
*/
Expand All @@ -39,8 +39,9 @@ public interface GatewaySenderFactory {
* Indicates whether events belonging to the same transaction must be
* delivered inside the same batch, i.e. they cannot be spread across different
* batches.
* Can only be enabled on serial gateway senders with just one dispatcher
* thread or on parallel gateway senders.
* <code>groupTransactionEvents</code> can be enabled only on parallel gateway senders
* or on serial gateway senders with just one dispatcher thread.
* It cannot be enabled if batch conflation is enabled.
*
* @param groupTransactionEvents boolean to indicate whether events from
* the same transaction must be delivered inside
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private final Set<Long> extraPeekedIds = ConcurrentHashMap.newKeySet();

/**
* Contains the set of peekedIds that were peeked to complete a transaction
* inside a batch when groupTransactionEvents is set and that have
* been sent in a batch but have not yet been removed.
*/
private final Set<Long> extraPeekedIdsSentNotRemoved = ConcurrentHashMap.newKeySet();

/**
* The name of the <code>Region</code> backing this queue
*/
Expand Down Expand Up @@ -315,10 +322,12 @@ public void remove() throws CacheException {
return;
}
Long key = peekedIds.remove();
extraPeekedIds.remove(key);
boolean isExtraPeeked = extraPeekedIds.remove(key);
try {
// Increment the head key
updateHeadKey(key.longValue());
if (!isExtraPeeked) {
updateHeadKey(key.longValue());
}
removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
// a WAN queue so that AbstractRegionEntry.destroy can get the value
Expand All @@ -338,7 +347,26 @@ public void remove() throws CacheException {
}

boolean wasEmpty = this.lastDispatchedKey == this.lastDestroyedKey;
this.lastDispatchedKey = key;
if (!isExtraPeeked) {
this.lastDispatchedKey = key;
// Remove also the extraPeekedIds right after this one so that
// they do not stay in the secondary's queue forever
long tmpKey = key;
while (extraPeekedIdsSentNotRemoved.contains(tmpKey = inc(tmpKey))) {
extraPeekedIdsSentNotRemoved.remove(tmpKey);
this.lastDispatchedKey = tmpKey;
updateHeadKey(tmpKey);
}
} else {
extraPeekedIdsSentNotRemoved.add(key);
// Remove if previous key was already dispatched so that it does
// not stay in the secondary's queue forever
long tmpKey = dec(key);
if (this.lastDispatchedKey == tmpKey) {
this.lastDispatchedKey = key;
updateHeadKey(key);
}
}
if (wasEmpty) {
synchronized (this) {
notifyAll();
Expand Down Expand Up @@ -700,11 +728,18 @@ private long inc(long value) {
return val;
}

private long dec(long value) {
long val = value - 1;
val = val == -1 ? MAXIMUM_KEY - 1 : val;
return val;
}


/**
* Clear the list of peeked keys. The next peek will start again at the head key.
*/
public void resetLastPeeked() {
this.peekedIds.clear();
peekedIds.clear();
extraPeekedIds.clear();
lastPeekedId.set(-1);
}
Expand All @@ -715,7 +750,7 @@ public void resetLastPeeked() {
*/
private Long getCurrentKey() {
long currentKey;
if (lastPeekedId.equals(-1)) {
if (lastPeekedId.get() == -1) {
currentKey = getHeadKey();
} else {
currentKey = inc(lastPeekedId.get());
Expand Down Expand Up @@ -784,7 +819,10 @@ public KeyAndEventPair peekAhead() throws CacheException {
logger.trace("{}: Trying head key + offset: {}", this, currentKey);
}
currentKey = inc(currentKey);
if (this.stats != null) {
// When mustGroupTransactionEvents is true, conflation cannot be enabled.
// Therefore, if we reach here, it would not be due to a conflated event
// but rather to an extra peeked event already sent.
if (!mustGroupTransactionEvents() && this.stats != null) {
this.stats.incEventsNotQueuedConflated();
}
}
Expand All @@ -794,7 +832,7 @@ public KeyAndEventPair peekAhead() throws CacheException {
}

if (object != null) {
this.peekedIds.add(currentKey);
peekedIds.add(currentKey);
lastPeekedId.set(currentKey);
return new KeyAndEventPair(currentKey, object);
}
Expand Down
2 changes: 1 addition & 1 deletion geode-docs/reference/topics/cache_xml.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ Configures a gateway sender to distribute region events to another <%=vars.produ
<tr>
<td>group-transaction-events</td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true.</p>
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
<td>false</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ create gateway-sender --id=value --remote-distributed-system-id=value
<tr>
<td><span class="keyword parmname">\-\-group-transaction-events</span></td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gatewaysenders with the <code class="ph codeph">parallel</code> attribute set to true.</p>
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
</td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ Gateway senders and receivers are defined at startup in the member caches. A sit
Geode ensures that all copies of a region eventually reach a consistent state on all members and clients that host the region, including Geode members that distribute region events across a WAN.

Events are sent in batches from gateway senders to receivers. In order to avoid inconsistencies due
to partial reception of the events belonging to a transaction, you can configure gateway senders to
to partial reception of the events belonging to a transaction, you can configure gateway senders
using the `group-transaction-events` property to
ensure that events belonging to the same transaction are sent together in the same batch.
<b>Note:</b> This setting is supported only on serial senders with just one dispatcher
thread, or on parallel senders. In addition, the regions to which the transaction events belong must
be replicated by the same set of gateway senders that also have this setting enabled.
In order to use transaction event grouping:

- The `group-transaction-events` setting is supported only on serial senders with just one dispatcher thread, or on parallel senders.
- The regions to which the transaction events belong must be replicated by the same set of gateway senders that also have this setting enabled.
- This setting cannot be enabled if `enable-batch-conflation` is in effect.

By default, potential WAN conflicts are resolved using a timestamp mechanism. You can optionally install a custom conflict resolver to apply custom logic when determining whether to apply a potentially conflicting update received over a WAN.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ public ResultModel preExecution(GfshParseResult parseResult) {
Boolean groupTransactionEvents =
(Boolean) parseResult
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__GROUPTRANSACTIONEVENTS);
Boolean batchConflationEnabled =
(Boolean) parseResult
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION);

if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == null) {
return ResultModel.createError(
Expand All @@ -297,6 +300,11 @@ public ResultModel preExecution(GfshParseResult parseResult) {
"Serial Gateway Sender cannot be created with --group-transaction-events when --dispatcher-threads is greater than 1.");
}

if (groupTransactionEvents && batchConflationEnabled) {
return ResultModel.createError(
"Gateway Sender cannot be created with both --group-transaction-events and --enable-batch-conflation.");
}

return ResultModel.createInfo("");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ public void testInvalidGroupTransactionEventsDueToSerialAndMoreThanOneThread() {
"Serial Gateway Sender cannot be created with --group-transaction-events when --dispatcher-threads is greater than 1");
}

@Test
public void testInvalidGroupTransactionEventsDueToConflationEnabled() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
cliFunctionResult = new CliFunctionResult("member",
CliFunctionResult.StatusState.OK, "cliFunctionResult");
functionResults.add(cliFunctionResult);
gfsh.executeAndAssertThat(command,
"create gateway-sender --member=xyz --id=1 --remote-distributed-system-id=1 " +
"--group-transaction-events --enable-batch-conflation --order-policy=THREAD")
.statusIsError().containsOutput(
"Gateway Sender cannot be created with both --group-transaction-events and --enable-batch-conflation");
}

@Test
public void testFunctionArgs() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
Expand Down Expand Up @@ -277,8 +290,7 @@ public void booleanArgumentsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
+ " --manual-start"
+ " --disk-synchronous"
+ " --enable-persistence"
+ " --enable-batch-conflation"
+ " --group-transaction-events")
+ " --enable-batch-conflation")
.statusIsSuccess();
verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());

Expand All @@ -289,8 +301,23 @@ public void booleanArgumentsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
assertThat(argsArgumentCaptor.getValue().isDiskSynchronous()).isTrue();
assertThat(argsArgumentCaptor.getValue().isPersistenceEnabled()).isTrue();
assertThat(argsArgumentCaptor.getValue().isBatchConflationEnabled()).isTrue();
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isTrue();
}

@Test
public void groupTransactionEventsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
cliFunctionResult =
new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
functionResults.add(cliFunctionResult);
gfsh.executeAndAssertThat(command,
"create gateway-sender --member=xyz --id=testGateway --remote-distributed-system-id=1"
+ " --parallel"
+ " --group-transaction-events")
.statusIsSuccess();
verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());

assertThat(argsArgumentCaptor.getValue().getId()).isEqualTo("testGateway");
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ public void testPartitionedParallelPropagationHA() throws Exception {

vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4));
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
// events not queued conflated:
assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
}

@Test
Expand Down Expand Up @@ -605,6 +607,8 @@ public void testPRParallelPropagationWithBatchRedistWithGroupTransactionEventsSe
assertEquals(2, (int) v4List.get(4));
// batches redistributed:
assertTrue("Batch was not redistributed", (v4List.get(5)) > 0);
// events not queued conflated:
assertEquals(0, (int) v4List.get(7));
}

@Test
Expand Down Expand Up @@ -782,19 +786,19 @@ public void testParallelPropagationHAWithGroupTransactionEvents() throws Excepti
int batchSize = 9;
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ public void testPartitionedSerialPropagationHA() throws Exception {
inv.join();
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ public void testReplicatedSerialPropagationWithGroupTransactionEventsSendsBatche

vm4.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, entries, entries));
vm4.invoke(() -> WANTestBase.checkBatchStats("ln", 1, true));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));

// wait until queue is empty
vm5.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(0) == 0));

vm5.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, 0, 0));
vm5.invoke(() -> WANTestBase.checkBatchStats("ln", 0, true));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down Expand Up @@ -354,13 +356,15 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv

vm4.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, entries, entries));
vm4.invoke(() -> WANTestBase.checkBatchStats("ln", 2, true, true));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));

// wait until queue is empty
vm5.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(0) == 0));

vm5.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, 0, 0));
vm5.invoke(() -> WANTestBase.checkBatchStats("ln", 0, true));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ public void test_create_SerialGatewaySender_ThrowsException_when_GroupTransactio
"SerialGatewaySender NYSender cannot be created with group transaction events set to true when dispatcher threads is greater than 1");
}

@Test
public void test_create_GatewaySender_ThrowsException_when_GroupTransactionEvents_isTrue_and_BatchConflation_is_enabled() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setBatchConflationEnabled(true);
fact.setGroupTransactionEvents(true);
assertThatThrownBy(() -> fact.create("NYSender", 2))
.isInstanceOf(GatewaySenderException.class)
.hasMessageContaining(
"GatewaySender NYSender cannot be created with both group transaction events set to true and batch conflation enabled");
}

/**
* Test to validate that sender with same Id can not be added to cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ public GatewaySender create(String id, int remoteDSId) {
}
}
}
if (this.attrs.mustGroupTransactionEvents() && this.attrs.isBatchConflationEnabled()) {
throw new GatewaySenderException(
String.format(
"GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled",
id));
}

if (this.attrs.isParallel()) {
if ((this.attrs.getOrderPolicy() != null)
Expand Down