Skip to content

Commit

Permalink
SMACK-412 Abstracted the keepalive implementation and set the thread …
Browse files Browse the repository at this point in the history
…to start and stop on demand.

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/branches/smack_3_3_0@13610 b35dd754-fafc-0310-a699-88a17e54d16e
  • Loading branch information
rcollier committed Apr 16, 2013
1 parent 260ebe6 commit b3c0d06
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 106 deletions.
4 changes: 2 additions & 2 deletions build/resources/META-INF/smack-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<startupClasses>
<className>org.jivesoftware.smackx.ServiceDiscoveryManager</className>
<className>org.jivesoftware.smack.PrivacyListManager</className>
<className>org.jivesoftware.smack.ping.ServerPingManager</className>
<className>org.jivesoftware.smack.keepalive.KeepAliveManager</className>
<className>org.jivesoftware.smackx.XHTMLManager</className>
<className>org.jivesoftware.smackx.muc.MultiUserChat</className>
<className>org.jivesoftware.smackx.bytestreams.ibb.InBandBytestreamManager</className>
Expand All @@ -34,7 +34,7 @@
<className>org.jivesoftware.smack.ReconnectionManager</className>
<className>org.jivesoftware.smackx.commands.AdHocCommandManager</className>
<className>org.jivesoftware.smack.util.dns.JavaxResolver</className>
<className>org.jivesoftware.smackx.ping.PingManager</className>
</startupClasses>


</smack>
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
* limitations under the License.
*/

package org.jivesoftware.smack.ping;
package org.jivesoftware.smack.keepalive;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -33,17 +34,11 @@
import org.jivesoftware.smack.PacketCollector;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.IQTypeFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketIDFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.IQ.Type;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.ping.PingFailedListener;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smackx.ServiceDiscoveryManager;

/**
* Using an implementation of <a href="http://www.xmpp.org/extensions/xep-0199.html">XMPP Ping (XEP-0199)</a>. This
Expand All @@ -56,25 +51,15 @@
*
* @author Florian Schmaus
*/
public class ServerPingManager {
private static Map<Connection, ServerPingManager> instances = Collections
.synchronizedMap(new WeakHashMap<Connection, ServerPingManager>());
private static long defaultPingInterval = SmackConfiguration.getKeepAliveInterval();
public class KeepAliveManager {
private static Map<Connection, KeepAliveManager> instances = new HashMap<Connection, KeepAliveManager>();
private static volatile ScheduledExecutorService periodicPingExecutorService;

private static ScheduledExecutorService periodicPingExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread pingThread = new Thread(runnable, "Smack Server Ping");
pingThread.setDaemon(true);
return pingThread;
}
});

static {
if (defaultPingInterval > 0) {
if (SmackConfiguration.getKeepAliveInterval() > 0) {
Connection.addConnectionCreationListener(new ConnectionCreationListener() {
public void connectionCreated(Connection connection) {
new ServerPingManager(connection);
new KeepAliveManager(connection);
}
});
}
Expand All @@ -87,56 +72,94 @@ public void connectionCreated(Connection connection) {
private volatile long lastSuccessfulContact = -1;

/**
* Retrieves a {@link ServerPingManager} for the specified {@link Connection}, creating one if it doesn't already
* Retrieves a {@link KeepAliveManager} for the specified {@link Connection}, creating one if it doesn't already
* exist.
*
* @param connection
* The connection the manager is attached to.
* @return The new or existing manager.
*/
public synchronized static ServerPingManager getInstanceFor(Connection connection) {
ServerPingManager pingManager = instances.get(connection);
public synchronized static KeepAliveManager getInstanceFor(Connection connection) {
KeepAliveManager pingManager = instances.get(connection);

if (pingManager == null) {
pingManager = new ServerPingManager(connection);
pingManager = new KeepAliveManager(connection);
instances.put(connection, pingManager);
}
return pingManager;
}

private ServerPingManager(Connection connection) {
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);
sdm.addFeature(Ping.NAMESPACE);
/*
* Start the executor service if it hasn't been started yet.
*/
private synchronized static void enableExecutorService() {
if (periodicPingExecutorService == null) {
periodicPingExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread pingThread = new Thread(runnable, "Smack Keepalive");
pingThread.setDaemon(true);
return pingThread;
}
});
}
}

/*
* Stop the executor service if all monitored connections are disconnected.
*/
private synchronized static void handleDisconnect(Connection con) {
if (periodicPingExecutorService != null) {
instances.remove(con);

if (instances.isEmpty()) {
periodicPingExecutorService.shutdownNow();
periodicPingExecutorService = null;
}
}
}

private KeepAliveManager(Connection connection) {
this.connection = connection;
init();
handleConnect();
}

private void init() {
PacketFilter pingPacketFilter = new AndFilter(new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET));

/*
* Call after every connection to add the packet listener.
*/
private void handleConnect() {
// Listen for all incoming packets and reset the scheduled ping whenever
// one arrives.
connection.addPacketListener(new PacketListener() {
/**
* Sends a Pong for every Ping
*/

@Override
public void processPacket(Packet packet) {
IQ pong = IQ.createResultIQ((Ping) packet);
connection.sendPacket(pong);
// reschedule the ping based on this last server contact
lastSuccessfulContact = System.currentTimeMillis();
schedulePingServerTask();
}
}, pingPacketFilter);
}, null);
}

