Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8302: Fixed 'events not queued conflated' stats when group-tran… #5313

Merged
merged 7 commits into from
Jul 10, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ public class SerialGatewaySenderQueue implements RegionQueue {
*/
private final AtomicLong tailKey = new AtomicLong();

private final Deque<Long> peekedIds = new LinkedBlockingDeque<Long>();

/**
* Last key peeked from the queue excluding the keys peeked
* to complete transactions when group-transaction-events is enabled.
* Contains the peekedIds but not including those that were peeked to complete a transaction
* inside a batch when groupTransactionEvents is set.
*/
private final AtomicLong lastPeekedId = new AtomicLong(-1);

private final Deque<Long> peekedIds = new LinkedBlockingDeque<Long>();
private final Deque<Long> peekedIdsWithoutExtra = new LinkedBlockingDeque<Long>();

/**
* Contains the set of peekedIds that were peeked to complete a transaction
Expand Down Expand Up @@ -315,7 +315,9 @@ public void remove() throws CacheException {
return;
}
Long key = peekedIds.remove();
extraPeekedIds.remove(key);
if (!extraPeekedIds.remove(key)) {
peekedIdsWithoutExtra.remove();
} ;
try {
// Increment the head key
updateHeadKey(key.longValue());
Expand Down Expand Up @@ -704,9 +706,9 @@ private long inc(long value) {
* Clear the list of peeked keys. The next peek will start again at the head key.
*/
public void resetLastPeeked() {
this.peekedIds.clear();
peekedIds.clear();
peekedIdsWithoutExtra.clear();
extraPeekedIds.clear();
lastPeekedId.set(-1);
}

/**
Expand All @@ -715,10 +717,14 @@ public void resetLastPeeked() {
*/
private Long getCurrentKey() {
long currentKey;
if (lastPeekedId.equals(-1)) {
if (peekedIdsWithoutExtra.isEmpty()) {
currentKey = getHeadKey();
} else {
currentKey = inc(lastPeekedId.get());
Long lastPeek = peekedIdsWithoutExtra.peekLast();
if (lastPeek == null) {
return null;
}
currentKey = inc(lastPeek.longValue());
}
return currentKey;
}
Expand Down Expand Up @@ -774,7 +780,8 @@ public KeyAndEventPair peekAhead() throws CacheException {
// does not save anything since GatewayBatchOp needs to GatewayEventImpl
// in object form.
while (before(currentKey, getTailKey())) {
if (!extraPeekedIds.contains(currentKey)) {
boolean currentKeyAlreadyPeeked = extraPeekedIds.contains(currentKey);
if (!currentKeyAlreadyPeeked) {
object = getObjectInSerialSenderQueue(currentKey);
if (object != null) {
break;
Expand All @@ -784,7 +791,7 @@ public KeyAndEventPair peekAhead() throws CacheException {
logger.trace("{}: Trying head key + offset: {}", this, currentKey);
}
currentKey = inc(currentKey);
if (this.stats != null) {
if (!currentKeyAlreadyPeeked && this.stats != null) {
this.stats.incEventsNotQueuedConflated();
}
}
Expand All @@ -794,8 +801,8 @@ public KeyAndEventPair peekAhead() throws CacheException {
}

if (object != null) {
this.peekedIds.add(currentKey);
lastPeekedId.set(currentKey);
peekedIds.add(currentKey);
peekedIdsWithoutExtra.add(currentKey);
return new KeyAndEventPair(currentKey, object);
}
return null;
Expand Down
2 changes: 1 addition & 1 deletion geode-docs/reference/topics/cache_xml.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ Configures a gateway sender to distribute region events to another <%=vars.produ
<tr>
<td>group-transaction-events</td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true.</p>
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
<td>false</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ create gateway-sender --id=value --remote-distributed-system-id=value
<tr>
<td><span class="keyword parmname">\-\-group-transaction-events</span></td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gatewaysenders with the <code class="ph codeph">parallel</code> attribute set to true.</p>
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gatewaysenders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
</td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ensure that events belonging to the same transaction are sent together in the sa
<b>Note:</b> This setting is supported only on serial senders with just one dispatcher
thread, or on parallel senders. In addition, the regions to which the transaction events belong must
be replicated by the same set of gateway senders that also have this setting enabled.
Furthermore, this setting cannot be enabled if batch conflation is enabled.

By default, potential WAN conflicts are resolved using a timestamp mechanism. You can optionally install a custom conflict resolver to apply custom logic when determining whether to apply a potentially conflicting update received over a WAN.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,9 @@ public ResultModel preExecution(GfshParseResult parseResult) {
Boolean groupTransactionEvents =
(Boolean) parseResult
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__GROUPTRANSACTIONEVENTS);
Boolean batchConflationEnabled =
(Boolean) parseResult
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION);

if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == null) {
return ResultModel.createError(
Expand All @@ -297,6 +300,11 @@ public ResultModel preExecution(GfshParseResult parseResult) {
"Serial Gateway Sender cannot be created with --group-transaction-events when --dispatcher-threads is greater than 1.");
}

if (groupTransactionEvents && batchConflationEnabled) {
return ResultModel.createError(
"Gateway Sender cannot be created with --group-transaction-events and --enable-batch-conflation.");
}

return ResultModel.createInfo("");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ public void testInvalidGroupTransactionEventsDueToSerialAndMoreThanOneThread() {
"Serial Gateway Sender cannot be created with --group-transaction-events when --dispatcher-threads is greater than 1");
}

@Test
public void testInvalidGroupTransactionEventsDueToConflationEnabled() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
cliFunctionResult = new CliFunctionResult("member",
CliFunctionResult.StatusState.OK, "cliFunctionResult");
functionResults.add(cliFunctionResult);
gfsh.executeAndAssertThat(command,
"create gateway-sender --member=xyz --id=1 --remote-distributed-system-id=1 " +
"--group-transaction-events --enable-batch-conflation --order-policy=THREAD")
.statusIsError().containsOutput(
"Gateway Sender cannot be created with --group-transaction-events and --enable-batch-conflation");
}

@Test
public void testFunctionArgs() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
Expand Down Expand Up @@ -277,8 +290,7 @@ public void booleanArgumentsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
+ " --manual-start"
+ " --disk-synchronous"
+ " --enable-persistence"
+ " --enable-batch-conflation"
+ " --group-transaction-events")
+ " --enable-batch-conflation")
.statusIsSuccess();
verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());

Expand All @@ -289,8 +301,23 @@ public void booleanArgumentsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
assertThat(argsArgumentCaptor.getValue().isDiskSynchronous()).isTrue();
assertThat(argsArgumentCaptor.getValue().isPersistenceEnabled()).isTrue();
assertThat(argsArgumentCaptor.getValue().isBatchConflationEnabled()).isTrue();
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isTrue();
}

