Skip to content
Permalink
Browse files
Make jms continuations independent of spring jms
git-svn-id: https://svn.apache.org/repos/asf/cxf/trunk@1566648 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cschneider committed Feb 10, 2014
1 parent 7c9df9d commit 9ffaa4d40cc724feeb49356d7337d9057d69364a
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 198 deletions.
@@ -21,12 +21,10 @@

import java.io.UnsupportedEncodingException;
import java.util.Calendar;
import java.util.Collection;
import java.util.GregorianCalendar;
import java.util.Map;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -51,11 +49,11 @@
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractMultiplexDestination;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.jms.continuations.JMSContinuation;
import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -71,8 +69,7 @@ public class JMSDestination extends AbstractMultiplexDestination
private Bus bus;
private EndpointInfo ei;
private AbstractMessageListenerContainer jmsListener;
private Collection<JMSContinuation> continuations =
new ConcurrentLinkedQueue<JMSContinuation>();
private ThrottlingCounter suspendedContinuations;
private ClassLoader loader;

public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
@@ -108,6 +105,10 @@ public void activate() {
Destination targetDestination = resolveTargetDestination();
jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this,
targetDestination);
int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100;
this.suspendedContinuations = new ThrottlingCounter(new SpringJMSListenerAdapter(this.jmsListener),
restartLimit,
jmsConfig.getMaxSuspendedContinuations());
}

