Skip to content

Commit

Permalink
ignite-db - rotate page id after reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
S.Vladykin committed Apr 29, 2016
1 parent bbd4e9f commit 86b9fa8
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 316 deletions.
Expand Up @@ -131,8 +131,8 @@ public static long pageId(int fileId, long pageIdx) {
* @param pageId Page id. * @param pageId Page id.
* @return Page ID. * @return Page ID.
*/ */
public static long pageIdx(long pageId) { public static int pageIdx(long pageId) {
return pageId & PAGE_IDX_MASK; return (int)(pageId & PAGE_IDX_MASK); // 30 bytes
} }


/** /**
Expand Down Expand Up @@ -191,7 +191,6 @@ public static long pageId(int partId, byte flag, long pageIdx) {
int fileId = 0; int fileId = 0;


fileId = (fileId << FLAG_SIZE) | (flag & FLAG_MASK); fileId = (fileId << FLAG_SIZE) | (flag & FLAG_MASK);

fileId = (fileId << PART_ID_SIZE) | (partId & PART_ID_MASK); fileId = (fileId << PART_ID_SIZE) | (partId & PART_ID_MASK);


return pageId(fileId, pageIdx); return pageId(fileId, pageIdx);
Expand Down Expand Up @@ -225,4 +224,14 @@ public static long rotatePageId(long pageId) {


return pageId(partId + 1, PageIdAllocator.FLAG_IDX, pageIdx); return pageId(partId + 1, PageIdAllocator.FLAG_IDX, pageIdx);
} }

/**
* @param pageId Page ID.
* @return Page ID with masked partition ID.
*/
public static long maskPartId(long pageId) {
assert flag(pageId) == PageIdAllocator.FLAG_IDX; // Possible only for index pages.

return pageId & ~((long)PART_ID_MASK << PAGE_IDX_SIZE);
}
} }
Expand Up @@ -252,7 +252,7 @@ boolean isAcquired() {
boolean releaseReference() { boolean releaseReference() {
int refs = refCntUpd.decrementAndGet(this); int refs = refCntUpd.decrementAndGet(this);


assert refs >= 0: fullId.pageId(); assert refs >= 0: fullId;


return refs == 0; return refs == 0;
} }
Expand Down
Expand Up @@ -470,7 +470,7 @@ public PageMemoryImpl(
/** /**
* @return Total number of loaded pages in memory. * @return Total number of loaded pages in memory.
*/ */
public long totalPages() { public long loadedPages() {
long total = 0; long total = 0;


for (Segment seg : segments) { for (Segment seg : segments) {
Expand All @@ -487,6 +487,26 @@ public long totalPages() {
return total; return total;
} }


/**
* @return Total number of acquired pages.
*/
public long acquiredPages() {
long total = 0;

for (Segment seg : segments) {
seg.readLock().lock();

try {
total += seg.acquiredPages.size();
}
finally {
seg.readLock().unlock();
}
}

return total;
}

/** /**
* @param ptr Pointer to wrap. * @param ptr Pointer to wrap.
* @param len Memory location length. * @param len Memory location length.
Expand Down
Expand Up @@ -17,22 +17,13 @@


package org.apache.ignite.internal.processors.cache.database; package org.apache.ignite.internal.processors.cache.database;


import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.database.freelist.FreeList; import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;

import static org.apache.ignite.internal.pagemem.PageIdUtils.dwordsOffset;
import static org.apache.ignite.internal.pagemem.PageIdUtils.linkFromDwordOffset;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage;


/** /**
* Data store for H2 rows. * Data store for H2 rows.
Expand All @@ -50,46 +41,6 @@ public class RowStore<T extends CacheDataRow> {
/** */ /** */
protected final CacheObjectContext coctx; protected final CacheObjectContext coctx;


/** */
private volatile long lastDataPageId;

/** */
@Deprecated
private final PageHandler<CacheDataRow> writeRow = new PageHandler<CacheDataRow>() {
@Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int ignore) throws IgniteCheckedException {
int entrySize = DataPageIO.getEntrySize(coctx, row.key(), row.value());

DataPageIO io = DataPageIO.VERSIONS.forPage(buf);

if (io.isEnoughSpace(buf, entrySize))
return -1;

int idx = io.addRow(coctx, buf, row.key(), row.value(), row.version(), entrySize);

assert idx >= 0: idx;

row.link(linkFromDwordOffset(page.id(), idx));

assert row.link() != 0;

return idx;
}
};

/** */
@Deprecated
private final PageHandler<Void> rmvRow = new PageHandler<Void>() {
@Override public int run(Page page, ByteBuffer buf, Void ignore, int itemId) throws IgniteCheckedException {
DataPageIO io = DataPageIO.VERSIONS.forPage(buf);

assert DataPageIO.check(itemId): itemId;

io.removeRow(buf, (byte)itemId);

return 0;
}
};

/** /**
* @param cctx Cache context. * @param cctx Cache context.
*/ */
Expand All @@ -113,84 +64,20 @@ protected final Page page(long pageId) throws IgniteCheckedException {
return pageMem.page(new FullPageId(pageId, cctx.cacheId())); return pageMem.page(new FullPageId(pageId, cctx.cacheId()));
} }


/**
* @param part Partition.
* @return Allocated page.
* @throws IgniteCheckedException if failed.
*/
private Page allocatePage(int part) throws IgniteCheckedException {
FullPageId fullPageId = pageMem.allocatePage(cctx.cacheId(), part, PageIdAllocator.FLAG_DATA);

return pageMem.page(fullPageId);
}

/** /**
* @param link Row link. * @param link Row link.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void removeRow(long link) throws IgniteCheckedException { public void removeRow(long link) throws IgniteCheckedException {
assert link != 0; assert link != 0;


if (freeList == null) { freeList.removeRow(link);
try (Page page = page(pageId(link))) {
writePage(page, rmvRow, null, dwordsOffset(link));
}
}
else
freeList.removeRow(link);
}

/**
* @param expLastDataPageId Expected last data page ID.
* @return Next data page ID.
*/
private synchronized long nextDataPage(long expLastDataPageId, int partId) throws IgniteCheckedException {
if (expLastDataPageId != lastDataPageId)
return lastDataPageId;

long pageId;

try (Page page = allocatePage(partId)) {
pageId = page.id();

ByteBuffer buf = page.getForInitialWrite();

DataPageIO.VERSIONS.latest().initNewPage(buf, page.id());
}

return lastDataPageId = pageId;
} }


/** /**
* @param row Row. * @param row Row.
*/ */
public void addRow(CacheDataRow row) throws IgniteCheckedException { public void addRow(CacheDataRow row) throws IgniteCheckedException {
if (freeList == null) freeList.insertRow(row);
writeRowDataOld(row);
else
freeList.insertRow(row);
}

/**
* @param row Row.
* @throws IgniteCheckedException If failed.
*/
@Deprecated
private void writeRowDataOld(CacheDataRow row) throws IgniteCheckedException {
assert row.link() == 0;

while (row.link() == 0) {
long pageId = lastDataPageId;

if (pageId == 0)
pageId = nextDataPage(0, row.partition());

try (Page page = page(pageId)) {
if (writePage(page, writeRow, row, -1) >= 0)
return; // Successful write.
}

nextDataPage(pageId, row.partition());
}
} }
} }
Expand Up @@ -53,28 +53,28 @@ public class FreeList {


/** */ /** */
private final PageHandler<CacheDataRow> writeRow = new PageHandler<CacheDataRow>() { private final PageHandler<CacheDataRow> writeRow = new PageHandler<CacheDataRow>() {
@Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int entrySize) @Override public int run(long pageId, Page page, ByteBuffer buf, CacheDataRow row, int entrySize)
throws IgniteCheckedException { throws IgniteCheckedException {
DataPageIO io = DataPageIO.VERSIONS.forPage(buf); DataPageIO io = DataPageIO.VERSIONS.forPage(buf);


int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key(), row.value(), row.version(), entrySize); int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key(), row.value(), row.version(), entrySize);


assert idx >= 0; assert idx >= 0;


row.link(PageIdUtils.linkFromDwordOffset(page.id(), idx)); row.link(PageIdUtils.linkFromDwordOffset(pageId, idx));


int freeSpace = io.getFreeSpace(buf); int freeSpace = io.getFreeSpace(buf);


// Put our free item. // Put our free item.
tree(row.partition()).put(new FreeItem(freeSpace, page.id(), cctx.cacheId())); tree(row.partition()).put(new FreeItem(freeSpace, pageId, cctx.cacheId()));


return 0; return 0;
} }
}; };


