Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-8486: record TransactionDataRebalancedException if tx put failed #5500

Merged
merged 2 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ public class TXState implements TXStateInterface {
/** keeps track of events, so as not to re-apply events */
protected Set<EventID> seenEvents = new HashSet<EventID>();
/** keeps track of results of txPutEntry */
private Map<EventID, Boolean> seenResults = new HashMap<EventID, Boolean>();
private Map<EventID, Boolean> seenResults = new HashMap<>();
/** keeps track of TransactionDataRebalancedException during txPutEntry */
private Map<EventID, TransactionDataRebalancedException> eventExceptions = new HashMap<>();

@Immutable
static final TXEntryState ENTRY_EXISTS = new TXEntryState();
Expand Down Expand Up @@ -193,19 +195,34 @@ private void recordEvent(EntryEventImpl event) {
}
}

private void recordEventAndResult(EntryEventImpl event, boolean result) {
void recordEventAndResult(EntryEventImpl event, boolean result) {
recordEvent(event);
if (event.getEventId() != null) {
this.seenResults.put(event.getEventId(), result);
}
}

private boolean getRecordedResult(EntryEventImpl event) {
boolean getRecordedResult(EntryEventImpl event) {
assert event != null;
assert this.seenResults.containsKey(event.getEventId());
return this.seenResults.get(event.getEventId());
}

void recordEventException(EntryEventImpl event,
TransactionDataRebalancedException exception) {
if (event.getEventId() != null) {
eventExceptions.put(event.getEventId(), exception);
}
}

boolean getRecordedResultOrException(EntryEventImpl event) {
boolean result = getRecordedResult(event);
if (!result && eventExceptions.containsKey(event.getEventId())) {
throw eventExceptions.get(event.getEventId());
}
return result;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down Expand Up @@ -1374,14 +1391,14 @@ public boolean txPutEntry(final EntryEventImpl event, boolean ifNew, boolean req
}

if (hasSeenEvent(event)) {
return getRecordedResult(event);
return getRecordedResultOrException(event);
}

// if requireOldValue then oldValue gets set in event
// (even if ifNew and entry exists)
// !!!:ezoerner:20080813 need to handle ifOld for transactional on
// PRs when PRs become transactional
TXEntryState tx = null;
TXEntryState tx;
boolean result = false;
try {
tx = txWriteEntry(region, event, ifNew, requireOldValue, expectedOldValue);
Expand All @@ -1392,6 +1409,10 @@ public boolean txPutEntry(final EntryEventImpl event, boolean ifNew, boolean req
}
} catch (EntryNotFoundException e) {
result = false;
} catch (TransactionDataRebalancedException e) {
result = false;
recordEventException(event, e);
throw e;
} finally {
recordEventAndResult(event, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,32 @@
import javax.transaction.Status;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.FailedSynchronizationException;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.SynchronizationCommitConflictException;
import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.cache.TransactionDataRebalancedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.distributed.internal.InternalDistributedSystem;

public class TXStateTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();

private TXStateProxyImpl txStateProxy;
private CommitConflictException exception;
private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException;
private SingleThreadJTAExecutor executor;
private InternalCache cache;
private InternalDistributedSystem internalDistributedSystem;
private final EntryEventImpl event = mock(EntryEventImpl.class);
private final EventID eventID = mock(EventID.class);

@Before
public void setup() {
Expand All @@ -62,6 +70,7 @@ public void setup() {

when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
when(event.getEventId()).thenReturn(eventID);
}

@Test
Expand Down Expand Up @@ -298,4 +307,63 @@ public void doCleanupThrowsWhenReleasingLockGotIllegalMonitorStateExceptionIfCac
verify(regionState1).cleanup(region1);
}

@Test
public void getRecordedResultsReturnsFalseIfRecordedFalse() {
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
txState.recordEventAndResult(event, false);

assertThat(txState.getRecordedResult(event)).isFalse();
}

@Test
public void getRecordedResultsReturnsTrueIfRecordedTrue() {
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
txState.recordEventAndResult(event, true);

assertThat(txState.getRecordedResult(event)).isTrue();
}

@Test
public void getRecordedResultOrExceptionThrowsIfRecordedException() {
expectedException.expect(TransactionDataRebalancedException.class);
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
txState.recordEventAndResult(event, false);
txState.recordEventException(event, new TransactionDataRebalancedException(""));

txState.getRecordedResultOrException(event);
}

@Test
public void getRecordedResultOrExceptionReturnFalseIfRecordedFalse() {
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
txState.recordEventAndResult(event, false);

assertThat(txState.getRecordedResultOrException(event)).isFalse();
}

@Test
public void getRecordedResultOrExceptionReturnTrueIfRecordedTrue() {
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
txState.recordEventAndResult(event, true);

assertThat(txState.getRecordedResultOrException(event)).isTrue();
}

@Test
public void txPutEntryRecordExceptionIfFailedWithTransactionDataRebalancedException() {
expectedException.expect(TransactionDataRebalancedException.class);
TXState txState = spy(new TXState(txStateProxy, true, disabledClock()));
boolean ifNew = true;
boolean requireOldValue = false;
boolean checkResources = false;
TransactionDataRebalancedException exception = new TransactionDataRebalancedException("");
InternalRegion region = mock(InternalRegion.class);
when(event.getRegion()).thenReturn(region);
doThrow(exception).when(txState).txWriteEntry(region, event, ifNew, requireOldValue, null);

assertThat(txState.txPutEntry(event, ifNew, requireOldValue, checkResources, null)).isFalse();

verify(txState, never()).getRecordedResultOrException(event);
verify(txState).recordEventException(event, exception);
}
}