Permalink
Browse files

Merge pull request #17 from nickmbailey/master

Make reconnecting feature optional.
  • Loading branch information...
2 parents 804ded0 + df2bcc2 commit dad87474e7d2200506d9e01f9b3640f06566d87f Bela Ban committed Jun 1, 2011
Showing with 45 additions and 39 deletions.
  1. +45 −39 src/org/jgroups/client/StompConnection.java
@@ -54,23 +54,26 @@
protected String password;
+ protected boolean reconnect;
+
protected final Log log=LogFactory.getLog(getClass());
/**
* @param dest IP address + ':' + port, e.g. "192.168.1.5:8787"
*/
public StompConnection(String dest) {
- this(dest, null, null, false);
+ this(dest, null, null, false, false);
}
- public StompConnection(String dest, boolean ssl) {
- this(dest, null, null, ssl);
+ public StompConnection(String dest, boolean reconnect, boolean ssl) {
+ this(dest, null, null, reconnect, ssl);
}
- public StompConnection(String dest, String userid, String password, boolean ssl) {
+ public StompConnection(String dest, String userid, String password, boolean reconnect, boolean ssl) {;
server_destinations.add(dest);
this.userid = userid;
this.password = password;
+ this.reconnect = reconnect;
if (ssl)
socket_factory = SSLSocketFactory.getDefault();
else
@@ -89,10 +92,6 @@ public void removeListener(Listener listener) {
listeners.remove(listener);
}
- public void connect() {
- startRunner();
- }
-
protected synchronized void startRunner() {
if(runner == null || !runner.isAlive()) {
running = true;
@@ -110,21 +109,16 @@ protected void sendConnect() {
sb.append("passcode: ").append(password).append("\n");
sb.append("\n");
- try{
+ try {
out.write(sb.toString().getBytes());
out.write(STOMP.NULL_BYTE);
out.flush();
}
catch(IOException ex) {
- log.error("failed to send connect message: " + ex);
+ log.error("failed to send connect message:", ex);
}
}
- public void disconnect() {
- running=false;
- close();
- }
-
public void subscribe(String destination) {
if(destination == null)
return;
@@ -147,7 +141,7 @@ protected void sendSubscribe(String destination) {
out.flush();
}
catch(IOException ex) {
- log.error("failed subscribing to " + destination + ": " + ex);
+ log.error("failed subscribing to " + destination + ": ", ex);
}
}
@@ -173,14 +167,11 @@ protected void sendUnsubscribe(String destination) {
out.flush();
}
catch(IOException ex) {
- log.error("failed unsubscribing from " + destination + ": " + ex);
+ log.error("failed unsubscribing from " + destination + ": ", ex);
}
}
public void send(String destination, byte[] buf, int offset, int length, String ... headers) {
- if(!isConnected())
- return;
-
StringBuilder sb=new StringBuilder();
sb.append(STOMP.ClientVerb.SEND.name()).append("\n");
if(destination != null)
@@ -200,8 +191,8 @@ public void send(String destination, byte[] buf, int offset, int length, String
out.write(STOMP.NULL_BYTE);
out.flush();
}
- catch(IOException ex) {
- log.error("failed sending message to server: " + ex);
+ catch (IOException e) {
+ log.error("failed sending message to " + destination + ": ", e);
}
}
@@ -224,8 +215,17 @@ public void run() {
int timeout = 1;
while(running) {
try {
- if(!isConnected()) {
- setupConnection();
+ if (!isConnected() && reconnect) {
+ log.error("Reconnecting in "+timeout+"s.");
+ try {
+ Thread.sleep(timeout * 1000);
+ }
+ catch (InterruptedException e1) {
+ // pass
+ }
+ timeout = timeout*2 > 60 ? 60 : timeout*2;
+
+ connect();
}
// reset the connection backoff when we successfully connect.
@@ -269,15 +269,13 @@ public void run() {
}
}
catch(IOException e) {
- log.error("Connection closed unexpectedly, will attempt reconnect in "+timeout+"s.", e);
- close();
- try {
- Thread.sleep(timeout * 1000);
+ log.error("Connection closed unexpectedly:", e);
+ if (reconnect) {
+ closeConnections();
}
- catch (InterruptedException e1) {
- // pass
+ else {
+ disconnect();
}
- timeout = timeout*2 > 60 ? 60 : timeout*2;
}
catch(Throwable t) {
log.error("failure reading frame", t);
@@ -307,7 +305,7 @@ protected void notifyListeners(Map<String,String> info) {
}
}
- protected void setupConnection() throws IOException{
+ public void connect() throws IOException{
for (String dest : server_destinations) {
try {
connectToDestination(dest);
@@ -320,12 +318,19 @@ protected void setupConnection() throws IOException{
}
catch(IOException ex) {
if(log.isErrorEnabled())
- log.error("failed connecting to " + dest);
- close();
+ log.error("failed connecting to " + dest, ex);
+ closeConnections();
}
}
+
if(!isConnected())
throw new IOException("no target server available");
+
+ startRunner();
+ }
+
+ public void startReconnectingClient() {
+ startRunner();
}
protected void connectToDestination(String dest) throws IOException {
@@ -340,7 +345,12 @@ protected void connectToDestination(String dest) throws IOException {
out=new DataOutputStream(sock.getOutputStream());
}
- protected void close() {
+ public void disconnect() {
+ running = false;
+ closeConnections();
+ }
+
+ protected void closeConnections() {
Util.close(in);
Util.close(out);
Util.close(sock);
@@ -383,10 +393,6 @@ public void onInfo(Map<String, String> information) {
}
});
- if(!conn.isConnected()) {
- conn.setupConnection();
- }
-
conn.connect();
while(conn.isConnected()) {

0 comments on commit dad8747

Please sign in to comment.