Skip to content
Permalink
Browse files
Fix a bug in the XMPP transport which lead to faluires when it is loaded
  • Loading branch information
Hemapani Srinath Perera committed Dec 15, 2010
1 parent ef2273e commit b245457b8229eae8929ee4015f445ff1e4d28c05
Showing 8 changed files with 169 additions and 113 deletions.
@@ -19,6 +19,14 @@

package org.apache.axis2.transport.xmpp;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
@@ -39,20 +47,18 @@
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.Roster.SubscriptionMode;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class XMPPListener implements TransportListener {

/**
Uncomment this enable XMPP logging, this is useful for testing.
static {
XMPPConnection.DEBUG_ENABLED = true;
}
**/
private static Log log = LogFactory.getLog(XMPPListener.class);
private ConfigurationContext configurationContext = null;
private String xmppServerUsername = "";
private String xmppServerUrl = "";
private XMPPServerCredentials serverCredentials;

/**
* A Map containing the connection factories managed by this,
@@ -108,21 +114,20 @@ private void initializeConnectionFactories(TransportInDescription transportIn) t
}

Iterator params = pi.getParameters().iterator();
XMPPServerCredentials serverCredentials =
new XMPPServerCredentials();
serverCredentials = new XMPPServerCredentials();

while (params.hasNext()) {
Parameter param = (Parameter) params.next();
if(XMPPConstants.XMPP_SERVER_URL.equals(param.getName())){
xmppServerUrl = (String)param.getValue();
serverCredentials.setServerUrl(xmppServerUrl);
serverCredentials.setServerUrl((String)param.getValue());
}else if(XMPPConstants.XMPP_SERVER_USERNAME.equals(param.getName())){
xmppServerUsername = (String) param.getValue();
serverCredentials.setAccountName(xmppServerUsername);
serverCredentials.setAccountName((String)param.getValue());
}else if(XMPPConstants.XMPP_SERVER_PASSWORD.equals(param.getName())){
serverCredentials.setPassword((String)param.getValue());
}else if(XMPPConstants.XMPP_SERVER_TYPE.equals(param.getName())){
serverCredentials.setServerType((String)param.getValue());
serverCredentials.setServerType((String)param.getValue());
}else if(XMPPConstants.XMPP_DOMAIN_NAME.equals(param.getName())){
serverCredentials.setDomainName((String)param.getValue());
}
}
XMPPConnectionFactory xmppConnectionFactory = new XMPPConnectionFactory();
@@ -158,8 +163,10 @@ public EndpointReference getEPRForService(String serviceName, String ip) throws
* @param ip
*/
public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
String domainName = serverCredentials.getDomainName() == null? serverCredentials.getDomainName()
: serverCredentials.getServerUrl();
return new EndpointReference[]{new EndpointReference(XMPPConstants.XMPP_PREFIX +
xmppServerUsername +"@"+ xmppServerUrl +"/" + serviceName)};
serverCredentials.getAccountName() +"@"+ domainName +"/services/" + serviceName)};
}


@@ -19,12 +19,15 @@

package org.apache.axis2.transport.xmpp;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import javax.xml.namespace.QName;

import org.apache.axiom.om.OMElement;
import org.apache.axiom.soap.SOAP12Version;
import org.apache.axiom.soap.SOAPVersion;
import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.client.Options;
@@ -39,7 +42,8 @@
import org.apache.axis2.handlers.AbstractHandler;
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axis2.transport.TransportSender;
import org.apache.axis2.transport.xmpp.util.XMPPClientSidePacketListener;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.transport.xmpp.util.XMPPClientResponseManager;
import org.apache.axis2.transport.xmpp.util.XMPPConnectionFactory;
import org.apache.axis2.transport.xmpp.util.XMPPConstants;
import org.apache.axis2.transport.xmpp.util.XMPPOutTransportInfo;
@@ -66,11 +70,14 @@

