Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -526,21 +526,36 @@ public boolean hasSeenEvent(EntryEventImpl event) {
isDuplicate = getEventTracker().hasSeenEvent(event);
if (isDuplicate) {
markEventAsDuplicate(event);
if (event.getVersionTag() == null && event.getRegion().getConcurrencyChecksEnabled()
&& event.getEventId() != null) {
// For a retry, versionTag can be missing possibly due to no versionTag recorded during gii.
findAndSetVersionTag(event);
if (event.getVersionTag() == null) {
logger.info("No version tag can be found in cluster when retrying the following event {}",
event);
}
}
} else {
// bug #48205 - a retried PR operation may already have a version assigned to it
// in another VM
if (event.isPossibleDuplicate() && event.getRegion().getConcurrencyChecksEnabled()
&& event.getVersionTag() == null && event.getEventId() != null) {
boolean isBulkOp = event.getOperation().isPutAll() || event.getOperation().isRemoveAll();
VersionTag tag =
FindVersionTagOperation.findVersionTag(event.getRegion(), event.getEventId(), isBulkOp);
event.setVersionTag(tag);
findAndSetVersionTag(event);
}
}
return isDuplicate;
}

private void markEventAsDuplicate(EntryEventImpl event) {
@VisibleForTesting
void findAndSetVersionTag(EntryEventImpl event) {
boolean isBulkOp = event.getOperation().isPutAll() || event.getOperation().isRemoveAll();
VersionTag tag =
FindVersionTagOperation.findVersionTag(event.getRegion(), event.getEventId(), isBulkOp);
event.setVersionTag(tag);
}

@VisibleForTesting
void markEventAsDuplicate(EntryEventImpl event) {
event.setPossibleDuplicate(true);
if (getConcurrencyChecksEnabled() && event.getVersionTag() == null) {
if (event.isBulkOpInProgress()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.event.EventTracker;
import org.apache.geode.internal.cache.versions.RegionVersionHolder;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;

Expand All @@ -49,6 +51,8 @@ public class DistributedRegionTest {
private RegionVersionHolder<VersionSource<Object>> holder;
private VersionSource<Object> lostMemberVersionID;
private InternalDistributedMember member;
private EntryEventImpl event;
private EventTracker eventTracker;

@Before
@SuppressWarnings("unchecked")
Expand All @@ -57,6 +61,8 @@ public void setup() {
holder = mock(RegionVersionHolder.class);
lostMemberVersionID = mock(VersionSource.class);
member = mock(InternalDistributedMember.class);
event = mock(EntryEventImpl.class);
eventTracker = mock(EventTracker.class);
}

@Test
Expand Down Expand Up @@ -260,4 +266,132 @@ public void validateAsynchronousEventDispatcherShouldThrowExceptionWhenDispatche
.hasMessage("Parallel Gateway Sender " + senderId
+ " can not be used with replicated region " + regionPath);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfFoundInEventTrackerAndVersionTagIsSet() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(true);
when(event.getVersionTag()).thenReturn(mock(VersionTag.class));

assertThat(distributedRegion.hasSeenEvent(event)).isTrue();
verify(distributedRegion).markEventAsDuplicate(event);
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfFoundInEventTrackerAndConcurrencyChecksNotEnabled() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(false);

assertThat(distributedRegion.hasSeenEvent(event)).isTrue();
verify(distributedRegion).markEventAsDuplicate(event);
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfFoundInEventTrackerAndEventIdIsNotSet() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(false);
when(event.getEventId()).thenReturn(null);

assertThat(distributedRegion.hasSeenEvent(event)).isTrue();
verify(distributedRegion).markEventAsDuplicate(event);
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventWillFindAndSetVersionTagIfFoundInEventTrackerButValidTagNotSet() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
when(event.getEventId()).thenReturn(mock(EventID.class));

assertThat(distributedRegion.hasSeenEvent(event)).isTrue();
verify(distributedRegion).markEventAsDuplicate(event);
verify(distributedRegion).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfNotFoundEventInEventTrackerAndNotADuplicateEvent() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(false);
when(event.isPossibleDuplicate()).thenReturn(false);

assertThat(distributedRegion.hasSeenEvent(event)).isFalse();
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfNotFoundInEventTrackerAndConcurrencyChecksNotEnabled() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(false);
when(event.isPossibleDuplicate()).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(false);

assertThat(distributedRegion.hasSeenEvent(event)).isFalse();
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfNotFoundInEventTrackerAndVersionTagIsSet() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(false);
when(event.isPossibleDuplicate()).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
when(event.getVersionTag()).thenReturn(mock(VersionTag.class));

assertThat(distributedRegion.hasSeenEvent(event)).isFalse();
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventDoesNotFindAndSetVersionTagIfNotFoundInEventTrackerAndNoEventId() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(false);
when(event.isPossibleDuplicate()).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
when(event.getEventId()).thenReturn(null);

assertThat(distributedRegion.hasSeenEvent(event)).isFalse();
verify(distributedRegion, never()).findAndSetVersionTag(event);
}

@Test
public void hasSeenEventWillFindAndSetVersionTagIfNotFoundInEventTrackerAndIsPossibleDuplicateWithConcurrencyChecksEnabled() {
DistributedRegion distributedRegion = mock(DistributedRegion.class);
doCallRealMethod().when(distributedRegion).hasSeenEvent(event);
when(distributedRegion.getEventTracker()).thenReturn(eventTracker);
when(eventTracker.hasSeenEvent(event)).thenReturn(false);
when(event.isPossibleDuplicate()).thenReturn(true);
when(event.getRegion()).thenReturn(distributedRegion);
when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
when(event.getEventId()).thenReturn(mock(EventID.class));

assertThat(distributedRegion.hasSeenEvent(event)).isFalse();
verify(distributedRegion).findAndSetVersionTag(event);
}
}