Skip to content

Commit

Permalink
HSEARCH-3775 Expose a shared ScheduledExecutorService through ThreadP…
Browse files Browse the repository at this point in the history
…oolProvider
  • Loading branch information
yrodiere committed Feb 13, 2020
1 parent e47e4af commit 8659163
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 47 deletions.
Expand Up @@ -16,7 +16,6 @@
import java.time.Duration;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,7 +68,7 @@ public class ElasticsearchClientImpl implements ElasticsearchClientImplementor {
Gson gson, JsonLogHelper jsonLogHelper) {
this.restClient = restClient;
this.sniffer = sniffer;
this.timeoutExecutorService = threadPoolProvider.newScheduledThreadPool( "Elasticsearch request timeout executor" );
this.timeoutExecutorService = threadPoolProvider.getSharedScheduledThreadPool();
this.globalTimeoutValue = globalTimeoutValue;
this.globalTimeoutUnit = globalTimeoutUnit;
this.gson = gson;
Expand Down Expand Up @@ -227,11 +226,9 @@ private void log(ElasticsearchRequest request, long start, ElasticsearchResponse
public void close() throws IOException {
try ( Closer<IOException> closer = new Closer<>() ) {
/*
* There's no point waiting for timeouts: we'll just cancel
* all timeouts and expect the RestClient to cancel all
* There's no point waiting for timeouts: we'll just expect the RestClient to cancel all
* currently running requests when closing.
*/
closer.push( ExecutorService::shutdownNow, this.timeoutExecutorService );
closer.push( Sniffer::close, this.sniffer );
closer.push( RestClient::close, this.restClient );
}
Expand Down
Expand Up @@ -194,12 +194,12 @@ public SearchIntegrationPartialBuildState prepareBuild() {
FailureHandler failureHandler = failureHandlerHolder.get();

threadProviderHolder = THREAD_PROVIDER.getAndTransform( propertySource, beanResolver::resolve );
ThreadPoolProviderImpl executorProvider = new ThreadPoolProviderImpl( threadProviderHolder.get() );
ThreadPoolProviderImpl threadPoolProvider = new ThreadPoolProviderImpl( threadProviderHolder );

RootBuildContext rootBuildContext = new RootBuildContext(
propertySource,
classResolver, resourceResolver, beanResolver,
failureCollector, executorProvider, failureHandler
failureCollector, threadPoolProvider, failureHandler
);

indexManagerBuildingStateHolder = new IndexManagerBuildingStateHolder( beanResolver, propertySource, rootBuildContext );
Expand Down Expand Up @@ -263,7 +263,7 @@ public SearchIntegrationPartialBuildState prepareBuild() {
return new SearchIntegrationPartialBuildStateImpl(
beanProvider, beanResolver,
failureHandlerHolder,
threadProviderHolder,
threadPoolProvider,
partiallyBuiltMappings,
indexManagerBuildingStateHolder.getBackendPartialBuildStates(),
indexManagerBuildingStateHolder.getIndexManagersByName(),
Expand Down
Expand Up @@ -15,13 +15,13 @@
import org.hibernate.search.engine.backend.index.IndexManager;
import org.hibernate.search.engine.backend.index.spi.IndexManagerImplementor;
import org.hibernate.search.engine.backend.spi.BackendImplementor;
import org.hibernate.search.engine.environment.thread.spi.ThreadProvider;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.common.spi.SearchIntegration;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.spi.BeanProvider;
import org.hibernate.search.engine.environment.thread.impl.ThreadPoolProviderImpl;
import org.hibernate.search.engine.logging.impl.Log;
import org.hibernate.search.engine.mapper.mapping.spi.MappingImplementor;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.Futures;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
Expand All @@ -32,21 +32,21 @@ public class SearchIntegrationImpl implements SearchIntegration {

private final BeanProvider beanProvider;
private final BeanHolder<? extends FailureHandler> failureHandlerHolder;
private final BeanHolder<? extends ThreadProvider> threadProviderHolder;
private final ThreadPoolProviderImpl threadPoolProvider;

private final List<MappingImplementor<?>> mappings;
private final Map<String, BackendImplementor<?>> backends;
private final Map<String, IndexManagerImplementor<?>> indexManagers;

SearchIntegrationImpl(BeanProvider beanProvider,
BeanHolder<? extends FailureHandler> failureHandlerHolder,
BeanHolder<? extends ThreadProvider> threadProviderHolder,
ThreadPoolProviderImpl threadPoolProvider,
List<MappingImplementor<?>> mappings,
Map<String, BackendImplementor<?>> backends,
Map<String, IndexManagerImplementor<?>> indexManagers) {
this.beanProvider = beanProvider;
this.failureHandlerHolder = failureHandlerHolder;
this.threadProviderHolder = threadProviderHolder;
this.threadPoolProvider = threadPoolProvider;
this.mappings = mappings;
this.backends = backends;
this.indexManagers = indexManagers;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void close() {
closer.pushAll( IndexManagerImplementor::stop, indexManagers.values() );
closer.push( SearchIntegrationImpl::preStopBackends, this );
closer.pushAll( BackendImplementor::stop, backends.values() );
closer.pushAll( BeanHolder::close, threadProviderHolder );
closer.pushAll( ThreadPoolProviderImpl::close, threadPoolProvider );
closer.pushAll( BeanHolder::close, failureHandlerHolder );
closer.pushAll( BeanProvider::close, beanProvider );
}
Expand Down
Expand Up @@ -16,19 +16,19 @@
import org.hibernate.search.engine.backend.spi.BackendImplementor;
import org.hibernate.search.engine.cfg.spi.ConfigurationPropertyChecker;
import org.hibernate.search.engine.cfg.spi.ConfigurationPropertySource;
import org.hibernate.search.engine.environment.thread.spi.ThreadProvider;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.common.spi.SearchIntegration;
import org.hibernate.search.engine.common.spi.SearchIntegrationFinalizer;
import org.hibernate.search.engine.common.spi.SearchIntegrationPartialBuildState;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.engine.environment.bean.spi.BeanProvider;
import org.hibernate.search.engine.environment.thread.impl.ThreadPoolProviderImpl;
import org.hibernate.search.engine.mapper.mapping.building.spi.MappingFinalizationContext;
import org.hibernate.search.engine.mapper.mapping.building.spi.MappingFinalizer;
import org.hibernate.search.engine.mapper.mapping.spi.MappingImplementor;
import org.hibernate.search.engine.mapper.mapping.building.spi.MappingKey;
import org.hibernate.search.engine.mapper.mapping.building.spi.MappingPartialBuildState;
import org.hibernate.search.engine.mapper.mapping.spi.MappingImplementor;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.reporting.impl.RootFailureCollector;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;
Expand All @@ -41,7 +41,7 @@ class SearchIntegrationPartialBuildStateImpl implements SearchIntegrationPartial
private final BeanProvider beanProvider;
private final BeanResolver beanResolver;
private final BeanHolder<? extends FailureHandler> failureHandlerHolder;
private final BeanHolder<? extends ThreadProvider> threadProviderHolder;
private final ThreadPoolProviderImpl threadPoolProvider;

private final Map<MappingKey<?, ?>, MappingPartialBuildState> partiallyBuiltMappings;
private final List<MappingImplementor<?>> fullyBuiltMappings = new ArrayList<>();
Expand All @@ -55,15 +55,15 @@ class SearchIntegrationPartialBuildStateImpl implements SearchIntegrationPartial
SearchIntegrationPartialBuildStateImpl(
BeanProvider beanProvider, BeanResolver beanResolver,
BeanHolder<? extends FailureHandler> failureHandlerHolder,
BeanHolder<? extends ThreadProvider> threadProviderHolder,
ThreadPoolProviderImpl threadPoolProvider,
Map<MappingKey<?, ?>, MappingPartialBuildState> partiallyBuiltMappings,
Map<String, BackendPartialBuildState> partiallyBuiltBackends,
Map<String, IndexManagerPartialBuildState> partiallyBuiltIndexManagers,
ConfigurationPropertyChecker partialConfigurationPropertyChecker) {
this.beanProvider = beanProvider;
this.beanResolver = beanResolver;
this.failureHandlerHolder = failureHandlerHolder;
this.threadProviderHolder = threadProviderHolder;
this.threadPoolProvider = threadPoolProvider;
this.partiallyBuiltMappings = partiallyBuiltMappings;
this.partiallyBuiltBackends = partiallyBuiltBackends;
this.partiallyBuiltIndexManagers = partiallyBuiltIndexManagers;
Expand All @@ -79,7 +79,7 @@ public void closeOnFailure() {
closer.pushAll( IndexManagerImplementor::stop, fullyBuiltIndexManagers.values() );
closer.pushAll( BackendPartialBuildState::closeOnFailure, partiallyBuiltBackends.values() );
closer.pushAll( BackendImplementor::stop, fullyBuiltBackends.values() );
closer.pushAll( BeanHolder::close, threadProviderHolder );
closer.pushAll( ThreadPoolProviderImpl::close, threadPoolProvider );
closer.pushAll( BeanHolder::close, failureHandlerHolder );
closer.pushAll( BeanProvider::close, beanProvider );
}
Expand Down Expand Up @@ -173,7 +173,7 @@ public SearchIntegration finalizeIntegration() {
return new SearchIntegrationImpl(
beanProvider,
failureHandlerHolder,
threadProviderHolder,
threadPoolProvider,
fullyBuiltMappings,
fullyBuiltBackends,
fullyBuiltIndexManagers
Expand Down
Expand Up @@ -14,8 +14,10 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.engine.environment.thread.spi.ThreadProvider;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.logging.impl.Log;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

Expand All @@ -30,15 +32,25 @@ public final class ThreadPoolProviderImpl implements ThreadPoolProvider {

private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );

private final ThreadProvider threadProvider;
private final BeanHolder<? extends ThreadProvider> threadProviderHolder;

public ThreadPoolProviderImpl(ThreadProvider threadProvider) {
this.threadProvider = threadProvider;
private volatile ScheduledExecutorService scheduledExecutorService;

public ThreadPoolProviderImpl(BeanHolder<? extends ThreadProvider> threadProviderHolder) {
this.threadProviderHolder = threadProviderHolder;
}

public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
closer.push( ScheduledExecutorService::shutdownNow, scheduledExecutorService );
scheduledExecutorService = null;
closer.push( BeanHolder::close, threadProviderHolder );
}
}

@Override
public ThreadProvider getThreadProvider() {
return threadProvider;
return threadProviderHolder.get();
}

@Override
Expand All @@ -54,14 +66,23 @@ public ThreadPoolExecutor newFixedThreadPool(int threads, String threadNamePrefi
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>( queueSize ),
threadProvider.createThreadFactory( threadNamePrefix ),
threadProviderHolder.get().createThreadFactory( threadNamePrefix ),
new BlockPolicy()
);
}

@Override
public ScheduledExecutorService newScheduledThreadPool(String threadNamePrefix) {
return new ScheduledThreadPoolExecutor( 1, threadProvider.createThreadFactory( threadNamePrefix ) );
public ScheduledExecutorService getSharedScheduledThreadPool() {
if ( scheduledExecutorService == null ) {
synchronized ( this ) {
if ( scheduledExecutorService == null ) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(
1, threadProviderHolder.get().createThreadFactory( "Scheduled task executor" )
);
}
}
}
return scheduledExecutorService;
}

/**
Expand Down
Expand Up @@ -46,11 +46,10 @@ public interface ThreadPoolProvider {
ThreadPoolExecutor newFixedThreadPool(int threads, String threadNamePrefix, int queueSize);

/**
* Creates an executor for recurring tasks
* Get the executor for short, scheduled tasks.
*
* @param threadNamePrefix a label to identify the threads; useful for profiling.
* @return instance of {@link ScheduledExecutorService}
* @return A (shared) instance of {@link ScheduledExecutorService}.
*/
ScheduledExecutorService newScheduledThreadPool(String threadNamePrefix);
ScheduledExecutorService getSharedScheduledThreadPool();

}
Expand Up @@ -15,11 +15,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;

import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.thread.impl.DefaultThreadProvider;
import org.hibernate.search.engine.environment.thread.impl.ThreadPoolProviderImpl;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.engine.reporting.FailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.environment.thread.impl.DefaultThreadProvider;
import org.hibernate.search.util.impl.test.FutureAssert;

import org.junit.After;
Expand All @@ -35,7 +35,8 @@ public class BatchingExecutorTest extends EasyMockSupport {

private final StubWorkProcessor processorMock = createMock( StubWorkProcessor.class );
private final FailureHandler failureHandlerMock = createMock( FailureHandler.class );
private final ThreadPoolProvider threadPoolProvider = new ThreadPoolProviderImpl( new DefaultThreadProvider() );
private final ThreadPoolProviderImpl threadPoolProvider =
new ThreadPoolProviderImpl( BeanHolder.of( new DefaultThreadProvider() ) );

// To execute code asynchronously. Just use more threads than we'll ever need, we don't care about performance.
private final ForkJoinPool asyncExecutor = new ForkJoinPool( 12 );
Expand All @@ -44,6 +45,7 @@ public class BatchingExecutorTest extends EasyMockSupport {

@After
public void cleanup() {
threadPoolProvider.close();
asyncExecutor.shutdownNow();
executor.stop();
}
Expand Down
Expand Up @@ -47,7 +47,6 @@
import org.hibernate.search.engine.environment.bean.spi.BeanConfigurer;
import org.hibernate.search.engine.environment.thread.impl.DefaultThreadProvider;
import org.hibernate.search.engine.environment.thread.impl.ThreadPoolProviderImpl;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.integrationtest.backend.elasticsearch.testsupport.categories.RequiresNoAutomaticAuthenticationHeader;
import org.hibernate.search.integrationtest.backend.elasticsearch.testsupport.util.ElasticsearchTckBackendHelper;
import org.hibernate.search.util.common.AssertionFailure;
Expand All @@ -59,6 +58,7 @@
import org.hibernate.search.util.impl.test.annotation.TestForIssue;
import org.hibernate.search.util.impl.test.rule.ExpectedLog4jLog;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -102,6 +102,15 @@ public class ElasticsearchClientFactoryImplIT {
@Rule
public TestConfigurationProvider testConfigurationProvider = new TestConfigurationProvider();

private ThreadPoolProviderImpl threadPoolProvider = new ThreadPoolProviderImpl(
BeanHolder.of( new DefaultThreadProvider( ElasticsearchClientFactoryImplIT.class.getName() + ": " ) )
);

@After
public void cleanup() {
threadPoolProvider.close();
}

@Test
@TestForIssue(jiraKey = "HSEARCH-2274")
public void simple_http() throws Exception {
Expand Down Expand Up @@ -622,9 +631,6 @@ private ElasticsearchClientImplementor createClient(Consumer<BiConsumer<String,
ConfigurationPropertySource defaultBackendProperties =
new ElasticsearchTckBackendHelper().createDefaultBackendSetupStrategy()
.createBackendConfigurationPropertySource( testConfigurationProvider );
ThreadPoolProvider threadPoolProvider = new ThreadPoolProviderImpl(
new DefaultThreadProvider( ElasticsearchClientFactoryImplIT.class.getName() + ": " )
);

Map<String, Object> configurationOverride = new HashMap<>();
// Redirect requests to Wiremock (rule 1 only by default)
Expand Down
Expand Up @@ -36,14 +36,14 @@
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.engine.environment.thread.impl.DefaultThreadProvider;
import org.hibernate.search.engine.environment.thread.impl.ThreadPoolProviderImpl;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.integrationtest.backend.elasticsearch.testsupport.categories.RequiresNoRequestPostProcessing;
import org.hibernate.search.integrationtest.backend.elasticsearch.testsupport.categories.RequiresRequestPostProcessing;
import org.hibernate.search.integrationtest.backend.elasticsearch.testsupport.util.ElasticsearchTckBackendHelper;
import org.hibernate.search.util.impl.integrationtest.common.TestConfigurationProvider;
import org.hibernate.search.util.impl.test.annotation.PortedFromSearch5;
import org.hibernate.search.util.impl.test.annotation.TestForIssue;

import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -93,6 +93,15 @@ public class ElasticsearchContentLengthIT {
@Rule
public TestConfigurationProvider testConfigurationProvider = new TestConfigurationProvider();

private final ThreadPoolProviderImpl threadPoolProvider = new ThreadPoolProviderImpl(
BeanHolder.of( new DefaultThreadProvider( ElasticsearchContentLengthIT.class.getName() + ": " ) )
);

@After
public void cleanup() {
threadPoolProvider.close();
}

@Test
public void tinyPayload() throws Exception {
wireMockRule.stubFor( post( urlPathLike( "/myIndex/myType" ) )
Expand Down Expand Up @@ -175,9 +184,6 @@ private ElasticsearchClientImplementor createClient() {
ConfigurationPropertySource defaultBackendProperties =
new ElasticsearchTckBackendHelper().createDefaultBackendSetupStrategy()
.createBackendConfigurationPropertySource( testConfigurationProvider );
ThreadPoolProvider threadPoolProvider = new ThreadPoolProviderImpl(
new DefaultThreadProvider( ElasticsearchContentLengthIT.class.getName() + ": " )
);

// Redirect requests to Wiremock
Map<String, Object> configurationOverride = new HashMap<>();
Expand Down

0 comments on commit 8659163

Please sign in to comment.