public class XMPPSender extends AbstractHandler implements TransportSender {
static Log log = null;
XMPPConnectionFactory connectionFactory;
XMPPServerCredentials serverCredentials;
private XMPPClientResponseManager xmppClientSidePacketListener;
private XMPPConnectionFactory defaultConnectionFactory;

public XMPPSender() {
log = LogFactory.getLog(XMPPSender.class);
xmppClientSidePacketListener = new XMPPClientResponseManager();
}

public void cleanup(MessageContext msgContext) throws AxisFault {
@@ -89,20 +96,11 @@ public void init(ConfigurationContext confContext,
//if connection details are available from axis configuration
//use those & connect to jabber server(s)
serverCredentials = new XMPPServerCredentials();
getConnectionDetailsFromAxisConfiguration(transportOut);
connectionFactory = new XMPPConnectionFactory();
connectionFactory.connect(serverCredentials);
getConnectionDetailsFromAxisConfiguration(transportOut);

defaultConnectionFactory = new XMPPConnectionFactory();
}

/**
* Extract connection details from Client options
* @param msgCtx
*/
private void connectUsingClientOptions(MessageContext msgCtx) throws AxisFault{
getConnectionDetailsFromClientOptions(msgCtx);
connectionFactory = new XMPPConnectionFactory();
connectionFactory.connect(serverCredentials);
}

public void stop() {}

@@ -136,35 +134,56 @@ public InvocationResponse invoke(MessageContext msgContext)
* @throws AxisFault on error
*/
public void sendMessage(MessageContext msgCtx, String targetAddress,

OutTransportInfo outTransportInfo) throws AxisFault {
XMPPConnection xmppConnection = null;
XMPPOutTransportInfo xmppOutTransportInfo = null;
XMPPConnectionFactory connectionFactory;

//if on client side,create connection to xmpp server
if(!msgCtx.isServerSide()){
connectUsingClientOptions(msgCtx);
if(msgCtx.isServerSide()){
xmppOutTransportInfo = (XMPPOutTransportInfo)msgCtx.getProperty(org.apache.axis2.Constants.OUT_TRANSPORT_INFO);
connectionFactory = xmppOutTransportInfo.getConnectionFactory();
}else{
getConnectionDetailsFromClientOptions(msgCtx);
connectionFactory = defaultConnectionFactory;
}

synchronized (this) {
xmppConnection = connectionFactory.getXmppConnection();
if(xmppConnection == null){
connectionFactory.connect(serverCredentials);
xmppConnection = connectionFactory.getXmppConnection();
}
}

Message message = new Message();
Options options = msgCtx.getOptions();
String serviceName = XMPPUtils.getServiceName(targetAddress);
String serviceName = XMPPUtils.getServiceName(targetAddress);

SOAPVersion version = msgCtx.getEnvelope().getVersion();
if(version instanceof SOAP12Version){
message.setProperty(XMPPConstants.CONTENT_TYPE, HTTPConstants.MEDIA_TYPE_APPLICATION_SOAP_XML+ "; action="+ msgCtx.getSoapAction());
}else{
message.setProperty(XMPPConstants.CONTENT_TYPE, HTTPConstants.MEDIA_TYPE_TEXT_XML);
}


if (targetAddress != null) {
xmppOutTransportInfo = new XMPPOutTransportInfo(targetAddress);
xmppOutTransportInfo.setConnectionFactory(connectionFactory);
xmppOutTransportInfo.setConnectionFactory(defaultConnectionFactory);
} else if (msgCtx.getTo() != null &&
!msgCtx.getTo().hasAnonymousAddress()) {
//TODO
} else if (msgCtx.isServerSide()) {
xmppOutTransportInfo = (XMPPOutTransportInfo)
msgCtx.getProperty(Constants.OUT_TRANSPORT_INFO);
}

xmppConnection = xmppOutTransportInfo.getConnectionFactory().getXmppConnection();

try{
if(msgCtx.isServerSide()){
message.setProperty(XMPPConstants.IS_SERVER_SIDE, new Boolean(false));
message.setProperty(XMPPConstants.IN_REPLY_TO, xmppOutTransportInfo.getInReplyTo());
message.setProperty(XMPPConstants.SEQUENCE_ID, xmppOutTransportInfo.getSequenceID());
}else{
//message is going to be processed on server side
message.setProperty(XMPPConstants.IS_SERVER_SIDE,new Boolean(true));
@@ -186,25 +205,21 @@ public void sendMessage(MessageContext msgCtx, String targetAddress,
handleException("Connection to XMPP Server is not established.");
}



//initialize the chat manager using connection
ChatManager chatManager = xmppConnection.getChatManager();
Chat chat = chatManager.createChat(xmppOutTransportInfo.getDestinationAccount(), null);
Chat chat = chatManager.createChat(xmppOutTransportInfo.getDestinationAccount(), null);

try
{
boolean waitForResponse =
msgCtx.getOperationContext() != null &&
WSDL2Constants.MEP_URI_OUT_IN.equals(
msgCtx.getOperationContext().getAxisOperation().getMessageExchangePattern());

//int endOfXMLDeclaration = soapMessage.indexOf("?>");
//String modifiedSOAPMessage = soapMessage.substring(endOfXMLDeclaration+2);

OMElement msgElement;
String messageToBeSent = "";

//TODO : need to read from a constant
if("xmpp/text".equals(xmppOutTransportInfo.getContentType())){
if(XMPPConstants.XMPP_CONTENT_TYPE_STRING.equals(xmppOutTransportInfo.getContentType())){
//if request is received from a chat client, whole soap envelope
//should not be sent.
OMElement soapBodyEle = msgCtx.getEnvelope().getBody();
@@ -222,12 +237,13 @@ public void sendMessage(MessageContext msgCtx, String targetAddress,
messageToBeSent = msgElement.toString();
message.setBody(messageToBeSent);


XMPPClientSidePacketListener xmppClientSidePacketListener = null;
String key = null;
if(waitForResponse && !msgCtx.isServerSide()){
PacketFilter filter = new PacketTypeFilter(message.getClass());
xmppClientSidePacketListener = new XMPPClientSidePacketListener(msgCtx);
xmppConnection.addPacketListener(xmppClientSidePacketListener,filter);
key = UUID.randomUUID().toString();
xmppClientSidePacketListener.listenForResponse(key, msgCtx);
message.setProperty(XMPPConstants.SEQUENCE_ID, key);
}

chat.sendMessage(message);
@@ -236,24 +252,22 @@ public void sendMessage(MessageContext msgCtx, String targetAddress,
//If this is on client side, wait for the response from server.
//Is this the best way to do this?
if(waitForResponse && !msgCtx.isServerSide()){
//TODO : need to add a timeout
while(! xmppClientSidePacketListener.isResponseReceived()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.debug("Sleep interrupted",e);
}
}
xmppConnection.disconnect();
xmppClientSidePacketListener.waitFor(key);
//xmppConnection.disconnect();
log.debug("Received response sucessfully");
}


} catch (XMPPException e) {
log.error("Error occurred while sending the message : "+message.toXML(),e);
handleException("Error occurred while sending the message : "+message.toXML(),e);
}finally{
if(!msgCtx.isServerSide()){
xmppConnection.disconnect();
}
} catch (InterruptedException e) {
log.error("Error occurred while sending the message : "+message.toXML(),e);
handleException("Error occurred while sending the message : "+message.toXML(),e);
}finally{
// if(xmppConnection != null && !msgCtx.isServerSide()){
// xmppConnection.disconnect();
// }
}
}

@@ -360,7 +374,7 @@ private static String getParameterListForOperation(AxisOperation operation) {
* @return
*/
private static String prepareServicesList(MessageContext msgCtx) {
HashMap services = msgCtx.getConfigurationContext().getAxisConfiguration().getServices();
Map services = msgCtx.getConfigurationContext().getAxisConfiguration().getServices();
StringBuffer sb = new StringBuffer();
if(services != null && services.size() > 0){
Iterator itrServiceNames = services.keySet().iterator();
@@ -415,6 +429,8 @@ private static void sendChatMessage(MessageContext msgCtx,String responseMsg) th
ChatManager chatManager = xmppConnection.getChatManager();
Chat chat = chatManager.createChat(xmppOutTransportInfo.getDestinationAccount(), null);
try{
message.setProperty(XMPPConstants.SEQUENCE_ID,
xmppOutTransportInfo.getSequenceID());
message.setBody(responseMsg);
chat.sendMessage(message);
log.debug("Sent message :"+message.toXML());
@@ -448,7 +464,12 @@ private void getConnectionDetailsFromAxisConfiguration(TransportOutDescription t
Parameter serverType = transportOut.getParameter(XMPPConstants.XMPP_SERVER_TYPE);
if (serverType != null) {
serverCredentials.setServerType(Utils.getParameterValue(serverType));
}
}

Parameter domainName = transportOut.getParameter(XMPPConstants.XMPP_DOMAIN_NAME);
if (serverUrl != null) {
serverCredentials.setDomainName(Utils.getParameterValue(domainName));
}
}
}

@@ -482,4 +503,4 @@ private static void handleException(String msg) throws AxisFault {
log.error(msg);
throw new AxisFault(msg);
}
}
}
@@ -19,6 +19,9 @@

package org.apache.axis2.transport.xmpp.util;

import java.io.ByteArrayInputStream;
import java.io.InputStream;

import org.apache.axis2.context.MessageContext;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
@@ -27,9 +30,6 @@
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;

import java.io.ByteArrayInputStream;
import java.io.InputStream;

public class XMPPClientSidePacketListener implements PacketListener {
private static Log log = LogFactory.getLog(XMPPClientSidePacketListener.class);
private MessageContext messageContext = null;
@@ -39,7 +39,9 @@ public XMPPClientSidePacketListener(MessageContext messageContext){
this.messageContext = messageContext;
}

/**


/**
* This method will be triggered, when a message is arrived at client side
*/
public void processPacket(Packet packet) {

0 comments on commit b245457

Please sign in to comment.