Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: belaban/JGroups
base: ef0002b
...
head fork: belaban/JGroups
compare: f8a743b
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 1 file changed
  • 0 commit comments
  • 2 contributors
Commits on May 22, 2011
Nick Bailey Improvements to StompConnection.
* SSL communication is now an option.
* Fix a bug with removeListener().
* Modify connection thread to attempt recconnect with backoff.
* Fix a bug with auth not being sent on reconnect.
786c897
Commits on May 23, 2011
@belaban Merge pull request #16 from nickmbailey/master
Improvements to StompConnection
f8a743b
Showing with 95 additions and 74 deletions.
  1. +95 −74 src/org/jgroups/client/StompConnection.java
View
169 src/org/jgroups/client/StompConnection.java
@@ -11,6 +11,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.*;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
import java.util.*;
/**
@@ -30,6 +32,7 @@
*/
@Experimental @Unsupported
public class StompConnection implements Runnable {
+ protected SocketFactory socket_factory;
protected Socket sock;
protected DataInputStream in;
protected DataOutputStream out;
@@ -43,23 +46,39 @@
protected Thread runner;
- protected volatile boolean running=true;
+ protected volatile boolean running=false;
protected String session_id;
- protected final Log log=LogFactory.getLog(getClass());
+ protected String userid;
+
+ protected String password;
+ protected final Log log=LogFactory.getLog(getClass());
/**
* @param dest IP address + ':' + port, e.g. "192.168.1.5:8787"
*/
public StompConnection(String dest) {
+ this(dest, null, null, false);
+ }
+
+ public StompConnection(String dest, boolean ssl) {
+ this(dest, null, null, ssl);
+ }
+
+ public StompConnection(String dest, String userid, String password, boolean ssl) {
server_destinations.add(dest);
+ this.userid = userid;
+ this.password = password;
+ if (ssl)
+ socket_factory = SSLSocketFactory.getDefault();
+ else
+ socket_factory = SocketFactory.getDefault();
}
public String getSessionId() {return session_id;}
-
public void addListener(Listener listener) {
if(listener != null)
listeners.add(listener);
@@ -67,32 +86,22 @@ public void addListener(Listener listener) {
public void removeListener(Listener listener) {
if(listener != null)
- listeners.add(listener);
+ listeners.remove(listener);
}
+ public void connect() {
+ startRunner();
+ }
- public void connect(String userid, String password) throws IOException {
- String dest;
-
- if(isConnected())
- return;
- while((dest=pickRandomDestination()) != null) {
- try {
- connect(dest);
- if(log.isDebugEnabled())
- log.debug("connected to " + dest);
- break;
- }
- catch(IOException ex) {
- if(log.isErrorEnabled())
- log.error("failed connecting to " + dest);
- close();
- server_destinations.remove(dest);
- }
+ protected synchronized void startRunner() {
+ if(runner == null || !runner.isAlive()) {
+ running = true;
+ runner=new Thread(this, "StompConnection receiver");
+ runner.start();
}
- if(!isConnected())
- throw new IOException("no target server available");
+ }
+ protected void sendConnect() {
StringBuilder sb=new StringBuilder();
sb.append(STOMP.ClientVerb.CONNECT.name()).append("\n");
if(userid != null)
@@ -101,30 +110,14 @@ public void connect(String userid, String password) throws IOException {
sb.append("passcode: ").append(password).append("\n");
sb.append("\n");
- out.write(sb.toString().getBytes());
- out.write(STOMP.NULL_BYTE);
- out.flush();
- }
-
-
- public void reconnect() throws IOException {
- if(!running)
- return;
- connect();
- for(String subscription: subscriptions)
- subscribe(subscription);
- if(log.isDebugEnabled()) {
- log.debug("reconnected to " + sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
- if(!subscriptions.isEmpty())
- log.debug("re-subscribed to " + subscriptions);
+ try{
+ out.write(sb.toString().getBytes());
+ out.write(STOMP.NULL_BYTE);
+ out.flush();
+ }
+ catch(IOException ex) {
+ log.error("failed to send connect message: " + ex);
}
-
- }
-
-
-
- public void connect() throws IOException {
- connect(null, null);
}
public void disconnect() {
@@ -137,6 +130,12 @@ public void subscribe(String destination) {
return;
subscriptions.add(destination);
+ if(isConnected()) {
+ sendSubscribe(destination);
+ }
+ }
+
+ protected void sendSubscribe(String destination) {
StringBuilder sb=new StringBuilder();
sb.append(STOMP.ClientVerb.SUBSCRIBE.name()).append("\n");
sb.append("destination: ").append(destination).append("\n");
@@ -157,6 +156,12 @@ public void unsubscribe(String destination) {
return;
subscriptions.remove(destination);
+ if(isConnected()) {
+ sendUnsubscribe(destination);
+ }
+ }
+
+ protected void sendUnsubscribe(String destination) {
StringBuilder sb=new StringBuilder();
sb.append(STOMP.ClientVerb.UNSUBSCRIBE.name()).append("\n");
sb.append("destination: ").append(destination).append("\n");
@@ -173,6 +178,9 @@ public void unsubscribe(String destination) {
}
public void send(String destination, byte[] buf, int offset, int length, String ... headers) {
+ if(!isConnected())
+ return;
+
StringBuilder sb=new StringBuilder();
sb.append(STOMP.ClientVerb.SEND.name()).append("\n");
if(destination != null)
@@ -213,8 +221,16 @@ public void send(String destination, byte[] buf) {
}
public void run() {
- while(isConnected() && running) {
+ int timeout = 1;
+ while(running) {
try {
+ if(!isConnected()) {
+ setupConnection();
+ }
+
+ // reset the connection backoff when we successfully connect.
+ timeout = 1;
+
STOMP.Frame frame=STOMP.readFrame(in);
if(frame != null) {
STOMP.ServerVerb verb=STOMP.ServerVerb.valueOf(frame.getVerb());
@@ -253,13 +269,15 @@ public void run() {
}
}
catch(IOException e) {
+ log.error("Connection closed unexpectedly, will attempt reconnect in "+timeout+"s.", e);
close();
try {
- reconnect();
+ Thread.sleep(timeout * 1000);
}
- catch(IOException e1) {
- log.warn("failed to reconnect; runner thread terminated, cause: " + e1);
+ catch (InterruptedException e1) {
+ // pass
}
+ timeout = timeout*2 > 60 ? 60 : timeout*2;
}
catch(Throwable t) {
log.error("failure reading frame", t);
@@ -289,24 +307,37 @@ protected void notifyListeners(Map<String,String> info) {
}
}
- protected String pickRandomDestination() {
- return server_destinations.isEmpty()? null : server_destinations.iterator().next();
- }
-
- protected void connect(String dest) throws IOException {
- SocketAddress saddr=parse(dest);
- sock=new Socket();
- sock.connect(saddr);
- in=new DataInputStream(sock.getInputStream());
- out=new DataOutputStream(sock.getOutputStream());
- startRunner();
+ protected void setupConnection() throws IOException{
+ for (String dest : server_destinations) {
+ try {
+ connectToDestination(dest);
+ sendConnect();
+ for(String subscription: subscriptions)
+ sendSubscribe(subscription);
+ if(log.isDebugEnabled())
+ log.debug("connected to " + dest);
+ break;
+ }
+ catch(IOException ex) {
+ if(log.isErrorEnabled())
+ log.error("failed connecting to " + dest);
+ close();
+ }
+ }
+ if(!isConnected())
+ throw new IOException("no target server available");
}
- protected static SocketAddress parse(String dest) throws UnknownHostException {
+ protected void connectToDestination(String dest) throws IOException {
+ // parse destination
int index=dest.lastIndexOf(":");
String host=dest.substring(0, index);
int port=Integer.parseInt(dest.substring(index+1));
- return new InetSocketAddress(host, port);
+
+ sock=socket_factory.createSocket(host, port);
+
+ in=new DataInputStream(sock.getInputStream());
+ out=new DataOutputStream(sock.getOutputStream());
}
protected void close() {
@@ -319,21 +350,11 @@ public boolean isConnected() {
return sock != null && sock.isConnected() && !sock.isClosed();
}
- protected synchronized void startRunner() {
- if(runner == null || !runner.isAlive()) {
- runner=new Thread(this, "StompConnection receiver");
- runner.start();
- }
- }
-
-
-
public static interface Listener {
void onMessage(Map<String,String> headers, byte[] buf, int offset, int length);
void onInfo(Map<String,String> information);
}
-
public static void main(String[] args) throws IOException {
String host="localhost";
String port="8787";

No commit comments for this range

Something went wrong with that request. Please try again.