Skip to content

Commit

Permalink
Websocket implementation burst-team#2
Browse files Browse the repository at this point in the history
  • Loading branch information
dawallet committed Aug 1, 2017
1 parent 7d8c862 commit 04a21a7
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 35 deletions.
12 changes: 12 additions & 0 deletions src/java/nxt/peer/Errors.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package nxt.peer;

final class Errors {

final static String BLACKLISTED = "Your peer is blacklisted";
final static String END_OF_FILE = "Unexpected token END OF FILE at position 0.";
final static String UNKNOWN_PEER = "Your peer address cannot be resolved";
final static String UNSUPPORTED_REQUEST_TYPE = "Unsupported request type!";
final static String UNSUPPORTED_PROTOCOL = "Unsupported protocol!";

private Errors() {} // never
}
10 changes: 8 additions & 2 deletions src/java/nxt/peer/PeerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.net.URL;
import java.net.UnknownHostException;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -86,6 +87,8 @@ public State getState() {
}

void setState(State state) {
if (state != State.CONNECTED)
webSocket.close();
if (this.state == state) {
return;
}
Expand Down Expand Up @@ -421,8 +424,11 @@ public JSONObject send(final JSONStreamAware request, int maxResponseSize) {
}
}
} catch (RuntimeException|IOException e) {
if (! (e instanceof UnknownHostException || e instanceof SocketTimeoutException || e instanceof SocketException)) {
Logger.logDebugMessage("Error sending JSON request", e);
if (state == State.CONNECTED ||
!(e instanceof UnknownHostException || e instanceof SocketTimeoutException ||
e instanceof SocketException || Errors.END_OF_FILE.equals(e.getMessage()))) {
Logger.logDebugMessage(String.format("Error sending JSON request to %s: %s",
address, e.getMessage()!=null ? e.getMessage() : e.toString()));
}
if ((Peers.communicationLoggingMask & Peers.LOGGING_MASK_EXCEPTIONS) != 0) {
log += " >>> " + e.toString();
Expand Down
2 changes: 1 addition & 1 deletion src/java/nxt/peer/PeerServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void init(ServletConfig config) throws ServletException {
@Override
public void configure(WebSocketServletFactory factory) {
factory.getPolicy().setIdleTimeout(Peers.webSocketIdleTimeout);
factory.getPolicy().setMaxTextMessageSize(Math.max(Peers.MAX_REQUEST_SIZE, Peers.MAX_RESPONSE_SIZE));
factory.getPolicy().setMaxBinaryMessageSize(PeerWebSocket.MAX_MESSAGE_SIZE);
factory.setCreator(new PeerSocketCreator());
}

Expand Down
132 changes: 100 additions & 32 deletions src/java/nxt/peer/PeerWebSocket.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package nxt.peer;

import nxt.Nxt;
import nxt.util.Logger;

import org.eclipse.jetty.websocket.api.Session;
Expand All @@ -11,8 +12,14 @@
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -24,6 +31,8 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* PeerWebSocket represents an HTTP/HTTPS upgraded connection
Expand All @@ -34,14 +43,27 @@
* begins with '(' and ends with ')' and consists of comma-separated
* fields.
*
* The Version 1 message prefix has the following format: (version,id)
* The Version 1 message prefix has the following format: (version,id,flags,length)
* - 'version' is the message version (Integer)
* - 'id' is the request identifier (Long). The response message
* must have the same identifier.
* - 'id' is the request identifier (Long)
* - 'flags' is a bit field of flags (Integer)
* - 'length' is the uncompressed message length (Integer)
*/
@WebSocket
public class PeerWebSocket {

/** Message compression enabled */
private static final boolean isGzipEnabled = Nxt.getBooleanProperty("nxt.enablePeerServerGZIPFilter");

/** Maximum message size */
static final int MAX_MESSAGE_SIZE = 192*1024*1024;

/** Minimum compressed message size */
private static final int MIN_COMPRESS_SIZE = 256;

/** Compressed message flag */
private static final int FLAG_COMPRESSED = 1;

/** Our WebSocket message version */
private static final int VERSION = 1;

Expand Down Expand Up @@ -80,7 +102,8 @@ public class PeerWebSocket {
*/
public PeerWebSocket() {
peerClient = new WebSocketClient();
peerClient.getPolicy().setMaxTextMessageSize(Math.max(Peers.MAX_REQUEST_SIZE, Peers.MAX_RESPONSE_SIZE));
peerClient.getPolicy().setIdleTimeout(Peers.webSocketIdleTimeout);
peerClient.getPolicy().setMaxBinaryMessageSize(MAX_MESSAGE_SIZE);
}

/**
Expand Down Expand Up @@ -134,7 +157,7 @@ public boolean startClient(URI uri) throws IOException {
Logger.logDebugMessage(String.format("WebSocket connection to %s failed", address), exc);
}
} catch (TimeoutException exc) {
Logger.logDebugMessage(String.format("WebSocket connection to %s timed out", address));
throw new SocketTimeoutException(String.format("WebSocket connection to %s timed out", address));
} catch (Exception exc) {
Logger.logDebugMessage(String.format("WebSocket connection to %s failed", address), exc);
} finally {
Expand Down Expand Up @@ -185,8 +208,27 @@ public String doPost(String request) throws IOException {
if (session == null || !session.isOpen())
throw new IOException("WebSocket session is not open");
requestId = nextRequestId++;
String requestMsg = String.format("(%d,%d)%s", version, requestId, request);
session.getRemote().sendString(requestMsg);
byte[] requestBytes = request.getBytes("UTF-8");
int requestLength = requestBytes.length;
int flags = 0;
if (isGzipEnabled && requestLength>=MIN_COMPRESS_SIZE) {
flags |= FLAG_COMPRESSED;
ByteArrayOutputStream outStream = new ByteArrayOutputStream(requestLength);
try (GZIPOutputStream gzipStream = new GZIPOutputStream(outStream)) {
gzipStream.write(requestBytes);
}
requestBytes = outStream.toByteArray();
}
ByteBuffer buf = ByteBuffer.allocate(requestBytes.length+20);
buf.putInt(version)
.putLong(requestId)
.putInt(flags)
.putInt(requestLength)
.put(requestBytes)
.flip();
if (buf.limit() > MAX_MESSAGE_SIZE)
throw new ProtocolException("POST request length exceeds max message size");
session.getRemote().sendBytes(buf);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -217,8 +259,27 @@ public void sendResponse(long requestId, String response) throws IOException {
lock.lock();
try {
if (session != null && session.isOpen()) {
String responseMsg = String.format("(%d,%d)%s", version, requestId, response);
session.getRemote().sendString(responseMsg);
byte[] responseBytes = response.getBytes("UTF-8");
int responseLength = responseBytes.length;
int flags = 0;
if (isGzipEnabled && responseLength >= MIN_COMPRESS_SIZE) {
flags |= FLAG_COMPRESSED;
ByteArrayOutputStream outStream = new ByteArrayOutputStream(responseLength);
try (GZIPOutputStream gzipStream = new GZIPOutputStream(outStream)) {
gzipStream.write(responseBytes);
}
responseBytes = outStream.toByteArray();
}
ByteBuffer buf = ByteBuffer.allocate(responseBytes.length + 20);
buf.putInt(version)
.putLong(requestId)
.putInt(flags)
.putInt(responseLength)
.put(responseBytes)
.flip();
if (buf.limit() > MAX_MESSAGE_SIZE)
throw new ProtocolException("POST response length exceeds max message size");
session.getRemote().sendBytes(buf);
}
} finally {
lock.unlock();
Expand All @@ -228,43 +289,50 @@ public void sendResponse(long requestId, String response) throws IOException {
/**
* Process a socket message
*
* @param message Socket message
* @param inbuf Message buffer
* @param off Starting offset
* @param len Message length
*/
@OnWebSocketMessage
public void OnMessage(String message) {
boolean msgProcessed = false;
public void OnMessage(byte[] inbuf, int off, int len) {
lock.lock();
try {
int sep = message.indexOf(')');
if (message.charAt(0) == '(' && sep >= 2 && sep < message.length()-2) {
String msgPrefix = message.substring(1, sep);
String jsonString = message.substring(sep+1);
String[] prefixParts = msgPrefix.split(",");
if (prefixParts.length >= 2) {
version = Math.min(Integer.valueOf(prefixParts[0]), VERSION);
Long requestId = Long.valueOf(prefixParts[1]);
msgProcessed = true;
if (peerServlet != null) {
threadPool.execute(() -> peerServlet.doPost(this, requestId, jsonString));
} else {
PostRequest postRequest = requestMap.remove(requestId);
if (postRequest != null)
postRequest.complete(jsonString);
ByteBuffer buf = ByteBuffer.wrap(inbuf, off, len);
version = Math.min(buf.getInt(), VERSION);
Long requestId = buf.getLong();
int flags = buf.getInt();
int length = buf.getInt();
byte[] msgBytes = new byte[buf.remaining()];
buf.get(msgBytes);
if ((flags&FLAG_COMPRESSED) != 0) {
ByteArrayInputStream inStream = new ByteArrayInputStream(msgBytes);
try (GZIPInputStream gzipStream = new GZIPInputStream(inStream, 1024)) {
msgBytes = new byte[length];
int offset = 0;
while (offset < msgBytes.length) {
int count = gzipStream.read(msgBytes, offset, msgBytes.length-offset);
if (count < 0)
throw new EOFException("End-of-data reading compressed data");
offset += count;
}
}
}
} catch (NumberFormatException exc) {
String message = new String(msgBytes, "UTF-8");
if (peerServlet != null) {
threadPool.execute(() -> peerServlet.doPost(this, requestId, message));
} else {
PostRequest postRequest = requestMap.remove(requestId);
if (postRequest != null)
postRequest.complete(message);
}
} catch (Exception exc) {
Logger.logDebugMessage("Exception while processing WebSocket message", exc);
} finally {
lock.unlock();
}
if (!msgProcessed) {
Logger.logDebugMessage(String.format("Peer %s: Invvalid WebSocket message: %s",
session.getRemoteAddress().getHostString(), message));
}
}


/**
* WebSocket session has been closed
*
Expand Down

0 comments on commit 04a21a7

Please sign in to comment.