Skip to content

Commit

Permalink
GG-27855 Correct the setting of the pool to build the index
Browse files Browse the repository at this point in the history
  • Loading branch information
tkalkirill committed Mar 6, 2020
1 parent 2a93de0 commit 8c9bd26
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1939,13 +1939,15 @@ private void start0(GridStartContext startCtx, IgniteConfiguration cfg, TimeBag
buildIdxExecSvc = new IgniteThreadPoolExecutor(
"build-idx-runner",
cfg.getIgniteInstanceName(),
0,
buildIdxThreadPoolSize,
0,
buildIdxThreadPoolSize,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
oomeHnd
);

buildIdxExecSvc.allowCoreThreadTimeOut(true);
}

validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Creates a new service with the given initial parameters.
* NOTE: There is known bug. If 'corePoolSize' equals {@code 0},
* then pool will degraded to single-threaded pool.
*
* @param threadNamePrefix Will be added at the beginning of all created threads.
* @param igniteInstanceName Must be the name of the grid.
Expand Down Expand Up @@ -59,6 +61,8 @@ public IgniteThreadPoolExecutor(

/**
* Creates a new service with the given initial parameters.
* NOTE: There is known bug. If 'corePoolSize' equals {@code 0},
* then pool will degraded to single-threaded pool.
*
* @param threadNamePrefix Will be added at the beginning of all created threads.
* @param igniteInstanceName Must be the name of the grid.
Expand Down Expand Up @@ -92,6 +96,8 @@ public IgniteThreadPoolExecutor(

/**
* Creates a new service with the given initial parameters.
* NOTE: There is known bug. If 'corePoolSize' equals {@code 0},
* then pool will degraded to single-threaded pool.
*
* @param corePoolSize The number of threads to keep in the pool, even if they are idle.
* @param maxPoolSize The maximum number of threads to allow in the pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.ignite.internal.processors.query.h2;

import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.CountDownLatch;
import java.util.stream.LongStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
Expand All @@ -31,13 +34,17 @@
import org.apache.ignite.internal.processors.cache.index.DynamicIndexAbstractSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;

import static java.util.Objects.nonNull;
import static java.util.Objects.requireNonNull;

/**
* Index rebuild after node restart test.
*/
Expand All @@ -54,6 +61,12 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
/** Latch to signal that rebuild may start. */
private final CountDownLatch rebuildLatch = new CountDownLatch(1);

/** Thread pool size for build index. */
private Integer buildIdxThreadPoolSize;

/** GridQueryIndexing class. */
private Class<? extends GridQueryIndexing> qryIndexingCls = BlockingIndexing.class;

/** {@inheritDoc} */
@Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception {
IgniteConfiguration cfg = super.commonConfiguration(idx);
Expand All @@ -62,6 +75,19 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
.setMaxSize(300*1024L*1024L)
.setPersistenceEnabled(true);

if (nonNull(buildIdxThreadPoolSize))
cfg.setBuildIndexThreadPoolSize(buildIdxThreadPoolSize);

return cfg;
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration serverConfiguration(int idx) throws Exception {
IgniteConfiguration cfg = super.serverConfiguration(idx);

if (nonNull(qryIndexingCls))
GridQueryProcessor.idxCls = qryIndexingCls;

return cfg;
}

Expand All @@ -82,6 +108,7 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
stopAllGrids();

cleanPersistenceDir();
GridQueryProcessor.idxCls = null;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -118,32 +145,121 @@ public class GridIndexRebuildSelfTest extends DynamicIndexAbstractSelfTest {
public void testIndexRebuild() throws Exception {
IgniteEx srv = startServer();

execute(srv, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=T,wrap_value=false," +
"atomicity=transactional\"");
IgniteInternalCache cc = createAndFillTableWithIndex(srv);

execute(srv, "CREATE INDEX IDX ON T(v)");
checkDataState(srv, false);

IgniteInternalCache cc = srv.cachex(CACHE_NAME);
File idxPath = indexFile(cc);

assertNotNull(cc);
stopAllGrids();

putData(srv, false);
assertTrue(U.delete(idxPath));

checkDataState(srv, false);
srv = startServer();

putData(srv, true);

checkDataState(srv, true);
}

/**
* Test checks that index rebuild will be with default pool size.
*
* @throws Exception if failed.
*/
@Test
public void testDefaultCntThreadForRebuildIdx() throws Exception {
checkCntThreadForRebuildIdx(IgniteConfiguration.DFLT_BUILD_IDX_THREAD_POOL_SIZE);
}

/**
* Test checks that index rebuild uses the number of threads that specified
* in configuration.
*
* @throws Exception if failed.
*/
@Test
public void testCustomCntThreadForRebuildIdx() throws Exception {
checkCntThreadForRebuildIdx(6);
}

/**
* Check that index rebuild uses the number of threads
* that specified in configuration.
*
* @param buildIdxThreadCnt Thread pool size for build index,
* after restart node.
* @throws Exception if failed.
*/
private void checkCntThreadForRebuildIdx(int buildIdxThreadCnt) throws Exception {
qryIndexingCls = null;

IgniteEx srv = startServer();

File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());
IgniteInternalCache internalCache = createAndFillTableWithIndex(srv);

File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
int partCnt = internalCache.configuration().getAffinity().partitions();

assertTrue(partCnt > buildIdxThreadCnt);

File idxPath = indexFile(internalCache);

stopAllGrids();

assertTrue(U.delete(idxPath));

buildIdxThreadPoolSize = buildIdxThreadCnt;

srv = startServer();
srv.cache(CACHE_NAME).indexReadyFuture().get();

putData(srv, true);
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

checkDataState(srv, true);
long buildIdxRunnerCnt = LongStream.of(threadMXBean.getAllThreadIds()).mapToObj(threadMXBean::getThreadInfo)
.filter(threadInfo -> threadInfo.getThreadName().startsWith("build-idx-runner")).count();

assertEquals(buildIdxThreadCnt, buildIdxRunnerCnt);
}

/**
* Creating a cache, table, index and populating data.
*
* @param node Node.
* @return Cache.
* @throws Exception if failed.
*/
private IgniteInternalCache createAndFillTableWithIndex(IgniteEx node) throws Exception {
requireNonNull(node);

String cacheName = CACHE_NAME;

execute(node, "CREATE TABLE T(k int primary key, v int) WITH \"cache_name=" + cacheName +
",wrap_value=false,atomicity=transactional\"");

execute(node, "CREATE INDEX IDX ON T(v)");

IgniteInternalCache cc = node.cachex(cacheName);

assertNotNull(cc);

putData(node, false);

return cc;
}

/**
* Get index file.
*
* @param internalCache Cache.
* @return Index file.
*/
protected File indexFile(IgniteInternalCache internalCache) {
requireNonNull(internalCache);

File cacheWorkDir = ((FilePageStoreManager)internalCache.context().shared().pageStore())
.cacheWorkDir(internalCache.configuration());

return cacheWorkDir.toPath().resolve("index.bin").toFile();
}

/**
Expand Down Expand Up @@ -213,17 +329,9 @@ protected void putData(Ignite node, final boolean forConcurrentPut) throws Excep
* @throws Exception if failed.
*/
protected IgniteEx startServer() throws Exception {
// Have to do this for each starting node - see GridQueryProcessor ctor, it nulls
// idxCls static field on each call.
GridQueryProcessor.idxCls = BlockingIndexing.class;

IgniteConfiguration cfg = serverConfiguration(0);

IgniteEx res = startGrid(cfg);

res.active(true);

return res;
IgniteEx srvNode = startGrid(serverConfiguration(0));
srvNode.active(true);
return srvNode;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
Expand Down Expand Up @@ -63,9 +62,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel

checkDataState(srv, false);

File cacheWorkDir = ((FilePageStoreManager)cc.context().shared().pageStore()).cacheWorkDir(cc.configuration());

File idxPath = cacheWorkDir.toPath().resolve("index.bin").toFile();
File idxPath = indexFile(cc);

stopAllGrids();

Expand Down

0 comments on commit 8c9bd26

Please sign in to comment.