Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-9625: Only serialize transaction metadata when grouping enabled. #6892

Merged
merged 4 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ org/apache/geode/cache/query/internal/xml/ElementType$1
org/apache/geode/cache/query/internal/xml/ElementType$2
org/apache/geode/cache/query/internal/xml/ElementType$3
org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut
org/apache/geode/internal/cache/wan/GatewaySenderEventImpl$TransactionMetadataDisposition
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package org.apache.geode.internal.cache.wan;

import static java.lang.Boolean.TRUE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;

import java.io.IOException;
Expand Down Expand Up @@ -856,7 +859,7 @@ private List<GatewaySenderEventImpl> addPDXEvent() throws IOException {
event.setCallbackArgument(geCallbackArg);
// OFFHEAP: event for pdx type meta data so it should never be off-heap
GatewaySenderEventImpl pdxSenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, event, null, false);
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, event, null);

pdxEventsMap.put(typeEntry.getKey(), pdxSenderEvent);
pdxSenderEventsList.add(pdxSenderEvent);
Expand Down Expand Up @@ -1225,6 +1228,18 @@ public void closeProcessor() {
}
}

protected GatewaySenderEventImpl.TransactionMetadataDisposition getTransactionMetadataDisposition(
final boolean isLastEventInTransaction) {
if (getSender().mustGroupTransactionEvents()) {
if (isLastEventInTransaction) {
return INCLUDE_LAST_EVENT;
}
return INCLUDE;
} else {
return EXCLUDE;
}
}

public void removeCacheListener() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package org.apache.geode.internal.cache.wan;

import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -178,7 +181,8 @@ public class GatewaySenderEventImpl

private short version;

private boolean isLastEventInTransaction = true;
private boolean isLastEventInTransaction = false;

private TransactionId transactionId = null;


Expand Down Expand Up @@ -214,59 +218,54 @@ public class GatewaySenderEventImpl

private volatile int serializedValueSize = DEFAULT_SERIALIZED_VALUE_SIZE;

// /**
// * Is this thread in the process of deserializing this event?
// */
// public static final ThreadLocal isDeserializingValue = new ThreadLocal() {
// @Override
// protected Object initialValue() {
// return Boolean.FALSE;
// }
// };


/**
* Constructor. No-arg constructor for data serialization.
* No-arg constructor for data serialization.
*
* @see DataSerializer
*/
public GatewaySenderEventImpl() {}

/**
* Constructor. Creates an initialized <code>GatewayEventImpl</code>
*
* @param operation The operation for this event (e.g. AFTER_CREATE)
* @param event The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based
* @param substituteValue The value to be enqueued instead of the value in the event.
* @param isLastEventInTransaction true if the event is the last in the transaction
* @param transactionMetadataDisposition indicating the inclusion of transaction metadata.
*
*/
@Retained
public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
Object substituteValue, boolean isLastEventInTransaction) throws IOException {
this(operation, event, substituteValue, true, isLastEventInTransaction);
Object substituteValue, final TransactionMetadataDisposition transactionMetadataDisposition)
throws IOException {
this(operation, event, substituteValue, true, transactionMetadataDisposition);
}

@Retained
public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
Object substituteValue) throws IOException {
this(operation, event, substituteValue, true, EXCLUDE);
}

@Retained
public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event,
Object substituteValue, boolean initialize, int bucketId,
boolean isLastEventInTransaction) throws IOException {
this(operation, event, substituteValue, initialize, isLastEventInTransaction);
final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException {
this(operation, event, substituteValue, initialize, transactionMetadataDisposition);
this.bucketId = bucketId;
}

/**
* Constructor.
*
* @param operation The operation for this event (e.g. AFTER_CREATE)
* @param ce The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based
* @param substituteValue The value to be enqueued instead of the value in the event.
* @param initialize Whether to initialize this instance
*
* @param transactionMetadataDisposition indicating the inclusion of transaction metadata.
*/
@Retained
public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> ce,
Object substituteValue,
boolean initialize, boolean isLastEventInTransaction) throws IOException {
Object substituteValue, boolean initialize,
final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException {
// Set the operation and event
final EntryEventImpl event = (EntryEventImpl) ce;
this.operation = operation;
Expand Down Expand Up @@ -322,9 +321,11 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> ce,
}
isConcurrencyConflict = event.isConcurrencyConflict();

transactionId = event.getTransactionId();
this.isLastEventInTransaction = isLastEventInTransaction;

if (transactionMetadataDisposition != EXCLUDE) {
transactionId = event.getTransactionId();
isLastEventInTransaction =
transactionMetadataDisposition == INCLUDE_LAST_EVENT && null != transactionId;
}
}

/**
Expand Down Expand Up @@ -1354,4 +1355,20 @@ protected GatewaySenderEventImpl makeCopy() {
public void setAcked(boolean acked) {
isAcked = acked;
}

public enum TransactionMetadataDisposition {
jake-at-work marked this conversation as resolved.
Show resolved Hide resolved
/**
* Transaction metadata should be excluded from the event.
*/
EXCLUDE,
/**
* Transaction metadata should be included in the event.
*/
INCLUDE,
/**
* Transaction metadata should be included in the event and this is the last event in the
* transaction.
*/
INCLUDE_LAST_EVENT,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void enqueueEvent(EnumListenerEvent operation, EntryEvent<?, ?> event,

final GatewaySenderEventImpl gatewayQueueEvent =
new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID(),
isLastEventInTransaction);
getTransactionMetadataDisposition(isLastEventInTransaction));

