Skip to content
Permalink
Browse files
CXF-5543 Use interface for MessageListenerContainer. User our own Des…
…tinationResolver impl

git-svn-id: https://svn.apache.org/repos/asf/cxf/trunk@1566716 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cschneider committed Feb 10, 2014
1 parent d37bdf0 commit 707c179ecaef3145d0ceb13b6368bfb6b3f704ae
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 58 deletions.
@@ -44,13 +44,12 @@
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.security.SecurityContext;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.transport.jms.util.JMSListenerContainer;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;

/**
* JMSConduit is instantiated by the JMSTransportFactory which is selected by a client if the transport
@@ -66,7 +65,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me

private JMSConfiguration jmsConfig;
private Map<String, Exchange> correlationMap = new ConcurrentHashMap<String, Exchange>();
private DefaultMessageListenerContainer jmsListener;
private JMSListenerContainer jmsListener;
private String conduitId;
private AtomicLong messageCount;
private JMSBusLifeCycleListener listener;
@@ -134,10 +133,7 @@ public void sendExchange(final Exchange exchange, final Object request) {
ResourceCloser closer = new ResourceCloser();
try {
Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
DestinationResolver resolver = jmsConfig.getDestinationResolver();
Destination targetDest = resolver.resolveDestinationName(session,
jmsConfig.getTargetDestination(),
jmsConfig.isPubSubDomain());
Destination targetDest = jmsConfig.getTargetDestination(session);

Destination replyToDestination = null;
if (!exchange.isOneWay()) {
@@ -27,13 +27,12 @@
import javax.jms.Session;

import org.apache.cxf.common.injection.NoJSR250Annotations;
import org.apache.cxf.transport.jms.util.JMSDestinationResolver;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.jndi.JndiTemplate;
import org.springframework.transaction.PlatformTransactionManager;

@@ -47,11 +46,9 @@ public class JMSConfiguration implements InitializingBean {

private boolean usingEndpointInfo = true;

private AbstractMessageListenerContainer messageListenerContainer;

private JndiTemplate jndiTemplate;
private ConnectionFactory connectionFactory;
private DestinationResolver destinationResolver = new DynamicDestinationResolver();
private DestinationResolver destinationResolver = new JMSDestinationResolver();
private PlatformTransactionManager transactionManager;
private TaskExecutor taskExecutor;
private boolean reconnectOnException = true;
@@ -487,14 +484,6 @@ public boolean isSetEnforceSpec() {
return this.enforceSpec != null;
}

public AbstractMessageListenerContainer getMessageListenerContainer() {
return messageListenerContainer;
}

public void setMessageListenerContainer(AbstractMessageListenerContainer messageListenerContainer) {
this.messageListenerContainer = messageListenerContainer;
}

/** * @return Returns the jmsProviderTibcoEms.
*/
public boolean isJmsProviderTibcoEms() {
@@ -523,19 +512,23 @@ public Destination getReplyToDestination(Session session, String userDestination
if (replyTo == null) {
return getReplyDestination(session);
}
return getDestinationResolver().resolveDestinationName(session, replyTo, replyPubSubDomain);
return destinationResolver.resolveDestinationName(session, replyTo, replyPubSubDomain);
}

public Destination getReplyDestination(Session session) throws JMSException {
if (replyDestinationDest == null) {
replyDestinationDest = replyDestination == null
? session.createTemporaryQueue()
: getDestinationResolver().resolveDestinationName(session, replyDestination, replyPubSubDomain);
: destinationResolver.resolveDestinationName(session, replyDestination, replyPubSubDomain);
}
return replyDestinationDest;
}

public Destination getTargetDestination(Session session) throws JMSException {
return destinationResolver.resolveDestinationName(session, targetDestination, pubSubDomain);
}

public Destination getReplyDestination(Session session, String replyToName) throws JMSException {
return destinationResolver.resolveDestinationName(session, replyToName, replyPubSubDomain);
}
}
@@ -50,15 +50,12 @@
import org.apache.cxf.transport.AbstractMultiplexDestination;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
import org.apache.cxf.transport.jms.util.JMSListenerContainer;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.JmsUtils;
import org.springframework.jms.support.destination.DestinationResolver;

