Skip to content
Permalink
Browse files
Fix a bug in the XMPP transport which lead to faluires when it is loa…
…ded, adding missing files
  • Loading branch information
Hemapani Srinath Perera committed Dec 15, 2010
1 parent b245457 commit 7313f07eeeef7fecefe4712ecae9253b6954eb75
Showing 2 changed files with 304 additions and 0 deletions.
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.axis2.transport.xmpp.util;

import org.apache.axis2.AxisFault;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.ConnectionListener;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.FromContainsFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.ToContainsFilter;

import java.util.HashMap;
import java.util.Map;

public class XMPPClientConnectionFactory {
private static Log log = LogFactory.getLog(XMPPClientConnectionFactory.class);
private XMPPConnection xmppConnection = null;
private PacketFilter packetFilter = null;
private Map<String,XMPPConnectionDetails> xmppConnections = new HashMap<String,XMPPConnectionDetails>();

public XMPPClientConnectionFactory(){}

/**
* Connects to a XMPP server based on the details available in serverCredentials object
* @param serverCredentials
* @throws XMPPException
*/
public XMPPConnection connect(final XMPPServerCredentials serverCredentials) throws AxisFault {
//XMPPConnection.DEBUG_ENABLED = true;
if(XMPPConstants.XMPP_SERVER_TYPE_JABBER.equals(serverCredentials.getServerType())){
xmppConnection = new XMPPConnection(serverCredentials.getServerUrl());
try {
xmppConnection.connect();
} catch (XMPPException e) {
log.error("Failed to connect to server :"+serverCredentials.getServerUrl(), e);
throw new AxisFault("Failed to connect to server :"+serverCredentials.getServerUrl());
}
//Pause for a small time before trying to login.
//This prevents random ssl exception from Smack API
try {
Thread.sleep(100);
} catch (InterruptedException e5) {
log.debug("Sleep interrupted ",e5);
}

if(xmppConnection.isConnected()){
String resource = serverCredentials.getResource()+ new Object().hashCode();
if(! xmppConnection.isAuthenticated()){
try {
//xmppConnection.login(serverCredentials.getAccountName()+"@"+
// serverCredentials.getServerUrl(),
xmppConnection.login(serverCredentials.getAccountName(),
serverCredentials.getPassword(),
resource,
true);
} catch (XMPPException e) {
try {
log.error("Login failed for "
+serverCredentials.getAccountName()
+"@"+serverCredentials.getServerUrl()
+".Retrying in 2 secs",e);
Thread.sleep(2000);
//xmppConnection.login(serverCredentials.getAccountName()+"@"+
// serverCredentials.getServerUrl(),
xmppConnection.login(serverCredentials.getAccountName(),
serverCredentials.getPassword(),
resource,
true);

} catch (InterruptedException e1) {
log.error("Sleep interrupted.",e1);
} catch (XMPPException e2) {
log.error("Login failed for : "+serverCredentials.getAccountName()
+"@"+serverCredentials.getServerUrl(),e2);
throw new AxisFault("Login failed for : "+serverCredentials.getAccountName()
+"@"+serverCredentials.getServerUrl());
}
}
//Listen for Message type packets from specified server url
//packetFilter = new AndFilter(new PacketTypeFilter(Message.class),
// new FromContainsFilter(serverCredentials.getServerUrl()));
packetFilter = new FromContainsFilter(serverCredentials.getServerUrl());
}
}
}else if(XMPPConstants.XMPP_SERVER_TYPE_GOOGLETALK.equals(serverCredentials.getServerType())){
ConnectionConfiguration connectionConfiguration =
new ConnectionConfiguration(XMPPConstants.GOOGLETALK_URL
,XMPPConstants.GOOGLETALK_PORT
,XMPPConstants.GOOGLETALK_SERVICE_NAME);
xmppConnection = new XMPPConnection(connectionConfiguration);
try {
xmppConnection.connect();
xmppConnection.login(serverCredentials.getAccountName()
, serverCredentials.getPassword()
,serverCredentials.getResource(),
true);
//packetFilter = new AndFilter(new PacketTypeFilter(Message.class),
// new FromContainsFilter(XMPPConstants.GOOGLETALK_FROM));
//packetFilter = new FromContainsFilter(XMPPConstants.GOOGLETALK_FROM);
packetFilter = new ToContainsFilter("@gmail.com");

} catch (XMPPException e1) {
log.error("Error occured while connecting to Googletalk server.",e1);
throw new AxisFault("Error occured while connecting to Googletalk server.");
}
}

ConnectionListener connectionListener = null;
connectionListener = new ConnectionListener(){
public void connectionClosed() {
log.debug("Connection closed normally");
}
public void connectionClosedOnError(
Exception e1) {
log.error("Connection to "+serverCredentials.getServerUrl()
+ " closed with error.",e1);
}
public void reconnectingIn(int seconds) {
log.error("Connection to "+serverCredentials.getServerUrl()
+" failed. Reconnecting in "+seconds+"s");
}
public void reconnectionFailed(Exception e) {
log.error("Reconnection to "+serverCredentials.getServerUrl()+" failed.",e);
}
public void reconnectionSuccessful() {
log.debug("Reconnection to "+serverCredentials.getServerUrl()+" successful.");
}
};
if(xmppConnection != null && xmppConnection.isConnected()){
xmppConnection.addConnectionListener(connectionListener);
log.info("Connected to " +serverCredentials.getAccountName()+ "@"
+ serverCredentials.getServerUrl()+ "/"+ serverCredentials.getResource());
}else{
log.warn(" Not Connected to " +serverCredentials.getAccountName()+ "@"
+ serverCredentials.getServerUrl()+ "/"+ serverCredentials.getResource());
}
return xmppConnection;
}

public XMPPConnection getXmppConnection(){
return xmppConnection;
}

// public XMPPConnection getConnection(XMPPServerCredentials credentials) throws AxisFault{
// StringBuffer buf = new StringBuffer();
// String key = buf.append(credentials.getServerUrl()).append(credentials.getAccountName()).toString();
//
// XMPPConnectionDetails connDetails = xmppConnections.get(key);
// if(connDetails == null){
// connDetails = new XMPPConnectionDetails();
// connDetails.connection = connect(credentials);
// connDetails.userCount = 1;
// xmppConnections.put(key, connDetails);
// }else{
// connDetails.userCount = 1;
// }
// return connDetails.connection;
// }
//
// public void disconnect(XMPPServerCredentials credentials){
// StringBuffer buf = new StringBuffer();
// String key = buf.append(credentials.getServerUrl()).append(credentials.getAccountName()).toString();
//
// XMPPConnectionDetails connDetails = xmppConnections.get(key);
// if(connDetails != null){
// connDetails.userCount--;
// }
// if(connDetails.userCount == 0){
// xmppConnections.remove(key);
// connDetails.connection.disconnect();
// }
// }


// public void listen(XMPPPacketListener packetListener){
// xmppConnection.addPacketListener(packetListener,packetFilter);
// }

public void stop() {}

public class XMPPConnectionDetails{
XMPPConnection connection;
int userCount;
}
}
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.axis2.transport.xmpp.util;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

