Skip to content

Commit

Permalink
ignite-db-x Made H2RowStore abstract to be reused from core module.
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Apr 19, 2016
1 parent 4282d80 commit 2f7a22c
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 228 deletions.
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.database;

import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

/**
*
*/
public interface CacheDataRow {
public CacheObject key();

public CacheObject value();

public GridCacheVersion version();

public int partition();

public long link();

public void link(long link);
}
@@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.database;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

import static org.apache.ignite.internal.pagemem.PageIdUtils.dwordsOffset;
import static org.apache.ignite.internal.pagemem.PageIdUtils.linkFromDwordOffset;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage;

/**
* Data store for H2 rows.
*/
public class RowStore<T extends CacheDataRow> {
/** */
private final FreeList freeList;

/** */
private final PageMemory pageMem;

/** */
private final GridCacheContext<?,?> cctx;

/** */
private final CacheObjectContext coctx;

/** */
private volatile long lastDataPageId;

/** */
private final RowFactory<T> rowFactory;

/** */
private final PageHandler<CacheDataRow> writeRow = new PageHandler<CacheDataRow>() {
@Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int ignore) throws IgniteCheckedException {
int entrySize = DataPageIO.getEntrySize(coctx, row.key(), row.value());

DataPageIO io = DataPageIO.VERSIONS.forPage(buf);

int idx = io.addRow(coctx, buf, row.key(), row.value(), row.version(), entrySize);

if (idx != -1) {
row.link(linkFromDwordOffset(page.id(), idx));

assert row.link() != 0;
}

return idx;
}
};

/** */
private final PageHandler<Void> rmvRow = new PageHandler<Void>() {
@Override public int run(Page page, ByteBuffer buf, Void ignore, int itemId) throws IgniteCheckedException {
DataPageIO io = DataPageIO.VERSIONS.forPage(buf);

assert DataPageIO.check(itemId): itemId;

io.removeRow(buf, (byte)itemId);

return 0;
}
};

/**
* @param cctx Cache context.
*/
public RowStore(GridCacheContext<?,?> cctx, RowFactory<T> rowFactory, FreeList freeList) {
assert rowFactory != null;
assert cctx != null;

this.cctx = cctx;
this.freeList = freeList;
this.rowFactory = rowFactory;

coctx = cctx.cacheObjectContext();
pageMem = cctx.shared().database().pageMemory();
}

/**
* @param pageId Page ID.
* @return Page.
* @throws IgniteCheckedException If failed.
*/
private Page page(long pageId) throws IgniteCheckedException {
return pageMem.page(new FullPageId(pageId, cctx.cacheId()));
}

/**
* @param part Partition.
* @return Allocated page.
* @throws IgniteCheckedException if failed.
*/
private Page allocatePage(int part) throws IgniteCheckedException {
FullPageId fullPageId = pageMem.allocatePage(cctx.cacheId(), part, PageIdAllocator.FLAG_DATA);

return pageMem.page(fullPageId);
}

/**
* @param link Row link.
* @throws IgniteCheckedException If failed.
*/
public void removeRow(long link) throws IgniteCheckedException {
assert link != 0;

try (Page page = page(pageId(link))) {
writePage(page, rmvRow, null, dwordsOffset(link), 0);
}
}

/**
* !!! This method must be invoked in read or write lock of referring index page. It is needed to
* !!! make sure that row at this link will be invisible, when the link will be removed from
* !!! from all the index pages, so that row can be safely erased from the data page.
*
* @param link Link.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
public T getRow(long link) throws IgniteCheckedException {
try (Page page = page(pageId(link))) {
ByteBuffer buf = page.getForRead();

try {
T existing = rowFactory.cachedRow(link);

if (existing != null)
return existing;

DataPageIO io = DataPageIO.VERSIONS.forPage(buf);

int dataOff = io.getDataOffset(buf, dwordsOffset(link));

buf.position(dataOff);

// Skip entry size.
buf.getShort();

CacheObject key = coctx.processor().toCacheObject(coctx, buf);
CacheObject val = coctx.processor().toCacheObject(coctx, buf);

int topVer = buf.getInt();
int nodeOrderDrId = buf.getInt();
long globalTime = buf.getLong();
long order = buf.getLong();

GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);

T row;

try {
row = rowFactory.createRow(key, val, ver, link, 0);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}

assert row.version() != null : row;

return row;
}
finally {
page.releaseRead();
}
}
}

/**
* @param expLastDataPageId Expected last data page ID.
* @return Next data page ID.
*/
private synchronized long nextDataPage(long expLastDataPageId, int partId) throws IgniteCheckedException {
if (expLastDataPageId != lastDataPageId)
return lastDataPageId;

long pageId;

try (Page page = allocatePage(partId)) {
pageId = page.id();

ByteBuffer buf = page.getForInitialWrite();

DataPageIO.VERSIONS.latest().initNewPage(buf, page.id());
}

return lastDataPageId = pageId;
}

