diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java new file mode 100644 index 0000000000000..a4cbd32e5b747 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database; + +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** + * + */ +public interface CacheDataRow { + public CacheObject key(); + + public CacheObject value(); + + public GridCacheVersion version(); + + public int partition(); + + public long link(); + + public void link(long link); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java new file mode 100644 index 0000000000000..50bf490b73ae6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +import static org.apache.ignite.internal.pagemem.PageIdUtils.dwordsOffset; +import static org.apache.ignite.internal.pagemem.PageIdUtils.linkFromDwordOffset; +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; +import static org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage; + +/** + * Data store for H2 rows. + */ +public class RowStore { + /** */ + private final FreeList freeList; + + /** */ + private final PageMemory pageMem; + + /** */ + private final GridCacheContext cctx; + + /** */ + private final CacheObjectContext coctx; + + /** */ + private volatile long lastDataPageId; + + /** */ + private final RowFactory rowFactory; + + /** */ + private final PageHandler writeRow = new PageHandler() { + @Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int ignore) throws IgniteCheckedException { + int entrySize = DataPageIO.getEntrySize(coctx, row.key(), row.value()); + + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + int idx = io.addRow(coctx, buf, row.key(), row.value(), row.version(), entrySize); + + if (idx != -1) { + row.link(linkFromDwordOffset(page.id(), idx)); + + assert row.link() != 0; + } + + return idx; + } + }; + + /** */ + private final PageHandler rmvRow = new PageHandler() { + @Override public int run(Page page, ByteBuffer buf, Void ignore, int itemId) throws IgniteCheckedException { + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + assert DataPageIO.check(itemId): itemId; + + io.removeRow(buf, (byte)itemId); + + return 0; + } + }; + + /** + * @param cctx Cache context. + */ + public RowStore(GridCacheContext cctx, RowFactory rowFactory, FreeList freeList) { + assert rowFactory != null; + assert cctx != null; + + this.cctx = cctx; + this.freeList = freeList; + this.rowFactory = rowFactory; + + coctx = cctx.cacheObjectContext(); + pageMem = cctx.shared().database().pageMemory(); + } + + /** + * @param pageId Page ID. + * @return Page. + * @throws IgniteCheckedException If failed. + */ + private Page page(long pageId) throws IgniteCheckedException { + return pageMem.page(new FullPageId(pageId, cctx.cacheId())); + } + + /** + * @param part Partition. + * @return Allocated page. + * @throws IgniteCheckedException if failed. + */ + private Page allocatePage(int part) throws IgniteCheckedException { + FullPageId fullPageId = pageMem.allocatePage(cctx.cacheId(), part, PageIdAllocator.FLAG_DATA); + + return pageMem.page(fullPageId); + } + + /** + * @param link Row link. + * @throws IgniteCheckedException If failed. + */ + public void removeRow(long link) throws IgniteCheckedException { + assert link != 0; + + try (Page page = page(pageId(link))) { + writePage(page, rmvRow, null, dwordsOffset(link), 0); + } + } + + /** + * !!! This method must be invoked in read or write lock of referring index page. It is needed to + * !!! make sure that row at this link will be invisible, when the link will be removed from + * !!! from all the index pages, so that row can be safely erased from the data page. + * + * @param link Link. + * @return Row. + * @throws IgniteCheckedException If failed. + */ + public T getRow(long link) throws IgniteCheckedException { + try (Page page = page(pageId(link))) { + ByteBuffer buf = page.getForRead(); + + try { + T existing = rowFactory.cachedRow(link); + + if (existing != null) + return existing; + + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + int dataOff = io.getDataOffset(buf, dwordsOffset(link)); + + buf.position(dataOff); + + // Skip entry size. + buf.getShort(); + + CacheObject key = coctx.processor().toCacheObject(coctx, buf); + CacheObject val = coctx.processor().toCacheObject(coctx, buf); + + int topVer = buf.getInt(); + int nodeOrderDrId = buf.getInt(); + long globalTime = buf.getLong(); + long order = buf.getLong(); + + GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); + + T row; + + try { + row = rowFactory.createRow(key, val, ver, link, 0); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + assert row.version() != null : row; + + return row; + } + finally { + page.releaseRead(); + } + } + } + + /** + * @param expLastDataPageId Expected last data page ID. + * @return Next data page ID. + */ + private synchronized long nextDataPage(long expLastDataPageId, int partId) throws IgniteCheckedException { + if (expLastDataPageId != lastDataPageId) + return lastDataPageId; + + long pageId; + + try (Page page = allocatePage(partId)) { + pageId = page.id(); + + ByteBuffer buf = page.getForInitialWrite(); + + DataPageIO.VERSIONS.latest().initNewPage(buf, page.id()); + } + + return lastDataPageId = pageId; + } + + /** + * @param row Row. + */ + public void addRow(CacheDataRow row) throws IgniteCheckedException { + if (freeList == null) + writeRowData0(row); + else + freeList.writeRowData(row); + } + + /** + * @param row Row. + * @throws IgniteCheckedException If failed. + */ + private void writeRowData0(CacheDataRow row) throws IgniteCheckedException { + assert row.link() == 0; + + while (row.link() == 0) { + long pageId = lastDataPageId; + + if (pageId == 0) + pageId = nextDataPage(0, row.partition()); + + try (Page page = page(pageId)) { + if (writePage(page, writeRow, row, -1, -1) >= 0) + return; // Successful write. + } + + nextDataPage(pageId, row.partition()); + } + } + + /** + * + */ + protected interface RowFactory { + /** + * @param link Link. + * @return Row. + */ + T cachedRow(long link); + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + * @param link Link. + * @param expirationTime Expiration time. + * @return Row. + * @throws IgniteCheckedException If failed. + */ + T createRow(CacheObject key, CacheObject val, GridCacheVersion ver, long link, long expirationTime) + throws IgniteCheckedException; + } +} diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeItem.java similarity index 96% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeItem.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeItem.java index 71bb32e7fc4ad..7de30c73007dc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeItem.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeItem.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.database.freelist; +package org.apache.ignite.internal.processors.cache.database.freelist; import org.apache.ignite.internal.pagemem.FullPageId; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java similarity index 90% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java index 43b0dcc3a8069..0f94bb13d99aa 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.database.freelist; +package org.apache.ignite.internal.processors.cache.database.freelist; import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; @@ -25,9 +25,9 @@ import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; -import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.lang.IgniteBiTuple; import org.jsr166.ConcurrentHashMap8; @@ -48,12 +48,12 @@ public class FreeList { private final ConcurrentHashMap8> trees = new ConcurrentHashMap8<>(); /** */ - private final PageHandler writeRow = new PageHandler() { - @Override public int run(Page page, ByteBuffer buf, GridH2Row row, int entrySize) + private final PageHandler writeRow = new PageHandler() { + @Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int entrySize) throws IgniteCheckedException { DataPageIO io = DataPageIO.VERSIONS.forPage(buf); - int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key, row.val, row.ver, entrySize); + int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key(), row.value(), row.version(), entrySize); assert idx >= 0; @@ -130,14 +130,14 @@ private FreeTree tree(Integer part) throws IgniteCheckedException { * @param row Row. * @throws IgniteCheckedException If failed. */ - public void writeRowData(GridH2Row row) throws IgniteCheckedException { - assert row.link == 0; + public void writeRowData(CacheDataRow row) throws IgniteCheckedException { + // assert row.link == 0; - int entrySize = DataPageIO.getEntrySize(cctx.cacheObjectContext(), row.key, row.val); + int entrySize = DataPageIO.getEntrySize(cctx.cacheObjectContext(), row.key(), row.value()); assert entrySize > 0 && entrySize < Short.MAX_VALUE: entrySize; - FreeTree tree = tree(row.partId); + FreeTree tree = tree(row.partition()); FreeItem item = take(tree, (short)entrySize); Page page = null; @@ -147,7 +147,7 @@ public void writeRowData(GridH2Row row) throws IgniteCheckedException { if (item == null) { DataPageIO io = DataPageIO.VERSIONS.latest(); - page = allocatePage(row.partId); + page = allocatePage(row.partition()); ByteBuffer buf = page.getForInitialWrite(); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeTree.java similarity index 91% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeTree.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeTree.java index 6d61bd7f9c335..31f9a1dcd4dd4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeTree.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.database.freelist; +package org.apache.ignite.internal.processors.cache.database.freelist; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; @@ -26,9 +26,9 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; -import org.apache.ignite.internal.processors.query.h2.database.freelist.io.FreeIO; -import org.apache.ignite.internal.processors.query.h2.database.freelist.io.FreeInnerIO; -import org.apache.ignite.internal.processors.query.h2.database.freelist.io.FreeLeafIO; +import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeIO; +import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeInnerIO; +import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeLeafIO; /** * Data structure for data pages and their free spaces. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeIO.java similarity index 94% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeIO.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeIO.java index 5af351c40fa5f..de28b228410f5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeIO.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.database.freelist.io; +package org.apache.ignite.internal.processors.cache.database.freelist.io; import java.nio.ByteBuffer; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeInnerIO.java similarity index 92% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeInnerIO.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeInnerIO.java index b8a3872fe9df1..bfaaaa2b79cd7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeInnerIO.java @@ -1,7 +1,7 @@ -package org.apache.ignite.internal.processors.query.h2.database.freelist.io; +package org.apache.ignite.internal.processors.cache.database.freelist.io; import java.nio.ByteBuffer; -import org.apache.ignite.internal.processors.query.h2.database.freelist.FreeItem; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeItem; import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO; diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeLeafIO.java similarity index 93% rename from modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeLeafIO.java rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeLeafIO.java index ee71f18f1120b..549689768da97 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/io/FreeLeafIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/FreeLeafIO.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.query.h2.database.freelist.io; +package org.apache.ignite.internal.processors.cache.database.freelist.io; import java.nio.ByteBuffer; import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; -import org.apache.ignite.internal.processors.query.h2.database.freelist.FreeItem; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeItem; /** * Routines for free list leaf pages. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java index f545c247028f7..24952372693fd 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java @@ -17,242 +17,62 @@ package org.apache.ignite.internal.processors.query.h2.database; -import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.Page; -import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; -import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.query.h2.database.freelist.FreeList; -import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; -import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; +import org.apache.ignite.internal.processors.cache.database.RowStore; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; -import static org.apache.ignite.internal.pagemem.PageIdUtils.dwordsOffset; -import static org.apache.ignite.internal.pagemem.PageIdUtils.linkFromDwordOffset; -import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; -import static org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage; - /** * Data store for H2 rows. */ -public class H2RowStore { - /** */ - private final FreeList freeList; - - /** */ - private final PageMemory pageMem; - - /** */ - private final GridH2RowDescriptor rowDesc; - - /** */ - private final GridCacheContext cctx; - - /** */ - private final CacheObjectContext coctx; - - /** */ - private volatile long lastDataPageId; - - /** */ - private final PageHandler writeRow = new PageHandler() { - @Override public int run(Page page, ByteBuffer buf, GridH2Row row, int ignore) throws IgniteCheckedException { - int entrySize = DataPageIO.getEntrySize(coctx, row.key, row.val); - - DataPageIO io = DataPageIO.VERSIONS.forPage(buf); - - int idx = io.addRow(coctx, buf, row.key, row.val, row.ver, entrySize); - - if (idx != -1) { - row.link = linkFromDwordOffset(page.id(), idx); - - assert row.link != 0; - } - - return idx; - } - }; - - /** */ - private final PageHandler removeRow = new PageHandler() { - @Override public int run(Page page, ByteBuffer buf, Void ignore, int itemId) throws IgniteCheckedException { - DataPageIO io = DataPageIO.VERSIONS.forPage(buf); - - assert DataPageIO.check(itemId): itemId; - - io.removeRow(buf, (byte)itemId); - - return 0; - } - }; - +public class H2RowStore extends RowStore { /** * @param rowDesc Row descriptor. * @param cctx Cache context. + * @param freeList Free list. */ public H2RowStore(GridH2RowDescriptor rowDesc, GridCacheContext cctx, FreeList freeList) { - assert rowDesc != null; - assert cctx != null; - - this.rowDesc = rowDesc; - this.cctx = cctx; - this.freeList = freeList; - - coctx = cctx.cacheObjectContext(); - pageMem = cctx.shared().database().pageMemory(); - } - - /** - * @param pageId Page ID. - * @return Page. - * @throws IgniteCheckedException If failed. - */ - private Page page(long pageId) throws IgniteCheckedException { - return pageMem.page(new FullPageId(pageId, cctx.cacheId())); - } - - /** - * @param part Partition. - * @return Allocated page. - * @throws IgniteCheckedException if failed. - */ - private Page allocatePage(int part) throws IgniteCheckedException { - FullPageId fullPageId = pageMem.allocatePage(cctx.cacheId(), part, PageIdAllocator.FLAG_DATA); - - return pageMem.page(fullPageId); - } - - /** - * @param link Row link. - * @throws IgniteCheckedException If failed. - */ - public void removeRow(long link) throws IgniteCheckedException { - assert link != 0; - - try (Page page = page(pageId(link))) { - writePage(page, removeRow, null, dwordsOffset(link), 0); - } + super(cctx, new H2RowFactory(rowDesc), freeList); } - /** - * !!! This method must be invoked in read or write lock of referring index page. It is needed to - * !!! make sure that row at this link will be invisible, when the link will be removed from - * !!! from all the index pages, so that row can be safely erased from the data page. * - * @param link Link. - * @return Row. - * @throws IgniteCheckedException If failed. */ - public GridH2Row getRow(long link) throws IgniteCheckedException { - try (Page page = page(pageId(link))) { - ByteBuffer buf = page.getForRead(); - - try { - GridH2Row existing = rowDesc.cachedRow(link); - - if (existing != null) - return existing; - - DataPageIO io = DataPageIO.VERSIONS.forPage(buf); - - int dataOff = io.getDataOffset(buf, dwordsOffset(link)); - - buf.position(dataOff); - - // Skip entry size. - buf.getShort(); - - CacheObject key = coctx.processor().toCacheObject(coctx, buf); - CacheObject val = coctx.processor().toCacheObject(coctx, buf); - - int topVer = buf.getInt(); - int nodeOrderDrId = buf.getInt(); - long globalTime = buf.getLong(); - long order = buf.getLong(); - - GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); - - GridH2Row row; - - try { - row = rowDesc.createRow(key, PageIdUtils.partId(link), val, ver, 0); - - row.link = link; - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - - assert row.ver != null; + static class H2RowFactory implements RowFactory { + /** */ + private final GridH2RowDescriptor rowDesc; - rowDesc.cache(row); + /** + * @param rowDesc Row descriptor. + */ + public H2RowFactory(GridH2RowDescriptor rowDesc) { + assert rowDesc != null; - return row; - } - finally { - page.releaseRead(); - } + this.rowDesc = rowDesc; } - } - - /** - * @param expLastDataPageId Expected last data page ID. - * @return Next data page ID. - */ - private synchronized long nextDataPage(long expLastDataPageId, int partId) throws IgniteCheckedException { - if (expLastDataPageId != lastDataPageId) - return lastDataPageId; - long pageId; - - try (Page page = allocatePage(partId)) { - pageId = page.id(); - - ByteBuffer buf = page.getForInitialWrite(); - - DataPageIO.VERSIONS.latest().initNewPage(buf, page.id()); + /** {@inheritDoc} */ + @Override public GridH2Row cachedRow(long link) { + return rowDesc.cachedRow(link); } - return lastDataPageId = pageId; - } - - /** - * @param row Row. - */ - public void addRow(GridH2Row row) throws IgniteCheckedException { - if (freeList == null) - writeRowData0(row); - else - freeList.writeRowData(row); - } - - /** - * @param row Row. - * @throws IgniteCheckedException If failed. - */ - private void writeRowData0(GridH2Row row) throws IgniteCheckedException { - assert row.link == 0; - - while (row.link == 0) { - long pageId = lastDataPageId; + /** {@inheritDoc} */ + @Override public GridH2Row createRow(CacheObject key, + CacheObject val, + GridCacheVersion ver, + long link, + long expirationTime) throws IgniteCheckedException { + GridH2Row row = rowDesc.createRow(key, PageIdUtils.partId(link), val, ver, 0); - if (pageId == 0) - pageId = nextDataPage(0, row.partId); + row.link = link; - try (Page page = page(pageId)) { - if (writePage(page, writeRow, row, -1, -1) >= 0) - return; // Successful write. - } + rowDesc.cache(row); - nextDataPage(pageId, row.partId); + return row; } } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java index 52b9b5fed4a73..a84ad20a55b10 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java @@ -19,13 +19,14 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.h2.result.Row; import org.h2.value.Value; /** * Row with locking support needed for unique key conflicts resolution. */ -public class GridH2Row extends Row implements GridSearchRowPointer { +public class GridH2Row extends Row implements GridSearchRowPointer, CacheDataRow { /** */ public long link; // TODO remove @@ -62,4 +63,34 @@ public GridH2Row(Value... data) { @Override public void decrementRefCount() { throw new IllegalStateException(); } + + /** {@inheritDoc} */ + @Override public CacheObject key() { + return key; + } + + /** {@inheritDoc} */ + @Override public CacheObject value() { + return val; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return ver; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ + @Override public long link() { + return link; + } + + /** {@inheritDoc} */ + @Override public void link(long link) { + this.link = link; + } } \ No newline at end of file