Skip to content

Commit

Permalink
IGNITE-3477 - Fixing query parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Mar 22, 2017
1 parent 6cc1403 commit aaf23fd
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 36 deletions.
Expand Up @@ -1761,7 +1761,7 @@ private static ClassProperty buildClassProperty(Class<?> keyCls, Class<?> valCls
* @param valCls value class * @param valCls value class
* @param pathStr property name * @param pathStr property name
* @param resType property type * @param resType property type
* @return * @return Property initialization exception.
*/ */
public static String propertyInitializationExceptionMessage(Class<?> keyCls, Class<?> valCls, String pathStr, public static String propertyInitializationExceptionMessage(Class<?> keyCls, Class<?> valCls, String pathStr,
Class<?> resType) { Class<?> resType) {
Expand Down
Expand Up @@ -1203,7 +1203,7 @@ private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws Except


final Map<Long,Long> map = new ConcurrentHashMap8<>(); final Map<Long,Long> map = new ConcurrentHashMap8<>();


final int loops = reuseList == null ? 200_000 : 400_000; final int loops = reuseList == null ? 100_000 : 200_000;


final GridStripedLock lock = new GridStripedLock(256); final GridStripedLock lock = new GridStripedLock(256);


Expand Down Expand Up @@ -1295,7 +1295,7 @@ else if (op == 3) {


return null; return null;
} }
}, 16, "put-remove"); }, Runtime.getRuntime().availableProcessors(), "put-remove");


final AtomicBoolean stop = new AtomicBoolean(); final AtomicBoolean stop = new AtomicBoolean();


Expand Down
Expand Up @@ -2978,7 +2978,9 @@ private Index createSortedIndex(
if (log.isInfoEnabled()) if (log.isInfoEnabled())
log.info("Creating cache index [cacheId=" + cctx.cacheId() + ", idxName=" + name + ']'); log.info("Creating cache index [cacheId=" + cctx.cacheId() + ", idxName=" + name + ']');


return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize); final int segments = tbl.rowDescriptor().configuration().getQueryParallelism();

