Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeSmell authored and davsclaus committed Jan 10, 2018
1 parent 9cc0eed commit 4f65a94
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
*/
package org.apache.camel.impl;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
Expand All @@ -26,14 +34,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
* this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
Expand Down Expand Up @@ -74,17 +74,13 @@ public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implement
// stateful information
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger state = new AtomicInteger(STATE_CLOSED);
private AtomicBoolean keepOpen = new AtomicBoolean(false);
private final AtomicBoolean keepOpen = new AtomicBoolean(false);
private volatile Timer halfOpenTimer;
private volatile long lastFailure;
private volatile long openedAt;

public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
this.throttledExceptions = handledExceptions;
this.failureWindow = failureWindow;
this.halfOpenAfter = halfOpenAfter;
this.failureThreshold = threshold;
this.keepOpen.set(false);
this(threshold, failureWindow, halfOpenAfter, handledExceptions, false);
}

public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) {
Expand Down Expand Up @@ -333,7 +329,7 @@ public boolean getKeepOpen() {
}

public void setKeepOpen(boolean keepOpen) {
log.debug("keep open:" + keepOpen);
log.debug("keep open: {}", keepOpen);
this.keepOpen.set(keepOpen);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
Expand Down Expand Up @@ -29,7 +45,7 @@ public void setUp() throws Exception {
protected void createPolicy() {
int threshold = 2;
long failureWindow = 30;
long halfOpenAfter = 1000;
long halfOpenAfter = 100;
boolean keepOpen = true;
policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
}
Expand All @@ -45,7 +61,7 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOn() throws Exception {

// gives time for policy half open check to run every second
// and should not close b/c keepOpen is true
Thread.sleep(2000);
Thread.sleep(500);

// gives time for policy half open check to run every second
// but it should never close b/c keepOpen is true
Expand All @@ -64,7 +80,7 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOnThenClose() throws Exc

// gives time for policy half open check to run every second
// and should not close b/c keepOpen is true
Thread.sleep(2000);
Thread.sleep(500);

result.expectedMessageCount(0);
result.setResultWaitTime(1500);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;

import org.apache.camel.ContextTestSupport;
Expand Down Expand Up @@ -29,7 +45,7 @@ public void setUp() throws Exception {
protected void createPolicy() {
int threshold = 2;
long failureWindow = 30;
long halfOpenAfter = 1000;
long halfOpenAfter = 100;
boolean keepOpen = false;
policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null, keepOpen);
}
Expand All @@ -44,7 +60,7 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws E
Thread.sleep(3);
}
result.expectedMessageCount(size);
result.setResultWaitTime(2000);
result.setResultWaitTime(1000);
assertMockEndpointsSatisfied();

// set keepOpen to true
Expand All @@ -55,7 +71,7 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws E
template.sendBody(url, "MessageTrigger");

// give time for circuit to open
Thread.sleep(1000);
Thread.sleep(500);

// send next set of messages
// should NOT go through b/c circuit is open
Expand All @@ -66,10 +82,10 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws E

// gives time for policy half open check to run every second
// and should not close b/c keepOpen is true
Thread.sleep(2000);
Thread.sleep(500);

result.expectedMessageCount(size + 1);
result.setResultWaitTime(2000);
result.setResultWaitTime(1000);
assertMockEndpointsSatisfied();

// set keepOpen to false
Expand All @@ -78,7 +94,7 @@ public void testThrottlingRoutePolicyStartWithAlwaysOpenOffThenToggle() throws E
// gives time for policy half open check to run every second
// and it should close b/c keepOpen is false
result.expectedMessageCount(size * 2 + 1);
result.setResultWaitTime(2000);
result.setResultWaitTime(1000);
assertMockEndpointsSatisfied();
}

Expand All @@ -87,11 +103,6 @@ protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
int threshold = 2;
long failureWindow = 30;
long halfOpenAfter = 1000;
policy = new ThrottlingExceptionRoutePolicy(threshold, failureWindow, halfOpenAfter, null);

from(url)
.routePolicy(policy)
.log("${body}")
Expand Down

4 comments on commit 4f65a94

@codecracker2014
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @CodeSmell i'm using this policy to detect availability of a web service and stop route for some time, but before onExchangeDone called error handler moves the current message to dead letter queue. I want to avoid putting messages to DLQ for specific situation which can be handled using ThrottlingExceptionRoutePolicy.
Is it possible with ThrottlingExceptionRoutePolicy not to put current message to DLQ where ThrottlingExceptionRoutePolicy needs to take action.

@CodeSmell
Copy link
Contributor Author

@CodeSmell CodeSmell commented on 4f65a94 Jan 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codecracker2014
Based on the question I assume you are consuming from a JMS queue/topic and then calling a web service. If that is the case, this policy/circuit breaker would detect failures calling the web service and then stop consuming from the endpoint once the configurable threshold was met.

Any messages that are consumed from the queue/topic will be rolled back (assuming that there is a transaction) when the exception is thrown calling the failing web service. The exception will be counted by the policy. This policy/circuit breaker, once opened, will minimize the number of messages that go on the DLQ. However, the handling of the message that is rolled back will be determined by the JMS broker. If it is configured to move messages to a DLQ on rollback there is nothing the policy can do to stop that.

Given this scenario, I recommend implementing the ThrottingExceptionHalfOpenHandler to check on the web service rather than rely on the default behavior of resuming the route during the half open state. Any messages consumed during this check will also go to the DLQ.

PS: probably best to ask the question in one place :)

@codecracker2014
Copy link

@codecracker2014 codecracker2014 commented on 4f65a94 Jan 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodeSmell
Ok i can configure camel to handle on going message, but in case of concurrent consumers of source queue all the on going exchanges are getting failed on stop of route as below. Can this be handled in ThrottlingExceptionRoutePolicy

Log:
EventDrivenConsumerRoute[activemq://smsqueue -> Channel[TransactionErrorHandler:PROPAGATION_REQUIRES_NEW[Pipeline[[Channel[BeanProcessor[com.nucleus.integration.ws.server.sms.route.AfterQueueProcessor(0x50c7d3e5)]], Channel[SetHeader(CamelHttpMethod, POST)], Channel[SetHeader(Content-Type, application/json)], Channel[sendTo(http://localhost:8080/camel-hello-1.0.0/newOrder?quantity=1&orderId=dd&productName=bb)]]]]]] 15:54:02.088 [Camel (neutrinoCoreIntegrationCamelContext) thread #1 - JmsConsumer[smsqueue]] WARN o.a.c.c.j.DefaultJmsMessageListenerContainer - Rejecting received message because of the listener container having been stopped in the meantime: ActiveMQBytesMessage {commandId = 53892, responseRequired = false, messageId = ID:NB1053-30288-1516615103332-1:21:17:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:NB1053-30288-1516615103332-1:21:17:1, destination = queue://smsqueue, transactionId = TX:ID:NB1053-30288-1516615103332-1:21:62, expiration = 0, timestamp = 1516616473191, arrival = 0, brokerInTime = 1516616473191, brokerOutTime = 1516616642088, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2b248ddd, marshalledProperties = org.apache.activemq.util.ByteSequence@6ba45c21, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID-NB1053-30286-1516615089364-0-95}, readOnlyProperties = true, readOnlyBody = true, droppable = false} ActiveMQBytesMessage{ bytesOut = null, dataOut = null, dataIn = null }

@CodeSmell
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that if the circuit is being opened that the web service is failing and that all in-flight messages would fail. If there is a way to handle these consumed messages in a way that does not involve a rollback you may want to consider the Camel/Hystrix circuit breaker. Here each message is processed via fallback option instead of rolled back.

Please sign in to comment.