Permalink
Browse files

added network dropout detection to reader, force socket close

-On network dropout, normally java doesn't notice until the tcp
socket timeout, which is quite long.
-now if no data is received by the reader thread within 1 min,
the reader will force the socket closed so that autoreconnect can
work it's magic (if enabled).
-Client notices network dropout within 1 minute
-Client will reconnect when network comes back up within 1 minute
  • Loading branch information...
theterg committed Jul 3, 2012
1 parent a840f4f commit a460fb041a58b2788dc74a60150490eb96a126e1
@@ -73,7 +73,7 @@ public void messageRecieved(Map<String, ?> payload, String fromSwarm, String fro
try {
while(true){
if (producersession != null && producersession.isConnected()){
- producersession.send(map);
+ //producersession.send(map);
}
Thread.sleep(1000);
}
@@ -5,7 +5,9 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
+import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.Date;
import java.util.List;
import java.util.Map;
@@ -41,16 +43,19 @@
private final String apiKey;
private final List<ISwarmMessageListener> listeners;
private final static ObjectMapper mapper = new ObjectMapper();
+ private final Socket socket;
+ private long last_message;
/**
* @param is inputstream, must not be null.
* @param apiKey api key, must not be null.
* @param listeners List of ISwarmMessageListener. Must not be null.
* @throws UnsupportedEncodingException
*/
- protected SwarmParticipationReader(InputStream is, String apiKey, List<ISwarmMessageListener> listeners) throws UnsupportedEncodingException {
- AbstractSwarmWSClient.validateParams(is, apiKey, listeners);
-
+ protected SwarmParticipationReader(Socket sock, String apiKey, List<ISwarmMessageListener> listeners) throws UnsupportedEncodingException, IOException{
+ AbstractSwarmWSClient.validateParams(sock, apiKey, listeners);
+ InputStream is = sock.getInputStream();
+ this.socket = sock;
this.apiKey = apiKey;
this.listeners = listeners;
this.reader = new BufferedReader(new InputStreamReader(is, ISwarmClient.SWARM_CHARACTER_ENCODING));
@@ -61,6 +66,7 @@ public void run() {
running = true;
String line = null;
String disconnectMessage = "Server disconnect";
+ last_message = new Date().getTime();
readinput:
@@ -69,12 +75,17 @@ public void run() {
try {
line = reader.readLine();
} catch (SocketTimeoutException e) {
+ if (last_message + SwarmSessionImp.MAX_INTERVAL + 100 < new Date().getTime()){
+ System.out.println("["+this.getClass().getSimpleName()+"]: "+"Connection stale, forcing the socket closed to trigger reconnect");
+ socket.close();
+ }
continue;
}
if (line == null){
System.out.println("["+this.getClass().getSimpleName()+"]: "+"read a null line, quitting");
break;
}
+ last_message = new Date().getTime();
line = line.trim();
//Filter empty lines and line length lines.
if (line.length() == 0 || isNumeric(line))
@@ -28,6 +28,7 @@
*
*/
public class SwarmSessionImp implements ISwarmSession, ISwarmMessageListener {
+
private Socket socket;
private final String apiKey;
private static final String CRLF = "\r\n";
@@ -96,7 +97,7 @@ public void run() {
}
- }, 0, 60000);
+ }, 0, MAX_INTERVAL);
}
@@ -108,7 +109,7 @@ private Socket createSocket(String hostname, int port) throws UnknownHostExcepti
if (readerThread != null)
readerThread.interrupt();
- this.readerThread = new SwarmParticipationReader(socket.getInputStream(), apiKey, listeners);
+ this.readerThread = new SwarmParticipationReader(socket, apiKey, listeners);
this.readerThread.start();
//sendHeader();
@@ -321,7 +322,15 @@ private void debugOut(String message, boolean out) {
private void writeOut(String message) throws IOException {
if (!isConnected() && autoreconnect) {
System.out.println("["+this.getClass().getSimpleName()+"]: "+"Connection closed when trying to write, reconnecting to swarm");
- this.socket = createSocket(hostname, port);
+ try {
+ this.socket = createSocket(hostname, port);
+ } catch (UnknownHostException e){
+ if (keepalive)
+ System.out.println("["+this.getClass().getSimpleName()+"]: "+"UnknownHostException during reconnect, will retry on next keepalive tick");
+ else
+ System.out.println("["+this.getClass().getSimpleName()+"]: "+"UnknownHostException during reconnect: Network is unavailable, swarm session is stopped.");
+ return;
+ }
sendHeader();
}
@@ -398,8 +407,10 @@ public void exceptionOccurred(ExceptionType type, String message) {
try {
this.socket = createSocket(hostname, port);
} catch (UnknownHostException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ if (keepalive)
+ System.out.println("["+this.getClass().getSimpleName()+"]: "+"UnknownHostException during reconnect, will retry on next keepalive tick");
+ else
+ System.out.println("["+this.getClass().getSimpleName()+"]: "+"UnknownHostException during reconnect: Network is unavailable, swarm session is stopped.");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();

0 comments on commit a460fb0

Please sign in to comment.