Skip to content
Permalink
Browse files
Removing unused processing threadpool on broker (#12070)
* Thread pool for broker

* Updating two tests to improve coverage for new method added

* Updating druidProcessingConfigTest to cover coverage

* Adding missed spelling errors caused in doc

* Adding test to cover lines of new function added
  • Loading branch information
somu-imply committed Dec 21, 2021
1 parent f345759 commit c267b65f97998f7e6b43b102a0e16c331825fb14
Showing 8 changed files with 355 additions and 2 deletions.
@@ -1620,6 +1620,7 @@ Druid uses Jetty to serve HTTP requests.
|--------|-----------|-------|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size (less than 2GiB), for the storage of intermediate results. The computation engine in both the Historical and Realtime processes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed. [Human-readable format](human-readable-byte.md) is supported.|auto (max 1GiB)|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|`druid.processing.buffer.poolCacheInitialCount`|initializes the number of buffers allocated on the intermediate results pool. Note that pool can create more buffers if necessary.|`0`|
|`druid.processing.formatString`|Realtime and Historical processes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
@@ -38,6 +38,7 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
public static final HumanReadableBytes DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = HumanReadableBytes.valueOf(-1);
public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
public static final int DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL = 0;

private AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();

@@ -104,6 +105,15 @@ public int poolCacheMaxCount()
return Integer.MAX_VALUE;
}

@Config({
"druid.computation.buffer.poolCacheInitialCount",
"${base_path}.buffer.poolCacheInitialCount"
})
public int getNumInitalBuffersForIntermediatePool()
{
return DEFAULT_INITIAL_BUFFERS_FOR_INTERMEDIATE_POOL;
}

@Override
@Config(value = "${base_path}.numThreads")
public int getNumThreadsConfigured()
@@ -135,6 +135,7 @@ public void testReplacements()
props.setProperty("druid.processing.fifo", "true");
props.setProperty("druid.processing.tmpDir", "/test/path");


Injector injector = makeInjector(
NUM_PROCESSORS,
DIRECT_SIZE,
@@ -151,6 +152,7 @@ public void testReplacements()
Assert.assertEquals(1, config.columnCacheSizeBytes());
Assert.assertTrue(config.isFifo());
Assert.assertEquals("/test/path", config.getTmpDir());
Assert.assertEquals(0, config.getNumInitalBuffersForIntermediatePool());
}

@Test
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import org.apache.druid.client.cache.BackgroundCachePopulator;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

/**
* This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and
* thread pools on Broker. Broker does not need to be allocated an intermediate results pool.
* This is separated from DruidProcessingModule to separate the needs of the broker from the historicals
*/

public class BrokerProcessingModule implements Module
{
private static final Logger log = new Logger(BrokerProcessingModule.class);

@Override
public void configure(Binder binder)
{
binder.bind(ExecutorServiceConfig.class).to(DruidProcessingConfig.class);
MetricsModule.register(binder, ExecutorServiceMonitor.class);
}

@Provides
@LazySingleton
public CachePopulator getCachePopulator(
@Smile ObjectMapper smileMapper,
CachePopulatorStats cachePopulatorStats,
CacheConfig cacheConfig
)
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
final ExecutorService exec = Executors.newFixedThreadPool(
cacheConfig.getNumBackgroundThreads(),
new ThreadFactoryBuilder()
.setNameFormat("background-cacher-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.build()
);

return new BackgroundCachePopulator(exec, smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
} else {
return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, cacheConfig.getMaxEntrySize());
}
}

@Provides
@ManageLifecycle
public QueryProcessingPool getProcessingExecutorPool(
DruidProcessingConfig config
)
{
return new ForwardingQueryProcessingPool(Execs.dummy());
}

@Provides
@LazySingleton
@Global
public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(DruidProcessingConfig config)
{
verifyDirectMemory(config);

return new StupidPool<>(
"intermediate processing pool",
new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()),
config.getNumInitalBuffersForIntermediatePool(),
config.poolCacheMaxCount()
);
}

@Provides
@LazySingleton
@Merging
public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig config)
{
verifyDirectMemory(config);
return new DefaultBlockingPool<>(
new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()),
config.getNumMergeBuffers()
);
}