import org.apache.axis2.context.MessageContext;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Packet;

public class XMPPClientResponseManager implements PacketListener {
private static Log log = LogFactory.getLog(XMPPClientResponseManager.class);

private ConcurrentHashMap<String, WaitingDetails> prespectiveResponseMap = new ConcurrentHashMap<String, WaitingDetails>();


public XMPPClientResponseManager(){
}


public void listenForResponse(String key, MessageContext messageContext){
prespectiveResponseMap.put(key, new WaitingDetails(messageContext));
}


/**
* This method will be triggered, when a message is arrived at client side
*/
public void processPacket(Packet packet) {
Message message = (Message)packet;
String xml = StringEscapeUtils.unescapeXml(message.getBody());
log.debug("Client received message : "+message.toXML());
InputStream inputStream = new ByteArrayInputStream(xml.getBytes());

String sequenceNumber = (String)message.getProperty(XMPPConstants.SEQUENCE_ID);
if(sequenceNumber != null){
WaitingDetails waitingDetails = prespectiveResponseMap.remove(sequenceNumber);
if(waitingDetails != null){
waitingDetails.messageContext.setProperty(MessageContext.TRANSPORT_IN, inputStream);
waitingDetails.wait.release();
}else{
log.error("No one waiting for message "+ xml);
}
}else{
log.error(XMPPConstants.SEQUENCE_ID + " not found in the message");
}
}

/**
* Indicates response message is received at client side.
* @see processPacket(Packet packet)
* @return
* @throws InterruptedException
*/
public void waitFor(String key) throws InterruptedException{
WaitingDetails waitingDetails = prespectiveResponseMap.get(key);
if(waitingDetails == null){
//this mean response has arrvied before wait
return;
}
waitingDetails.wait.acquire();
}

public class WaitingDetails{
Semaphore wait = new Semaphore(0);
MessageContext messageContext;
public WaitingDetails(MessageContext messageContext) {
super();
this.messageContext = messageContext;
}

}

}

0 comments on commit 7313f07

Please sign in to comment.