Skip to content

Commit

Permalink
Fix async engine / tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gnodet committed Nov 22, 2018
1 parent 2a79d6c commit 6094865
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 98 deletions.
Expand Up @@ -234,19 +234,17 @@ public enum Initialization {
private Boolean allowUseOriginalMessage = Boolean.FALSE; private Boolean allowUseOriginalMessage = Boolean.FALSE;
private Long delay; private Long delay;
private ErrorHandlerFactory errorHandlerBuilder; private ErrorHandlerFactory errorHandlerBuilder;
private final Object errorHandlerExecutorServiceLock = new Object();
private ScheduledExecutorService errorHandlerExecutorService;
private Map<String, DataFormatDefinition> dataFormats = new HashMap<>(); private Map<String, DataFormatDefinition> dataFormats = new HashMap<>();
private Map<String, String> globalOptions = new HashMap<>(); private Map<String, String> globalOptions = new HashMap<>();
private PropertiesComponent propertiesComponent; private PropertiesComponent propertiesComponent;
private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>(); private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>();
private final Map<String, RouteService> routeServices = new LinkedHashMap<>(); private final Map<String, RouteService> routeServices = new LinkedHashMap<>();
private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>(); private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<>();


private final Object lock = new Object();
private volatile CamelContextNameStrategy nameStrategy; private volatile CamelContextNameStrategy nameStrategy;
private volatile ManagementNameStrategy managementNameStrategy; private volatile ManagementNameStrategy managementNameStrategy;
private Registry registry; private volatile Registry registry;

private volatile TypeConverter typeConverter; private volatile TypeConverter typeConverter;
private volatile TypeConverterRegistry typeConverterRegistry; private volatile TypeConverterRegistry typeConverterRegistry;
private volatile Injector injector; private volatile Injector injector;
Expand Down Expand Up @@ -276,6 +274,7 @@ public enum Initialization {
private volatile UnitOfWorkFactory unitOfWorkFactory; private volatile UnitOfWorkFactory unitOfWorkFactory;
private volatile ReloadStrategy reloadStrategy; private volatile ReloadStrategy reloadStrategy;
private volatile RouteController routeController; private volatile RouteController routeController;
private volatile ScheduledExecutorService errorHandlerExecutorService;


private TransformerRegistry<TransformerKey> transformerRegistry; private TransformerRegistry<TransformerKey> transformerRegistry;
private ValidatorRegistry<ValidatorKey> validatorRegistry; private ValidatorRegistry<ValidatorKey> validatorRegistry;
Expand Down Expand Up @@ -413,7 +412,7 @@ public void setName(String name) {


public CamelContextNameStrategy getNameStrategy() { public CamelContextNameStrategy getNameStrategy() {
if (nameStrategy == null) { if (nameStrategy == null) {
synchronized (this) { synchronized (lock) {
if (nameStrategy == null) { if (nameStrategy == null) {
setNameStrategy(createCamelContextNameStrategy()); setNameStrategy(createCamelContextNameStrategy());
} }
Expand All @@ -428,7 +427,7 @@ public void setNameStrategy(CamelContextNameStrategy nameStrategy) {


public ManagementNameStrategy getManagementNameStrategy() { public ManagementNameStrategy getManagementNameStrategy() {
if (managementNameStrategy == null) { if (managementNameStrategy == null) {
synchronized (this) { synchronized (lock) {
if (managementNameStrategy == null) { if (managementNameStrategy == null) {
setManagementNameStrategy(createManagementNameStrategy()); setManagementNameStrategy(createManagementNameStrategy());
} }
Expand Down Expand Up @@ -913,7 +912,7 @@ public void setRouteController(RouteController routeController) {
@Override @Override
public RouteController getRouteController() { public RouteController getRouteController() {
if (routeController == null) { if (routeController == null) {
synchronized (this) { synchronized (lock) {
if (routeController == null) { if (routeController == null) {
setRouteController(createRouteController()); setRouteController(createRouteController());
} }
Expand Down Expand Up @@ -2456,7 +2455,7 @@ public String resolvePropertyPlaceholders(String text) throws Exception {


public TypeConverter getTypeConverter() { public TypeConverter getTypeConverter() {
if (typeConverter == null) { if (typeConverter == null) {
synchronized (this) { synchronized (lock) {
if (typeConverter == null) { if (typeConverter == null) {
setTypeConverter(createTypeConverter()); setTypeConverter(createTypeConverter());
} }
Expand All @@ -2471,7 +2470,7 @@ public void setTypeConverter(TypeConverter typeConverter) {


public TypeConverterRegistry getTypeConverterRegistry() { public TypeConverterRegistry getTypeConverterRegistry() {
if (typeConverterRegistry == null) { if (typeConverterRegistry == null) {
synchronized (this) { synchronized (lock) {
if (typeConverterRegistry == null) { if (typeConverterRegistry == null) {
setTypeConverterRegistry(createTypeConverterRegistry()); setTypeConverterRegistry(createTypeConverterRegistry());
} }
Expand All @@ -2486,7 +2485,7 @@ public void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry


public Injector getInjector() { public Injector getInjector() {
if (injector == null) { if (injector == null) {
synchronized (this) { synchronized (lock) {
if (injector == null) { if (injector == null) {
setInjector(createInjector()); setInjector(createInjector());
} }
Expand All @@ -2509,7 +2508,7 @@ public void setManagementMBeanAssembler(ManagementMBeanAssembler managementMBean


public ComponentResolver getComponentResolver() { public ComponentResolver getComponentResolver() {
if (componentResolver == null) { if (componentResolver == null) {
synchronized (this) { synchronized (lock) {
if (componentResolver == null) { if (componentResolver == null) {
setComponentResolver(createComponentResolver()); setComponentResolver(createComponentResolver());
} }
Expand All @@ -2524,7 +2523,7 @@ public void setComponentResolver(ComponentResolver componentResolver) {


public LanguageResolver getLanguageResolver() { public LanguageResolver getLanguageResolver() {
if (languageResolver == null) { if (languageResolver == null) {
synchronized (this) { synchronized (lock) {
if (languageResolver == null) { if (languageResolver == null) {
setLanguageResolver(createLanguageResolver()); setLanguageResolver(createLanguageResolver());
} }
Expand All @@ -2547,7 +2546,7 @@ public void setAutoCreateComponents(boolean autoCreateComponents) {


public Registry getRegistry() { public Registry getRegistry() {
if (registry == null) { if (registry == null) {
synchronized (this) { synchronized (lock) {
if (registry == null) { if (registry == null) {
setRegistry(createRegistry()); setRegistry(createRegistry());
} }
Expand Down Expand Up @@ -2875,22 +2874,32 @@ public void setErrorHandlerFactory(ErrorHandlerFactory errorHandlerFactory) {
} }


public ScheduledExecutorService getErrorHandlerExecutorService() { public ScheduledExecutorService getErrorHandlerExecutorService() {
synchronized (errorHandlerExecutorServiceLock) { if (errorHandlerExecutorService == null) {
if (errorHandlerExecutorService == null) { synchronized (lock) {
// setup default thread pool for error handler if (errorHandlerExecutorService == null) {
errorHandlerExecutorService = getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask"); // setup default thread pool for error handler
errorHandlerExecutorService = createErrorHandlerExecutorService();
}
} }
} }
return errorHandlerExecutorService; return errorHandlerExecutorService;
} }


protected ScheduledExecutorService createErrorHandlerExecutorService() {
return getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask");
}

public void setErrorHandlerExecutorService(ScheduledExecutorService errorHandlerExecutorService) {
this.errorHandlerExecutorService = errorHandlerExecutorService;
}

public void setProducerServicePool(ServicePool<Producer> producerServicePool) { public void setProducerServicePool(ServicePool<Producer> producerServicePool) {
this.producerServicePool = doAddService(producerServicePool); this.producerServicePool = doAddService(producerServicePool);
} }


public ServicePool<Producer> getProducerServicePool() { public ServicePool<Producer> getProducerServicePool() {
if (producerServicePool == null) { if (producerServicePool == null) {
synchronized (this) { synchronized (lock) {
if (producerServicePool == null) { if (producerServicePool == null) {
setProducerServicePool(createProducerServicePool()); setProducerServicePool(createProducerServicePool());
} }
Expand All @@ -2901,7 +2910,7 @@ public ServicePool<Producer> getProducerServicePool() {


public ServicePool<PollingConsumer> getPollingConsumerServicePool() { public ServicePool<PollingConsumer> getPollingConsumerServicePool() {
if (pollingConsumerServicePool == null) { if (pollingConsumerServicePool == null) {
synchronized (this) { synchronized (lock) {
if (pollingConsumerServicePool == null) { if (pollingConsumerServicePool == null) {
setPollingConsumerServicePool(createPollingConsumerServicePool()); setPollingConsumerServicePool(createPollingConsumerServicePool());
} }
Expand All @@ -2916,7 +2925,7 @@ public void setPollingConsumerServicePool(ServicePool<PollingConsumer> pollingCo


public UnitOfWorkFactory getUnitOfWorkFactory() { public UnitOfWorkFactory getUnitOfWorkFactory() {
if (unitOfWorkFactory == null) { if (unitOfWorkFactory == null) {
synchronized (this) { synchronized (lock) {
if (unitOfWorkFactory == null) { if (unitOfWorkFactory == null) {
setUnitOfWorkFactory(createUnitOfWorkFactory()); setUnitOfWorkFactory(createUnitOfWorkFactory());
} }
Expand Down Expand Up @@ -2954,7 +2963,7 @@ public long getUptimeMillis() {


public String getVersion() { public String getVersion() {
if (version == null) { if (version == null) {
synchronized (this) { synchronized (lock) {
if (version == null) { if (version == null) {
version = doGetVersion(); version = doGetVersion();
} }
Expand Down Expand Up @@ -4121,7 +4130,7 @@ public void setGlobalOptions(Map<String, String> globalOptions) {


public FactoryFinder getDefaultFactoryFinder() { public FactoryFinder getDefaultFactoryFinder() {
if (defaultFactoryFinder == null) { if (defaultFactoryFinder == null) {
synchronized (this) { synchronized (lock) {
if (defaultFactoryFinder == null) { if (defaultFactoryFinder == null) {
defaultFactoryFinder = getFactoryFinderResolver().resolveDefaultFactoryFinder(getClassResolver()); defaultFactoryFinder = getFactoryFinderResolver().resolveDefaultFactoryFinder(getClassResolver());
} }
Expand All @@ -4132,7 +4141,7 @@ public FactoryFinder getDefaultFactoryFinder() {


public FactoryFinderResolver getFactoryFinderResolver() { public FactoryFinderResolver getFactoryFinderResolver() {
if (factoryFinderResolver == null) { if (factoryFinderResolver == null) {
synchronized (this) { synchronized (lock) {
if (factoryFinderResolver == null) { if (factoryFinderResolver == null) {
factoryFinderResolver = createFactoryFinderResolver(); factoryFinderResolver = createFactoryFinderResolver();
} }
Expand All @@ -4155,7 +4164,7 @@ protected FactoryFinder createFactoryFinder(String path) {


public ClassResolver getClassResolver() { public ClassResolver getClassResolver() {
if (classResolver == null) { if (classResolver == null) {
synchronized (this) { synchronized (lock) {
if (classResolver == null) { if (classResolver == null) {
setClassResolver(createClassResolver()); setClassResolver(createClassResolver());
} }
Expand All @@ -4170,7 +4179,7 @@ public void setClassResolver(ClassResolver classResolver) {


public PackageScanClassResolver getPackageScanClassResolver() { public PackageScanClassResolver getPackageScanClassResolver() {
if (packageScanClassResolver == null) { if (packageScanClassResolver == null) {
synchronized (this) { synchronized (lock) {
if (packageScanClassResolver == null) { if (packageScanClassResolver == null) {
setPackageScanClassResolver(createPackageScanClassResolver()); setPackageScanClassResolver(createPackageScanClassResolver());
} }
Expand All @@ -4193,7 +4202,7 @@ public List<String> getLanguageNames() {


public ModelJAXBContextFactory getModelJAXBContextFactory() { public ModelJAXBContextFactory getModelJAXBContextFactory() {
if (modelJAXBContextFactory == null) { if (modelJAXBContextFactory == null) {
synchronized (this) { synchronized (lock) {
if (modelJAXBContextFactory == null) { if (modelJAXBContextFactory == null) {
setModelJAXBContextFactory(createModelJAXBContextFactory()); setModelJAXBContextFactory(createModelJAXBContextFactory());
} }
Expand All @@ -4208,7 +4217,7 @@ public void setModelJAXBContextFactory(final ModelJAXBContextFactory modelJAXBCo


public NodeIdFactory getNodeIdFactory() { public NodeIdFactory getNodeIdFactory() {
if (nodeIdFactory == null) { if (nodeIdFactory == null) {
synchronized (this) { synchronized (lock) {
if (nodeIdFactory == null) { if (nodeIdFactory == null) {
setNodeIdFactory(createNodeIdFactory()); setNodeIdFactory(createNodeIdFactory());
} }
Expand All @@ -4223,7 +4232,7 @@ public void setNodeIdFactory(NodeIdFactory idFactory) {


public ManagementStrategy getManagementStrategy() { public ManagementStrategy getManagementStrategy() {
if (managementStrategy == null) { if (managementStrategy == null) {
synchronized (this) { synchronized (lock) {
if (managementStrategy == null) { if (managementStrategy == null) {
setManagementStrategy(createManagementStrategy()); setManagementStrategy(createManagementStrategy());
} }
Expand Down Expand Up @@ -4254,7 +4263,7 @@ public boolean isJMXDisabled() {


public InflightRepository getInflightRepository() { public InflightRepository getInflightRepository() {
if (inflightRepository == null) { if (inflightRepository == null) {
synchronized (this) { synchronized (lock) {
if (inflightRepository == null) { if (inflightRepository == null) {
setInflightRepository(createInflightRepository()); setInflightRepository(createInflightRepository());
} }
Expand All @@ -4269,7 +4278,7 @@ public void setInflightRepository(InflightRepository repository) {


public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
if (asyncProcessorAwaitManager == null) { if (asyncProcessorAwaitManager == null) {
synchronized (this) { synchronized (lock) {
if (asyncProcessorAwaitManager == null) { if (asyncProcessorAwaitManager == null) {
setAsyncProcessorAwaitManager(createAsyncProcessorAwaitManager()); setAsyncProcessorAwaitManager(createAsyncProcessorAwaitManager());
} }
Expand Down Expand Up @@ -4341,7 +4350,7 @@ public void setApplicationContextClassLoader(ClassLoader classLoader) {


public DataFormatResolver getDataFormatResolver() { public DataFormatResolver getDataFormatResolver() {
if (dataFormatResolver == null) { if (dataFormatResolver == null) {
synchronized (this) { synchronized (lock) {
if (dataFormatResolver == null) { if (dataFormatResolver == null) {
setDataFormatResolver(createDataFormatResolver()); setDataFormatResolver(createDataFormatResolver());
} }
Expand Down Expand Up @@ -4396,7 +4405,7 @@ private static <T> T lookup(CamelContext context, String ref, Class<T> type) {


public ShutdownStrategy getShutdownStrategy() { public ShutdownStrategy getShutdownStrategy() {
if (shutdownStrategy == null) { if (shutdownStrategy == null) {
synchronized (this) { synchronized (lock) {
if (shutdownStrategy == null) { if (shutdownStrategy == null) {
setShutdownStrategy(createShutdownStrategy()); setShutdownStrategy(createShutdownStrategy());
} }
Expand Down Expand Up @@ -4435,7 +4444,7 @@ public Boolean isAllowUseOriginalMessage() {


public ExecutorServiceManager getExecutorServiceManager() { public ExecutorServiceManager getExecutorServiceManager() {
if (executorServiceManager == null) { if (executorServiceManager == null) {
synchronized (this) { synchronized (lock) {
if (executorServiceManager == null) { if (executorServiceManager == null) {
setExecutorServiceManager(createExecutorServiceManager()); setExecutorServiceManager(createExecutorServiceManager());
} }
Expand All @@ -4451,7 +4460,7 @@ public void setExecutorServiceManager(ExecutorServiceManager executorServiceMana


public ProcessorFactory getProcessorFactory() { public ProcessorFactory getProcessorFactory() {
if (processorFactory == null) { if (processorFactory == null) {
synchronized (this) { synchronized (lock) {
if (processorFactory == null) { if (processorFactory == null) {
setProcessorFactory(createProcessorFactory()); setProcessorFactory(createProcessorFactory());
} }
Expand All @@ -4466,7 +4475,7 @@ public void setProcessorFactory(ProcessorFactory processorFactory) {


public MessageHistoryFactory getMessageHistoryFactory() { public MessageHistoryFactory getMessageHistoryFactory() {
if (messageHistoryFactory == null) { if (messageHistoryFactory == null) {
synchronized (this) { synchronized (lock) {
if (messageHistoryFactory == null) { if (messageHistoryFactory == null) {
setMessageHistoryFactory(createMessageHistoryFactory()); setMessageHistoryFactory(createMessageHistoryFactory());
} }
Expand All @@ -4492,7 +4501,7 @@ public void setDebugger(Debugger debugger) {


public UuidGenerator getUuidGenerator() { public UuidGenerator getUuidGenerator() {
if (uuidGenerator == null) { if (uuidGenerator == null) {
synchronized (this) { synchronized (lock) {
if (uuidGenerator == null) { if (uuidGenerator == null) {
setUuidGenerator(createUuidGenerator()); setUuidGenerator(createUuidGenerator());
} }
Expand All @@ -4507,7 +4516,7 @@ public void setUuidGenerator(UuidGenerator uuidGenerator) {


public StreamCachingStrategy getStreamCachingStrategy() { public StreamCachingStrategy getStreamCachingStrategy() {
if (streamCachingStrategy == null) { if (streamCachingStrategy == null) {
synchronized (this) { synchronized (lock) {
if (streamCachingStrategy == null) { if (streamCachingStrategy == null) {
setStreamCachingStrategy(createStreamCachingStrategy()); setStreamCachingStrategy(createStreamCachingStrategy());
} }
Expand All @@ -4522,7 +4531,7 @@ public void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy


public RestRegistry getRestRegistry() { public RestRegistry getRestRegistry() {
if (restRegistry == null) { if (restRegistry == null) {
synchronized (this) { synchronized (lock) {
if (restRegistry == null) { if (restRegistry == null) {
setRestRegistry(createRestRegistry()); setRestRegistry(createRestRegistry());
} }
Expand Down Expand Up @@ -4616,7 +4625,7 @@ public SSLContextParameters getSSLContextParameters() {
@Override @Override
public HeadersMapFactory getHeadersMapFactory() { public HeadersMapFactory getHeadersMapFactory() {
if (headersMapFactory == null) { if (headersMapFactory == null) {
synchronized (this) { synchronized (lock) {
if (headersMapFactory == null) { if (headersMapFactory == null) {
setHeadersMapFactory(createHeadersMapFactory()); setHeadersMapFactory(createHeadersMapFactory());
} }
Expand Down
Expand Up @@ -52,7 +52,11 @@ public boolean process(Exchange exchange, AsyncCallback callback) {


LoopState state = new LoopState(exchange, callback); LoopState state = new LoopState(exchange, callback);


ReactiveHelper.scheduleMain(state); if (exchange.isTransacted()) {
ReactiveHelper.scheduleSync(state);
} else {
ReactiveHelper.scheduleMain(state);
}
return false; return false;


} catch (Exception e) { } catch (Exception e) {
Expand Down
Expand Up @@ -223,7 +223,11 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
if (isParallelProcessing()) { if (isParallelProcessing()) {
executorService.submit(() -> ReactiveHelper.schedule(state)); executorService.submit(() -> ReactiveHelper.schedule(state));
} else { } else {
ReactiveHelper.scheduleMain(state); if (exchange.isTransacted()) {
ReactiveHelper.scheduleSync(state);
} else {
ReactiveHelper.scheduleMain(state);
}
} }


// the remainder of the multicast will be completed async // the remainder of the multicast will be completed async
Expand Down
Expand Up @@ -81,8 +81,13 @@ public static Processor newInstance(final CamelContext camelContext, final Proce


@Override @Override
public boolean process(Exchange exchange, AsyncCallback callback) { public boolean process(Exchange exchange, AsyncCallback callback) {
ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true), if (exchange.isTransacted()) {
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]"); ReactiveHelper.scheduleSync(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
} else {
ReactiveHelper.scheduleMain(() -> Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
"Step[" + exchange.getExchangeId() + "," + Pipeline.this + "]");
}
return false; return false;
} }


Expand Down
Expand Up @@ -152,7 +152,11 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) {
// Create the redelivery state object for this exchange // Create the redelivery state object for this exchange
RedeliveryState state = new RedeliveryState(exchange, callback); RedeliveryState state = new RedeliveryState(exchange, callback);
// Run it // Run it
ReactiveHelper.scheduleMain(state); if (exchange.isTransacted()) {
ReactiveHelper.scheduleSync(state);
} else {
ReactiveHelper.scheduleMain(state);
}
return false; return false;
} }


Expand Down

0 comments on commit 6094865

Please sign in to comment.