Skip to content

Commit

Permalink
IGNITE-10604: MVCC: mvcc history can be missed during remove operatio…
Browse files Browse the repository at this point in the history
…n. This closes #5979.
  • Loading branch information
AMashenkov authored and gvvinblade committed Feb 7, 2019
1 parent 5b8431e commit b5b9ffa
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 103 deletions.
Expand Up @@ -5606,6 +5606,8 @@ else if (res.resultType() == ResultType.REMOVED_NOT_NULL) {
updRes.invokeResult(res.invokeResult());
}

updRes.newValue(res.newValue());

if (needOldVal && compareIgnoreOpCounter(res.resultVersion(), mvccVer) != 0 &&
(res.resultType() == ResultType.PREV_NOT_NULL || res.resultType() == ResultType.REMOVED_NOT_NULL))
updRes.oldValue(res.oldValue());
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo implements MvccVe

/** {@inheritDoc} */
@Override public int newMvccOperationCounter() {
return newMvccOpCntr & ~MVCC_OP_COUNTER_MASK;
return newMvccOpCntr & MVCC_OP_COUNTER_MASK;
}

/** {@inheritDoc} */
Expand All @@ -85,7 +85,7 @@ public class GridCacheMvccEntryInfo extends GridCacheEntryInfo implements MvccVe

