Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.smallrye.faulttolerance.core.fallback.Fallback;
import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
import io.smallrye.faulttolerance.core.timeout.Timeout;
import io.smallrye.faulttolerance.core.timer.ThreadTimer;
import io.smallrye.faulttolerance.core.timer.Timer;
import io.smallrye.faulttolerance.core.util.ExceptionDecision;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -83,12 +83,10 @@ public class FaultToleranceProcessor extends AsyncProcessorSupport
private boolean shutdownScheduledExecutorService;
private ExecutorService executorService;
private boolean shutdownExecutorService;
private ExecutorService threadTimerExecutorService;
private boolean shutdownThreadTimerExecutorService;
private ProcessorExchangeFactory processorExchangeFactory;
private PooledExchangeTaskFactory taskFactory;
private PooledExchangeTaskFactory fallbackTaskFactory;
private ThreadTimer timer;
private Timer timer;

public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor,
Processor fallbackProcessor) {
Expand Down Expand Up @@ -151,6 +149,14 @@ public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}

public Timer getTimer() {
return timer;
}

public void setTimer(Timer timer) {
this.timer = timer;
}

@Override
public String getTraceLabel() {
return "faultTolerance";
Expand Down Expand Up @@ -262,7 +268,7 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
}
// 2. timeout
if (config.isTimeoutEnabled()) {
target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), timer);
target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), getTimer());
}
// 3. fallback
if (fallbackProcessor != null) {
Expand Down Expand Up @@ -349,15 +355,10 @@ public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
protected void doInit() throws Exception {
ObjectHelper.notNull(camelContext, "CamelContext", this);
if (circuitBreaker == null) {
threadTimerExecutorService
= getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer");
shutdownThreadTimerExecutorService = true;

timer = ThreadTimer.create(threadTimerExecutorService);
circuitBreaker = new CircuitBreaker<>(
invocation(), id, ExceptionDecision.ALWAYS_FAILURE, config.getDelay(), config.getRequestVolumeThreshold(),
config.getFailureRatio(),
config.getSuccessThreshold(), SystemStopwatch.INSTANCE, timer);
config.getSuccessThreshold(), SystemStopwatch.INSTANCE, getTimer());
}

ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
Expand Down Expand Up @@ -389,14 +390,6 @@ protected void doStop() throws Exception {
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
executorService = null;
}
if (timer != null) {
timer.shutdown();
timer = null;
}
if (shutdownThreadTimerExecutorService && threadTimerExecutorService != null) {
getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
threadTimerExecutorService = null;
}

ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import io.smallrye.faulttolerance.ExecutorHolder;
import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
import io.smallrye.faulttolerance.core.timer.Timer;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.CircuitBreakerDefinition;
Expand Down Expand Up @@ -69,6 +71,7 @@ public Processor createProcessor() throws Exception {
answer.setCircuitBreaker(cb);
}
configureBulkheadExecutorService(answer, config);
configureTimer(answer);
return answer;
}

Expand Down Expand Up @@ -126,6 +129,25 @@ private void configureBulkheadExecutorService(FaultToleranceProcessor processor,
}
}

private void configureTimer(FaultToleranceProcessor answer) throws Exception {
Timer timer;

// If running in a CDI container, try to find the singleton scoped ExecutorHolder. Else we have to manage the Timer ourselves
ExecutorHolder executorHolder = findSingleByType(ExecutorHolder.class);
if (executorHolder != null) {
timer = executorHolder.getTimer();
} else {
FaultToleranceTimerService threadTimerService = camelContext.hasService(FaultToleranceTimerService.class);
if (threadTimerService == null) {
threadTimerService = new FaultToleranceTimerService();
camelContext.addService(threadTimerService);
}
timer = threadTimerService.getTimer();
}

answer.setTimer(timer);
}

// *******************************
// Helpers
// *******************************
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.microprofile.faulttolerance;

import java.util.concurrent.ExecutorService;

import io.smallrye.faulttolerance.core.timer.ThreadTimer;
import io.smallrye.faulttolerance.core.timer.Timer;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.support.service.ServiceSupport;

/**
* Service to manage the lifecycle of the SmallRye Fault Tolerance Timer. Primarily used when running without CDI
* container support.
*/
public class FaultToleranceTimerService extends ServiceSupport implements CamelContextAware {
private ExecutorService threadTimerExecutorService;
private Timer timer;
private CamelContext camelContext;

@Override
protected void doInit() throws Exception {
threadTimerExecutorService
= getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer");
timer = ThreadTimer.create(threadTimerExecutorService);
}

@Override
protected void doStop() throws Exception {
if (timer != null) {
try {
timer.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
timer = null;
}
}

if (threadTimerExecutorService != null) {
getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
threadTimerExecutorService = null;
}
}

@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}

@Override
public CamelContext getCamelContext() {
return this.camelContext;
}

public Timer getTimer() {
return timer;
}
}