Skip to content
Permalink
Browse files
CXF-5543 Replacing message listener container and removing spring jms…
… dep

git-svn-id: https://svn.apache.org/repos/asf/cxf/trunk@1567186 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
cschneider committed Feb 11, 2014
1 parent b40ff38 commit 4b48fab7277f46abdb3d84aed23d0f3163633960
Show file tree
Hide file tree
Showing 27 changed files with 531 additions and 623 deletions.
@@ -253,7 +253,6 @@ public void add(Interceptor<? extends Message> i, boolean force) {
}
}


public synchronized void pause() {
state = State.PAUSED;
pausedMessage = CURRENT_MESSAGE.get();
@@ -106,18 +106,10 @@
<version>3.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
@@ -19,29 +19,43 @@
package org.apache.cxf.transport.jms;

import java.io.IOException;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.AbstractConduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;

/**
* Conduit for sending the reply back to the client
*/
class BackChannelConduit extends AbstractConduit {
class BackChannelConduit extends AbstractConduit implements JMSExchangeSender {
private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
protected Message inMessage;
private JMSExchangeSender sender;

BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
super(ref);
inMessage = message;
this.sender = sender;
private JMSConfiguration jmsConfig;
private Message inMessage;
BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) {
super(EndpointReferenceUtils.getAnonymousEndpointReference());
this.inMessage = inMessage;
this.jmsConfig = jmsConfig;
}
@Override
public void close(Message msg) throws IOException {
@@ -75,15 +89,122 @@ public void prepare(final Message message) throws IOException {
message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
}

final Exchange exchange = inMessage.getExchange();
exchange.setOutMessage(message);

Exchange exchange = inMessage.getExchange();
exchange.setOutMessage(message);

boolean isTextMessage = (jmsMessage instanceof TextMessage) && !JMSMessageUtils.isMtomEnabled(message);
MessageStreamUtil.prepareStream(message, isTextMessage, sender);
MessageStreamUtil.prepareStream(message, isTextMessage, this);
}

protected Logger getLogger() {
return LOG;
}

public void sendExchange(Exchange exchange, final Object replyObj) {
if (exchange.isOneWay()) {
//Don't need to send anything
return;
}

final Message outMessage = exchange.getOutMessage();

ResourceCloser closer = new ResourceCloser();
try {
ConnectionFactory cf = jmsConfig.getConnectionFactory();
Connection connection = closer.register(cf.createConnection());
Session session = closer
.register(connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE));

final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
JMSMessageHeadersType inMessageProperties = (JMSMessageHeadersType)inMessage
.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
initResponseMessageProperties(messageProperties, inMessageProperties);

// setup the reply message
final javax.jms.Message request = (javax.jms.Message)inMessage
.get(JMSConstants.JMS_REQUEST_MESSAGE);
final String msgType = JMSMessageUtils.isMtomEnabled(outMessage)
? JMSConstants.BINARY_MESSAGE_TYPE : JMSMessageUtils.getMessageType(request);
if (isTimedOut(request)) {
return;
}

Destination replyTo = getReplyToDestination(session, inMessage);
if (replyTo == null) {
throw new RuntimeException("No replyTo destination set");
}

getLogger().log(Level.FINE, "send out the message!");

String correlationId = determineCorrelationID(request);
javax.jms.Message reply = JMSMessageUtils.asJMSMessage(jmsConfig,
outMessage,
replyObj,
msgType,
session,
correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties);
LOG.log(Level.FINE, "server sending reply: ", reply);
sender.sendMessage(closer, session, replyTo, reply);
} catch (JMSException ex) {
throw JMSUtil.convertJmsException(ex);
} finally {
closer.close();
}
}

/**
* @param messageProperties
* @param inMessageProperties
*/
public static void initResponseMessageProperties(JMSMessageHeadersType messageProperties,
JMSMessageHeadersType inMessageProperties) {
messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
messageProperties.setSOAPJMSBindingVersion("1.0");
}

private boolean isTimedOut(final javax.jms.Message request) throws JMSException {
if (request.getJMSExpiration() > 0) {
TimeZone tz = new SimpleTimeZone(0, "GMT");
Calendar cal = new GregorianCalendar(tz);
long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
if (timeToLive < 0) {
getLogger()
.log(Level.INFO, "Message time to live is already expired skipping response.");
return true;
}
}
return false;
}

private Destination getReplyToDestination(Session session, Message inMessage2) throws JMSException {
javax.jms.Message message = (javax.jms.Message)inMessage2.get(JMSConstants.JMS_REQUEST_MESSAGE);
// If WS-Addressing had set the replyTo header.
final String replyToName = (String)inMessage2.get(JMSConstants.JMS_REBASED_REPLY_TO);
if (replyToName != null) {
return jmsConfig.getReplyDestination(session, replyToName);
} else if (message.getJMSReplyTo() != null) {
return message.getJMSReplyTo();
} else {
return jmsConfig.getReplyDestination(session);
}
}

/**
* Decides what correlationId to use for the reply by looking at the request headers
*
* @param request jms request message
* @return correlation id of request if set else message id from request
* @throws JMSException
*/
public String determineCorrelationID(javax.jms.Message request) throws JMSException {
return StringUtils.isEmpty(request.getJMSCorrelationID())
? request.getJMSMessageID()
: request.getJMSCorrelationID();
}

}