private Destination resolveTargetDestination() {
@@ -185,13 +186,11 @@ public void onMessage(javax.jms.Message message) {
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
((MessageImpl)inMessage).setDestination(this);
if (jmsConfig.getMaxSuspendedContinuations() != 0) {
inMessage.put(ContinuationProvider.class.getName(),
new JMSContinuationProvider(bus,
inMessage,
incomingObserver,
continuations,
jmsListener,
jmsConfig));
JMSContinuationProvider cp = new JMSContinuationProvider(bus,
inMessage,
incomingObserver,
suspendedContinuations);
inMessage.put(ContinuationProvider.class.getName(), cp);
}

origBus = BusFactory.getAndSetThreadDefaultBus(bus);
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.transport.jms;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.cxf.transport.jms.continuations.Counter;
import org.apache.cxf.transport.jms.util.JMSListenerContainer;

/**
* Counter that throttles a jms listener on a high and low water mark.
*
* When the counter reaches the high watermark the listener will be stopped.
* When the counter reaches the low watermark the listener will be started.
*/
public class ThrottlingCounter implements Counter {

private AtomicInteger counter;
private int lowWatermark;
private int highWatermark;
private JMSListenerContainer listenerContainer;

public ThrottlingCounter(JMSListenerContainer listenerContainer, int lowWatermark, int highWatermark) {
this.counter = new AtomicInteger();
this.lowWatermark = lowWatermark;
this.highWatermark = highWatermark;
this.listenerContainer = listenerContainer;
}

public final int incrementAndGet() {
int curCounter = counter.incrementAndGet();
if (curCounter >= highWatermark && listenerContainer.isRunning()) {
listenerContainer.stop();
}
return curCounter;
}

public final int decrementAndGet() {
int curCounter = counter.decrementAndGet();
if (curCounter <= lowWatermark && !listenerContainer.isRunning()) {
listenerContainer.start();
}
return curCounter;
}

}
@@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.transport.jms.continuations;

public interface Counter {
int incrementAndGet();
int decrementAndGet();
}
@@ -19,7 +19,6 @@

package org.apache.cxf.transport.jms.continuations;

import java.util.Collection;
import java.util.logging.Logger;

import org.apache.cxf.Bus;
@@ -30,43 +29,33 @@
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.jms.JMSConfiguration;
import org.apache.cxf.workqueue.WorkQueue;
import org.apache.cxf.workqueue.WorkQueueManager;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class JMSContinuation implements Continuation {
private static final Logger LOG = LogUtils.getL7dLogger(JMSContinuation.class);
private Bus bus;
private Message inMessage;
private MessageObserver incomingObserver;
private Collection<JMSContinuation> continuations;
private AbstractMessageListenerContainer jmsListener;
private JMSConfiguration jmsConfig;

private Counter suspendendContinuations;

private volatile Object userObject;

private volatile boolean isNew = true;
private volatile boolean isPending;
private volatile boolean isResumed;
private volatile boolean isCanceled;
private WorkQueue workQueue;
private ClassLoader loader;

public JMSContinuation(Bus b, Message m, MessageObserver observer,
Collection<JMSContinuation> cList,
AbstractMessageListenerContainer jmsListener,
JMSConfiguration jmsConfig) {

public JMSContinuation(Bus b, Message m, MessageObserver observer, Counter suspendendContinuations) {
bus = b;
inMessage = m;
inMessage = m;
incomingObserver = observer;
continuations = cList;
this.jmsListener = jmsListener;
this.jmsConfig = jmsConfig;
this.suspendendContinuations = suspendendContinuations;
WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
if (manager != null) {
workQueue = manager.getNamedWorkQueue("jms-continuation");
workQueue = manager.getNamedWorkQueue("jms-continuation");
if (workQueue == null) {
workQueue = manager.getAutomaticWorkQueue();
}
@@ -110,7 +99,7 @@ public synchronized void resume() {
}

protected void doResume() {
updateContinuations(true);
suspendendContinuations.decrementAndGet();
ClassLoaderHolder origLoader = null;
Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
try {
@@ -139,9 +128,9 @@ public synchronized boolean suspend(long timeout) {
return false;
}
inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
updateContinuations(false);

suspendendContinuations.incrementAndGet();

isNew = false;
isResumed = false;
isPending = true;
@@ -167,45 +156,6 @@ public void run() {
protected synchronized void cancelTimerTask() {
isCanceled = true;
}

protected void updateContinuations(boolean remove) {

if (jmsConfig.getMaxSuspendedContinuations() < 0
|| (jmsListener instanceof DefaultMessageListenerContainer
&& ((DefaultMessageListenerContainer)jmsListener).getCacheLevel()
>= DefaultMessageListenerContainer.CACHE_CONSUMER)) {
modifyList(remove);
return;
}

// throttle the flow if there're too many continuation instances in memory
synchronized (continuations) {
modifyList(remove);
if (continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
jmsListener.stop();
} else if (!jmsListener.isRunning()) {
int limit = jmsConfig.getReconnectPercentOfMax();
if (limit < 0 || limit > 100) {
limit = 70;
}
limit = (limit * jmsConfig.getMaxSuspendedContinuations()) / 100;

if (continuations.size() <= limit) {
jmsListener.start();
}
}
}

}

protected void modifyList(boolean remove) {
if (remove) {
continuations.remove(this);
} else {
continuations.add(this);
}
}



}
@@ -19,38 +19,29 @@

package org.apache.cxf.transport.jms.continuations;

import java.util.Collection;

import org.apache.cxf.Bus;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.jms.JMSConfiguration;
import org.springframework.jms.listener.AbstractMessageListenerContainer;

public class JMSContinuationProvider implements ContinuationProvider {

private Bus bus;
private Message inMessage;
private MessageObserver incomingObserver;
private Collection<JMSContinuation> continuations;
private AbstractMessageListenerContainer jmsListener;
private JMSConfiguration jmsConfig;
private Counter suspendendContinuations;

public JMSContinuationProvider(Bus b,
Message m,
MessageObserver observer,
Collection<JMSContinuation> cList,
AbstractMessageListenerContainer jmsListener,
JMSConfiguration jmsConfig) {
Counter suspendendContinuations) {
bus = b;
inMessage = m;
incomingObserver = observer;
continuations = cList;
this.jmsListener = jmsListener;
this.jmsConfig = jmsConfig;
this.suspendendContinuations = suspendendContinuations;
}

public void complete() {
JMSContinuation cw = inMessage.get(JMSContinuation.class);
if (cw != null) {
@@ -69,8 +60,7 @@ public Continuation getContinuation() {
}
JMSContinuation cw = m.get(JMSContinuation.class);
if (cw == null) {
cw = new JMSContinuation(bus, m, incomingObserver, continuations,
jmsListener, jmsConfig);
cw = new JMSContinuation(bus, m, incomingObserver, suspendendContinuations);
m.put(JMSContinuation.class, cw);
}
return cw;
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.transport.jms.util;

public interface JMSListenerContainer {
boolean isRunning();
void stop();
void start();
}

0 comments on commit 9ffaa4d

Please sign in to comment.