@Test
public void groupTransactionEventsShouldBeSetAsTrueWhenSpecifiedWithoutValue() {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
cliFunctionResult =
new CliFunctionResult("member", CliFunctionResult.StatusState.OK, "cliFunctionResult");
functionResults.add(cliFunctionResult);
gfsh.executeAndAssertThat(command,
"create gateway-sender --member=xyz --id=testGateway --remote-distributed-system-id=1"
+ " --parallel"
+ " --group-transaction-events")
.statusIsSuccess();
verify(command).executeAndGetFunctionResult(any(), argsArgumentCaptor.capture(), any());

assertThat(argsArgumentCaptor.getValue().getId()).isEqualTo("testGateway");
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ public void testPartitionedParallelPropagationHA() throws Exception {

vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 10000));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ public void testPRParallelPropagationWithGroupTransactionEventsSendsBatchesWithC
assertEquals(1, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4));
// batches redistributed:
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5));
// events not queued conflated:
assertEquals(0, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7));
}

@Test
Expand Down Expand Up @@ -605,6 +607,8 @@ public void testPRParallelPropagationWithBatchRedistWithGroupTransactionEventsSe
assertEquals(2, (int) v4List.get(4));
// batches redistributed:
assertTrue("Batch was not redistributed", (v4List.get(5)) > 0);
// events not queued conflated:
assertEquals(0, (int) v4List.get(7));
}

@Test
Expand Down Expand Up @@ -782,19 +786,19 @@ public void testParallelPropagationHAWithGroupTransactionEvents() throws Excepti
int batchSize = 9;
boolean groupTransactionEvents = true;
vm4.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm5.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm6.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));
vm7.invoke(
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, true, false, null, true,
() -> WANTestBase.createSender("ln", 2, true, 100, batchSize, false, false, null, true,
groupTransactionEvents,
-1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ public void testPartitionedSerialPropagationHA() throws Exception {
inv.join();
vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ public void testReplicatedSerialPropagationWithGroupTransactionEventsSendsBatche

vm4.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, entries, entries));
vm4.invoke(() -> WANTestBase.checkBatchStats("ln", 1, true));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));

// wait until queue is empty
vm5.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(0) == 0));

vm5.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, 0, 0));
vm5.invoke(() -> WANTestBase.checkBatchStats("ln", 0, true));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down Expand Up @@ -354,13 +356,15 @@ public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEv

vm4.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, entries, entries));
vm4.invoke(() -> WANTestBase.checkBatchStats("ln", 2, true, true));
vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));

// wait until queue is empty
vm5.invoke(() -> await()
.until(() -> WANTestBase.getSenderStats("ln", -1).get(0) == 0));

vm5.invoke(() -> WANTestBase.checkQueueStats("ln", 0, entries, 0, 0));
vm5.invoke(() -> WANTestBase.checkBatchStats("ln", 0, true));
vm5.invoke(() -> WANTestBase.checkConflatedStats("ln", 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ public void test_create_SerialGatewaySender_ThrowsException_when_GroupTransactio
"SerialGatewaySender NYSender cannot be created with group transaction events set to true when dispatcher threads is greater than 1");
}

@Test
public void test_create_GatewaySender_ThrowsException_when_GroupTransactionEvents_isTrue_and_BatchConflation_is_enabled() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setBatchConflationEnabled(true);
fact.setGroupTransactionEvents(true);
assertThatThrownBy(() -> fact.create("NYSender", 2))
.isInstanceOf(GatewaySenderException.class)
.hasMessageContaining(
"GatewaySender NYSender cannot be created with group transaction events set to true and batch conflation enabled");
}

/**
* Test to validate that sender with same Id can not be added to cache.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ public GatewaySender create(String id, int remoteDSId) {
}
}
}
if (this.attrs.mustGroupTransactionEvents() && this.attrs.isBatchConflationEnabled()) {
throw new GatewaySenderException(
String.format(
"GatewaySender %s cannot be created with group transaction events set to true and batch conflation enabled",
id));
}

if (this.attrs.isParallel()) {
if ((this.attrs.getOrderPolicy() != null)
Expand Down