public class JMSDestination extends AbstractMultiplexDestination
implements MessageListener, JMSExchangeSender {
@@ -68,7 +65,7 @@ public class JMSDestination extends AbstractMultiplexDestination
private JMSConfiguration jmsConfig;
private Bus bus;
private EndpointInfo ei;
private AbstractMessageListenerContainer jmsListener;
private JMSListenerContainer jmsListener;
private ThrottlingCounter suspendedContinuations;
private ClassLoader loader;

@@ -96,17 +93,12 @@ protected Conduit getInbuiltBackChannel(Message inMessage) {
public void activate() {
getLogger().log(Level.FINE, "JMSDestination activate().... ");
jmsConfig.ensureProperlyConfigured();
Object o = ei.getProperty(AbstractMessageListenerContainer.class.getName());
if (o instanceof AbstractMessageListenerContainer
&& jmsConfig.getMessageListenerContainer() == null) {
jmsConfig.setMessageListenerContainer((AbstractMessageListenerContainer)o);
}

Destination targetDestination = resolveTargetDestination();
jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this,
targetDestination);
int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100;
this.suspendedContinuations = new ThrottlingCounter(new SpringJMSListenerAdapter(this.jmsListener),
this.suspendedContinuations = new ThrottlingCounter(this.jmsListener,
restartLimit,
jmsConfig.getMaxSuspendedContinuations());
}
@@ -140,8 +132,7 @@ public Destination getReplyToDestination(Session session,
// If WS-Addressing had set the replyTo header.
final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
if (replyToName != null) {
DestinationResolver resolver = jmsConfig.getDestinationResolver();
return resolver.resolveDestinationName(session, replyToName, jmsConfig.isReplyPubSubDomain());
return jmsConfig.getReplyDestination(session, replyToName);
} else if (message.getJMSReplyTo() != null) {
return message.getJMSReplyTo();
} else {
@@ -283,7 +274,7 @@ public void sendExchange(Exchange exchange, final Object replyObj) {
LOG.log(Level.FINE, "server sending reply: ", reply);
sender.sendMessage(closer, session, replyTo, reply);
} catch (JMSException ex) {
throw JmsUtils.convertJmsAccessException(ex);
throw JMSUtil.convertJmsException(ex);
} finally {
closer.close();
}
@@ -31,12 +31,13 @@

import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.jms.util.JMSListenerContainer;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.transport.jms.util.SessionFactory;
import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/**
@@ -121,21 +122,11 @@ public static JMSSender createJmsSender(JMSConfiguration jmsConfig,
* @param destination to listen on
* @return
*/
public static AbstractMessageListenerContainer createJmsListener(EndpointInfo ei,
public static JMSListenerContainer createJmsListener(EndpointInfo ei,
JMSConfiguration jmsConfig,
MessageListener listenerHandler,
Destination destination) {

if (jmsConfig.getMessageListenerContainer() != null) {
AbstractMessageListenerContainer jmsListener = jmsConfig.getMessageListenerContainer();
if (jmsListener.getMessageListener() == null) {
jmsListener.setMessageListener(listenerHandler);
jmsListener.initialize();
jmsListener.start();
}
return jmsListener;
}

DefaultMessageListenerContainer jmsListener = null;

//Check to see if transport is being used in JCA RA with XA
@@ -150,11 +141,12 @@ public static AbstractMessageListenerContainer createJmsListener(EndpointInfo ei
jmsListener = new DefaultMessageListenerContainer();
}

return createJmsListener(jmsListener,
createJmsListener(jmsListener,
jmsConfig,
listenerHandler,
destination,
null);
null);
return new SpringJMSListenerAdapter(jmsListener);
}

/**
@@ -167,19 +159,20 @@ public static AbstractMessageListenerContainer createJmsListener(EndpointInfo ei
* @param conduitId id for message selector
* @return
*/
public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
public static JMSListenerContainer createJmsListener(JMSConfiguration jmsConfig,
MessageListener listenerHandler,
Destination destination,
String conduitId) {
DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer();
return createJmsListener(jmsListener,
createJmsListener(jmsListener,
jmsConfig,
listenerHandler,
destination,
conduitId);
conduitId);
return new SpringJMSListenerAdapter(jmsListener);
}

private static DefaultMessageListenerContainer createJmsListener(
private static void createJmsListener(
DefaultMessageListenerContainer jmsListener,
JMSConfiguration jmsConfig,
MessageListener listenerHandler,
@@ -241,7 +234,6 @@ private static DefaultMessageListenerContainer createJmsListener(
jmsListener.setDestination(destination);
jmsListener.initialize();
jmsListener.start();
return jmsListener;
}

public static SessionFactory createJmsSessionFactory(JMSConfiguration jmsConfig, ResourceCloser closer) {
@@ -39,6 +39,7 @@
import org.apache.cxf.transport.jms.uri.JMSEndpoint;
import org.apache.cxf.transport.jms.uri.JMSEndpointParser;
import org.apache.cxf.transport.jms.uri.JMSURIConstants;
import org.apache.cxf.transport.jms.util.JMSDestinationResolver;
import org.apache.cxf.transport.jms.wsdl.DeliveryModeType;
import org.apache.cxf.transport.jms.wsdl.JndiConnectionFactoryNameType;
import org.apache.cxf.transport.jms.wsdl.JndiContextParameterType;
@@ -49,7 +50,6 @@
import org.apache.cxf.transport.jms.wsdl.TimeToLiveType;
import org.apache.cxf.transport.jms.wsdl.TopicReplyToNameType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.springframework.jms.support.destination.JndiDestinationResolver;
import org.springframework.jndi.JndiTemplate;

public class JMSOldConfigHolder {
@@ -276,7 +276,7 @@ protected JMSConfiguration configureEndpoint(boolean isConduit, JMSEndpoint endp
boolean useJndi = endpoint.getJmsVariant().contains(JMSURIConstants.JNDI);
if (useJndi) {
// Setup Destination jndi destination resolver
final JndiDestinationResolver jndiDestinationResolver = new JndiDestinationResolver();
final JMSDestinationResolver jndiDestinationResolver = new JMSDestinationResolver();
jndiDestinationResolver.setJndiTemplate(jt);
jmsConfig.setDestinationResolver(jndiDestinationResolver);
jmsConfig.setTargetDestination(endpoint.getDestinationName());
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.transport.jms.util;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.naming.NamingException;

import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jndi.JndiTemplate;

public class JMSDestinationResolver implements DestinationResolver {
JndiTemplate jndiTemplate;

public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
throws JMSException {
if (jndiTemplate != null) {
try {
return jndiTemplate.lookup(destinationName, Destination.class);
} catch (NamingException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
if (pubSubDomain) {
return session.createTopic(destinationName);
} else {
return session.createQueue(destinationName);
}
}

public void setJndiTemplate(JndiTemplate jt) {
this.jndiTemplate = jt;
}

}
@@ -23,4 +23,5 @@ public interface JMSListenerContainer {
boolean isRunning();
void stop();
void start();
void shutdown();
}
@@ -43,4 +43,9 @@ public void start() {
container.start();
}

@Override
public void shutdown() {
container.shutdown();
}

}
@@ -71,7 +71,10 @@ private void receiveAndRespondWithMessageIdAsCorrelationId() {
Session session = new SessionFactory(connectionFactory, closer).createSession();
MessageConsumer consumer = closer.register(session.createConsumer(session
.createQueue(receiveQueueName)));
final javax.jms.Message inMessage = consumer.receive();
final javax.jms.Message inMessage = consumer.receive(2000);
if (inMessage == null) {
throw new RuntimeException("No message received on destination " + receiveQueueName);
}
requestMessageId = inMessage.getJMSMessageID();
System.out.println("Received message " + requestMessageId);
final TextMessage replyMessage = session.createTextMessage("Result");
@@ -60,6 +60,10 @@ public void start() {
running = true;
}

@Override
public void shutdown() {
}

}

}

0 comments on commit 707c179

Please sign in to comment.