@Provides
@ManageLifecycle
public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
{
return new LifecycleForkJoinPoolProvider(
config.getMergePoolParallelism(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
true,
config.getMergePoolAwaitShutdownMillis()
);
}

@Provides
@Merging
public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider)
{
return poolProvider.getPool();
}

private void verifyDirectMemory(DruidProcessingConfig config)
{
try {
final long maxDirectMemory = JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
final long memoryNeeded = (long) config.intermediateComputeSizeBytes() *
(config.getNumMergeBuffers() + 1);

if (maxDirectMemory < memoryNeeded) {
throw new ProvisionException(
StringUtils.format(
"Not enough direct memory. Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, druid.processing.numThreads, or druid.processing.numMergeBuffers: "
+ "maxDirectMemory[%,d], memoryNeeded[%,d] = druid.processing.buffer.sizeBytes[%,d] * (druid.processing.numMergeBuffers[%,d] + 1)",
maxDirectMemory,
memoryNeeded,
config.intermediateComputeSizeBytes(),
config.getNumMergeBuffers()
)
);
}
}
catch (UnsupportedOperationException e) {
log.debug("Checking for direct memory size is not support on this platform: %s", e);
log.info(
"Unable to determine max direct memory size. If druid.processing.buffer.sizeBytes is explicitly configured, "
+ "then make sure to set -XX:MaxDirectMemorySize to at least \"druid.processing.buffer.sizeBytes * "
+ "(druid.processing.numMergeBuffers[%,d] + 1)\", "
+ "or else set -XX:MaxDirectMemorySize to at least 25%% of maximum jvm heap size.",
config.getNumMergeBuffers()
);
}
}
}
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.google.common.collect.ImmutableList;
import com.google.inject.Injector;
import com.google.inject.ProvisionException;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.utils.JvmUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;


@RunWith(MockitoJUnitRunner.class)
public class BrokerProcessingModuleTest
{
private static final boolean INJECT_SERVER_TYPE_CONFIG = true;
@Mock
private DruidProcessingConfig druidProcessingConfig;
private Injector injector;
private BrokerProcessingModule target;
@Mock
private CacheConfig cacheConfig;
@Mock
private CachePopulatorStats cachePopulatorStats;

@Before
public void setUp()
{
target = new BrokerProcessingModule();
injector = makeInjector(INJECT_SERVER_TYPE_CONFIG);
}

@Test
public void testIntermediateResultsPool()
{
target.getIntermediateResultsPool(druidProcessingConfig);
}


@Test
public void testMergeBufferPool()
{
target.getMergeBufferPool(druidProcessingConfig);
}

@Test
public void testMergeProcessingPool()
{
DruidProcessingConfig config = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "processing-test-%s";
}
};
DruidProcessingModule module = new DruidProcessingModule();
module.getMergeProcessingPoolProvider(config);
config.getNumInitalBuffersForIntermediatePool();
}

@Test
public void testCachePopulatorAsSingleton()
{
CachePopulator cachePopulator = injector.getInstance(CachePopulator.class);
Assert.assertNotNull(cachePopulator);

}

@Test(expected = ProvisionException.class)
public void testMemoryCheckThrowsException()
{
// JDK 9 and above do not support checking for direct memory size
// so this test only validates functionality for Java 8.
try {
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
}
catch (UnsupportedOperationException e) {
Assume.assumeNoException(e);
}

BrokerProcessingModule module = new BrokerProcessingModule();
module.getMergeBufferPool(new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return "test";
}

@Override
public int intermediateComputeSizeBytes()
{
return Integer.MAX_VALUE;
}
});
}

private Injector makeInjector(boolean withServerTypeConfig)
{
return Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), (ImmutableList.of(Modules.override(
(binder) -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
binder.bind(DruidProcessingConfig.class).toInstance(druidProcessingConfig);
},
target
).with(
(binder) -> {
binder.bind(CachePopulatorStats.class).toInstance(cachePopulatorStats);
binder.bind(CacheConfig.class).toInstance(cacheConfig);
}
)
)));
}

}

@@ -70,6 +70,7 @@ public String getFormatString()
};

DruidProcessingModule module = new DruidProcessingModule();
config.getNumInitalBuffersForIntermediatePool();
module.getIntermediateResultsPool(config);
}
}

0 comments on commit c267b65

Please sign in to comment.