Skip to content

Commit

Permalink
Add support for deltas using VCDiff codec.
Browse files Browse the repository at this point in the history
  • Loading branch information
tsviatko authored and Quintin committed Jun 8, 2020
1 parent 6985178 commit 2b5c3a9
Show file tree
Hide file tree
Showing 15 changed files with 477 additions and 23 deletions.
1 change: 1 addition & 0 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dependencies {
implementation 'org.msgpack:msgpack-core:0.8.11'
implementation 'org.java-websocket:Java-WebSocket:1.4.0'
implementation 'com.google.code.gson:gson:2.5'
implementation 'com.davidehrmann.vcdiff:vcdiff-core:0.1.1'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation 'junit:junit:4.12'
testImplementation 'org.nanohttpd:nanohttpd:2.3.0'
Expand Down
77 changes: 69 additions & 8 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void attach(CompletionListener listener) throws AblyException {
this.attach(false, listener);
}

private void attach(boolean forceReattach, CompletionListener listener) throws AblyException {
private void attach(boolean forceReattach, CompletionListener listener) {
clearAttachTimers();
attachWithTimeout(forceReattach, listener);
}
Expand Down Expand Up @@ -150,6 +150,9 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
attachMessage.encodeModesToFlags(this.options.modes);
}
}
if(this.decodeFailureRecoveryInProgress) {
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
}
try {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
Expand Down Expand Up @@ -323,7 +326,7 @@ private void attachWithTimeout(final CompletionListener listener) throws AblyExc
* Attach channel, if not attached within timeout set state to suspended and
* set up timer to reattach it later
*/
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener) throws AblyException {
synchronized private void attachWithTimeout(final boolean forceReattach, final CompletionListener listener) {
Timer currentAttachTimer;
try {
currentAttachTimer = new Timer();
Expand Down Expand Up @@ -646,12 +649,38 @@ public synchronized void unsubscribe(String[] names, MessageListener listener) {
private void onMessage(ProtocolMessage message) {
Log.v(TAG, "onMessage(); channel = " + name);
Message[] messages = message.messages;
Message firstMessage = messages[0];
Message lastMessage = messages[messages.length - 1];

if (firstMessage.extras != null && firstMessage.extras.delta != null && !firstMessage.extras.delta.from.equals(this.lastPayloadMessageId)) {
Log.e(TAG, String.format("Delta message decode failure - previous message not available. Message id = %s, channel = %s", firstMessage.id, name));
this.startDecodeFailureRecovery();
return;
}

for(int i = 0; i < messages.length; i++) {
Message msg = messages[i];
try {
msg.decode(options);
msg.decode(options, decodingContext);
} catch (MessageDecodeException e) {
Log.e(TAG, String.format("%s on channel %s", e.errorInfo.message, name));

if(msg.id == null) msg.id = message.id + ':' + i;

if (e.errorInfo.code == 40018) {
Log.e(TAG, String.format("Delta message decode failure - %s. Message id = %s, channel = %s", e.errorInfo.message, msg.id, name));
this.startDecodeFailureRecovery();

//log messages skipped per RTL16
for (int j = i + 1; j < messages.length; j++) {
if(messages[j].id == null) messages[j].id = message.id + ':' + j;
Log.v(TAG, String.format("Delta recovery in progress - message skipped. Message id = %s, channel = %s", messages[j].id, name));
}

return;
}
else {
Log.e(TAG, String.format("Message decode failure - %s. Message id = %s, channel = %s", e.errorInfo.message, msg.id, name));
}
}
/* populate fields derived from protocol message */
if(msg.connectionId == null) msg.connectionId = message.connectionId;
Expand All @@ -663,11 +692,33 @@ private void onMessage(ProtocolMessage message) {
listeners.onMessage(msg);
}

this.lastPayloadMessageId = lastMessage.id;
this.lastPayloadProtocolMessageChannelSerial = message.channelSerial;

for (Message msg : message.messages) {
this.listeners.onMessage(msg);
}
}

private void startDecodeFailureRecovery() {
if (this.decodeFailureRecoveryInProgress) {
return;
}
Log.w(TAG, "Starting delta decode failure recovery process");
this.decodeFailureRecoveryInProgress = true;
this.attach(true, new CompletionListener() {
@Override
public void onSuccess() {
decodeFailureRecoveryInProgress = false;
}

@Override
public void onError(ErrorInfo reason) {
decodeFailureRecoveryInProgress = false;
}
});
}

private void onPresence(ProtocolMessage message, String syncChannelSerial) {
Log.v(TAG, "onPresence(); channel = " + name + "; syncChannelSerial = " + syncChannelSerial);
PresenceMessage[] messages = message.presence;
Expand Down Expand Up @@ -915,7 +966,7 @@ else if(!"false".equalsIgnoreCase(param.value)) {
private List<QueuedMessage> queuedMessages;

/************************************
* Channel history
* Channel history
************************************/

/**
Expand Down Expand Up @@ -947,7 +998,7 @@ private BasePaginatedQuery.ResultRequest<Message> historyImpl(Param[] params) {
}

/************************************
* Channel options
* Channel options
************************************/

public void setOptions(ChannelOptions options) throws AblyException {
Expand Down Expand Up @@ -979,7 +1030,7 @@ public ChannelModes getModes() {

/************************************
* internal general
* @throws AblyException
* @throws AblyException
************************************/

private class ChannelStateCompletionListener implements ChannelStateListener {
Expand Down Expand Up @@ -1015,6 +1066,7 @@ else if(stateChange.current.equals(failureState)) {
this.presence = new Presence((Channel) this);
state = ChannelState.initialized;
queuedMessages = new ArrayList<QueuedMessage>();
this.decodingContext = new DecodingContext();
}

void onChannelMessage(ProtocolMessage msg) {
Expand Down Expand Up @@ -1056,7 +1108,13 @@ void onChannelMessage(ProtocolMessage msg) {
}
break;
case message:
onMessage(msg);
if(!decodeFailureRecoveryInProgress)
onMessage(msg);
else {
//log messages skipped per RTL16
for (int j = 0; j < msg.messages.length; j++)
Log.v(TAG, String.format("Delta recovery in progress - message skipped. Message id = %s, channel = %s", msg.messages[j].id, name));
}
break;
case presence:
onPresence(msg, null);
Expand Down Expand Up @@ -1100,4 +1158,7 @@ public void once(ChannelState state, ChannelStateListener listener) {
String syncChannelSerial;
private ChannelParams params;
private ChannelModes modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public String getHost() {
return params.host;
}

protected void preProcessReceivedMessage(ProtocolMessage message)
{
//Gives the chance to child classes to do message pre-processing
}

/**************************
* WebSocketHandler methods
**************************/
Expand All @@ -146,6 +151,7 @@ public void onMessage(ByteBuffer blob) {
try {
ProtocolMessage msg = ProtocolSerializer.readMsgpack(blob.array());
Log.d(TAG, "onMessage(): msg (binary) = " + msg);
WebSocketTransport.this.preProcessReceivedMessage(msg);
connectionManager.onMessage(WebSocketTransport.this, msg);
} catch (AblyException e) {
String msg = "Unexpected exception processing received binary message";
Expand All @@ -159,6 +165,7 @@ public void onMessage(String string) {
try {
ProtocolMessage msg = ProtocolSerializer.fromJSON(string);
Log.d(TAG, "onMessage(): msg (text) = " + msg);
WebSocketTransport.this.preProcessReceivedMessage(msg);
connectionManager.onMessage(WebSocketTransport.this, msg);
} catch (AblyException e) {
String msg = "Unexpected exception processing received text message";
Expand Down
33 changes: 29 additions & 4 deletions lib/src/main/java/io/ably/lib/types/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -69,12 +70,20 @@ public void getDetails(StringBuilder builder) {
}

public void decode(ChannelOptions opts) throws MessageDecodeException {

this.decode(opts, new DecodingContext());
}

public void decode(ChannelOptions opts, DecodingContext context) throws MessageDecodeException {

Object lastPayload = data;

if(encoding != null) {
String[] xforms = encoding.split("\\/");
int i = 0, j = xforms.length;
int lastProcessedEncodingIndex = 0, encodingsToProcess = xforms.length;
try {
while((i = j) > 0) {
Matcher match = xformPattern.matcher(xforms[--j]);
while((lastProcessedEncodingIndex = encodingsToProcess ) > 0) {
Matcher match = xformPattern.matcher(xforms[--encodingsToProcess ]);
if(!match.matches()) break;
switch(match.group(1)) {
case "base64":
Expand All @@ -83,6 +92,9 @@ public void decode(ChannelOptions opts) throws MessageDecodeException {
} catch (IllegalArgumentException e) {
throw MessageDecodeException.fromDescription("Invalid base64 data received");
}
if(lastProcessedEncodingIndex == xforms.length) {
lastPayload = data;
}
continue;

case "utf-8":
Expand Down Expand Up @@ -110,13 +122,26 @@ public void decode(ChannelOptions opts) throws MessageDecodeException {
else {
throw MessageDecodeException.fromDescription("Encrypted message received but encryption is not set up");
}
case "vcdiff":
data = VCDiffDecoderHelper.decode((byte[]) data, context.getLastMessageData());
lastPayload = data;

continue;
}
break;
}
} finally {
encoding = (i <= 0) ? null : join(xforms, '/', 0, i);
encoding = (lastProcessedEncodingIndex <= 0) ? null : join(xforms, '/', 0, lastProcessedEncodingIndex );
}
}

//last message bookkeping
if(lastPayload instanceof String)
context.setLastMessageData((String)lastPayload);
else if (lastPayload instanceof byte[])
context.setLastMessageData((byte[])lastPayload);
else
throw MessageDecodeException.fromDescription("Message data neither String nor byte[]. Unsupported message data type.");
}

public void encode(ChannelOptions opts) throws AblyException {
Expand Down
1 change: 1 addition & 0 deletions lib/src/main/java/io/ably/lib/types/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.ably.lib.util.Log;
import io.ably.lib.util.Log.LogHandler;

import java.util.HashMap;
import java.util.Map;

/**
Expand Down
36 changes: 36 additions & 0 deletions lib/src/main/java/io/ably/lib/types/DecodingContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.ably.lib.types;

import java.util.Map;
import java.nio.charset.StandardCharsets;

public class DecodingContext {

private String lastMessageString;
private byte[] lastMessageBinary;

public DecodingContext()
{
lastMessageBinary = null;
lastMessageString = null;
}

public byte[] getLastMessageData() {
if(lastMessageBinary != null)
return lastMessageBinary;
else if(lastMessageString != null) {
return lastMessageString.getBytes(StandardCharsets.UTF_8);
}
else
return null;
}

public void setLastMessageData(String message) {
lastMessageString = message;
lastMessageBinary = null;
}

public void setLastMessageData(byte[] message) {
lastMessageBinary = message;
lastMessageString = null;
}
}
51 changes: 51 additions & 0 deletions lib/src/main/java/io/ably/lib/types/DeltaExtras.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.ably.lib.types;

import io.ably.lib.util.Log;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

import java.io.IOException;

public class DeltaExtras {
private static final String TAG = DeltaExtras.class.getName();

public String format;
public String from;

void writeMsgpack(MessagePacker packer) throws IOException {
packer.packMapHeader(2);

packer.packString("format");
packer.packString(this.format);

packer.packString("from");
packer.packString(this.from);
}

private DeltaExtras readMsgpack(MessageUnpacker unpacker) throws IOException {
int fieldCount = unpacker.unpackMapHeader();
for(int i = 0; i < fieldCount; i++) {
String fieldName = unpacker.unpackString();
MessageFormat fieldFormat = unpacker.getNextFormat();
if(fieldFormat.equals(MessageFormat.NIL)) {
unpacker.unpackNil();
continue;
}

if(fieldName.equals("format")) {
this.format = unpacker.unpackString();
} else if (fieldName.equals("from")) {
this.from = unpacker.unpackString();
} else {
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
}
}
return this;
}

static DeltaExtras fromMsgpack(MessageUnpacker unpacker) throws IOException {
return (new DeltaExtras()).readMsgpack(unpacker);
}
}
Loading

0 comments on commit 2b5c3a9

Please sign in to comment.