return new H2TreeIndex(cctx, tbl, name, pk, cols, inlineSize, segments);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
throw new IgniteException(e); throw new IgniteException(e);
Expand Down
Expand Up @@ -50,12 +50,13 @@
/** /**
* H2 Index over {@link BPlusTree}. * H2 Index over {@link BPlusTree}.
*/ */
@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
public class H2TreeIndex extends GridH2IndexBase { public class H2TreeIndex extends GridH2IndexBase {
/** Default value for {@code IGNITE_MAX_INDEX_PAYLOAD_SIZE} */ /** Default value for {@code IGNITE_MAX_INDEX_PAYLOAD_SIZE} */
public static final int IGNITE_MAX_INDEX_PAYLOAD_SIZE_DEFAULT = 10; public static final int IGNITE_MAX_INDEX_PAYLOAD_SIZE_DEFAULT = 10;


/** */ /** */
private final H2Tree tree; private final H2Tree[] segments;


/** */ /** */
private final List<InlineIndexHelper> inlineIdxs; private final List<InlineIndexHelper> inlineIdxs;
Expand All @@ -78,8 +79,11 @@ public H2TreeIndex(
String name, String name,
boolean pk, boolean pk,
List<IndexColumn> colsList, List<IndexColumn> colsList,
int inlineSize int inlineSize,
int segmentsCnt
) throws IgniteCheckedException { ) throws IgniteCheckedException {
assert segmentsCnt > 0 : segmentsCnt;

this.cctx = cctx; this.cctx = cctx;
IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]); IndexColumn[] cols = colsList.toArray(new IndexColumn[colsList.size()]);


Expand All @@ -95,21 +99,35 @@ public H2TreeIndex(
if (cctx.affinityNode()) { if (cctx.affinityNode()) {
IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database(); IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();


RootPage page = getMetaPage(name);

inlineIdxs = getAvailableInlineColumns(cols); inlineIdxs = getAvailableInlineColumns(cols);


tree = new H2Tree(name, cctx.offheap().reuseListForIndex(name), cctx.cacheId(), segments = new H2Tree[segmentsCnt];
dbMgr.pageMemory(), cctx.shared().wal(), cctx.offheap().globalRemoveId(),
tbl.rowFactory(), page.pageId().pageId(), page.isAllocated(), cols, inlineIdxs, computeInlineSize(inlineIdxs, inlineSize)) { for (int i = 0; i < segments.length; i++) {
@Override public int compareValues(Value v1, Value v2) { RootPage page = getMetaPage(name, i);
return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
} segments[i] = new H2Tree(
}; name,
cctx.offheap().reuseListForIndex(name),
cctx.cacheId(),
dbMgr.pageMemory(),
cctx.shared().wal(),
cctx.offheap().globalRemoveId(),
tbl.rowFactory(),
page.pageId().pageId(),
page.isAllocated(),
cols,
inlineIdxs,
computeInlineSize(inlineIdxs, inlineSize)) {
@Override public int compareValues(Value v1, Value v2) {
return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
}
};
}
} }
else { else {
// We need indexes on the client node, but index will not contain any data. // We need indexes on the client node, but index will not contain any data.
tree = null; segments = null;
inlineIdxs = null; inlineIdxs = null;
} }


Expand All @@ -123,9 +141,7 @@ public H2TreeIndex(
private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) { private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
List<InlineIndexHelper> res = new ArrayList<>(); List<InlineIndexHelper> res = new ArrayList<>();


for (int i = 0; i < cols.length; i++) { for (IndexColumn col : cols) {
IndexColumn col = cols[i];

if (!InlineIndexHelper.AVAILABLE_TYPES.contains(col.column.getType())) if (!InlineIndexHelper.AVAILABLE_TYPES.contains(col.column.getType()))
break; break;


Expand Down Expand Up @@ -154,6 +170,10 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
p = f.forSpace(spaceName); p = f.forSpace(spaceName);
} }


int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

return new H2Cursor(tree.find(lower, upper), p); return new H2Cursor(tree.find(lower, upper), p);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -164,6 +184,10 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public GridH2Row findOne(GridH2Row row) { @Override public GridH2Row findOne(GridH2Row row) {
try { try {
int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

return tree.findOne(row); return tree.findOne(row);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -176,6 +200,10 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
try { try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);


int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

return tree.put(row); return tree.put(row);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -191,6 +219,10 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
try { try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);


int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

return tree.putx(row); return tree.putx(row);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -205,6 +237,11 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
@Override public GridH2Row remove(SearchRow row) { @Override public GridH2Row remove(SearchRow row) {
try { try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);

int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

return tree.remove(row); return tree.remove(row);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -219,6 +256,11 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
@Override public void removex(SearchRow row) { @Override public void removex(SearchRow row) {
try { try {
InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs); InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);

int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

tree.removex(row); tree.removex(row);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand Down Expand Up @@ -271,9 +313,11 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {
@Override public void destroy() { @Override public void destroy() {
try { try {
if (cctx.affinityNode()) { if (cctx.affinityNode()) {
tree.destroy(); for (H2Tree tree : segments) {
tree.destroy();


cctx.offheap().dropRootPageForIndex(tree.getName()); cctx.offheap().dropRootPageForIndex(tree.getName());
}
} }
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
Expand All @@ -286,17 +330,14 @@ private List<InlineIndexHelper> getAvailableInlineColumns(IndexColumn[] cols) {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override protected IgniteTree<SearchRow, GridH2Row> doTakeSnapshot() { @Nullable @Override protected IgniteTree<SearchRow, GridH2Row> doTakeSnapshot() {
return tree; int seg = threadLocalSegment();
}


/** {@inheritDoc} */ return treeForRead(seg);
protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
return tree;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override protected <K, V> IgniteTree<K, V> treeForRead(int segment) { @Override protected H2Tree treeForRead(int segment) {
return (IgniteTree<K, V>)tree; return segments[segment];
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand All @@ -306,9 +347,11 @@ protected IgniteTree<SearchRow, GridH2Row> treeForRead() {
boolean includeFirst, boolean includeFirst,
@Nullable SearchRow last, @Nullable SearchRow last,
IndexingQueryFilter filter) { IndexingQueryFilter filter) {
includeFirst &= first != null;

try { try {
int seg = threadLocalSegment();

H2Tree tree = treeForRead(seg);

GridCursor<GridH2Row> range = tree.find(first, last); GridCursor<GridH2Row> range = tree.find(first, last);


if (range == null) if (range == null)
Expand Down Expand Up @@ -344,8 +387,7 @@ private int computeInlineSize(List<InlineIndexHelper> inlineIdxs, int cfgInlineS


int size = 0; int size = 0;


for (int i = 0; i < inlineIdxs.size(); i++) { for (InlineIndexHelper idxHelper : inlineIdxs) {
InlineIndexHelper idxHelper = inlineIdxs.get(i);
if (idxHelper.size() <= 0) { if (idxHelper.size() <= 0) {
size = propSize; size = propSize;
break; break;
Expand All @@ -363,9 +405,9 @@ private int computeInlineSize(List<InlineIndexHelper> inlineIdxs, int cfgInlineS
/** /**
* @param name Name. * @param name Name.
* @return RootPage for meta page. * @return RootPage for meta page.
* @throws IgniteCheckedException * @throws IgniteCheckedException If failed.
*/ */
private RootPage getMetaPage(String name) throws IgniteCheckedException { private RootPage getMetaPage(String name, int segIdx) throws IgniteCheckedException {
return cctx.offheap().rootPageForIndex(name); return cctx.offheap().rootPageForIndex(name + "%" + segIdx);
} }
} }
Expand Up @@ -517,7 +517,7 @@ private ComparableRow(SearchRow row, int bias) {
while(cursor.next()) { while(cursor.next()) {
GridH2Row row = cursor.get(); GridH2Row row = cursor.get();


// Check for interruptions every 1000 iterations. // Check for interruptions every 1024 iterations.
if ((++j & 1024) == 0 && thread.isInterrupted()) if ((++j & 1024) == 0 && thread.isInterrupted())
throw new InterruptedException(); throw new InterruptedException();


Expand Down

0 comments on commit aaf23fd

Please sign in to comment.