/** */ /** */
private final PageHandler<FreeTree> removeRow = new PageHandler<FreeTree>() { private final PageHandler<FreeTree> removeRow = new PageHandler<FreeTree>() {
@Override public int run(Page page, ByteBuffer buf, FreeTree tree, int itemId) throws IgniteCheckedException { @Override public int run(long pageId, Page page, ByteBuffer buf, FreeTree tree, int itemId) throws IgniteCheckedException {
assert tree != null; assert tree != null;


DataPageIO io = DataPageIO.VERSIONS.forPage(buf); DataPageIO io = DataPageIO.VERSIONS.forPage(buf);
Expand All @@ -88,13 +88,13 @@ public class FreeList {
int newFreeSpace = io.getFreeSpace(buf); int newFreeSpace = io.getFreeSpace(buf);


// Move page to the new position with respect to the new free space. // Move page to the new position with respect to the new free space.
FreeItem item = tree.remove(new FreeItem(oldFreeSpace, page.id(), cctx.cacheId())); FreeItem item = tree.remove(new FreeItem(oldFreeSpace, pageId, cctx.cacheId()));


// If item is null, then it was removed concurrently by insertRow, because // If item is null, then it was removed concurrently by insertRow, because
// in removeRow we own the write lock on this page. Thus we can be sure that // in removeRow we own the write lock on this page. Thus we can be sure that
// insertRow will update position correctly after us. // insertRow will update position correctly after us.
if (item != null) { if (item != null) {
FreeItem old = tree.put(new FreeItem(newFreeSpace, page.id(), cctx.cacheId())); FreeItem old = tree.put(new FreeItem(newFreeSpace, pageId, cctx.cacheId()));


assert old == null; assert old == null;
} }
Expand Down Expand Up @@ -176,7 +176,7 @@ public void removeRow(long link) throws IgniteCheckedException {
FreeTree tree = tree(partId); FreeTree tree = tree(partId);


try (Page page = pageMem.page(new FullPageId(pageId, cctx.cacheId()))) { try (Page page = pageMem.page(new FullPageId(pageId, cctx.cacheId()))) {
writePage(page, removeRow, tree, itemId); writePage(pageId, page, removeRow, tree, itemId);
} }
} }


Expand Down Expand Up @@ -207,10 +207,10 @@ public void insertRow(CacheDataRow row) throws IgniteCheckedException {


io.initNewPage(buf, page.id()); io.initNewPage(buf, page.id());


writeRow.run(page, buf, row, entrySize); writeRow.run(page.id(), page, buf, row, entrySize);
} }
else else
writePage(page, writeRow, row, entrySize); writePage(page.id(), page, writeRow, row, entrySize);
} }
} }


Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeIO; import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeIO;
import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeInnerIO; import org.apache.ignite.internal.processors.cache.database.freelist.io.FreeInnerIO;
Expand Down Expand Up @@ -68,7 +69,7 @@ public int getPartId() {
int res = Short.compare(((FreeIO)io).getFreeSpace(buf, idx), row.freeSpace()); int res = Short.compare(((FreeIO)io).getFreeSpace(buf, idx), row.freeSpace());


if (res == 0) if (res == 0)
res = Integer.compare(((FreeIO)io).getPageIndex(buf, idx), FreeInnerIO.pageIndex(row.pageId())); res = Integer.compare(((FreeIO)io).getPageIndex(buf, idx), PageIdUtils.pageIdx(row.pageId()));


return res; return res;
} }
Expand Down
Expand Up @@ -26,24 +26,12 @@ protected FreeInnerIO(int ver) {
super(T_FREE_INNER, ver, false, 6); // freeSpace(2) + pageIndex(4) super(T_FREE_INNER, ver, false, 6); // freeSpace(2) + pageIndex(4)
} }


/**
* @param pageId Page Id.
* @return Page index.
*/
public static int pageIndex(long pageId) {
long idx = PageIdUtils.pageIdx(pageId);

assert idx >= 0 && idx < Integer.MAX_VALUE: idx;

return (int)idx;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void store(ByteBuffer buf, int idx, FreeItem row) { @Override public void store(ByteBuffer buf, int idx, FreeItem row) {
int off = offset(idx); int off = offset(idx);


buf.putShort(off, row.freeSpace()); buf.putShort(off, row.freeSpace());
buf.putInt(off + 2, pageIndex(row.pageId())); buf.putInt(off + 2, PageIdUtils.pageIdx(row.pageId()));
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -49,7 +49,7 @@ protected FreeLeafIO(int ver) {
int off = offset(idx); int off = offset(idx);


buf.putShort(off, row.freeSpace()); buf.putShort(off, row.freeSpace());
buf.putInt(off + 2, FreeInnerIO.pageIndex(row.pageId())); buf.putInt(off + 2, PageIdUtils.pageIdx(row.pageId()));
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down

0 comments on commit 86b9fa8

Please sign in to comment.