Skip to content

Commit

Permalink
updating from buglabs/bugswarm-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
theterg committed Jun 18, 2012
1 parent b8caf55 commit aa4fd80
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 972 deletions.
204 changes: 0 additions & 204 deletions com.buglabs.bug.swarm.client/bin/Example.html

This file was deleted.

1 change: 0 additions & 1 deletion com.buglabs.bug.swarm.client/bin/README

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class SwarmClientFactory {
* Default port of swarm server participation API.
*/
private static final int DEFAULT_SWARM_SERVER_PORT = 80;
private static boolean keepalive;

private SwarmClientFactory() {
//Static utility class.
Expand Down Expand Up @@ -56,9 +57,9 @@ public static ISwarmConfiguration getSwarmConfigurationClient(String hostname, S
* @throws UnknownHostException if unable to resolve hostname
* @throws IOException on I/O error
*/
public static ISwarmSession createProductionSession(String hostname, String apiKey, String resourceId, String ... swarmIds) throws UnknownHostException, IOException {
public static ISwarmSession createProductionSession(String hostname, String apiKey, String resourceId, boolean keepalive, boolean autoreconnect, String ... swarmIds) throws UnknownHostException, IOException {

return createSession(hostname, SessionType.PRODUCTION, apiKey, resourceId, swarmIds);
return createSession(hostname, SessionType.PRODUCTION, apiKey, resourceId, keepalive, autoreconnect, swarmIds);
}

/**
Expand All @@ -72,12 +73,12 @@ public static ISwarmSession createProductionSession(String hostname, String apiK
* @throws UnknownHostException if unable to resolve hostname
* @throws IOException on I/O error
*/
public static ISwarmSession createConsumptionSession(String hostname, String apiKey, String resourceId, String ... swarmIds) throws UnknownHostException, IOException {
public static ISwarmSession createConsumptionSession(String hostname, String apiKey, String resourceId, boolean keepalive, boolean autoreconnect, String ... swarmIds) throws UnknownHostException, IOException {

return createSession(hostname, SessionType.CONSUMPTION, apiKey, resourceId, swarmIds);
return createSession(hostname, SessionType.CONSUMPTION, apiKey, resourceId, keepalive, autoreconnect, swarmIds);
}

private static ISwarmSession createSession(String hostname, SessionType type, String apiKey, String resourceId, String ... swarmIds) throws UnknownHostException, IOException {
private static ISwarmSession createSession(String hostname, SessionType type, String apiKey, String resourceId, boolean keepalive, boolean autoreconnect, String ... swarmIds) throws UnknownHostException, IOException {
if (hostname.toLowerCase().startsWith("http://"))
hostname = hostname.substring("http://".length());

Expand All @@ -87,7 +88,7 @@ private static ISwarmSession createSession(String hostname, SessionType type, St
port = Integer.parseInt(hostname.split(":")[1]);
hostname = hostname.split(":")[0];
}
return new SwarmSessionImp(hostname, type, port, apiKey, resourceId, swarmIds);
return new SwarmSessionImp(hostname, type, port, apiKey, resourceId, keepalive, autoreconnect, swarmIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.buglabs.bug.swarm.client.impl;

import java.net.Socket;

public class KeepAliveThread implements Runnable{
public KeepAliveThread(Socket connection){

}
@Override
public void run() {


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void run() {
listener.exceptionOccurred(ExceptionType.SERVER_MESSAGE_PARSE_ERROR, "Unparsable message: " + line);
continue;
}

debugOut(line, false);
//uncomment below to print all messages received by the reader
//debugOut(line, false);

//Parse the message string into a JsonNode.
JsonNode jmessage = mapper.readTree(line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;

import org.codehaus.jackson.JsonGenerationException;
Expand All @@ -33,45 +36,80 @@ public class SwarmSessionImp implements ISwarmSession, ISwarmMessageListener {
private final List<ISwarmMessageListener> listeners;
private SwarmParticipationReader readerThread;
private final static ObjectMapper mapper = new ObjectMapper();
protected static final long MAX_INTERVAL = 60000; //timeout
private final String[] swarmIds;
private final String resourceId;
private final int port;
private final SessionType type;
private final SessionType type;
private boolean keepalive;
private boolean autoreconnect;
private long timestamp;

/**
* @param hostname host of server
* @param type type of session to create
* @param port port on server
* @param apiKey api key
* @param resourceId resource id
* @param autoreconnect
* @param keepalive
* @param swarmIds list of swarms to join
* @throws UnknownHostException on host resolution error
* @throws IOException on I/O error
*/
public SwarmSessionImp(String hostname, ISwarmSession.SessionType type, int port, String apiKey, String resourceId, String ... swarmIds) throws UnknownHostException, IOException {
public SwarmSessionImp(String hostname, ISwarmSession.SessionType type, int port, String apiKey, String resourceId, boolean keepalive, boolean autoreconnect, String ... swarmIds) throws UnknownHostException, IOException {
this.hostname = hostname;
this.type = type;
this.port = port;
this.apiKey = apiKey;
this.resourceId = resourceId;
this.keepalive = keepalive;
this.autoreconnect = autoreconnect;
this.swarmIds = swarmIds;
this.listeners = new CopyOnWriteArrayList<ISwarmMessageListener>();
this.listeners.add(this);
this.socket = createSocket(hostname, port);

sendHeader();
sendHeader();
if (keepalive)
createKeepAliveThread();
}

//every 60 seconds, see if a message has been sent by comparing
//the local timestamp to the global. if they're the same
//no message has been sent, send a \n, otherwise just keep truckin
private void createKeepAliveThread() {
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
private long localtimestamp = timestamp;

public void run() {
if (localtimestamp==timestamp){
try {
writeOut("\n");
} catch (IOException e) {
e.printStackTrace();
}
}
localtimestamp = timestamp;


}
}, 0, 60000);
}


private Socket createSocket(String hostname, int port) throws UnknownHostException, IOException {
Socket socket = new Socket(hostname, port);
socket.setSoTimeout(60000);
this.soutput = socket.getOutputStream();

if (readerThread != null)
readerThread.interrupt();

this.readerThread = new SwarmParticipationReader(socket.getInputStream(), apiKey, listeners);
this.readerThread.start();
//sendHeader();

return socket;
}
Expand Down Expand Up @@ -112,7 +150,11 @@ private void sendHeader() throws IOException {
//need to write one chunk into the stream before the platform will send data do us
//I chose \ because it's the keepalive as well
//see https://github.com/buglabs/bugswarm-connector/issues/30
writeOut("\n");
soutput.write(Integer.toHexString("\n".length()).getBytes());
soutput.write(CRLF.getBytes());
soutput.write("\n".getBytes());
soutput.write(CRLF.getBytes());
soutput.flush();
}

/**
Expand Down Expand Up @@ -209,8 +251,8 @@ public void join(String swarmId, String resourceId) throws IOException {

buffer.append(Integer.toHexString(ps.getBytes().length)).append(CRLF);
buffer.append(ps).append(CRLF);

debugOut(buffer.toString(), true);
//uncomment to get join messages
//debugOut(buffer.toString(), true);
soutput.write(buffer.toString().getBytes());
soutput.flush();
}
Expand Down Expand Up @@ -279,17 +321,20 @@ private Map<String, Object> createFeedRequestMap(String feedName, int interval,
* @throws IOException on socket I/O error
*/
private void writeOut(String message) throws IOException {
if (!isConnected()) {
if (!isConnected() && autoreconnect) {
this.socket = createSocket(hostname, port);
sendHeader();
}

debugOut(message, true);

soutput.write(Integer.toHexString(message.length()).getBytes());
//debugOut(message, true);
//new framing requires sending a \r\n after each message.
soutput.write(Integer.toHexString(message.length()+CRLF.length()).getBytes());
soutput.write(CRLF.getBytes());
message = message+CRLF;
soutput.write(message.getBytes());
soutput.write(CRLF.getBytes());
soutput.flush();
timestamp = (new Date()).getTime();
}

@Override
Expand All @@ -314,8 +359,8 @@ public void close() {

buffer.append(Integer.toHexString(ps.getBytes().length)).append(CRLF);
buffer.append(ps).append(CRLF);

debugOut(buffer.toString(), true);
//uncomment to get close messages
//debugOut(buffer.toString(), true);
soutput.write(buffer.toString().getBytes());
soutput.flush();
} catch (IOException e) {
Expand Down

0 comments on commit aa4fd80

Please sign in to comment.