private void init() {
connection.addConnectionListener(new ConnectionListener() {

@Override
public void connectionClosed() {
stopPingServerTask();
handleDisconnect(connection);
}

@Override
public void connectionClosedOnError(Exception arg0) {
stopPingServerTask();
handleDisconnect(connection);
}

@Override
public void reconnectionSuccessful() {
handleConnect();
schedulePingServerTask();
}

Expand All @@ -149,17 +172,6 @@ public void reconnectionFailed(Exception e) {
}
});

// Listen for all incoming packets and reset the scheduled ping whenever
// one arrives.
connection.addPacketListener(new PacketListener() {

@Override
public void processPacket(Packet packet) {
// reschedule the ping based on this last server contact
lastSuccessfulContact = System.currentTimeMillis();
schedulePingServerTask();
}
}, null);
instances.put(connection, this);
schedulePingServerTask();
}
Expand All @@ -171,15 +183,20 @@ public void processPacket(Packet packet) {
* The new ping time interval in milliseconds.
*/
public void setPingInterval(long newPingInterval) {
if (pingInterval != newPingInterval) {
pingInterval = newPingInterval;
if (pingInterval == newPingInterval)
return;

// Enable the executor service
if (newPingInterval > 0)
enableExecutorService();

pingInterval = newPingInterval;

if (pingInterval < 0) {
stopPinging();
}
else {
schedulePingServerTask();
}
if (pingInterval < 0) {
stopPinging();
}
else {
schedulePingServerTask();
}
}

Expand Down Expand Up @@ -227,12 +244,20 @@ public void removePingFailedListener(PingFailedListener listener) {
}

/**
* Returns the time of the last successful contact with the server. (i.e. the last time any message was received).
* Returns the elapsed time (in milliseconds) since the last successful contact with the server
* (i.e. the last time any message was received).
* <p>
* <b>Note</b>: Result is -1 if no message has been received since manager was created and
* 0 if the elapsed time is negative due to a clock reset.
*
* @return Time of last message or -1 if none has been received since manager was created.
* @return Elapsed time since last message was received.
*/
public long getLastSuccessfulContact() {
return lastSuccessfulContact;
public long getTimeSinceLastContact() {
if (lastSuccessfulContact == -1)
return lastSuccessfulContact;
long delta = System.currentTimeMillis() - lastSuccessfulContact;

return (delta < 0) ? 0 : delta;
}

/**
Expand All @@ -242,6 +267,7 @@ public long getLastSuccessfulContact() {
* This is designed so only one executor is used for scheduling all pings on all connections. This results in only 1 thread used for pinging.
*/
private synchronized void schedulePingServerTask() {
enableExecutorService();
stopPingServerTask();

if (pingInterval > 0) {
Expand Down
67 changes: 63 additions & 4 deletions source/org/jivesoftware/smackx/ping/PingManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@

package org.jivesoftware.smackx.ping;

import java.util.Collections;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledExecutorService;

import org.jivesoftware.smack.Connection;
import org.jivesoftware.smack.ConnectionCreationListener;
import org.jivesoftware.smack.PacketListener;
import org.jivesoftware.smack.SmackConfiguration;
import org.jivesoftware.smack.SmackError;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.ping.ServerPingManager;
import org.jivesoftware.smack.filter.AndFilter;
import org.jivesoftware.smack.filter.IQTypeFilter;
import org.jivesoftware.smack.filter.PacketFilter;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.keepalive.KeepAliveManager;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Packet;
import org.jivesoftware.smack.packet.IQ.Type;
import org.jivesoftware.smack.ping.packet.Ping;
import org.jivesoftware.smack.util.SyncPacketSend;
import org.jivesoftware.smackx.ServiceDiscoveryManager;
Expand All @@ -31,7 +45,7 @@
* allows one entity to 'ping' any other entity by simply sending a ping to
* the appropriate JID.
* <p>
* NOTE: The {@link ServerPingManager} already provides a keepalive functionality
* NOTE: The {@link KeepAliveManager} already provides a keepalive functionality
* for regularly pinging the server to keep the underlying transport connection
* alive. This class is specifically intended to do manual pings of other
* entities.
Expand All @@ -41,12 +55,57 @@
* Ping</a>
*/
public class PingManager {
private static Map<Connection, PingManager> instances = Collections
.synchronizedMap(new WeakHashMap<Connection, PingManager>());

static {
Connection.addConnectionCreationListener(new ConnectionCreationListener() {
public void connectionCreated(Connection connection) {
new PingManager(connection);
}
});
}

private Connection connection;

public PingManager(Connection connection) {
/**
* Retrieves a {@link PingManager} for the specified {@link Connection}, creating one if it doesn't already
* exist.
*
* @param connection
* The connection the manager is attached to.
* @return The new or existing manager.
*/
public synchronized static PingManager getInstanceFor(Connection connection) {
PingManager pingManager = instances.get(connection);

if (pingManager == null) {
pingManager = new PingManager(connection);
}
return pingManager;
}

private PingManager(Connection con) {
this.connection = con;
ServiceDiscoveryManager sdm = ServiceDiscoveryManager.getInstanceFor(connection);

// The ServiceDiscoveryManager was not pre-initialized
if (sdm == null)
sdm = new ServiceDiscoveryManager(connection);

sdm.addFeature(Ping.NAMESPACE);
this.connection = connection;

PacketFilter pingPacketFilter = new AndFilter(new PacketTypeFilter(Ping.class), new IQTypeFilter(Type.GET));

connection.addPacketListener(new PacketListener() {
/**
* Sends a Pong for every Ping
*/
public void processPacket(Packet packet) {
IQ pong = IQ.createResultIQ((Ping) packet);
connection.sendPacket(pong);
}
}, pingPacketFilter);
}

/**
Expand Down
Loading

0 comments on commit b3c0d06

Please sign in to comment.