From 3aeaafcd36b48821b919ba301b5af0d948174190 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Fri, 14 Dec 2018 16:02:23 +0300 Subject: [PATCH 1/5] IGNITE-10448: MVCC: NPE on data page eviction. --- .../pagemem/wal/record/WALRecord.java | 8 ++- .../DataPageInsertFragmentMvccRecord.java | 56 +++++++++++++++ .../delta/DataPageInsertFragmentRecord.java | 2 +- .../delta/DataPageInsertMvccRecord.java | 56 +++++++++++++++ .../record/delta/DataPageInsertRecord.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 8 ++- .../persistence/CacheDataRowAdapter.java | 5 ++ .../persistence/GridCacheOffheapManager.java | 5 ++ .../cache/persistence/Storable.java | 5 ++ .../evict/PageAbstractEvictionTracker.java | 5 +- .../freelist/AbstractFreeList.java | 23 +++++-- .../metastorage/MetastorageDataRow.java | 5 ++ .../tree/io/AbstractDataPageIO.java | 69 +++++++++++++++---- .../cache/persistence/tree/io/DataPageIO.java | 9 ++- .../persistence/wal/record/RecordTypes.java | 2 + .../serializer/RecordDataV1Serializer.java | 69 +++++++++++++++++++ .../cache/tree/mvcc/data/MvccDataRow.java | 5 ++ .../PageEvictionMultinodeAbstractTest.java | 2 - .../database/CacheFreeListImplSelfTest.java | 5 ++ 19 files changed, 309 insertions(+), 32 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentMvccRecord.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertMvccRecord.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 5d7276838ed3d..d06b3a286fc19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -208,7 +208,13 @@ public enum RecordType { MVCC_DATA_RECORD (LOGICAL), /** Mvcc Tx state change record. */ - MVCC_TX_RECORD (LOGICAL); + MVCC_TX_RECORD (LOGICAL), + + /** Insert mvcc data record. */ + MVCC_DATA_PAGE_INSERT_RECORD (PHYSICAL), + + /** Insert fragmented mvcc data record. */ + MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD (PHYSICAL); /** * When you're adding a new record don't forget to choose record purpose explicitly diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentMvccRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentMvccRecord.java new file mode 100644 index 0000000000000..54c6c1642b613 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentMvccRecord.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.pagemem.wal.record.delta; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Insert mvcc fragment to data page record. + */ +public class DataPageInsertFragmentMvccRecord extends DataPageInsertFragmentRecord { + /** + * @param grpId Cache group ID. + * @param pageId Page ID. + * @param payload Fragment payload. + * @param lastLink Link to the last entry fragment. + */ + public DataPageInsertFragmentMvccRecord(int grpId, long pageId, byte[] payload, long lastLink) { + super(grpId, pageId, payload, lastLink); + } + + /** {@inheritDoc} */ + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); + + io.addRowFragment(PageIO.getPageId(pageAddr), pageAddr, payload(), lastLink(), pageMem.realPageSize(groupId()), true); + } + + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataPageInsertFragmentMvccRecord.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java index 650ae1e8014c3..dfdf35c4fb547 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java @@ -57,7 +57,7 @@ public DataPageInsertFragmentRecord( @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { AbstractDataPageIO io = PageIO.getPageIO(pageAddr); - io.addRowFragment(PageIO.getPageId(pageAddr), pageAddr, payload, lastLink, pageMem.realPageSize(groupId())); + io.addRowFragment(PageIO.getPageId(pageAddr), pageAddr, payload, lastLink, pageMem.realPageSize(groupId()), false); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertMvccRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertMvccRecord.java new file mode 100644 index 0000000000000..a8425fce842d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertMvccRecord.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.pagemem.wal.record.delta; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Insert mvcc record into data page. + */ +public class DataPageInsertMvccRecord extends DataPageInsertRecord { + /** + * @param grpId Cache group ID. + * @param pageId Page ID. + * @param payload Remainder of the record. + */ + public DataPageInsertMvccRecord(int grpId, long pageId, byte[] payload) { + super(grpId, pageId, payload); + } + + /** {@inheritDoc} */ + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + assert payload() != null; + + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); + + io.addRow(pageAddr, payload(), pageMem.realPageSize(groupId()), true); + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.MVCC_DATA_PAGE_INSERT_RECORD; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DataPageInsertMvccRecord.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java index 9b0637dae89da..79d70354e26d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java @@ -58,7 +58,7 @@ public byte[] payload() { AbstractDataPageIO io = PageIO.getPageIO(pageAddr); - io.addRow(pageAddr, payload, pageMem.realPageSize(groupId())); + io.addRow(pageAddr, payload, pageMem.realPageSize(groupId()), false); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index b5f9dfc49faaa..324227f5319b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4579,8 +4579,12 @@ private CacheEntryImplEx wrapVersionedWithValue() { // Nullify value after swap. value(null); - if (evictOffheap) - removeValue(); + if (evictOffheap) { + if (cctx.mvccEnabled()) + cctx.offheap().mvccRemoveAll(this); + else + removeValue(); + } marked = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index 85be9d259df3d..15a27ea7a9017 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -626,6 +626,11 @@ public boolean isReady() { return 0; } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return false; + } + /** {@inheritDoc} */ @Override public long mvccCoordinatorVersion() { return MVCC_CRD_COUNTER_NA; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index f78428d055e52..ad1f97e9a6643 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -1327,6 +1327,11 @@ private DataEntryRow(DataEntry entry) { throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return false; + } + /** {@inheritDoc} */ @Override public int hash() { return entry.key().hashCode(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java index 133f0a10afd06..a317b2d6981c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java @@ -49,4 +49,9 @@ public interface Storable { * which is entirely available on the very first page followed by the row link. */ public int headerSize(); + + /** + * @return {@code True} if this is MVCC data. + */ + public boolean mvcc(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java index 5142c59136de5..302bcdbc57777 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.util.typedef.internal.U; @@ -100,7 +101,9 @@ final boolean evictDataPage(int pageIdx) throws IgniteCheckedException { rowsToEvict = io.forAllItems(pageAddr, new DataPageIO.CC() { @Override public CacheDataRowAdapter apply(long link) throws IgniteCheckedException { - CacheDataRowAdapter row = new CacheDataRowAdapter(link); + boolean mvccRow = io.isMvccItem(pageAddr, PageIdUtils.itemId(link), pageMem.pageSize()); + + CacheDataRowAdapter row = mvccRow ? new MvccDataRow(link) : new CacheDataRowAdapter(link); row.initFromLink(null, sharedCtx, pageMem, CacheDataRowAdapter.RowData.KEY_ONLY); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index 60aefb927ce6f..6cf93f0173af9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -24,7 +24,9 @@ import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentMvccRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertMvccRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; @@ -101,6 +103,8 @@ private final class UpdateRowHandler extends PageHandler { int itemId, IoStatisticsHolder statHolder) throws IgniteCheckedException { + assert !row.mvcc(); // Mvcc inserts new row instead of updating an old one. + AbstractDataPageIO io = (AbstractDataPageIO)iox; int rowSize = row.size(); @@ -193,7 +197,7 @@ private int addRow( T row, int rowSize ) throws IgniteCheckedException { - io.addRow(pageId, pageAddr, row, rowSize, pageSize()); + io.addRow(pageId, pageAddr, row, rowSize, pageSize(), row.mvcc()); if (needWalDeltaRecord(pageId, page, null)) { // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data. @@ -205,10 +209,11 @@ private int addRow( PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize); - wal.log(new DataPageInsertRecord( - grpId, - pageId, - payload)); + DataPageInsertRecord rec = row.mvcc() ? + new DataPageInsertMvccRecord(grpId, pageId, payload) : + new DataPageInsertRecord(grpId, pageId, payload); + + wal.log(rec); } return rowSize; @@ -237,7 +242,7 @@ private int addRowFragment( // Read last link before the fragment write, because it will be updated there. long lastLink = row.link(); - int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize()); + int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize(), row.mvcc()); assert payloadSize > 0 : payloadSize; @@ -249,7 +254,11 @@ private int addRowFragment( PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize); - wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink)); + DataPageInsertFragmentRecord rec = row.mvcc() ? + new DataPageInsertFragmentMvccRecord(grpId, pageId, payload, lastLink) : + new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink); + + wal.log(rec); } return written + payloadSize; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java index 2d7b0a66e47a4..8342e30668aed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java @@ -88,6 +88,11 @@ public long link() { return link; } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return false; + } + /** * @return Value. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java index 6176eeb0bd281..e6e44e13ba84e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java @@ -79,6 +79,9 @@ public abstract class AbstractDataPageIO extends PageIO impl /** */ private static final int FRAGMENTED_FLAG = 0b10000000_00000000; + /** */ + protected static final int MVCC_FLAG = 0b01000000_00000000; + /** */ public static final int MIN_DATA_PAGE_OVERHEAD = ITEMS_OFF + ITEM_SIZE + PAYLOAD_LEN_SIZE + LINK_SIZE; @@ -125,6 +128,18 @@ public long getFreeListPageId(long pageAddr) { return PageUtils.getLong(pageAddr, FREE_LIST_PAGE_ID_OFF); } + /** + * @param pageAddr Page address. + * @param itemId Item id. + * @param pageSize Page size. + * @return {@code True} if entry with the given index is an MVCC entry. + */ + public boolean isMvccItem(long pageAddr, int itemId, int pageSize) { + int dataOff = getDataOffset(pageAddr, itemId, pageSize); + + return (PageUtils.getShort(pageAddr, dataOff) & MVCC_FLAG) != 0; + } + /** * @param pageAddr Page address. * @param dataOff Data offset. @@ -134,6 +149,8 @@ public long getFreeListPageId(long pageAddr) { private int getPageEntrySize(long pageAddr, int dataOff, int show) { int payloadLen = PageUtils.getShort(pageAddr, dataOff) & 0xFFFF; + payloadLen &= ~MVCC_FLAG; // Clear MVCC flag. + if ((payloadLen & FRAGMENTED_FLAG) != 0) payloadLen &= ~FRAGMENTED_FLAG; // We are fragmented and have a link. else @@ -675,7 +692,7 @@ public boolean updateRow( if (row != null) writeRowData(pageAddr, dataOff, rowSize, row, false); else - writeRowData(pageAddr, dataOff, payload); + writeRowData(pageAddr, dataOff, payload, false); return true; } @@ -789,10 +806,12 @@ private boolean isEnoughSpace(int newEntryFullSize, int firstEntryOff, int direc /** * Adds row to this data page and sets respective link to the given row object. * + * @param pageId Page id. * @param pageAddr Page address. * @param row Data row. * @param rowSize Row size. * @param pageSize Page size. + * @param mvcc {@code True} if this is a MVCC record. * @throws IgniteCheckedException If failed. */ public void addRow( @@ -800,7 +819,8 @@ public void addRow( final long pageAddr, T row, final int rowSize, - final int pageSize + final int pageSize, + boolean mvcc ) throws IgniteCheckedException { assert rowSize <= getFreeSpace(pageAddr) : "can't call addRow if not enough space for the whole row"; @@ -824,13 +844,15 @@ public void addRow( * @param pageAddr Page address. * @param payload Payload. * @param pageSize Page size. + * @param mvcc {@code True} if this is a MVCC record. * @return Item ID. * @throws IgniteCheckedException If failed. */ public int addRow( long pageAddr, byte[] payload, - int pageSize + int pageSize, + boolean mvcc ) throws IgniteCheckedException { assert payload.length <= getFreeSpace(pageAddr) : "can't call addRow if not enough space for the whole row"; @@ -841,7 +863,7 @@ public int addRow( int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize); - writeRowData(pageAddr, dataOff, payload); + writeRowData(pageAddr, dataOff, payload, mvcc); return addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); } @@ -934,6 +956,7 @@ private int getDataOffsetForWrite(long pageAddr, int fullEntrySize, int directCn * @param written Number of bytes of row size that was already written. * @param rowSize Row size. * @param pageSize Page size. + * @param mvcc {@code True} if this is a MVCC record. * @return Written payload size. * @throws IgniteCheckedException If failed. */ @@ -944,9 +967,10 @@ public int addRowFragment( T row, int written, int rowSize, - int pageSize + int pageSize, + boolean mvcc ) throws IgniteCheckedException { - return addRowFragment(pageMem, pageId, pageAddr, written, rowSize, row.link(), row, null, pageSize); + return addRowFragment(pageMem, pageId, pageAddr, written, rowSize, row.link(), row, null, pageSize, mvcc); } /** @@ -957,6 +981,7 @@ public int addRowFragment( * @param payload Payload bytes. * @param lastLink Link to the previous written fragment (link to the tail). * @param pageSize Page size. + * @param mvcc {@code True} if this is a MVCC record. * @throws IgniteCheckedException If failed. */ public void addRowFragment( @@ -964,9 +989,10 @@ public void addRowFragment( long pageAddr, byte[] payload, long lastLink, - int pageSize + int pageSize, + boolean mvcc ) throws IgniteCheckedException { - addRowFragment(null, pageId, pageAddr, 0, 0, lastLink, null, payload, pageSize); + addRowFragment(null, pageId, pageAddr, 0, 0, lastLink, null, payload, pageSize, mvcc); } /** @@ -981,6 +1007,7 @@ public void addRowFragment( * @param row Row. * @param payload Payload bytes. * @param pageSize Page size. + * @param mvcc {@code True} if this is a MVCC record. * @return Written payload size. * @throws IgniteCheckedException If failed. */ @@ -993,7 +1020,8 @@ private int addRowFragment( long lastLink, T row, byte[] payload, - int pageSize + int pageSize, + boolean mvcc ) throws IgniteCheckedException { assert payload == null ^ row == null; @@ -1013,6 +1041,14 @@ private int addRowFragment( payloadSize -= hdrSize - remain; } + short p = (short)(payloadSize | FRAGMENTED_FLAG); + + if (mvcc) { + assert row == null || row.mvcc(); + + p |= MVCC_FLAG; + } + int fullEntrySize = getPageEntrySize(payloadSize, SHOW_PAYLOAD_LEN | SHOW_LINK | SHOW_ITEM); int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize); @@ -1020,9 +1056,6 @@ private int addRowFragment( ByteBuffer buf = pageMem.pageBuffer(pageAddr); buf.position(dataOff); - - short p = (short)(payloadSize | FRAGMENTED_FLAG); - buf.putShort(p); buf.putLong(lastLink); @@ -1031,7 +1064,7 @@ private int addRowFragment( writeFragmentData(row, buf, rowOff, payloadSize); } else { - PageUtils.putShort(pageAddr, dataOff, (short)(payloadSize | FRAGMENTED_FLAG)); + PageUtils.putShort(pageAddr, dataOff, p); PageUtils.putLong(pageAddr, dataOff + 2, lastLink); @@ -1315,9 +1348,15 @@ protected abstract void writeRowData( protected void writeRowData( long pageAddr, int dataOff, - byte[] payload + byte[] payload, + boolean mvcc ) { - PageUtils.putShort(pageAddr, dataOff, (short)payload.length); + short p = (short)payload.length; + + if (mvcc) + p |= MVCC_FLAG; + + PageUtils.putShort(pageAddr, dataOff, p); dataOff += 2; PageUtils.putBytes(pageAddr, dataOff, payload); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java index f9e6bcc0152b1..bf81174a75037 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java @@ -63,10 +63,15 @@ protected DataPageIO(int ver) { long addr = pageAddr + dataOff; int cacheIdSize = row.cacheId() != 0 ? 4 : 0; - int mvccInfoSize = row.mvccCoordinatorVersion() > 0 ? MVCC_INFO_SIZE : 0; + int mvccInfoSize = row.mvcc() ? MVCC_INFO_SIZE : 0; if (newRow) { - PageUtils.putShort(addr, 0, (short)payloadSize); + short p = (short)payloadSize; + + if (row.mvcc()) + p |= MVCC_FLAG; + + PageUtils.putShort(addr, 0, p); addr += 2; if (mvccInfoSize > 0) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java index 65f0aae83b2a1..84a3b4314d1ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/record/RecordTypes.java @@ -34,6 +34,8 @@ public final class RecordTypes { DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_INSERT_FRAGMENT_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_REMOVE_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_INSERT_RECORD); + DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE); DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_MARK_UPDATED_RECORD); DELTA_TYPE_SET.add(WALRecord.RecordType.MVCC_DATA_PAGE_TX_STATE_HINT_UPDATED_RECORD); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java index 84c4074168f61..ca2d16da9a281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java @@ -44,7 +44,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType; import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentMvccRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertMvccRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccMarkUpdatedRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageMvccUpdateNewTxStateHintRecord; @@ -407,6 +409,16 @@ assert record instanceof PageSnapshot; case DATA_PAGE_REMOVE_RECORD: return 4 + 8 + 1; + case MVCC_DATA_PAGE_INSERT_RECORD: + DataPageInsertMvccRecord mvccDiRec = (DataPageInsertMvccRecord)record; + + return 4 + 8 + 2 + mvccDiRec.payload().length; + + case MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD: + final DataPageInsertFragmentMvccRecord mvccDifRec = (DataPageInsertFragmentMvccRecord)record; + + return 4 + 8 + 8 + 4 + mvccDifRec.payloadSize(); + case DATA_PAGE_SET_FREE_LIST_PAGE: return 4 + 8 + 8; @@ -742,6 +754,39 @@ WALRecord readPlainRecord(RecordType type, ByteBufferBackedDataInput in, break; + case MVCC_DATA_PAGE_INSERT_RECORD: { + cacheId = in.readInt(); + pageId = in.readLong(); + + int size = in.readUnsignedShort(); + + in.ensure(size); + + byte[] payload = new byte[size]; + + in.readFully(payload); + + res = new DataPageInsertMvccRecord(cacheId, pageId, payload); + + break; + } + + case MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD: { + cacheId = in.readInt(); + pageId = in.readLong(); + + final long lastLink = in.readLong(); + final int payloadSize = in.readInt(); + + final byte[] payload = new byte[payloadSize]; + + in.readFully(payload); + + res = new DataPageInsertFragmentMvccRecord(cacheId, pageId, payload, lastLink); + + break; + } + case DATA_PAGE_SET_FREE_LIST_PAGE: cacheId = in.readInt(); pageId = in.readLong(); @@ -1320,6 +1365,30 @@ void writePlainRecord(WALRecord rec, ByteBuffer buf) throws IgniteCheckedExcepti break; + case MVCC_DATA_PAGE_INSERT_RECORD: + DataPageInsertMvccRecord mvccDiRec = (DataPageInsertMvccRecord)rec; + + buf.putInt(mvccDiRec.groupId()); + buf.putLong(mvccDiRec.pageId()); + + buf.putShort((short)mvccDiRec.payload().length); + + buf.put(mvccDiRec.payload()); + + break; + + case MVCC_DATA_PAGE_INSERT_FRAGMENT_RECORD: + final DataPageInsertFragmentMvccRecord mvccDifRec = (DataPageInsertFragmentMvccRecord)rec; + + buf.putInt(mvccDifRec.groupId()); + buf.putLong(mvccDifRec.pageId()); + + buf.putLong(mvccDifRec.lastLink()); + buf.putInt(mvccDifRec.payloadSize()); + buf.put(mvccDifRec.payload()); + + break; + case DATA_PAGE_SET_FREE_LIST_PAGE: DataPageSetFreeListPageRecord freeListRec = (DataPageSetFreeListPageRecord)rec; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java index bdd3166841876..7093d6e172d80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/mvcc/data/MvccDataRow.java @@ -284,6 +284,11 @@ protected void keyAbsentBeforeFlag(boolean flag) { return MVCC_INFO_SIZE; } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return true; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MvccDataRow.class, this, "super", super.toString()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMultinodeAbstractTest.java index 49b54dd66fdcd..bd94d1e031456 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMultinodeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMultinodeAbstractTest.java @@ -98,8 +98,6 @@ public void testPageEviction() throws Exception { */ @Test public void testPageEvictionMvcc() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); - for (int i = 0; i < CACHE_MODES.length; i++) { CacheConfiguration cfg = cacheConfig( "evict" + i, null, CACHE_MODES[i], CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java index 4c0d898427422..0225de93fc8f9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java @@ -446,6 +446,11 @@ private TestDataRow(int keySize, int valSize) { this.link = link; } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return false; + } + /** {@inheritDoc} */ @Override public int hash() { throw new UnsupportedOperationException(); From 394855bc1f073219cc18fe8b7a61a0f910abb6cc Mon Sep 17 00:00:00 2001 From: rkondakov Date: Fri, 14 Dec 2018 17:36:42 +0300 Subject: [PATCH 2/5] Mute/unmute tests. --- .../cache/eviction/paged/PageEvictionMetricTest.java | 2 -- .../PageEvictionPagesRecyclingAndReusingTest.java | 10 +++++----- .../eviction/paged/PageEvictionTouchOrderTest.java | 2 -- .../paged/PageEvictionWithRebalanceAbstractTest.java | 2 -- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java index 9af960fbc4493..0c340f3d681a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionMetricTest.java @@ -57,8 +57,6 @@ public void testPageEvictionMetric() throws Exception { */ @Test public void testPageEvictionMetricMvcc() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); - checkPageEvictionMetric(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java index 51c6405f40bf7..279ff53c5d5cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionPagesRecyclingAndReusingTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -94,9 +95,8 @@ public void testPagesRecyclingAndReusingTxLocal() throws Exception { * @throws Exception If failed. */ @Test + @Ignore("https://issues.apache.org/jira/browse/IGNITE-10694") public void testPagesRecyclingAndReusingMvccTxPartitioned() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); - testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED); } @@ -105,9 +105,8 @@ public void testPagesRecyclingAndReusingMvccTxPartitioned() throws Exception { * @throws Exception If failed. */ @Test + @Ignore("https://issues.apache.org/jira/browse/IGNITE-10694") public void testPagesRecyclingAndReusingMvccTxReplicated() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); - testPagesRecyclingAndReusing(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED); } @@ -144,7 +143,8 @@ private void testPagesRecyclingAndReusing(CacheAtomicityMode atomicityMode, Cach long recycledPagesCnt2 = reuseList.recycledPagesCount(); - assert recycledPagesCnt1 == recycledPagesCnt2 : "Possible recycled pages leak!"; + assert recycledPagesCnt1 == recycledPagesCnt2 : "Possible recycled pages leak! cnt1=" + recycledPagesCnt1 + + ", cnt2=" + recycledPagesCnt2; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java index 59561b89056a1..5530786f7e6ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionTouchOrderTest.java @@ -83,7 +83,6 @@ public void testTouchOrderWithFairFifoEvictionTxLocal() throws Exception { */ @Test public void testTouchOrderWithFairFifoEvictionMvccTxReplicated() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); fail("https://issues.apache.org/jira/browse/IGNITE-7956"); testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.REPLICATED); @@ -94,7 +93,6 @@ public void testTouchOrderWithFairFifoEvictionMvccTxReplicated() throws Exceptio */ @Test public void testTouchOrderWithFairFifoEvictionMvccTxPartitioned() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); fail("https://issues.apache.org/jira/browse/IGNITE-7956"); testTouchOrderWithFairFifoEviction(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, CacheMode.PARTITIONED); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java index f52af6f1cf187..66f5af4737df3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/paged/PageEvictionWithRebalanceAbstractTest.java @@ -45,8 +45,6 @@ public void testEvictionWithRebalance() throws Exception { */ @Test public void testEvictionWithRebalanceMvcc() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-10448"); - checkEvictionWithRebalance(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); } From e4c5741baf6b1eed1c7f25d2522e23805929ab4c Mon Sep 17 00:00:00 2001 From: rkondakov Date: Sun, 16 Dec 2018 17:18:58 +0300 Subject: [PATCH 3/5] Compilation fix. --- .../processors/compress/CompressionProcessorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java index f660426540bc9..058c3b0f420d2 100644 --- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/CompressionProcessorTest.java @@ -844,7 +844,7 @@ private void doTestDataPage() throws IgniteCheckedException { if (io.getFreeSpace(pageAddr) < row.length) break; - itemIds.add(io.addRow(pageAddr, row, pageSize)); + itemIds.add(io.addRow(pageAddr, row, pageSize, false)); } int freeSpace = io.getFreeSpace(pageAddr); @@ -853,7 +853,7 @@ private void doTestDataPage() throws IgniteCheckedException { byte[] lastRow = new byte[freeSpace]; rnd.nextBytes(lastRow); - io.addRowFragment(pageId, pageAddr, lastRow, 777L, pageSize); + io.addRowFragment(pageId, pageAddr, lastRow, 777L, pageSize, false); assertEquals(0, io.getRealFreeSpace(pageAddr)); } From 3e91384c105e17d8e935e5321f898732d9362b14 Mon Sep 17 00:00:00 2001 From: rkondakov Date: Sun, 16 Dec 2018 18:28:18 +0300 Subject: [PATCH 4/5] Compilation fix. --- .../ignite/internal/processors/query/h2/opt/GridH2Row.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java index 7fb896f0ee92e..181f33426032c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java @@ -76,6 +76,11 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD row.link(link); } + /** {@inheritDoc} */ + @Override public boolean mvcc() { + return row.mvcc(); + } + /** {@inheritDoc} */ @Override public int hash() { return row.hash(); From c6b20867fadf4d0b31db3df8ea7ab5bd8be25b7f Mon Sep 17 00:00:00 2001 From: rkondakov Date: Wed, 19 Dec 2018 12:12:44 +0300 Subject: [PATCH 5/5] Minors. --- .../cache/persistence/freelist/AbstractFreeList.java | 2 +- .../cache/persistence/tree/io/AbstractDataPageIO.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index 6cf93f0173af9..3618359bfa9b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -197,7 +197,7 @@ private int addRow( T row, int rowSize ) throws IgniteCheckedException { - io.addRow(pageId, pageAddr, row, rowSize, pageSize(), row.mvcc()); + io.addRow(pageId, pageAddr, row, rowSize, pageSize()); if (needWalDeltaRecord(pageId, page, null)) { // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java index e6e44e13ba84e..25774d684bf08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/AbstractDataPageIO.java @@ -811,7 +811,6 @@ private boolean isEnoughSpace(int newEntryFullSize, int firstEntryOff, int direc * @param row Data row. * @param rowSize Row size. * @param pageSize Page size. - * @param mvcc {@code True} if this is a MVCC record. * @throws IgniteCheckedException If failed. */ public void addRow( @@ -819,8 +818,7 @@ public void addRow( final long pageAddr, T row, final int rowSize, - final int pageSize, - boolean mvcc + final int pageSize ) throws IgniteCheckedException { assert rowSize <= getFreeSpace(pageAddr) : "can't call addRow if not enough space for the whole row"; @@ -840,6 +838,7 @@ public void addRow( /** * Adds row to this data page and sets respective link to the given row object. + * This method is used for the recovery from WAL delta records. * * @param pageAddr Page address. * @param payload Payload.