diff --git a/.idea/modules/SocketclusterClientJava.iml b/.idea/modules/SocketclusterClientJava.iml index ef226b0..b97c015 100644 --- a/.idea/modules/SocketclusterClientJava.iml +++ b/.idea/modules/SocketclusterClientJava.iml @@ -1,10 +1,11 @@ - + + diff --git a/.idea/modules/SocketclusterClientJava_main.iml b/.idea/modules/SocketclusterClientJava_main.iml index 8d02020..72d8a36 100644 --- a/.idea/modules/SocketclusterClientJava_main.iml +++ b/.idea/modules/SocketclusterClientJava_main.iml @@ -1,7 +1,7 @@ - - - + + + @@ -9,7 +9,11 @@ - - + + + + + + \ No newline at end of file diff --git a/.idea/modules/SocketclusterClientJava_test.iml b/.idea/modules/SocketclusterClientJava_test.iml index c514b2d..29cdd0d 100644 --- a/.idea/modules/SocketclusterClientJava_test.iml +++ b/.idea/modules/SocketclusterClientJava_test.iml @@ -1,7 +1,7 @@ - - - + + + @@ -10,8 +10,12 @@ - - + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 8f7abd4..688859a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,14 @@ Java and Android Socketcluster Client ===================================== +This SocketCluster Java/Android Client fork has the following differences from upstream: +- [Jackson](https://github.com/FasterXML/jackson) JSON parser is used, instead of [JSON-java](https://github.com/stleary/JSON-java) +- [SocketCluster Minimal binary (sc-codec-min-bin) codec](https://github.com/SocketCluster/sc-codec-min-bin) support +- Underlying [nv-websocket-client](https://github.com/TakahikoKawasaki/nv-websocket-client) has been updated to the latest version +- Breaking: callback params now have Jackson `JsonNode` types, instead of upstream's `Object` + +**Important Notes:** Due to using `jackson-databing` this library is significantly bigger in size than upstream (~2 Mb), so if binary codec support is not needed, consider using upstream instead. + Overview -------- This client provides following functionality @@ -9,6 +17,7 @@ This client provides following functionality - Automatic reconnection - Pub/sub - Authentication (JWT) +- Binary codec support (`sc-codec-min-bin` included out-of-the-box) License ------- @@ -16,30 +25,18 @@ Apache License, Version 2.0 Gradle ------ -For java ```Gradle -dependencies { - compile 'io.github.sac:SocketclusterClientJava:1.7.4' +repositories { + jcenter() + maven { url "https://jitpack.io" } } -``` -for sample java examples visit [Java Demo](https://github.com/sacOO7/socketcluster-client-testing/tree/master/src/main/java) - -For android - -```Gradle -compile ('io.github.sac:SocketclusterClientJava:1.7.4'){ - exclude group :'org.json', module: 'json' +dependencies { + implementation 'com.github.XDex:socketcluster-client-java:2.0.0' } ``` -for sample android demo visit [Android Demo](https://github.com/sacOO7/socketcluster-android-demo) - -[ ![Download](https://api.bintray.com/packages/sacoo7/Maven/socketcluster-client/images/download.svg) ](https://bintray.com/sacoo7/Maven/socketcluster-client/_latestVersion) - - +[Download JAR](https://api.bintray.com/packages/sacoo7/Maven/socketcluster-client/images/download.svg) Description ----------- @@ -194,6 +191,20 @@ The object received can be String, Boolean, Long or JSONObject. ``` +Codecs +------ + +Custom binary [SocketCluster codecs](https://github.com/SocketCluster/socketcluster#custom-codecs) are supported. +Support for [sc-codec-min-bin](https://github.com/SocketCluster/sc-codec-min-bin) is included out-of-the-box. + +To enable a binary codec, just set it as follows on the SocketCluster `Socket`: + +```java +socket.setCodec(new MinBinCodec()); +``` + +Custom binary codecs must implement the `SocketClusterCodec` interface. + Implementing Pub-Sub via channels --------------------------------- diff --git a/build.gradle b/build.gradle index 8ff7025..9afdc59 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ allprojects { } group 'io.github.sac' -version '1.7.4' +version '2.0.0' allprojects { @@ -28,6 +28,10 @@ allprojects { apply plugin: 'maven-publish' } +task wrapper(type: Wrapper) { + gradleVersion = '4.6' +} + task sourceJar(type: Jar) { classifier = 'sources' from sourceSets.main.allJava @@ -38,13 +42,19 @@ task javadocJar(type: Jar, dependsOn: javadoc) { from javadoc.destinationDir } +jar { + from { + configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } + } +} + publishing { publications { MyPublication(MavenPublication) { from components.java groupId 'io.github.sac' artifactId 'SocketclusterClientJava' - version '1.7.4' + version '2.0.0' artifact sourceJar { classifier "sources" @@ -71,15 +81,15 @@ bintray{ publicDownloadNumbers = true version { - name = '1.7.4' - desc = 'Added method to Connect asynchronously with server' - vcsTag = '1.7.4' + name = '2.0.0' + desc = 'Switched to Jackson and implemented sc-min-bin codec support' + vcsTag = '2.0.0' } } } dependencies { - compile 'com.neovisionaries:nv-websocket-client:1.30' - compile group: 'org.json', name: 'json', version: '20090211' + implementation 'com.neovisionaries:nv-websocket-client:2.3' + implementation 'org.msgpack:jackson-dataformat-msgpack:0.8.15' } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 27ed120..6bfa03e 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Nov 08 13:06:24 IST 2016 +#Thu Mar 29 19:55:57 EEST 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.9-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip diff --git a/src/main/java/META-INF/MANIFEST.MF b/src/main/java/META-INF/MANIFEST.MF deleted file mode 100644 index 5ee19cb..0000000 --- a/src/main/java/META-INF/MANIFEST.MF +++ /dev/null @@ -1,3 +0,0 @@ -Manifest-Version: 1.0 -Main-Class: Main - diff --git a/src/main/java/Main.java b/src/main/java/Main.java deleted file mode 100644 index fd83f4e..0000000 --- a/src/main/java/Main.java +++ /dev/null @@ -1,165 +0,0 @@ -import com.neovisionaries.ws.client.WebSocketException; -import com.neovisionaries.ws.client.WebSocketFrame; -import io.github.sac.*; - -import java.util.List; -import java.util.Map; - -/** - * Created by sachin on 8/11/16. - */ - -public class Main { - - public static String url="ws://localhost:8000/socketcluster/"; - - public static void main(String arg[]) { - - Socket socket = new Socket(url); - - socket.setListener(new BasicListener() { - - public void onConnected(Socket socket,Map> headers) { - System.out.println("Connected to endpoint"); - } - - public void onDisconnected(Socket socket,WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) { - System.out.println("Disconnected from end-point"); - } - - public void onConnectError(Socket socket,WebSocketException exception) { - System.out.println("Got connect error "+ exception); - } - - public void onSetAuthToken(String token, Socket socket) { - System.out.println("Set auth token got called"); - socket.setAuthToken(token); - } - - public void onAuthentication(Socket socket,Boolean status) { - if (status) { - System.out.println("socket is authenticated"); - } else { - System.out.println("Authentication is required (optional)"); - } - } - - }); - - socket.setReconnection(new ReconnectStrategy().setDelay(3000).setMaxAttempts(10)); //Connect after each 2 seconds for 30 times - - - socket.connectAsync(); - - - socket.disableLogging(); - - - socket.emit("chat","Hi"); - socket.emit("chat", "Hi", new Ack() { - @Override - public void call(String eventName, Object error, Object data) { - System.out.println("Got message for :"+eventName+" error is :"+error+" data is :"+data); - } - }); - - socket.on("yell", new Emitter.Listener() { - @Override - public void call(String eventName, Object data) { - System.out.println("Got message for :"+eventName+" data is :"+data); - } - }); - - socket.on("yell", new Emitter.AckListener() { - @Override - public void call(String eventName, Object data, Ack ack) { - System.out.println("Got message for :"+eventName+" data is :"+data); - //sending ack back - - ack.call(eventName,"This is error","This is data"); - } - }); -// -// - Socket.Channel channel = socket.createChannel("yell"); -// - channel.subscribe(new Ack() { - @Override - public void call(String channelName, Object error, Object data) { - if (error==null){ - System.out.println("Subscribed to channel "+channelName+" successfully"); - } - } - }); - - channel.publish("Hi sachin", new Ack() { - @Override - public void call(String channelName, Object error, Object data) { - if (error==null){ - System.out.println("Published message to channel "+channelName+" successfully"); - } - } - }); - - channel.onMessage(new Emitter.Listener() { - @Override - public void call(String channelName, Object data) { - - System.out.println("Got message for channel "+channelName+" data is "+data); - } - }); - - channel.unsubscribe(new Ack() { - @Override - public void call(String name, Object error, Object data) { - System.out.println("Unsubscribed successfully"); - } - }); - channel.unsubscribe(); - -// channel.subscribe(new Ack() { -// @Override -// public void call(String name, Object error, Object data) { -// -// } -// }); -//// -// channel.onMessage(new Emitter.Listener() { -// public void call(Object object) { -// System.out.println("got message " + object); -// } -// }); - -// -// -// -// socket.on("chat", new Emitter.Listener() { -// public void call(Object object) { -// System.out.println("Got echo event :: " + object); -// } -// }); -// -// -// -// socket.emit("chat", "hi", new Ack() { -// public void call(Object error, Object data) { -// -// } -// }); -// -// -// -// while (true) { -// Scanner scanner = new Scanner(System.in); -// -// channel.publish(scanner.nextLine(), new Ack() { -// public void call(Object error, Object data) { -// if (error == null) { -// System.out.println("Publish sent successfully"); -// } -// } -// }); -// } - - } -} diff --git a/src/main/java/io/github/sac/Ack.java b/src/main/java/io/github/sac/Ack.java index 785f337..66b5b46 100644 --- a/src/main/java/io/github/sac/Ack.java +++ b/src/main/java/io/github/sac/Ack.java @@ -2,10 +2,12 @@ * Created by sachin on 16/11/16. */ +import com.fasterxml.jackson.databind.JsonNode; + /** * Interface for handling errors */ public interface Ack { - void call(String name, Object error, Object data); + void call(String name, JsonNode error, JsonNode data); } diff --git a/src/main/java/io/github/sac/Emitter.java b/src/main/java/io/github/sac/Emitter.java index cdf49b1..73ea2f1 100644 --- a/src/main/java/io/github/sac/Emitter.java +++ b/src/main/java/io/github/sac/Emitter.java @@ -4,6 +4,8 @@ * Created by sachin on 13/11/16. */ +import com.fasterxml.jackson.databind.JsonNode; + import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -47,7 +49,7 @@ public Emitter on(String event, AckListener fn) { } - public Emitter handleEmit(String event, Object object) { + public Emitter handleEmit(String event, JsonNode object) { Listener listener = singlecallbacks.get(event); if (listener != null) { @@ -56,7 +58,7 @@ public Emitter handleEmit(String event, Object object) { return this; } - public Emitter handlePublish(String event, Object object) { + public Emitter handlePublish(String event, JsonNode object) { Listener listener = publishcallbacks.get(event); @@ -70,7 +72,7 @@ public boolean hasEventAck(String event) { return this.singleackcallbacks.get(event) != null; } - public Emitter handleEmitAck(String event, Object object, Ack ack) { + public Emitter handleEmitAck(String event, JsonNode object, Ack ack) { AckListener listener = singleackcallbacks.get(event); if (listener != null) { @@ -81,11 +83,11 @@ public Emitter handleEmitAck(String event, Object object, Ack ack) { public interface Listener { - void call(String name, Object data); + void call(String name, JsonNode data); } public interface AckListener { - void call(String name, Object data, Ack ack); + void call(String name, JsonNode data, Ack ack); } /** diff --git a/src/main/java/io/github/sac/Parser.java b/src/main/java/io/github/sac/Parser.java index b0f8005..f4b9c70 100644 --- a/src/main/java/io/github/sac/Parser.java +++ b/src/main/java/io/github/sac/Parser.java @@ -1,8 +1,7 @@ package io.github.sac; -import org.json.JSONException; -import org.json.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; /** * Created by sachin on 15/11/16. @@ -19,23 +18,25 @@ public enum ParseResult { } - public static ParseResult parse(Object dataobject, String event) throws JSONException { - - if (dataobject instanceof JSONObject && ((JSONObject) dataobject).opt("isAuthenticated") != null) { + public static ParseResult parse(JsonNode dataobject, String event) { + if (dataobject.has("isAuthenticated")) { return ParseResult.ISAUTHENTICATED; - } else if (event != null) { - if (event.equals("#publish")) { - return ParseResult.PUBLISH; - } else if (event.equals("#removeAuthToken")) { - return ParseResult.REMOVETOKEN; - } else if (event.equals("#setAuthToken")) { - return ParseResult.SETTOKEN; - } else { - return ParseResult.EVENT; + } + + if (event != null) { + switch (event) { + case "#publish": + return ParseResult.PUBLISH; + case "#removeAuthToken": + return ParseResult.REMOVETOKEN; + case "#setAuthToken": + return ParseResult.SETTOKEN; + default: + return ParseResult.EVENT; } - } else { - return ParseResult.ACKRECEIVE; } + + return ParseResult.ACKRECEIVE; } } diff --git a/src/main/java/io/github/sac/Socket.java b/src/main/java/io/github/sac/Socket.java index 50c4b9e..a794846 100644 --- a/src/main/java/io/github/sac/Socket.java +++ b/src/main/java/io/github/sac/Socket.java @@ -1,8 +1,12 @@ package io.github.sac; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; import com.neovisionaries.ws.client.*; -import org.json.JSONException; -import org.json.JSONObject; +import io.github.sac.codec.SocketClusterCodec; import java.io.IOException; import java.util.*; @@ -16,7 +20,6 @@ public class Socket extends Emitter { - private final static Logger LOGGER = Logger.getLogger(Socket.class.getName()); private AtomicInteger counter; @@ -30,6 +33,9 @@ public class Socket extends Emitter { private List channels; private WebSocketAdapter adapter; private Map headers; + private SocketClusterCodec codec; + + private static final ObjectMapper mapper = new ObjectMapper(); public Socket(String URL) { this.URL = URL; @@ -79,6 +85,10 @@ public void setListener(BasicListener listener) { this.listener = listener; } + public void setCodec(SocketClusterCodec codec) { + this.codec = codec; + } + /** * used to set up TLS/SSL connection to server for more details visit neovisionaries websocket client */ @@ -91,11 +101,27 @@ public void setAuthToken(String token) { AuthToken = token; } + private void send(WebSocket webSocket, String data) { + send(webSocket, new TextNode(data)); + } + + private void send(JsonNode data) { + send(ws, data); + } + + private void send(WebSocket webSocket, JsonNode data) { + if (codec == null) { + webSocket.sendText(data.toString()); + } else { + webSocket.sendBinary(codec.encode(data)); + } + } + public WebSocketAdapter getAdapter() { return new WebSocketAdapter() { @Override - public void onConnected(WebSocket websocket, Map> headers) throws Exception { + public void onConnected(WebSocket websocket, Map> headers) { /** * Code for sending handshake @@ -106,117 +132,110 @@ public void onConnected(WebSocket websocket, Map> headers) strategy.setAttemptsMade(0); } - JSONObject handshakeObject = new JSONObject(); + ObjectNode handshakeObject = mapper.createObjectNode(); handshakeObject.put("event", "#handshake"); - JSONObject object = new JSONObject(); + + ObjectNode object = mapper.createObjectNode(); object.put("authToken", AuthToken); - handshakeObject.put("data", object); + + handshakeObject.set("data", object); handshakeObject.put("cid", counter.getAndIncrement()); - websocket.sendText(handshakeObject.toString()); - listener.onConnected(Socket.this, headers); + send(websocket, handshakeObject); - super.onConnected(websocket, headers); + listener.onConnected(Socket.this, headers); } @Override public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception { listener.onDisconnected(Socket.this, serverCloseFrame, clientCloseFrame, closedByServer); reconnect(); - super.onDisconnected(websocket, serverCloseFrame, clientCloseFrame, closedByServer); } @Override public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception { listener.onConnectError(Socket.this, exception); reconnect(); - super.onConnectError(websocket, exception); } @Override public void onFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { + JsonNode payload; - if (frame.getPayloadText().equalsIgnoreCase("#1")) { - /** - * PING-PONG logic goes here - */ - websocket.sendText("#2"); + if (codec == null) { + payload = getTextPayload(frame.getPayloadText()); } else { + payload = codec.decode(frame.getPayload()); + } - JSONObject object = new JSONObject(frame.getPayloadText()); - - /** - * Message retrieval mechanism goes here - */ - LOGGER.info("Message :" + object.toString()); - - - try { - Object dataobject = object.opt("data"); - Integer rid = (Integer) object.opt("rid"); - Integer cid = (Integer) object.opt("cid"); - String event = (String) object.opt("event"); - - switch (Parser.parse(dataobject, event)) { - - case ISAUTHENTICATED: - listener.onAuthentication(Socket.this, ((JSONObject) dataobject).getBoolean("isAuthenticated")); - subscribeChannels(); - break; - case PUBLISH: - Socket.this.handlePublish(((JSONObject) dataobject).getString("channel"), ((JSONObject) dataobject).opt("data")); - break; - case REMOVETOKEN: - setAuthToken(null); - break; - case SETTOKEN: - String token = ((JSONObject) dataobject).getString("token"); - setAuthToken(token); - listener.onSetAuthToken(token, Socket.this); - break; - case EVENT: - if (hasEventAck(event)) { - handleEmitAck(event, dataobject, ack(Long.valueOf(cid))); - } else { - Socket.this.handleEmit(event, dataobject); + if (payload.isTextual() && payload.asText().equalsIgnoreCase("#1")) { + send(websocket, "#2"); // PONG + return; + } + LOGGER.info("Message: " + payload.toString()); + + JsonNode dataobject = payload.get("data"); + Integer rid = payload.get("rid").asInt(); + Integer cid = payload.get("cid").asInt(); + String event = payload.get("event").asText(); + + switch (Parser.parse(dataobject, event)) { + case ISAUTHENTICATED: + listener.onAuthentication(Socket.this, dataobject.get("isAuthenticated").asBoolean()); + subscribeChannels(); + break; + case PUBLISH: + Socket.this.handlePublish(dataobject.get("channel").asText(), dataobject.get("data")); + break; + case REMOVETOKEN: + setAuthToken(null); + break; + case SETTOKEN: + String token = dataobject.get("token").asText(); + setAuthToken(token); + listener.onSetAuthToken(token, Socket.this); + break; + case EVENT: + if (hasEventAck(event)) { + handleEmitAck(event, dataobject, ack(Long.valueOf(cid))); + } else { + Socket.this.handleEmit(event, dataobject); + } + break; + case ACKRECEIVE: + if (acks.containsKey((long) rid)) { + Object[] objects = acks.remove((long) rid); + if (objects != null) { + Ack fn = (Ack) objects[1]; + if (fn != null) { + fn.call((String) objects[0], payload.get("error"), dataobject); + } else { + LOGGER.info("ack function is null with rid " + rid); } - break; - case ACKRECEIVE: - if (acks.containsKey((long) rid)) { - Object[] objects = acks.remove((long) rid); - if (objects != null) { - Ack fn = (Ack) objects[1]; - if (fn != null) { - fn.call((String) objects[0], object.opt("error"), object.opt("data")); - } else { - LOGGER.info("ack function is null with rid " + rid); - } - } - } - break; + } } - } catch (Exception e) { - e.printStackTrace(); - } - + break; } - super.onFrame(websocket, frame); } + private JsonNode getTextPayload(String payloadText) throws IOException { + try { + return mapper.readTree(payloadText); + } catch (JsonParseException e) { + return mapper.valueToTree(payloadText); + } + } @Override public void onCloseFrame(WebSocket websocket, WebSocketFrame frame) throws Exception { LOGGER.info("On close frame got called"); - super.onCloseFrame(websocket, frame); } @Override public void onSendError(WebSocket websocket, WebSocketException cause, WebSocketFrame frame) throws Exception { LOGGER.info("Got send error"); - - super.onSendError(websocket, cause, frame); } }; @@ -226,34 +245,24 @@ public void onSendError(WebSocket websocket, WebSocketException cause, WebSocket public Socket emit(final String event, final Object object) { EventThread.exec(new Runnable() { public void run() { - JSONObject eventObject = new JSONObject(); - try { - eventObject.put("event", event); - eventObject.put("data", object); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(eventObject.toString()); + ObjectNode eventObject = mapper.createObjectNode(); + eventObject.put("event", event); + eventObject.putPOJO("data", object); + send(eventObject); } }); return this; } - public Socket emit(final String event, final Object object, final Ack ack) { - EventThread.exec(new Runnable() { public void run() { - JSONObject eventObject = new JSONObject(); + ObjectNode eventObject = mapper.createObjectNode(); acks.put(counter.longValue(), getAckObject(event, ack)); - try { - eventObject.put("event", event); - eventObject.put("data", object); - eventObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(eventObject.toString()); + eventObject.put("event", event); + eventObject.putPOJO("data", object); + eventObject.put("cid", counter.getAndIncrement()); + send(eventObject); } }); return this; @@ -262,18 +271,12 @@ public void run() { private Socket subscribe(final String channel) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject = new JSONObject(); - try { - subscribeObject.put("event", "#subscribe"); - JSONObject object = new JSONObject(); - object.put("channel", channel); - subscribeObject.put("data", object); - - subscribeObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(subscribeObject.toString()); + ObjectNode subscribeObject = mapper.createObjectNode(); + subscribeObject.put("event", "#subscribe"); + subscribeObject.set("data", mapper.createObjectNode().put("channel", channel)); + + subscribeObject.put("cid", counter.getAndIncrement()); + send(subscribeObject); } }); return this; @@ -287,18 +290,12 @@ private Object[] getAckObject(String event, Ack ack) { private Socket subscribe(final String channel, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject = new JSONObject(); - try { - subscribeObject.put("event", "#subscribe"); - JSONObject object = new JSONObject(); - acks.put(counter.longValue(), getAckObject(channel, ack)); - object.put("channel", channel); - subscribeObject.put("data", object); - subscribeObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(subscribeObject.toString()); + ObjectNode subscribeObject = mapper.createObjectNode(); + subscribeObject.put("event", "#subscribe"); + acks.put(counter.longValue(), getAckObject(channel, ack)); + subscribeObject.set("data", mapper.createObjectNode().put("channel", channel)); + subscribeObject.put("cid", counter.getAndIncrement()); + send(subscribeObject); } }); return this; @@ -307,15 +304,11 @@ public void run() { private Socket unsubscribe(final String channel) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject = new JSONObject(); - try { - subscribeObject.put("event", "#unsubscribe"); - subscribeObject.put("data", channel); - subscribeObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(subscribeObject.toString()); + ObjectNode subscribeObject = mapper.createObjectNode(); + subscribeObject.put("event", "#unsubscribe"); + subscribeObject.put("data", channel); + subscribeObject.put("cid", counter.getAndIncrement()); + send(subscribeObject); } }); return this; @@ -324,17 +317,13 @@ public void run() { private Socket unsubscribe(final String channel, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject = new JSONObject(); - try { - subscribeObject.put("event", "#unsubscribe"); - subscribeObject.put("data", channel); + ObjectNode subscribeObject = mapper.createObjectNode(); + subscribeObject.put("event", "#unsubscribe"); + subscribeObject.put("data", channel); - acks.put(counter.longValue(), getAckObject(channel, ack)); - subscribeObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(subscribeObject.toString()); + acks.put(counter.longValue(), getAckObject(channel, ack)); + subscribeObject.put("cid", counter.getAndIncrement()); + send(subscribeObject); } }); return this; @@ -343,18 +332,16 @@ public void run() { public Socket publish(final String channel, final Object data) { EventThread.exec(new Runnable() { public void run() { - JSONObject publishObject = new JSONObject(); - try { - publishObject.put("event", "#publish"); - JSONObject object = new JSONObject(); - object.put("channel", channel); - object.put("data", data); - publishObject.put("data", object); - publishObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(publishObject.toString()); + ObjectNode publishObject = mapper.createObjectNode(); + publishObject.put("event", "#publish"); + + ObjectNode dataObject = mapper.createObjectNode(); + dataObject.put("channel", channel); + dataObject.putPOJO("data", data); + publishObject.set("data", dataObject); + + publishObject.put("cid", counter.getAndIncrement()); + send(publishObject); } }); @@ -364,19 +351,17 @@ public void run() { public Socket publish(final String channel, final Object data, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject publishObject = new JSONObject(); - try { - publishObject.put("event", "#publish"); - JSONObject object = new JSONObject(); - acks.put(counter.longValue(), getAckObject(channel, ack)); - object.put("channel", channel); - object.put("data", data); - publishObject.put("data", object); - publishObject.put("cid", counter.getAndIncrement()); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(publishObject.toString()); + ObjectNode publishObject = mapper.createObjectNode(); + publishObject.put("event", "#publish"); + + ObjectNode dataObject = mapper.createObjectNode(); + acks.put(counter.longValue(), getAckObject(channel, ack)); + dataObject.put("channel", channel); + dataObject.putPOJO("data", data); + publishObject.set("data", dataObject); + + publishObject.put("cid", counter.getAndIncrement()); + send(publishObject); } }); @@ -385,18 +370,14 @@ public void run() { private Ack ack(final Long cid) { return new Ack() { - public void call(final String channel, final Object error, final Object data) { + public void call(final String channel, final JsonNode error, final JsonNode data) { EventThread.exec(new Runnable() { public void run() { - JSONObject object = new JSONObject(); - try { - object.put("error", error); - object.put("data", data); - object.put("rid", cid); - } catch (JSONException e) { - e.printStackTrace(); - } - ws.sendText(object.toString()); + ObjectNode object = mapper.createObjectNode(); + object.set("error", error); + object.set("data", data); + object.put("rid", cid); + send(object); } }); } diff --git a/src/main/java/io/github/sac/codec/MinBinCodec.java b/src/main/java/io/github/sac/codec/MinBinCodec.java new file mode 100644 index 0000000..82018da --- /dev/null +++ b/src/main/java/io/github/sac/codec/MinBinCodec.java @@ -0,0 +1,177 @@ +package io.github.sac.codec; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.msgpack.jackson.dataformat.MessagePackFactory; + +import java.io.IOException; +import java.util.logging.Logger; + +public class MinBinCodec implements SocketClusterCodec { + private final static Logger LOGGER = Logger.getLogger(SocketClusterCodec.class.getName()); + private final static ObjectMapper mapper = new ObjectMapper(new MessagePackFactory()); + + @Override + public byte[] encode(JsonNode data) { + try { + if (data.isValueNode()) { + return mapper.writeValueAsBytes(data); + } + + if (data.isObject()) { + ObjectNode encodeObject = (ObjectNode) data; + ObjectNode compressed = mapper.createObjectNode(); + + compressPublish(encodeObject, compressed); + compressEmit(encodeObject, compressed); + compressResponse(encodeObject, compressed); + + return mapper.writeValueAsBytes(compressed); + } + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + LOGGER.info("Unable to encode data"); + return null; + } + + private void compressResponse(ObjectNode object, ObjectNode compressed) { + if (!object.has("rid") || object.get("rid").isNull()) { + return; + } + + ArrayNode array = mapper.createArrayNode() + .add(object.get("rid")) + .add(object.get("error")) + .add(object.get("data")); + + compressed.set("r", array); + + object.remove("rid"); + object.remove("error"); + object.remove("data"); + } + + private void compressPublish(ObjectNode object, ObjectNode compressed) { + if (!object.has("event") || !object.get("event").asText().equals("#publish") + || !object.has("data") || object.get("data").isNull()) { + return; + } + + ObjectNode dataObject = (ObjectNode) object.get("data"); + + ArrayNode array = mapper.createArrayNode(); + array.add(dataObject.get("channel")); + array.add(dataObject.get("data")); + + if (object.has("cid")) { + array.add(object.get("cid")); + object.remove("cid"); + } + + compressed.set("p", array); + + object.remove("event"); + object.remove("data"); + } + + private void compressEmit(ObjectNode object, ObjectNode compressed) { + if (!object.has("event") || object.get("event").isNull()) { + return; + } + + ArrayNode array = mapper.createArrayNode(); + array.add(object.get("event")); + array.add(object.get("data")); + + if (object.has("cid")) { + array.add(object.get("cid")); + object.remove("cid"); + } + + compressed.set("e", array); + + object.remove("event"); + object.remove("data"); + } + + + @Override + public JsonNode decode(byte[] data) { + try { + JsonNode decoded = mapper.readTree(data); + + if (decoded.isValueNode()) { + return decoded; + } + + if (decoded.isObject()) { + ObjectNode decodeObject = (ObjectNode) decoded; + decompressEmit(decodeObject); + decompressPublish(decodeObject); + decompressResponse(decodeObject); + + return decodeObject; + } + } catch (IOException e) { + e.printStackTrace(); + } + LOGGER.info("Unable to decode data"); + return null; + } + + private void decompressResponse(ObjectNode object) { + if (!object.has("r") || object.get("r").isNull()) { + return; + } + + ArrayNode array = (ArrayNode) object.get("r"); + + object.set("rid", array.get(0)); + object.set("error", array.get(1)); + object.set("data", array.get(2)); + + object.remove("r"); + } + + private void decompressPublish(ObjectNode object) { + if (!object.has("p") || object.get("p").isNull()) { + return; + } + + ArrayNode array = (ArrayNode) object.get("p"); + + ObjectNode dataObject = mapper.createObjectNode(); + dataObject.set("channel", array.get(0)); + dataObject.set("data", array.get(1)); + + object.put("event", "#publish"); + object.set("data", dataObject); + + if (array.has(2)) { + object.set("cid", array.get(2)); + } + + object.remove("p"); + } + + private void decompressEmit(ObjectNode object) { + if (!object.has("e") || object.get("e").isNull()) { + return; + } + + ArrayNode array = (ArrayNode) object.get("e"); + + object.set("event", array.get(0)); + object.set("data", array.get(1)); + + if (array.has(2)) { + object.set("cid", array.get(2)); + } + + object.remove("e"); + } +} diff --git a/src/main/java/io/github/sac/codec/SocketClusterCodec.java b/src/main/java/io/github/sac/codec/SocketClusterCodec.java new file mode 100644 index 0000000..3d30581 --- /dev/null +++ b/src/main/java/io/github/sac/codec/SocketClusterCodec.java @@ -0,0 +1,9 @@ +package io.github.sac.codec; + +import com.fasterxml.jackson.databind.JsonNode; + +public interface SocketClusterCodec { + byte[] encode(JsonNode data); + + JsonNode decode(byte[] data); +}