Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message recovery, stage #2 #19

Merged
merged 6 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ To use with Android don't forget to set INTERNET permission to `AndroidManifest.
- [ ] handle subscription expired error
- [x] ping/pong to find broken connection
- [ ] server-side subscriptions
- [ ] message recovery mechanism
- [x] message recovery mechanism (works with Centrifugo >= 2.5.0 with `v3_use_offset` option set to `true`)

## Generate proto

Expand Down
136 changes: 94 additions & 42 deletions centrifuge/src/main/java/io/github/centrifugal/centrifuge/Client.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
package io.github.centrifugal.centrifuge;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;

import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.WebSocket;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocketListener;
import okio.ByteString;

import io.github.centrifugal.centrifuge.internal.backoff.Backoff;
import io.github.centrifugal.centrifuge.internal.protocol.Protocol;

import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonObject;
import com.google.protobuf.InvalidProtocolBufferException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,7 +20,17 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import io.github.centrifugal.centrifuge.internal.backoff.Backoff;
import io.github.centrifugal.centrifuge.internal.protocol.Protocol;

import java8.util.concurrent.CompletableFuture;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class Client {
private WebSocket ws;
Expand All @@ -44,6 +42,7 @@ Options getOpts() {

private Options opts;
private String token = "";
private com.google.protobuf.ByteString connectData;
private EventListener listener;
private String client;
private Map<Integer, CompletableFuture<Protocol.Reply>> futures = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -76,6 +75,7 @@ ExecutorService getExecutor() {

/**
* Set connection JWT. This is a token you have to receive from your application backend.
*
* @param token
*/
public void setToken(String token) {
Expand All @@ -84,9 +84,21 @@ public void setToken(String token) {
});
}

/**
* Set connection data This is a data you have to receive from your application backend.
*
* @param data
*/
public void setConnectData(byte[] data) {
this.executor.submit(() -> {
Client.this.connectData = com.google.protobuf.ByteString.copyFrom(data);
});
}

/**
* Creates a new instance of Client. Client allows to allocate new Subscriptions to channels,
* automatically manages reconnects.
*
* @param url
* @param opts
* @param listener
Expand Down Expand Up @@ -118,7 +130,7 @@ private void _connect() {

Headers.Builder headers = new Headers.Builder();
if (this.opts.getHeaders() != null) {
for (Map.Entry<String,String> entry : this.opts.getHeaders().entrySet()) {
for (Map.Entry<String, String> entry : this.opts.getHeaders().entrySet()) {
headers.add(entry.getKey(), entry.getValue());
}
}
Expand Down Expand Up @@ -242,6 +254,9 @@ private void handleConnectionClose(String reason, Boolean shouldReconnect) {
Subscription sub = entry.getValue();
SubscriptionState previousSubState = sub.getState();
sub.moveToUnsubscribed();
if (!shouldReconnect) {
sub.setNeedRecover(false);
}
if (previousSubState == SubscriptionState.SUBSCRIBED) {
sub.getListener().onUnsubscribe(sub, new UnsubscribeEvent());
}
Expand All @@ -252,7 +267,7 @@ private void handleConnectionClose(String reason, Boolean shouldReconnect) {
DisconnectEvent event = new DisconnectEvent();
event.setReason(reason);
event.setReconnect(shouldReconnect);
for(Map.Entry<Integer, CompletableFuture<Protocol.Reply>> entry: this.futures.entrySet()) {
for (Map.Entry<Integer, CompletableFuture<Protocol.Reply>> entry : this.futures.entrySet()) {
CompletableFuture f = entry.getValue();
f.completeExceptionally(new IOException());
}
Expand All @@ -272,7 +287,7 @@ private void scheduleReconnect() {
this.reconnectExecutor.submit(() -> {
try {
Thread.sleep(Client.this.backoff.duration());
} catch(InterruptedException ex) {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
Client.this.executor.submit(() -> {
Expand All @@ -284,11 +299,24 @@ private void scheduleReconnect() {
});
}

private void sendSubscribeSynchronized(String channel, String token) {
Protocol.SubscribeRequest req = Protocol.SubscribeRequest.newBuilder()
.setChannel(channel)
.setToken(token)
.build();
private void sendSubscribeSynchronized(String channel, boolean recover, StreamPosition streamPosition, String token) {

Protocol.SubscribeRequest req = null;

if (recover) {
req = Protocol.SubscribeRequest.newBuilder()
.setEpoch(streamPosition.getEpoch())
.setOffset(streamPosition.getOffset())
.setChannel(channel)
.setRecover(true)
.setToken(token)
.build();
} else {
req = Protocol.SubscribeRequest.newBuilder()
.setChannel(channel)
.setToken(token)
.build();
}

Protocol.Command cmd = Protocol.Command.newBuilder()
.setId(this.getNextId())
Expand All @@ -299,7 +327,7 @@ private void sendSubscribeSynchronized(String channel, String token) {
CompletableFuture<Protocol.Reply> f = new CompletableFuture<>();
this.futures.put(cmd.getId(), f);
f.thenAccept(reply -> {
this.handleSubscribeReply(channel, reply);
this.handleSubscribeReply(channel, reply, recover);
this.futures.remove(cmd.getId());
}).orTimeout(this.opts.getTimeout(), TimeUnit.MILLISECONDS).exceptionally(e -> {
this.executor.submit(() -> {
Expand All @@ -315,10 +343,23 @@ private void sendSubscribeSynchronized(String channel, String token) {

private void sendSubscribe(Subscription sub) {
String channel = sub.getChannel();

boolean isRecover = false;
StreamPosition streamPosition = new StreamPosition();

if (sub.getNeedRecover() && sub.isRecoverable()) {
isRecover = true;
if (sub.getLastOffset() > 0) {
streamPosition.setOffset(sub.getLastOffset());
}
streamPosition.setEpoch(sub.getLastEpoch());
}

if (sub.getChannel().startsWith(this.opts.getPrivateChannelPrefix())) {
PrivateSubEvent privateSubEvent = new PrivateSubEvent();
privateSubEvent.setChannel(sub.getChannel());
privateSubEvent.setClient(this.client);
boolean finalIsRecover = isRecover;
this.listener.onPrivateSub(this, privateSubEvent, new TokenCallback() {
@Override
public void Fail(Throwable e) {
Expand All @@ -336,11 +377,11 @@ public void Done(String token) {
if (Client.this.state != ConnectionState.CONNECTED) {
return;
}
Client.this.sendSubscribeSynchronized(channel, token);
Client.this.sendSubscribeSynchronized(channel, finalIsRecover, streamPosition, token);
}
});
} else {
this.sendSubscribeSynchronized(channel, "");
this.sendSubscribeSynchronized(channel, isRecover, streamPosition,"");
}
}

Expand Down Expand Up @@ -391,6 +432,7 @@ private Subscription getSub(String channel) {

/**
* Create new subscription to channel with certain SubscriptionEventListener
*
* @param channel
* @param listener
* @return
Expand All @@ -411,6 +453,7 @@ public Subscription newSubscription(String channel, SubscriptionEventListener li
/**
* Try to get Subscription from internal client registry. Can return null if Subscription
* does not exist yet.
*
* @param channel
* @return
*/
Expand All @@ -425,6 +468,7 @@ public Subscription getSubscription(String channel) {
/**
* Say Client that Subscription should be removed from internal registry. Subscription will be
* automatically unsubscribed before removing.
*
* @param sub
*/
public void removeSubscription(Subscription sub) {
Expand All @@ -447,7 +491,7 @@ void subscribe(Subscription sub) {
});
}

private void handleSubscribeReply(String channel, Protocol.Reply reply) {
private void handleSubscribeReply(String channel, Protocol.Reply reply, boolean recover) {
Subscription sub = this.getSub(channel);
if (reply.getError().getCode() != 0) {
if (sub != null) {
Expand All @@ -461,7 +505,7 @@ private void handleSubscribeReply(String channel, Protocol.Reply reply) {
try {
if (sub != null) {
Protocol.SubscribeResult result = Protocol.SubscribeResult.parseFrom(reply.getResult().toByteArray());
sub.moveToSubscribeSuccess(result);
sub.moveToSubscribeSuccess(result, recover);
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
Expand Down Expand Up @@ -531,7 +575,7 @@ private void handleConnectReply(Protocol.Reply reply) {
}
this.backoff.reset();

for(Map.Entry<Integer, Protocol.Command> entry: this.connectCommands.entrySet()) {
for (Map.Entry<Integer, Protocol.Command> entry : this.connectCommands.entrySet()) {
// TODO: send in one frame?
Protocol.Command cmd = entry.getValue();
boolean sent = this.ws.send(ByteString.of(this.serializeCommand(cmd)));
Expand All @@ -544,7 +588,7 @@ private void handleConnectReply(Protocol.Reply reply) {
}
this.connectCommands.clear();

for(Map.Entry<Integer, Protocol.Command> entry: this.connectAsyncCommands.entrySet()) {
for (Map.Entry<Integer, Protocol.Command> entry : this.connectAsyncCommands.entrySet()) {
// TODO: send in one frame?
Protocol.Command cmd = entry.getValue();
CompletableFuture<Protocol.Reply> f = this.futures.get(cmd.getId());
Expand Down Expand Up @@ -614,9 +658,10 @@ public void onDone(ReplyError error, Protocol.RefreshResult result) {
}

private void sendConnect() {
Protocol.ConnectRequest req = Protocol.ConnectRequest.newBuilder()
.setToken(this.token)
.build();
Protocol.ConnectRequest.Builder build = Protocol.ConnectRequest.newBuilder();
if (this.token.length() > 0) build.setToken(this.token);
if (this.connectData != null) build.setData(this.connectData);
Protocol.ConnectRequest req = build.build();

Protocol.Command cmd = Protocol.Command.newBuilder()
.setId(this.getNextId())
Expand Down Expand Up @@ -658,7 +703,9 @@ private void handleAsyncReply(Protocol.Reply reply) {
if (sub != null) {
PublishEvent event = new PublishEvent();
event.setData(pub.getData().toByteArray());
event.setOffset(pub.getOffset());
sub.getListener().onPublish(sub, event);
sub.setLastOffset(pub.getOffset());
}
} else if (push.getType() == Protocol.PushType.JOIN) {
Protocol.Join join = Protocol.Join.parseFrom(push.getData());
Expand All @@ -673,7 +720,7 @@ private void handleAsyncReply(Protocol.Reply reply) {
event.setInfo(info);
sub.getListener().onJoin(sub, event);
}
} else if (push.getType() == Protocol.PushType.LEAVE) {
} else if (push.getType() == Protocol.PushType.LEAVE) {
Protocol.Leave leave = Protocol.Leave.parseFrom(push.getData());
Subscription sub = this.getSub(channel);
if (sub != null) {
Expand Down Expand Up @@ -706,6 +753,7 @@ private void handleAsyncReply(Protocol.Reply reply) {
/**
* Send asynchronous message with data to server. Callback successfully completes if data
* written to connection. No reply from server expected in this case.
*
* @param data
* @param cb
*/
Expand Down Expand Up @@ -749,7 +797,7 @@ private void sendSynchronized(byte[] data, CompletionCallback cb) {
}
}

private void enqueueCommandFuture(Protocol.Command cmd, CompletableFuture<Protocol.Reply> f) {
private void enqueueCommandFuture(Protocol.Command cmd, CompletableFuture<Protocol.Reply> f) {
this.futures.put(cmd.getId(), f);
if (this.state != ConnectionState.CONNECTED) {
this.connectCommands.put(cmd.getId(), cmd);
Expand Down Expand Up @@ -780,6 +828,7 @@ private void cleanCommandFuture(Protocol.Command cmd) {

/**
* Send RPC to server, process result in callback.
*
* @param data
* @param cb
*/
Expand All @@ -789,6 +838,7 @@ public void rpc(byte[] data, ReplyCallback<RPCResult> cb) {

/**
* Send RPC with method to server, process result in callback.
*
* @param method
* @param data
* @param cb
Expand All @@ -801,9 +851,9 @@ private void rpcSynchronized(String method, byte[] data, ReplyCallback<RPCResult
Protocol.RPCRequest.Builder builder = Protocol.RPCRequest.newBuilder()
.setData(com.google.protobuf.ByteString.copyFrom(data));

if(method != null){
builder.setMethod(method);
}
if (method != null) {
builder.setMethod(method);
}

Protocol.RPCRequest req = builder.build();

Expand Down Expand Up @@ -842,6 +892,7 @@ private void rpcSynchronized(String method, byte[] data, ReplyCallback<RPCResult
/**
* Publish data to channel without being subscribed to it. Publish option should be
* enabled in Centrifuge/Centrifugo server configuration.
*
* @param channel
* @param data
* @param cb
Expand Down Expand Up @@ -885,7 +936,7 @@ private void publishSynchronized(String channel, byte[] data, ReplyCallback<Publ
void history(String channel, ReplyCallback<HistoryResult> cb) {
this.executor.submit(() -> Client.this.historySynchronized(channel, cb));
}

private void historySynchronized(String channel, ReplyCallback<HistoryResult> cb) {
Protocol.HistoryRequest req = Protocol.HistoryRequest.newBuilder()
.setChannel(channel)
Expand All @@ -908,10 +959,11 @@ private void historySynchronized(String channel, ReplyCallback<HistoryResult> cb
HistoryResult result = new HistoryResult();
List<Protocol.Publication> protoPubs = replyResult.getPublicationsList();
List<Publication> pubs = new ArrayList<>();
for (int i=0; i<protoPubs.size(); i++) {
for (int i = 0; i < protoPubs.size(); i++) {
Protocol.Publication protoPub = protoPubs.get(i);
Publication pub = new Publication();
pub.setData(protoPub.getData().toByteArray());
pub.setOffset(protoPub.getOffset());
pubs.add(pub);
}
result.setPublications(pubs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ public String getClient() {
return client;
}

public void setClient(String client) {
void setClient(String client) {
this.client = client;
}

public byte[] getData() {
return data;
}

public void setData(byte[] data) {
void setData(byte[] data) {
this.data = data;
}

Expand Down
Loading