This file was deleted.

@@ -29,6 +29,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
@@ -49,7 +50,6 @@
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.springframework.jms.connection.SingleConnectionFactory;

/**
* JMSConduit is instantiated by the JMSTransportFactory which is selected by a client if the transport
@@ -70,6 +70,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
private AtomicLong messageCount;
private JMSBusLifeCycleListener listener;
private Bus bus;
private Connection connection;
private Destination staticReplyDestination;

public JMSConduit(EndpointReferenceType target,
@@ -97,12 +98,11 @@ public void close(Message msg) throws IOException {
MessageStreamUtil.closeStreams(msg);
super.close(msg);
}

private synchronized void getJMSListener(Destination replyTo) {
if (jmsListener == null) {
jmsListener = JMSFactory.createJmsListener(jmsConfig,
this,
replyTo,
conduitId);
jmsListener = JMSFactory
.createSimpleJmsListener(jmsConfig, connection, this, replyTo, conduitId);
addBusListener();
}
}
@@ -132,7 +132,11 @@ public void sendExchange(final Exchange exchange, final Object request) {

ResourceCloser closer = new ResourceCloser();
try {
Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
if (connection == null) {
connection = JMSFactory.createConnection(jmsConfig);
}
Session session = closer.register(connection.createSession(jmsConfig.isSessionTransacted(),
Session.AUTO_ACKNOWLEDGE));
Destination targetDest = jmsConfig.getTargetDestination(session);

Destination replyToDestination = null;
@@ -143,6 +147,7 @@ public void sendExchange(final Exchange exchange, final Object request) {
}
replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo());
}
connection.start();

String messageType = jmsConfig.getMessageType();
String correlationId = createCorrelationId(exchange, userCID);
@@ -301,6 +306,8 @@ public void onMessage(javax.jms.Message jmsMessage) {
String correlationId = jmsMessage.getJMSCorrelationID();
LOG.log(Level.INFO, "Received reply message with correlation id " + correlationId);

// Try to correlate the incoming message with some timeout as it may have been
// added to the map after the message was sent
int count = 0;
Exchange exchange = null;
while (exchange == null && count < 100) {
@@ -364,11 +371,7 @@ public synchronized void shutdownListeners() {
}
}
public synchronized void close() {
try {
((SingleConnectionFactory)jmsConfig.getConnectionFactory()).resetConnection();
} catch (Exception e) {
// Ignore
}
ResourceCloser.close(connection);
shutdownListeners();
LOG.log(Level.FINE, "JMSConduit closed ");
}

0 comments on commit 4b48fab

Please sign in to comment.