/**
* @param row Row.
*/
public void addRow(CacheDataRow row) throws IgniteCheckedException {
if (freeList == null)
writeRowData0(row);
else
freeList.writeRowData(row);
}

/**
* @param row Row.
* @throws IgniteCheckedException If failed.
*/
private void writeRowData0(CacheDataRow row) throws IgniteCheckedException {
assert row.link() == 0;

while (row.link() == 0) {
long pageId = lastDataPageId;

if (pageId == 0)
pageId = nextDataPage(0, row.partition());

try (Page page = page(pageId)) {
if (writePage(page, writeRow, row, -1, -1) >= 0)
return; // Successful write.
}

nextDataPage(pageId, row.partition());
}
}

/**
*
*/
protected interface RowFactory<T> {
/**
* @param link Link.
* @return Row.
*/
T cachedRow(long link);

/**
* @param key Key.
* @param val Value.
* @param ver Version.
* @param link Link.
* @param expirationTime Expiration time.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
T createRow(CacheObject key, CacheObject val, GridCacheVersion ver, long link, long expirationTime)
throws IgniteCheckedException;
}
}
Expand Up @@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.internal.processors.query.h2.database.freelist; package org.apache.ignite.internal.processors.cache.database.freelist;


import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.FullPageId;


Expand Down
Expand Up @@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.internal.processors.query.h2.database.freelist; package org.apache.ignite.internal.processors.cache.database.freelist;


import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -25,9 +25,9 @@
import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteBiTuple;
import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentHashMap8;
Expand All @@ -48,12 +48,12 @@ public class FreeList {
private final ConcurrentHashMap8<Integer,GridFutureAdapter<FreeTree>> trees = new ConcurrentHashMap8<>(); private final ConcurrentHashMap8<Integer,GridFutureAdapter<FreeTree>> trees = new ConcurrentHashMap8<>();


/** */ /** */
private final PageHandler<GridH2Row> writeRow = new PageHandler<GridH2Row>() { private final PageHandler<CacheDataRow> writeRow = new PageHandler<CacheDataRow>() {
@Override public int run(Page page, ByteBuffer buf, GridH2Row row, int entrySize) @Override public int run(Page page, ByteBuffer buf, CacheDataRow row, int entrySize)
throws IgniteCheckedException { throws IgniteCheckedException {
DataPageIO io = DataPageIO.VERSIONS.forPage(buf); DataPageIO io = DataPageIO.VERSIONS.forPage(buf);


int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key, row.val, row.ver, entrySize); int idx = io.addRow(cctx.cacheObjectContext(), buf, row.key(), row.value(), row.version(), entrySize);


assert idx >= 0; assert idx >= 0;


Expand Down Expand Up @@ -130,14 +130,14 @@ private FreeTree tree(Integer part) throws IgniteCheckedException {
* @param row Row. * @param row Row.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void writeRowData(GridH2Row row) throws IgniteCheckedException { public void writeRowData(CacheDataRow row) throws IgniteCheckedException {
assert row.link == 0; // assert row.link == 0;


int entrySize = DataPageIO.getEntrySize(cctx.cacheObjectContext(), row.key, row.val); int entrySize = DataPageIO.getEntrySize(cctx.cacheObjectContext(), row.key(), row.value());


assert entrySize > 0 && entrySize < Short.MAX_VALUE: entrySize; assert entrySize > 0 && entrySize < Short.MAX_VALUE: entrySize;


FreeTree tree = tree(row.partId); FreeTree tree = tree(row.partition());
FreeItem item = take(tree, (short)entrySize); FreeItem item = take(tree, (short)entrySize);


Page page = null; Page page = null;
Expand All @@ -147,7 +147,7 @@ public void writeRowData(GridH2Row row) throws IgniteCheckedException {
if (item == null) { if (item == null) {
DataPageIO io = DataPageIO.VERSIONS.latest(); DataPageIO io = DataPageIO.VERSIONS.latest();


page = allocatePage(row.partId); page = allocatePage(row.partition());


ByteBuffer buf = page.getForInitialWrite(); ByteBuffer buf = page.getForInitialWrite();


Expand Down

0 comments on commit 2f7a22c

Please sign in to comment.