/** {@inheritDoc} */
@Override public int mvccOperationCounter() {
return mvccOpCntr & ~MVCC_OP_COUNTER_MASK;
return mvccOpCntr & MVCC_OP_COUNTER_MASK;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -121,6 +121,7 @@
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_HINTS_BIT_OFF;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_KEY_ABSENT_BEFORE_OFF;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_MASK;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.compare;
Expand Down Expand Up @@ -1443,7 +1444,7 @@ protected class CacheDataStoreImpl implements CacheDataStore {
private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>();

/** Mvcc remove handler. */
private final PageHandler<MvccVersion, Boolean> mvccUpdateMarker = new MvccMarkUpdatedHandler();
private final PageHandler<MvccUpdateDataRow, Boolean> mvccUpdateMarker = new MvccMarkUpdatedHandler();

/** Mvcc update tx state hint handler. */
private final PageHandler<Void, Boolean> mvccUpdateTxStateHint = new MvccUpdateTxStateHintHandler();
Expand Down Expand Up @@ -1998,7 +1999,7 @@ else if (res == ResultType.VERSION_FOUND || // exceptional case

// Mark old version as removed.
if (res == ResultType.PREV_NOT_NULL) {
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, updateRow, grp.statisticsHolderData());

if (op == CacheInvokeEntry.Operation.REMOVE) {
updateRow.resultType(ResultType.REMOVED_NOT_NULL);
Expand All @@ -2014,7 +2015,7 @@ else if (res == ResultType.VERSION_FOUND || // exceptional case
assert op != CacheInvokeEntry.Operation.REMOVE;
}
else if (oldRow != null)
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, updateRow, grp.statisticsHolderData());

if (!grp.storeCacheIdInDataPage() && updateRow.cacheId() != CU.UNDEFINED_CACHE_ID) {
updateRow.cacheId(CU.UNDEFINED_CACHE_ID);
Expand Down Expand Up @@ -2174,7 +2175,7 @@ else if (res == ResultType.PREV_NOT_NULL) {

assert oldRow != null && oldRow.link() != 0 : oldRow;

rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, updateRow, grp.statisticsHolderData());

clearPendingEntries(cctx, oldRow);
}
Expand Down Expand Up @@ -2451,7 +2452,7 @@ else if (res == ResultType.PREV_NOT_NULL) {

assert oldRow != null && oldRow.link() != 0 : oldRow;

rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, mvccSnapshot, grp.statisticsHolderData());
rowStore.updateDataRow(oldRow.link(), mvccUpdateMarker, updateRow, grp.statisticsHolderData());
}

if (val != null) {
Expand Down Expand Up @@ -3154,10 +3155,10 @@ private class MvccUpdateRowWithPreloadInfoClosure extends MvccDataRow implements
/**
* Mvcc remove handler.
*/
private final class MvccMarkUpdatedHandler extends PageHandler<MvccVersion, Boolean> {
private final class MvccMarkUpdatedHandler extends PageHandler<MvccUpdateDataRow, Boolean> {
/** {@inheritDoc} */
@Override public Boolean run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc,
MvccVersion newVer, int itemId, IoStatisticsHolder statHolder) throws IgniteCheckedException {
MvccUpdateDataRow updateDataRow, int itemId, IoStatisticsHolder statHolder) throws IgniteCheckedException {
assert grp.mvccEnabled();

DataPageIO iox = (DataPageIO)io;
Expand All @@ -3167,15 +3168,18 @@ private final class MvccMarkUpdatedHandler extends PageHandler<MvccVersion, Bool

long newCrd = iox.newMvccCoordinator(pageAddr, off);
long newCntr = iox.newMvccCounter(pageAddr, off);
int newOpCntr = iox.newMvccOperationCounter(pageAddr, off);
int newOpCntr = iox.rawNewMvccOperationCounter(pageAddr, off);

assert newCrd == MVCC_CRD_COUNTER_NA || state(grp, newCrd, newCntr, newOpCntr) == TxState.ABORTED;

iox.updateNewVersion(pageAddr, off, newVer, TxState.NA);
int keyAbsentBeforeFlag = updateDataRow.isKeyAbsentBefore() ? (1 << MVCC_KEY_ABSENT_BEFORE_OFF) : 0;

iox.updateNewVersion(pageAddr, off, updateDataRow.mvccCoordinatorVersion(), updateDataRow.mvccCounter(),
updateDataRow.mvccOperationCounter() | keyAbsentBeforeFlag, TxState.NA);

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccMarkUpdatedRecord(cacheId, pageId, itemId,
newVer.coordinatorVersion(), newVer.counter(), newVer.operationCounter()));
updateDataRow.mvccCoordinatorVersion(), updateDataRow.mvccCounter(), updateDataRow.mvccOperationCounter()));

return TRUE;
}
Expand All @@ -3196,14 +3200,14 @@ private final class MvccUpdateTxStateHintHandler extends PageHandler<Void, Boole

long crd = iox.mvccCoordinator(pageAddr, off);
long cntr = iox.mvccCounter(pageAddr, off);
int opCntr = iox.mvccOperationCounter(pageAddr, off);
int opCntr = iox.rawMvccOperationCounter(pageAddr, off);
byte txState = (byte)(opCntr >>> MVCC_HINTS_BIT_OFF);

if (txState == TxState.NA) {
byte state = state(grp, crd, cntr, opCntr);

if (state == TxState.COMMITTED || state == TxState.ABORTED) {
iox.mvccOperationCounter(pageAddr, off, opCntr | (state << MVCC_HINTS_BIT_OFF));
iox.rawMvccOperationCounter(pageAddr, off, opCntr | (state << MVCC_HINTS_BIT_OFF));

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccUpdateTxStateHintRecord(cacheId, pageId, itemId, state));
Expand All @@ -3214,14 +3218,14 @@ private final class MvccUpdateTxStateHintHandler extends PageHandler<Void, Boole

long newCrd = iox.newMvccCoordinator(pageAddr, off);
long newCntr = iox.newMvccCounter(pageAddr, off);
int newOpCntr = iox.newMvccOperationCounter(pageAddr, off);
int newOpCntr = iox.rawNewMvccOperationCounter(pageAddr, off);
byte newTxState = (byte)(newOpCntr >>> MVCC_HINTS_BIT_OFF);

if (newCrd != MVCC_CRD_COUNTER_NA && newTxState == TxState.NA) {
byte state = state(grp, newCrd, newCntr, newOpCntr);

if (state == TxState.COMMITTED || state == TxState.ABORTED) {
iox.newMvccOperationCounter(pageAddr, off, newOpCntr | (state << MVCC_HINTS_BIT_OFF));
iox.rawNewMvccOperationCounter(pageAddr, off, newOpCntr | (state << MVCC_HINTS_BIT_OFF));

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccUpdateNewTxStateHintRecord(cacheId, pageId, itemId, state));
Expand Down Expand Up @@ -3250,14 +3254,14 @@ private final class MvccApplyChangesHandler extends PageHandler<MvccDataRow, Boo

long crd = iox.mvccCoordinator(pageAddr, off);
long cntr = iox.mvccCounter(pageAddr, off);
int opCntrAndHint = iox.mvccOperationCounter(pageAddr, off);
int opCntr = opCntrAndHint & ~MVCC_OP_COUNTER_MASK;
int opCntrAndHint = iox.rawMvccOperationCounter(pageAddr, off);
int opCntr = opCntrAndHint & MVCC_OP_COUNTER_MASK;
byte txState = (byte)(opCntrAndHint >>> MVCC_HINTS_BIT_OFF);

long newCrd = iox.newMvccCoordinator(pageAddr, off);
long newCntr = iox.newMvccCounter(pageAddr, off);
int newOpCntrAndHint = iox.newMvccOperationCounter(pageAddr, off);
int newOpCntr = newOpCntrAndHint & ~MVCC_OP_COUNTER_MASK;
int newOpCntrAndHint = iox.rawNewMvccOperationCounter(pageAddr, off);
int newOpCntr = newOpCntrAndHint & MVCC_OP_COUNTER_MASK;
byte newTxState = (byte)(newOpCntrAndHint >>> MVCC_HINTS_BIT_OFF);

assert crd == newRow.mvccCoordinatorVersion();
Expand All @@ -3267,7 +3271,7 @@ private final class MvccApplyChangesHandler extends PageHandler<MvccDataRow, Boo
if (txState != newRow.mvccTxState() && newRow.mvccTxState() != TxState.NA) {
assert txState == TxState.NA;

iox.mvccOperationCounter(pageAddr, off, opCntr | (newRow.mvccTxState() << MVCC_HINTS_BIT_OFF));
iox.rawMvccOperationCounter(pageAddr, off, opCntr | (newRow.mvccTxState() << MVCC_HINTS_BIT_OFF));

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccUpdateTxStateHintRecord(cacheId, pageId, itemId, newRow.mvccTxState()));
Expand All @@ -3280,7 +3284,8 @@ private final class MvccApplyChangesHandler extends PageHandler<MvccDataRow, Boo
newRow.newMvccCounter(),
newRow.newMvccOperationCounter()) != 0) {

iox.updateNewVersion(pageAddr, off, newRow.newMvccVersion(), newRow.newMvccTxState());
iox.updateNewVersion(pageAddr, off, newRow.newMvccCoordinatorVersion(), newRow.newMvccCounter(),
newRow.newMvccOperationCounter(), newRow.newMvccTxState());

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccMarkUpdatedRecord(cacheId, pageId, itemId,
Expand All @@ -3289,7 +3294,7 @@ private final class MvccApplyChangesHandler extends PageHandler<MvccDataRow, Boo
else if (newTxState != newRow.newMvccTxState() && newRow.newMvccTxState() != TxState.NA) {
assert newTxState == TxState.NA;

iox.newMvccOperationCounter(pageAddr, off, newOpCntr | (newRow.newMvccTxState() << MVCC_HINTS_BIT_OFF));
iox.rawNewMvccOperationCounter(pageAddr, off, newOpCntr | (newRow.newMvccTxState() << MVCC_HINTS_BIT_OFF));

if (isWalDeltaRecordNeeded(grp.dataRegion().pageMemory(), cacheId, pageId, page, ctx.wal(), walPlc))
ctx.wal().log(new DataPageMvccUpdateNewTxStateHintRecord(cacheId, pageId, itemId, newRow.newMvccTxState()));
Expand Down
Expand Up @@ -701,7 +701,7 @@ private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearc
batches.put(node.id(), batch = new Batch(node));

if (moving && hist0 == null) {
assert !F.isEmpty(hist);
assert !F.isEmpty(hist) || val == null;

hist0 = fetchHistoryInfo(key, hist);
}
Expand Down
Expand Up @@ -2113,7 +2113,7 @@ private IgniteInternalFuture<Long> updateAsync(GridNearTxQueryAbstractEnlistFutu
assert res != null;

if (res > 0) {
if (mvccSnapshot.operationCounter() == ~MvccUtils.MVCC_OP_COUNTER_MASK) {
if (mvccSnapshot.operationCounter() == MvccUtils.MVCC_READ_OP_CNTR) {
throw new IgniteCheckedException("The maximum limit of the number of statements allowed in" +
" one transaction is reached. [max=" + mvccSnapshot.operationCounter() + ']');
}
Expand Down

0 comments on commit b5b9ffa

Please sign in to comment.