enqueueEvent(gatewayQueueEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ public void enqueueEvent(EnumListenerEvent operation, EntryEvent<?, ?> event,
if (!(isUpdateVersionStamp || isCME_And_NotOriginRemote)) {
senderEvent =
new GatewaySenderEventImpl(operation, event, substituteValue, false,
isLastEventInTransaction);
getTransactionMetadataDisposition(isLastEventInTransaction));
handleSecondaryEvent(senderEvent);
}
}
Expand All @@ -432,9 +432,9 @@ public void enqueueEvent(EnumListenerEvent operation, EntryEvent<?, ?> event,
waitForFailoverCompletion();
}
// If it is, create and enqueue an initialized GatewayEventImpl
senderEvent =
new GatewaySenderEventImpl(operation, event, substituteValue, isLastEventInTransaction); // OFFHEAP
// ok
// OFFHEAP ok
senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue,
getTransactionMetadataDisposition(isLastEventInTransaction));

boolean queuedEvent = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Operatio
EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null);
eei.setEventId(new EventID(new byte[16], threadId, sequenceId));

return new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true, false);
return new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null);
}

private EnumListenerEvent getEnumListenerEvent(Operation operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
*/
package org.apache.geode.internal.cache.wan;

import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -27,7 +31,7 @@

public class AbstractGatewaySenderEventProcessorTest {

private RegionQueue queue = mock(RegionQueue.class);
private final RegionQueue queue = mock(RegionQueue.class);

@Test
public void eventQueueSizeReturnsQueueSize() {
Expand All @@ -49,4 +53,30 @@ public void eventQueueSizeReturnsZeroIfRegionQueueIsNull() {

verify(queue, never()).size();
}

@Test
public void getTransactionMetadataDispositionIncludedWhenSenderMustGroupTransactionEventsTrue() {
final AbstractGatewaySenderEventProcessor processor =
mock(AbstractGatewaySenderEventProcessor.class);
final AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
when(processor.getSender()).thenReturn(sender);
when(sender.mustGroupTransactionEvents()).thenReturn(true);
when(processor.getTransactionMetadataDisposition(anyBoolean())).thenCallRealMethod();

assertThat(processor.getTransactionMetadataDisposition(false)).isEqualTo(INCLUDE);
assertThat(processor.getTransactionMetadataDisposition(true)).isEqualTo(INCLUDE_LAST_EVENT);
}

@Test
public void getTransactionMetadataDispositionExcludedWhenSenderMustGroupTransactionEventsFalse() {
final AbstractGatewaySenderEventProcessor processor =
mock(AbstractGatewaySenderEventProcessor.class);
final AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
when(sender.mustGroupTransactionEvents()).thenReturn(false);
when(processor.getSender()).thenReturn(sender);
when(processor.getTransactionMetadataDisposition(anyBoolean())).thenCallRealMethod();

assertThat(processor.getTransactionMetadataDisposition(false)).isEqualTo(EXCLUDE);
assertThat(processor.getTransactionMetadataDisposition(true)).isEqualTo(EXCLUDE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
package org.apache.geode.internal.cache.wan;

import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT;
import static org.apache.geode.internal.serialization.KnownVersion.GEODE_1_13_0;
import static org.apache.geode.internal.serialization.KnownVersion.GEODE_1_14_0;
import static org.apache.geode.internal.serialization.KnownVersion.GEODE_1_8_0;
Expand Down Expand Up @@ -42,6 +45,8 @@
import org.apache.geode.cache.TransactionId;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
Expand Down Expand Up @@ -263,6 +268,87 @@ public void testSerialization() throws Exception {
.isEqualTo(deserializedEvent.isLastEventInTransaction());
}

@Test
public void constructsWithTransactionMetadataWhenInclude() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE);

assertThat(gatewaySenderEvent.getTransactionId()).isNotNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse();
}

@Test
public void constructsWithTransactionMetadataWhenIncludedLastEvent() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null,
INCLUDE_LAST_EVENT);

assertThat(gatewaySenderEvent.getTransactionId()).isNotNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isTrue();
}

@Test
public void constructsWithoutTransactionMetadataWhenExcluded() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, EXCLUDE);

assertThat(gatewaySenderEvent.getTransactionId()).isNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse();
}

@Test
public void constructsWithoutTransactionMetadataWhenIncludedButNotTransactionEvent()
throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(null);

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE);

assertThat(gatewaySenderEvent.getTransactionId()).isNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse();
}

@Test
public void constructsWithoutTransactionMetadataWhenIncludedLastEventButNotTransactionEvent()
throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(null);

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null,
INCLUDE_LAST_EVENT);

assertThat(gatewaySenderEvent.getTransactionId()).isNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse();
}

@Test
public void constructsWithoutTransactionMetadataWhenExcludedButNotTransactionEvent()
throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(null);

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, EXCLUDE);

assertThat(gatewaySenderEvent.getTransactionId()).isNull();
assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse();
}

private EntryEventImpl mockEntryEventImpl(final TransactionId transactionId) {
final EntryEventImpl cacheEvent = mock(EntryEventImpl.class);
when(cacheEvent.getEventId()).thenReturn(mock(EventID.class));
when(cacheEvent.getOperation()).thenReturn(Operation.CREATE);
when(cacheEvent.getTransactionId()).thenReturn(transactionId);
final LocalRegion region = mock(LocalRegion.class);
when(cacheEvent.getRegion()).thenReturn(region);
return cacheEvent;
}

public static class VersionAndExpectedInvocations {

private final KnownVersion version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.internal.cache.wan.parallel;

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE;
import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -94,7 +95,7 @@ public static GatewaySenderEventImpl createGatewaySenderEvent(LocalRegion lr, Op
eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId));
GatewaySenderEventImpl gsei =
new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true, bucketId,
false);
EXCLUDE);
gsei.setShadowKey(shadowKey);
return gsei;
}
Expand Down