From cce96838824e9a544fdb8cd9b870c57f9d30dd1d Mon Sep 17 00:00:00 2001 From: peterdeme Date: Fri, 6 May 2022 16:49:18 +0200 Subject: [PATCH 1/2] feat(faye): add websocket client --- .../getstream/cloud/CloudAggregatedFeed.java | 4 + .../java/io/getstream/cloud/CloudClient.java | 143 +++++- .../java/io/getstream/cloud/CloudFeed.java | 18 + .../io/getstream/cloud/CloudFlatFeed.java | 4 + .../cloud/CloudNotificationFeed.java | 4 + .../io/getstream/cloud/FeedSubscriber.java | 10 + .../cloud/RealtimeMessageCallback.java | 7 + .../java/io/getstream/core/faye/Advice.java | 68 +++ .../java/io/getstream/core/faye/Channel.java | 119 +++++ .../core/faye/DefaultMessageTransformer.java | 13 + .../getstream/core/faye/FayeClientError.java | 57 +++ .../java/io/getstream/core/faye/Grammar.java | 18 + .../java/io/getstream/core/faye/Message.java | 168 +++++++ .../core/faye/MessageTransformer.java | 7 + .../getstream/core/faye/client/Callback.java | 5 + .../core/faye/client/FayeClient.java | 455 ++++++++++++++++++ .../core/faye/client/FayeClientState.java | 8 + .../core/faye/client/MessageCallback.java | 7 + .../core/faye/client/StateChangeListener.java | 5 + .../core/faye/emitter/ErrorListener.java | 5 + .../core/faye/emitter/EventEmitter.java | 120 +++++ .../core/faye/emitter/EventListener.java | 5 + .../core/faye/emitter/ListenerEntry.java | 24 + .../subscription/ChannelDataCallback.java | 7 + .../subscription/ChannelSubscription.java | 49 ++ .../SubscriptionCancelledCallback.java | 5 + .../subscription/WithChannelDataCallback.java | 7 + .../java/io/getstream/core/models/FeedID.java | 4 + .../core/models/RealtimeMessage.java | 92 ++++ .../getstream/core/utils/Serialization.java | 10 + .../io/getstream/cloud/CloudFeedTest.java | 39 ++ 31 files changed, 1479 insertions(+), 8 deletions(-) create mode 100644 src/main/java/io/getstream/cloud/FeedSubscriber.java create mode 100644 src/main/java/io/getstream/cloud/RealtimeMessageCallback.java create mode 100644 src/main/java/io/getstream/core/faye/Advice.java create mode 100644 src/main/java/io/getstream/core/faye/Channel.java create mode 100644 src/main/java/io/getstream/core/faye/DefaultMessageTransformer.java create mode 100644 src/main/java/io/getstream/core/faye/FayeClientError.java create mode 100644 src/main/java/io/getstream/core/faye/Grammar.java create mode 100644 src/main/java/io/getstream/core/faye/Message.java create mode 100644 src/main/java/io/getstream/core/faye/MessageTransformer.java create mode 100644 src/main/java/io/getstream/core/faye/client/Callback.java create mode 100644 src/main/java/io/getstream/core/faye/client/FayeClient.java create mode 100644 src/main/java/io/getstream/core/faye/client/FayeClientState.java create mode 100644 src/main/java/io/getstream/core/faye/client/MessageCallback.java create mode 100644 src/main/java/io/getstream/core/faye/client/StateChangeListener.java create mode 100644 src/main/java/io/getstream/core/faye/emitter/ErrorListener.java create mode 100644 src/main/java/io/getstream/core/faye/emitter/EventEmitter.java create mode 100644 src/main/java/io/getstream/core/faye/emitter/EventListener.java create mode 100644 src/main/java/io/getstream/core/faye/emitter/ListenerEntry.java create mode 100644 src/main/java/io/getstream/core/faye/subscription/ChannelDataCallback.java create mode 100644 src/main/java/io/getstream/core/faye/subscription/ChannelSubscription.java create mode 100644 src/main/java/io/getstream/core/faye/subscription/SubscriptionCancelledCallback.java create mode 100644 src/main/java/io/getstream/core/faye/subscription/WithChannelDataCallback.java create mode 100644 src/main/java/io/getstream/core/models/RealtimeMessage.java diff --git a/src/main/java/io/getstream/cloud/CloudAggregatedFeed.java b/src/main/java/io/getstream/cloud/CloudAggregatedFeed.java index 66044da4..7c4f8030 100644 --- a/src/main/java/io/getstream/cloud/CloudAggregatedFeed.java +++ b/src/main/java/io/getstream/cloud/CloudAggregatedFeed.java @@ -20,6 +20,10 @@ public class CloudAggregatedFeed extends CloudFeed { super(client, id); } + CloudAggregatedFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) { + super(client, id, subscriber); + } + public CompletableFuture>> getActivities() throws StreamException { return getActivities( diff --git a/src/main/java/io/getstream/cloud/CloudClient.java b/src/main/java/io/getstream/cloud/CloudClient.java index a157869a..e2417ec4 100644 --- a/src/main/java/io/getstream/cloud/CloudClient.java +++ b/src/main/java/io/getstream/cloud/CloudClient.java @@ -6,6 +6,10 @@ import io.getstream.core.Region; import io.getstream.core.Stream; import io.getstream.core.exceptions.StreamException; +import io.getstream.core.faye.DefaultMessageTransformer; +import io.getstream.core.faye.Message; +import io.getstream.core.faye.client.FayeClient; +import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.http.HTTPClient; import io.getstream.core.http.OKHTTPClientAdapter; import io.getstream.core.http.Response; @@ -14,20 +18,38 @@ import io.getstream.core.models.Data; import io.getstream.core.models.FeedID; import io.getstream.core.models.OGData; +import io.getstream.core.models.RealtimeMessage; import io.getstream.core.options.RequestOption; +import io.getstream.core.utils.Serialization; import java.net.MalformedURLException; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import java8.util.concurrent.CompletableFuture; public final class CloudClient { + private final String apiKey; private final Token token; + private final String appID; private final String userID; private final Stream stream; - - private CloudClient(String key, Token token, String userID, URL baseURL, HTTPClient httpClient) { + private final FayeClient faye; + + private CloudClient( + String key, + Token token, + String userID, + String appID, + URL baseURL, + HTTPClient httpClient, + URL fayeURL) { + this.apiKey = key; this.token = token; + this.appID = appID; this.userID = userID; this.stream = new Stream(key, baseURL, httpClient); + this.faye = new FayeClient(fayeURL); + this.faye.setMessageTransformer(new FayeMessageTransformer()); } public static Builder builder(String apiKey, String token, String userID) { @@ -38,22 +60,25 @@ public static Builder builder(String apiKey, Token token, String userID) { return new Builder(apiKey, token, userID); } - public CompletableFuture openGraph(URL url) throws StreamException { - return stream.openGraph(token, url); + public static Builder builder(String apiKey, Token token, String userID, String appID) { + return new Builder(apiKey, token, userID, appID); } public static final class Builder { private static final String DEFAULT_HOST = "stream-io-api.com"; + private static final String DEFAULT_FAYE_URL = "https://faye-us-east.stream-io-api.com/faye"; private final String apiKey; private final Token token; private final String userID; + private final String appID; private HTTPClient httpClient; private String scheme = "https"; private String region = Region.US_EAST.toString(); private String host = DEFAULT_HOST; private int port = 443; + private String fayeURL = DEFAULT_FAYE_URL; public Builder(String apiKey, Token token, String userID) { checkNotNull(apiKey, "API key can't be null"); @@ -64,6 +89,19 @@ public Builder(String apiKey, Token token, String userID) { this.apiKey = apiKey; this.token = token; this.userID = userID; + this.appID = null; + } + + public Builder(String apiKey, Token token, String userID, String appID) { + checkNotNull(apiKey, "API key can't be null"); + checkNotNull(token, "Token can't be null"); + checkNotNull(userID, "User ID can't be null"); + checkArgument(!apiKey.isEmpty(), "API key can't be empty"); + checkArgument(!userID.isEmpty(), "User ID can't be empty"); + this.apiKey = apiKey; + this.token = token; + this.userID = userID; + this.appID = appID; } public Builder httpClient(HTTPClient httpClient) { @@ -105,6 +143,13 @@ public Builder region(String region) { return this; } + public Builder fayeURL(String fayeURL) { + checkNotNull(fayeURL, "FayeUrl can't be null"); + checkArgument(!fayeURL.isEmpty(), "FayeUrl can't be empty"); + this.fayeURL = fayeURL; + return this; + } + private String buildHost() { final StringBuilder sb = new StringBuilder(); if (host.equals(DEFAULT_HOST)) { @@ -118,8 +163,50 @@ public CloudClient build() throws MalformedURLException { if (httpClient == null) { httpClient = new OKHTTPClientAdapter(); } + return new CloudClient( - apiKey, token, userID, new URL(scheme, buildHost(), port, ""), httpClient); + apiKey, + token, + userID, + appID, + new URL(scheme, buildHost(), port, ""), + httpClient, + new URL(DEFAULT_FAYE_URL)); + } + } + + private static class FeedSubscription { + private String token; + private String userId; + private ChannelSubscription channelSubscription; + + private FeedSubscription(String token, String userId) { + this.token = token; + this.userId = userId; + } + + private FeedSubscription(String token, String userId, ChannelSubscription subscription) { + this.token = token; + this.userId = userId; + this.channelSubscription = subscription; + } + } + + private final Map feedSubscriptions = new HashMap<>(); + + private class FayeMessageTransformer extends DefaultMessageTransformer { + @Override + public Message transformRequest(Message message) { + final String subscription = message.getSubscription(); + if (feedSubscriptions.containsKey(subscription)) { + final FeedSubscription feedSubscription = feedSubscriptions.get(subscription); + final Map ext = new HashMap<>(); + ext.put("user_id", feedSubscription.userId); + ext.put("api_key", apiKey); + ext.put("signature", feedSubscription.token); + message.setExt(ext); + } + return message; } } @@ -127,6 +214,46 @@ public T getHTTPClientImplementation() { return stream.getHTTPClientImplementation(); } + public CompletableFuture openGraph(URL url) throws StreamException { + return stream.openGraph(token, url); + } + + private CompletableFuture feedSubscriber( + FeedID feedId, RealtimeMessageCallback messageCallback) { + final CompletableFuture subscriberCompletion = new CompletableFuture<>(); + try { + checkNotNull(appID, "Missing app id, which is needed in order to subscribe feed"); + final String claim = feedId.getClaim(); + final String notificationChannel = "site" + "-" + appID + "-" + "feed" + "-" + claim; + final FeedSubscription subscription = + new FeedSubscription(token.toString(), notificationChannel); + feedSubscriptions.put("/" + notificationChannel, subscription); + + final ChannelSubscription channelSubscription = + faye.subscribe( + "/" + notificationChannel, + data -> { + try { + final byte[] payload = Serialization.toJSON(data); + final RealtimeMessage message = + Serialization.fromJSON(new String(payload), RealtimeMessage.class); + messageCallback.onMessage(message); + } catch (Exception e) { + e.printStackTrace(); + } + }, + () -> feedSubscriptions.remove("/" + notificationChannel)) + .get(); + + subscription.channelSubscription = channelSubscription; + feedSubscriptions.put("/" + notificationChannel, subscription); + subscriberCompletion.complete(channelSubscription); + } catch (Exception e) { + subscriberCompletion.completeExceptionally(e); + } + return subscriberCompletion; + } + // TODO: add personalized feed versions public CloudFlatFeed flatFeed(String slug) { return flatFeed(slug, userID); @@ -141,7 +268,7 @@ public CloudFlatFeed flatFeed(String slug, String userID) { } public CloudFlatFeed flatFeed(FeedID id) { - return new CloudFlatFeed(this, id); + return new CloudFlatFeed(this, id, this::feedSubscriber); } public CloudAggregatedFeed aggregatedFeed(String slug) { @@ -157,7 +284,7 @@ public CloudAggregatedFeed aggregatedFeed(String slug, String userID) { } public CloudAggregatedFeed aggregatedFeed(FeedID id) { - return new CloudAggregatedFeed(this, id); + return new CloudAggregatedFeed(this, id, this::feedSubscriber); } public CloudNotificationFeed notificationFeed(String slug) { @@ -173,7 +300,7 @@ public CloudNotificationFeed notificationFeed(String slug, String userID) { } public CloudNotificationFeed notificationFeed(FeedID id) { - return new CloudNotificationFeed(this, id); + return new CloudNotificationFeed(this, id, this::feedSubscriber); } public CloudUser user(String userID) { diff --git a/src/main/java/io/getstream/cloud/CloudFeed.java b/src/main/java/io/getstream/cloud/CloudFeed.java index d45537d0..07673c01 100644 --- a/src/main/java/io/getstream/cloud/CloudFeed.java +++ b/src/main/java/io/getstream/cloud/CloudFeed.java @@ -6,6 +6,7 @@ import com.google.common.collect.Iterables; import io.getstream.core.exceptions.StreamException; +import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.http.Response; import io.getstream.core.models.Activity; import io.getstream.core.models.FeedID; @@ -26,6 +27,7 @@ public class CloudFeed { private final CloudClient client; private final FeedID id; + private final FeedSubscriber subscriber; CloudFeed(CloudClient client, FeedID id) { checkNotNull(client, "Can't create feed w/o a client"); @@ -33,12 +35,28 @@ public class CloudFeed { this.client = client; this.id = id; + this.subscriber = null; + } + + CloudFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) { + checkNotNull(client, "Can't create feed w/o a client"); + checkNotNull(id, "Can't create feed w/o an ID"); + + this.client = client; + this.id = id; + this.subscriber = subscriber; } protected final CloudClient getClient() { return client; } + public final CompletableFuture subscribe( + RealtimeMessageCallback messageCallback) { + checkNotNull(subscriber, "A subscriber must be provided in order to start listening to a feed"); + return subscriber.subscribe(id, messageCallback); + } + public final FeedID getID() { return id; } diff --git a/src/main/java/io/getstream/cloud/CloudFlatFeed.java b/src/main/java/io/getstream/cloud/CloudFlatFeed.java index 2b6987f8..af776a2e 100644 --- a/src/main/java/io/getstream/cloud/CloudFlatFeed.java +++ b/src/main/java/io/getstream/cloud/CloudFlatFeed.java @@ -19,6 +19,10 @@ public final class CloudFlatFeed extends CloudFeed { super(client, id); } + CloudFlatFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) { + super(client, id, subscriber); + } + public CompletableFuture> getActivities() throws StreamException { return getActivities( DefaultOptions.DEFAULT_LIMIT, diff --git a/src/main/java/io/getstream/cloud/CloudNotificationFeed.java b/src/main/java/io/getstream/cloud/CloudNotificationFeed.java index edbb9e35..f6e17a16 100644 --- a/src/main/java/io/getstream/cloud/CloudNotificationFeed.java +++ b/src/main/java/io/getstream/cloud/CloudNotificationFeed.java @@ -17,6 +17,10 @@ public final class CloudNotificationFeed extends CloudFeed { super(client, id); } + CloudNotificationFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) { + super(client, id, subscriber); + } + public CompletableFuture> getActivities() throws StreamException { return getActivities( diff --git a/src/main/java/io/getstream/cloud/FeedSubscriber.java b/src/main/java/io/getstream/cloud/FeedSubscriber.java new file mode 100644 index 00000000..38872777 --- /dev/null +++ b/src/main/java/io/getstream/cloud/FeedSubscriber.java @@ -0,0 +1,10 @@ +package io.getstream.cloud; + +import io.getstream.core.faye.subscription.ChannelSubscription; +import io.getstream.core.models.FeedID; +import java8.util.concurrent.CompletableFuture; + +public interface FeedSubscriber { + CompletableFuture subscribe( + FeedID feedID, RealtimeMessageCallback messageCallback); +} diff --git a/src/main/java/io/getstream/cloud/RealtimeMessageCallback.java b/src/main/java/io/getstream/cloud/RealtimeMessageCallback.java new file mode 100644 index 00000000..cce28932 --- /dev/null +++ b/src/main/java/io/getstream/cloud/RealtimeMessageCallback.java @@ -0,0 +1,7 @@ +package io.getstream.cloud; + +import io.getstream.core.models.RealtimeMessage; + +public interface RealtimeMessageCallback { + void onMessage(RealtimeMessage message); +} diff --git a/src/main/java/io/getstream/core/faye/Advice.java b/src/main/java/io/getstream/core/faye/Advice.java new file mode 100644 index 00000000..5d3a5779 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/Advice.java @@ -0,0 +1,68 @@ +package io.getstream.core.faye; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.google.common.base.MoreObjects; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Advice { + private final String reconnect; + private final Integer interval; + private final Integer timeout; + + public static final String NONE = "none"; + public static final String HANDSHAKE = "handshake"; + public static final String RETRY = "retry"; + + // for deserialization + public Advice() { + reconnect = null; + interval = null; + timeout = null; + } + + public Advice(String reconnect, Integer interval, Integer timeout) { + this.reconnect = reconnect; + this.interval = interval; + this.timeout = timeout; + } + + public String getReconnect() { + return reconnect; + } + + public Integer getInterval() { + return interval; + } + + public Integer getTimeout() { + return timeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Advice that = (Advice) o; + return Objects.equals(reconnect, that.reconnect) + && Objects.equals(interval, that.interval) + && Objects.equals(timeout, that.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(reconnect, interval, timeout); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("reconnect", this.reconnect) + .add("interval", this.interval) + .add("timeout", this.timeout) + .toString(); + } +} diff --git a/src/main/java/io/getstream/core/faye/Channel.java b/src/main/java/io/getstream/core/faye/Channel.java new file mode 100644 index 00000000..988ee789 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/Channel.java @@ -0,0 +1,119 @@ +package io.getstream.core.faye; + +import com.google.common.base.MoreObjects; +import io.getstream.core.faye.emitter.EventEmitter; +import io.getstream.core.faye.emitter.EventListener; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class Channel { + + public static final String HANDSHAKE = "/meta/handshake"; + public static final String CONNECT = "/meta/connect"; + public static final String DISCONNECT = "/meta/disconnect"; + public static final String SUBSCRIBE = "/meta/subscribe"; + public static final String UNSUBSCRIBE = "/meta/unsubscribe"; + + private final String name; + + public Channel(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + final EventEmitter eventEmitter = new EventEmitter<>(); + + public void bind(String event, EventListener listener) { + eventEmitter.on(event, listener); + } + + public void unbind(String event, EventListener listener) { + eventEmitter.removeListener(event, listener); + } + + public void trigger(String event, Message message) { + eventEmitter.emit(event, message); + } + + public boolean hasListeners(String event) throws Exception { + return eventEmitter.hasListeners(event); + } + + public static List expand(String name) { + final List channels = new ArrayList<>(Arrays.asList("/**", name)); + final List segments = parse(name); + + if (segments == null) return null; + + List copy = new ArrayList<>(segments); + copy.add(copy.size() - 1, "*"); + channels.add(unparse(copy)); + + for (int i = 1; i < segments.size(); i++) { + copy = segments.subList(0, i); + copy.add("**"); + channels.add(unparse(copy)); + } + + return channels; + } + + public static boolean isValid(String name) { + return Grammar.CHANNEL_NAME.matcher(name).matches() + || Grammar.CHANNEL_PATTERN.matcher(name).matches(); + } + + public static List parse(String name) { + if (!isValid(name)) return null; + final String[] splits = name.split("/"); + return Arrays.asList(splits).subList(1, splits.length); + } + + public static String unparse(List segments) { + final String joinedSegments = String.join("/", segments); + return "/" + joinedSegments; + } + + public static Boolean isMeta(String name) { + final List segments = parse(name); + if (segments == null) return null; + return segments.get(0).equals("meta"); + } + + public static Boolean isService(String name) { + final List segments = parse(name); + if (segments == null) return null; + return segments.get(0).equals("service"); + } + + public static Boolean isSubscribable(String name) { + if (!isValid(name)) return null; + final Boolean isMeta = isMeta(name); + final Boolean isService = isService(name); + if (isMeta == null || isService == null) return null; + return !isMeta && !isService; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Channel that = (Channel) o; + return Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).omitNullValues().add("name", this.name).toString(); + } +} diff --git a/src/main/java/io/getstream/core/faye/DefaultMessageTransformer.java b/src/main/java/io/getstream/core/faye/DefaultMessageTransformer.java new file mode 100644 index 00000000..bd462377 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/DefaultMessageTransformer.java @@ -0,0 +1,13 @@ +package io.getstream.core.faye; + +public class DefaultMessageTransformer extends MessageTransformer { + @Override + public Message transformRequest(Message message) { + return message; + } + + @Override + public Message transformResponse(Message message) { + return message; + } +} diff --git a/src/main/java/io/getstream/core/faye/FayeClientError.java b/src/main/java/io/getstream/core/faye/FayeClientError.java new file mode 100644 index 00000000..c8c643d8 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/FayeClientError.java @@ -0,0 +1,57 @@ +package io.getstream.core.faye; + +import com.google.common.base.MoreObjects; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class FayeClientError extends Throwable { + private final Integer code; + private final List params; + private final String errorMessage; + + public FayeClientError(Integer code, List params, String errorMessage) { + this.code = code; + this.params = params; + this.errorMessage = errorMessage; + } + + public static FayeClientError parse(String errorMessage) { + if (errorMessage == null) errorMessage = ""; + if (!Grammar.ERROR.matcher(errorMessage).matches()) { + return new FayeClientError(null, null, errorMessage); + } + + final List parts = Arrays.asList(errorMessage.split(":")); + final Integer code = Integer.parseInt(parts.get(0)); + final List params = Arrays.asList(parts.get(1).split(",")); + final String message = parts.get(2); + + return new FayeClientError(code, params, message); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FayeClientError that = (FayeClientError) o; + return Objects.equals(code, that.code) + && Objects.equals(params, that.params) + && Objects.equals(errorMessage, that.errorMessage); + } + + @Override + public int hashCode() { + return Objects.hash(code, params, errorMessage); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("code", this.code) + .add("params", this.params) + .add("errorMessage", this.errorMessage) + .toString(); + } +} diff --git a/src/main/java/io/getstream/core/faye/Grammar.java b/src/main/java/io/getstream/core/faye/Grammar.java new file mode 100644 index 00000000..1c850ca0 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/Grammar.java @@ -0,0 +1,18 @@ +package io.getstream.core.faye; + +import java.util.regex.Pattern; + +public class Grammar { + static final Pattern CHANNEL_NAME = + Pattern.compile( + "^\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+(\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+)*$", + Pattern.MULTILINE); + static final Pattern CHANNEL_PATTERN = + Pattern.compile( + "^(\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+)*\\/\\*{1,2}$"); + static final Pattern ERROR = + Pattern.compile( + "^([0-9][0-9][0-9]:(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*(,(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*)*:(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*|[0-9][0-9][0-9]::(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*)$"); + static final Pattern VERSION = + Pattern.compile("^([0-9])+(\\.(([a-z]|[A-Z])|[0-9])(((([a-z]|[A-Z])|[0-9])|\\-|\\_))*)*$"); +} diff --git a/src/main/java/io/getstream/core/faye/Message.java b/src/main/java/io/getstream/core/faye/Message.java new file mode 100644 index 00000000..085df34c --- /dev/null +++ b/src/main/java/io/getstream/core/faye/Message.java @@ -0,0 +1,168 @@ +package io.getstream.core.faye; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.google.common.base.MoreObjects; +import java.util.Map; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Message { + private String id; + private final String channel; + private String clientId; + private String connectionType; + private String version; + private String minimumVersion; + private String[] supportedConnectionTypes; + private Advice advice; + private Boolean successful; + private String subscription; + private Map data; + private Map ext; + private String error; + + // for deserialization + public Message() { + this.channel = null; + } + + public Message(String channel) { + this.channel = channel; + } + + public void setId(String id) { + this.id = id; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public void setConnectionType(String connectionType) { + this.connectionType = connectionType; + } + + public void setVersion(String version) { + this.version = version; + } + + public void setMinimumVersion(String minimumVersion) { + this.minimumVersion = minimumVersion; + } + + public void setSupportedConnectionTypes(String[] supportedConnectionTypes) { + this.supportedConnectionTypes = supportedConnectionTypes; + } + + public void setAdvice(Advice advice) { + this.advice = advice; + } + + public void setSuccessful(Boolean successful) { + this.successful = successful; + } + + public void setSubscription(String subscription) { + this.subscription = subscription; + } + + public void setData(Map data) { + this.data = data; + } + + public void setExt(Map ext) { + this.ext = ext; + } + + public void setError(String error) { + this.error = error; + } + + public String getId() { + return id; + } + + public String getChannel() { + return channel; + } + + public String getClientId() { + return clientId; + } + + public String getConnectionType() { + return connectionType; + } + + public String getVersion() { + return version; + } + + public String getMinimumVersion() { + return minimumVersion; + } + + public String[] getSupportedConnectionTypes() { + return supportedConnectionTypes; + } + + public Advice getAdvice() { + return advice; + } + + public Boolean isSuccessful() { + return successful; + } + + public String getSubscription() { + return subscription; + } + + public Map getData() { + return data; + } + + public Map getExt() { + return ext; + } + + public String getError() { + return error; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Message that = (Message) o; + return Objects.equals(id, that.id) + && Objects.equals(channel, that.channel) + && Objects.equals(clientId, that.clientId); + } + + @Override + public int hashCode() { + return Objects.hash(id, channel, clientId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("id", this.id) + .add("channel", this.channel) + .add("clientId", this.clientId) + .add("connectionType", this.connectionType) + .add("version", this.version) + .add("minimumVersion", this.minimumVersion) + .add("supportedConnectionTypes", this.supportedConnectionTypes) + .add("advice", this.advice) + .add("successful", this.successful) + .add("data", this.data) + .add("ext", this.ext) + .add("error", this.error) + .toString(); + } +} diff --git a/src/main/java/io/getstream/core/faye/MessageTransformer.java b/src/main/java/io/getstream/core/faye/MessageTransformer.java new file mode 100644 index 00000000..c57c17b4 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/MessageTransformer.java @@ -0,0 +1,7 @@ +package io.getstream.core.faye; + +public abstract class MessageTransformer { + public abstract Message transformRequest(Message message); + + public abstract Message transformResponse(Message message); +} diff --git a/src/main/java/io/getstream/core/faye/client/Callback.java b/src/main/java/io/getstream/core/faye/client/Callback.java new file mode 100644 index 00000000..51d9fb06 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/Callback.java @@ -0,0 +1,5 @@ +package io.getstream.core.faye.client; + +interface Callback { + void call(); +} diff --git a/src/main/java/io/getstream/core/faye/client/FayeClient.java b/src/main/java/io/getstream/core/faye/client/FayeClient.java new file mode 100644 index 00000000..caf58572 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/FayeClient.java @@ -0,0 +1,455 @@ +package io.getstream.core.faye.client; + +import io.getstream.core.faye.Advice; +import io.getstream.core.faye.Channel; +import io.getstream.core.faye.DefaultMessageTransformer; +import io.getstream.core.faye.FayeClientError; +import io.getstream.core.faye.Message; +import io.getstream.core.faye.MessageTransformer; +import io.getstream.core.faye.subscription.ChannelDataCallback; +import io.getstream.core.faye.subscription.ChannelSubscription; +import io.getstream.core.faye.subscription.SubscriptionCancelledCallback; +import io.getstream.core.utils.Serialization; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.WebSocket; +import okhttp3.WebSocketListener; + +public class FayeClient extends WebSocketListener { + + private static final String BAYEUX_VERSION = "1.0"; + private static final int DEFAULT_TIMEOUT = 60; // seconds + private static final int DEFAULT_INTERVAL = 0; // seconds + + private final String baseURL; + private final int timeout; + private final int interval; + + private Advice advice; + + public FayeClient(URL baseURL) { + String url = baseURL.toString(); + if (url.startsWith("https")) { + url = url.replace("https", "wss"); + } else if (url.startsWith("http")) { + url = url.replace("http", "ws"); + } + + this.baseURL = url; + this.timeout = DEFAULT_TIMEOUT; + this.interval = DEFAULT_INTERVAL; + this.advice = new Advice(Advice.RETRY, 1000 * interval, 1000 * timeout); + } + + private String clientId; + private final Map channels = new HashMap<>(); + private final Map responseCallbacks = new HashMap<>(); + + private MessageTransformer messageTransformer = new DefaultMessageTransformer(); + + public void setMessageTransformer(MessageTransformer messageTransformer) { + this.messageTransformer = messageTransformer; + } + + private FayeClientState state = FayeClientState.UNCONNECTED; + + private void setState(FayeClientState state) { + this.state = state; + if (stateChangeListener != null) stateChangeListener.onStateChanged(state); + } + + private StateChangeListener stateChangeListener; + + public void setStateChangeListener(StateChangeListener stateChangeListener) { + this.stateChangeListener = stateChangeListener; + } + + private WebSocket webSocket; + private final OkHttpClient httpClient = new OkHttpClient(); + + private Timer timer = new Timer(); + + private void initWebSocket() { + // Initiating connection with $baseUrl + if (webSocket != null) { + closeWebSocket(); + } + final Request request = new Request.Builder().url(baseURL).build(); + webSocket = httpClient.newWebSocket(request, this); + } + + private void closeWebSocket() { + // Cancelling all timer tasks + if (timer != null) { + timer.cancel(); + timer = null; + } + + // Closing connection for $baseUrl + if (webSocket != null) { + webSocket.close(1000, "Connection closed by client"); + webSocket = null; + } + } + + @Override + public void onMessage(WebSocket webSocket, String text) { + List messages = null; + try { + messages = Serialization.fromJSONList(text, Message.class); + } catch (IOException ignored) { + } + + if (messages == null) return; + + for (Message message : messages) { + receiveMessage(message); + } + } + + @Override + public void onFailure(WebSocket webSocket, Throwable t, Response response) { + // 'Error occurred', error, stacktrace); + closeWebSocket(); + initWebSocket(); + } + + private boolean manuallyClosed = false; + + @Override + public void onClosed(WebSocket webSocket, int code, String reason) { + closeWebSocket(); + + // Checking if we manually closed the connection + if (manuallyClosed) return; + initWebSocket(); + } + + private void scheduleTimerTask(Callback callback, long duration) { + if (timer == null) timer = new Timer(); + timer.schedule( + new TimerTask() { + @Override + public void run() { + callback.call(); + } + }, + duration); + } + + public void handshake() { + handshake(null); + } + + private void handshake(Callback callback) { + if (Objects.equals(advice.getReconnect(), Advice.NONE)) return; + if (state != FayeClientState.UNCONNECTED) return; + + setState(FayeClientState.CONNECTING); + + initWebSocket(); + + // Initiating handshake with $baseUrl + + final String[] connectionTypes = {"websocket"}; + final Message message = new Message(Channel.HANDSHAKE); + message.setVersion(BAYEUX_VERSION); + message.setSupportedConnectionTypes(connectionTypes); + sendMessage( + message, + response -> { + if (response.isSuccessful() != null && response.isSuccessful()) { + setState(FayeClientState.CONNECTED); + clientId = response.getClientId(); + + // Handshake successful: $clientId + final Set keys = channels.keySet(); + subscribeChannels(keys.toArray(new String[0])); + if (callback != null) callback.call(); + } else { + // Handshake unsuccessful + scheduleTimerTask(() -> handshake(callback), 1000); + setState(FayeClientState.UNCONNECTED); + } + }); + } + + private boolean connectRequestInProgress = false; + + public void connect() { + connect(null); + } + + private void connect(Callback callback) { + if (Objects.equals(advice.getReconnect(), Advice.NONE)) return; + if (state == FayeClientState.DISCONNECTED) return; + + if (state == FayeClientState.UNCONNECTED) { + handshake(() -> connect(callback)); + return; + } + + if (callback != null) callback.call(); + if (state != FayeClientState.CONNECTED) return; + + if (connectRequestInProgress) return; + connectRequestInProgress = true; + + // Initiating connection for $clientId + + final Message message = new Message(Channel.CONNECT); + message.setClientId(clientId); + message.setConnectionType("websocket"); + sendMessage(message, response -> cycleConnection()); + } + + public CompletableFuture disconnect() { + final CompletableFuture disconnectionCompleter = new CompletableFuture<>(); + + if (state != FayeClientState.CONNECTED) disconnectionCompleter.complete(null); + setState(FayeClientState.DISCONNECTED); + + // Disconnecting $clientId + + final Message message = new Message(Channel.DISCONNECT); + message.setClientId(clientId); + sendMessage( + message, + response -> { + if (response.isSuccessful() != null && response.isSuccessful()) { + manuallyClosed = true; + closeWebSocket(); + disconnectionCompleter.complete(null); + } else { + final FayeClientError error = FayeClientError.parse(response.getError()); + disconnectionCompleter.completeExceptionally(error); + } + }); + + // Clearing channel listeners for $clientId + channels.clear(); + + return disconnectionCompleter; + } + + private void subscribeChannels(String[] channels) { + for (String channel : channels) { + subscribe(channel, true); + } + } + + public CompletableFuture subscribe( + String channel, ChannelDataCallback callback) { + return subscribe(channel, callback, null, null); + } + + private CompletableFuture subscribe(String channel, Boolean force) { + return subscribe(channel, null, null, force); + } + + public CompletableFuture subscribe( + String channel, ChannelDataCallback callback, SubscriptionCancelledCallback onCancelled) { + return subscribe(channel, callback, onCancelled, null); + } + + private CompletableFuture subscribe( + String channel, + ChannelDataCallback onData, + SubscriptionCancelledCallback onCancelled, + Boolean force) { + // default value + if (force == null) force = false; + + final CompletableFuture subscriptionCompleter = new CompletableFuture<>(); + + final ChannelSubscription channelSubscription = + new ChannelSubscription(this, channel, onData, onCancelled); + final boolean hasSubscribe = channels.containsKey(channel); + + if (hasSubscribe && !force) { + subscribeChannel(channel, channelSubscription); + subscriptionCompleter.complete(channelSubscription); + } else { + Boolean finalForce = force; + connect( + () -> { + // Client $clientId attempting to subscribe to $channel + if (!finalForce) subscribeChannel(channel, channelSubscription); + final Message message = new Message(Channel.SUBSCRIBE); + message.setClientId(clientId); + message.setSubscription(channel); + sendMessage( + message, + response -> { + if (response.isSuccessful() != null && response.isSuccessful()) { + final String subscribedChannel = response.getSubscription(); + // Subscription acknowledged for $channel to $clientId + subscriptionCompleter.complete(channelSubscription); + } else { + unsubscribeChannel(channel, channelSubscription); + final FayeClientError error = FayeClientError.parse(response.getError()); + subscriptionCompleter.completeExceptionally(error); + } + }); + }); + } + + return subscriptionCompleter; + } + + public void unsubscribe(String channel, ChannelSubscription channelSubscription) { + final boolean dead = unsubscribeChannel(channel, channelSubscription); + if (!dead) return; + + connect( + () -> { + // Client $clientId attempting to unsubscribe from $channel + final Message message = new Message(Channel.UNSUBSCRIBE); + message.setClientId(clientId); + message.setSubscription(channel); + sendMessage( + message, + response -> { + if (response.isSuccessful() != null && response.isSuccessful()) { + final String unsubscribedChannel = response.getSubscription(); + // Un-subscription acknowledged for $clientId from $channel + } + }); + }); + } + + public CompletableFuture publish(String channel, Map data) { + final CompletableFuture publishCompleter = new CompletableFuture<>(); + + connect( + () -> { + // Client $clientId queuing published message to $channel: $data + final Message message = new Message(channel); + message.setData(data); + message.setClientId(clientId); + sendMessage( + message, + response -> { + if (response.isSuccessful() != null && response.isSuccessful()) { + publishCompleter.complete(null); + } else { + final FayeClientError error = FayeClientError.parse(response.getError()); + publishCompleter.completeExceptionally(error); + } + }); + }); + + return publishCompleter; + } + + private final String EVENT_MESSAGE = "message"; + + private void subscribeChannel(String name, ChannelSubscription channelSubscription) { + Channel channel; + if (channels.containsKey(name)) { + channel = channels.get(name); + } else { + channel = new Channel(name); + channels.put(name, channel); + } + channel.bind(EVENT_MESSAGE, channelSubscription::call); + } + + private boolean unsubscribeChannel(String name, ChannelSubscription channelSubscription) { + final Channel channel = channels.get(name); + if (channel == null) return false; + channel.unbind(EVENT_MESSAGE, channelSubscription::call); + try { + if (channel.hasListeners(EVENT_MESSAGE)) { + channels.remove(name); + return true; + } + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + private void distributeChannelMessage(Message message) { + final List expandedChannels = Channel.expand(message.getChannel()); + if (expandedChannels == null) return; + for (String c : expandedChannels) { + final Channel channel = this.channels.get(c); + if (channel != null) channel.trigger(EVENT_MESSAGE, message); + } + } + + private int messageId = 0; + + private String generateMessageId() { + messageId += 1; + if (messageId >= Math.pow(2, 32)) messageId = 0; + return Integer.toString(messageId, 36); + } + + private void sendMessage(Message message) { + sendMessage(message, null); + } + + private void sendMessage(Message message, MessageCallback onResponse) { + final String id = generateMessageId(); + message.setId(id); + message = messageTransformer.transformRequest(message); + // Sending Message: $message + if (onResponse != null) responseCallbacks.put(id, onResponse); + try { + final byte[] payload = Serialization.toJSON(message); + webSocket.send(new String(payload)); + } catch (Exception ignored) { + + } + } + + private void receiveMessage(Message message) { + final String id = message.getId(); + MessageCallback callback = null; + if (message.isSuccessful() != null) { + callback = responseCallbacks.remove(id); + } + message = messageTransformer.transformResponse(message); + // Received message: $message + if (message.getAdvice() != null) handleAdvice(message.getAdvice()); + deliverMessage(message); + if (callback != null) callback.onMessage(message); + } + + private void handleAdvice(Advice advice) { + this.advice = advice; + if (Objects.equals(advice.getReconnect(), Advice.HANDSHAKE) + && state != FayeClientState.DISCONNECTED) { + setState(FayeClientState.UNCONNECTED); + clientId = null; + cycleConnection(); + } + } + + private void deliverMessage(Message message) { + if (message.getChannel() == null || message.getData() == null) return; + // Client $clientId calling listeners for ${message.channel} with ${message.data} + distributeChannelMessage(message); + } + + private void cycleConnection() { + if (connectRequestInProgress) { + connectRequestInProgress = false; + // Closed connection for $clientId + } + scheduleTimerTask(this::connect, advice.getInterval()); + } +} diff --git a/src/main/java/io/getstream/core/faye/client/FayeClientState.java b/src/main/java/io/getstream/core/faye/client/FayeClientState.java new file mode 100644 index 00000000..05bb98be --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/FayeClientState.java @@ -0,0 +1,8 @@ +package io.getstream.core.faye.client; + +public enum FayeClientState { + UNCONNECTED, + CONNECTING, + CONNECTED, + DISCONNECTED, +} diff --git a/src/main/java/io/getstream/core/faye/client/MessageCallback.java b/src/main/java/io/getstream/core/faye/client/MessageCallback.java new file mode 100644 index 00000000..f6c8714d --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/MessageCallback.java @@ -0,0 +1,7 @@ +package io.getstream.core.faye.client; + +import io.getstream.core.faye.Message; + +interface MessageCallback { + void onMessage(Message message); +} diff --git a/src/main/java/io/getstream/core/faye/client/StateChangeListener.java b/src/main/java/io/getstream/core/faye/client/StateChangeListener.java new file mode 100644 index 00000000..127e3f3f --- /dev/null +++ b/src/main/java/io/getstream/core/faye/client/StateChangeListener.java @@ -0,0 +1,5 @@ +package io.getstream.core.faye.client; + +public interface StateChangeListener { + void onStateChanged(FayeClientState clientState); +} diff --git a/src/main/java/io/getstream/core/faye/emitter/ErrorListener.java b/src/main/java/io/getstream/core/faye/emitter/ErrorListener.java new file mode 100644 index 00000000..f7d74b1e --- /dev/null +++ b/src/main/java/io/getstream/core/faye/emitter/ErrorListener.java @@ -0,0 +1,5 @@ +package io.getstream.core.faye.emitter; + +public interface ErrorListener { + void onError(Exception error); +} diff --git a/src/main/java/io/getstream/core/faye/emitter/EventEmitter.java b/src/main/java/io/getstream/core/faye/emitter/EventEmitter.java new file mode 100644 index 00000000..3ca17328 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/emitter/EventEmitter.java @@ -0,0 +1,120 @@ +package io.getstream.core.faye.emitter; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class EventEmitter { + + private final Map>> events = new HashMap<>(); + + private ErrorListener errorListener; + + public void setErrorListener(ErrorListener errorListener) { + this.errorListener = errorListener; + } + + private boolean mounted = true; + + public boolean isMounted() { + return mounted; + } + + private void assertMounted() { + assert isMounted() + : "Tried to use " + + this.getClass().getSimpleName() + + " after `dispose` was called. Consider checking `isMounted`"; + } + + public void emit(String event, T data) { + assertMounted(); + final LinkedList> listeners = events.get(event); + if (listeners == null) return; + boolean didThrow = false; + final List> removables = new ArrayList<>(); + for (ListenerEntry entry : listeners) { + try { + entry.getListener().onData(data); + Integer limit = entry.getLimit(); + if (limit != null) { + if (limit > 0) { + limit -= 1; + entry.setLimit(limit); + } + if (limit == 0) { + removables.add(entry); + } + } + } catch (Exception e) { + didThrow = true; + if (errorListener != null) { + errorListener.onError(e); + } + } + } + for (ListenerEntry entry : removables) { + listeners.remove(entry); + } + if (didThrow) throw new Error(); + } + + public void on(String event, EventListener listener) { + addListener(event, listener); + } + + public void on(String event, EventListener listener, int limit) { + addListener(event, listener, limit); + } + + public void addListener(String event, EventListener listener) { + _addListener(event, listener, null); + } + + public void addListener(String event, EventListener listener, int limit) { + _addListener(event, listener, limit); + } + + void _addListener(String event, EventListener listener, Integer limit) { + assertMounted(); + final ListenerEntry entry = new ListenerEntry(listener, limit); + LinkedList> listeners = events.get(event); + if (listeners == null) listeners = new LinkedList<>(); + listeners.add(entry); + events.put(event, listeners); + } + + public void off(String event) { + assertMounted(); + events.put(event, new LinkedList<>()); + } + + public void removeListener(String event, EventListener listener) { + assertMounted(); + final LinkedList> listeners = events.get(event); + if (listeners == null) return; + listeners.removeIf(curr -> curr.getListener() == listener); + } + + public void removeAllListeners() { + assertMounted(); + events.clear(); + } + + public boolean hasListeners(String event) throws Exception { + assertMounted(); + final LinkedList> listeners = events.get(event); + if (listeners == null) { + throw new Exception("Event not available"); + } + return !listeners.isEmpty(); + } + + public void dispose() { + assertMounted(); + events.values().forEach(LinkedList::clear); + mounted = false; + } +} diff --git a/src/main/java/io/getstream/core/faye/emitter/EventListener.java b/src/main/java/io/getstream/core/faye/emitter/EventListener.java new file mode 100644 index 00000000..c9763660 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/emitter/EventListener.java @@ -0,0 +1,5 @@ +package io.getstream.core.faye.emitter; + +public interface EventListener { + void onData(T data); +} diff --git a/src/main/java/io/getstream/core/faye/emitter/ListenerEntry.java b/src/main/java/io/getstream/core/faye/emitter/ListenerEntry.java new file mode 100644 index 00000000..90d28d6a --- /dev/null +++ b/src/main/java/io/getstream/core/faye/emitter/ListenerEntry.java @@ -0,0 +1,24 @@ +package io.getstream.core.faye.emitter; + +class ListenerEntry { + + public ListenerEntry(EventListener listener, Integer limit) { + this.listener = listener; + this.limit = limit; + } + + private Integer limit; + private final EventListener listener; + + public Integer getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public EventListener getListener() { + return listener; + } +} diff --git a/src/main/java/io/getstream/core/faye/subscription/ChannelDataCallback.java b/src/main/java/io/getstream/core/faye/subscription/ChannelDataCallback.java new file mode 100644 index 00000000..80b415d6 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/subscription/ChannelDataCallback.java @@ -0,0 +1,7 @@ +package io.getstream.core.faye.subscription; + +import java.util.Map; + +public interface ChannelDataCallback { + void onData(Map data); +} diff --git a/src/main/java/io/getstream/core/faye/subscription/ChannelSubscription.java b/src/main/java/io/getstream/core/faye/subscription/ChannelSubscription.java new file mode 100644 index 00000000..a58ec8ba --- /dev/null +++ b/src/main/java/io/getstream/core/faye/subscription/ChannelSubscription.java @@ -0,0 +1,49 @@ +package io.getstream.core.faye.subscription; + +import io.getstream.core.faye.Message; +import io.getstream.core.faye.client.FayeClient; + +public class ChannelSubscription { + private final FayeClient client; + private final String channel; + private final ChannelDataCallback channelDataCallback; + private final SubscriptionCancelledCallback onCancelledCallback; + private WithChannelDataCallback withChannel; + + private boolean cancelled = false; + + public ChannelSubscription(FayeClient client, String channel) { + this.client = client; + this.channel = channel; + this.channelDataCallback = null; + this.onCancelledCallback = null; + } + + public ChannelSubscription( + FayeClient client, + String channel, + ChannelDataCallback channelDataCallback, + SubscriptionCancelledCallback onCancelledCallback) { + this.client = client; + this.channel = channel; + this.channelDataCallback = channelDataCallback; + this.onCancelledCallback = onCancelledCallback; + } + + public ChannelSubscription setWithChannel(WithChannelDataCallback withChannel) { + this.withChannel = withChannel; + return this; + } + + public void call(Message message) { + if (channelDataCallback != null) channelDataCallback.onData(message.getData()); + if (withChannel != null) withChannel.onData(message.getChannel(), message.getData()); + } + + public void cancel() { + if (cancelled) return; + client.unsubscribe(channel, this); + if (onCancelledCallback != null) onCancelledCallback.onCancelled(); + cancelled = true; + } +} diff --git a/src/main/java/io/getstream/core/faye/subscription/SubscriptionCancelledCallback.java b/src/main/java/io/getstream/core/faye/subscription/SubscriptionCancelledCallback.java new file mode 100644 index 00000000..c8e60924 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/subscription/SubscriptionCancelledCallback.java @@ -0,0 +1,5 @@ +package io.getstream.core.faye.subscription; + +public interface SubscriptionCancelledCallback { + void onCancelled(); +} diff --git a/src/main/java/io/getstream/core/faye/subscription/WithChannelDataCallback.java b/src/main/java/io/getstream/core/faye/subscription/WithChannelDataCallback.java new file mode 100644 index 00000000..ec140c15 --- /dev/null +++ b/src/main/java/io/getstream/core/faye/subscription/WithChannelDataCallback.java @@ -0,0 +1,7 @@ +package io.getstream.core.faye.subscription; + +import java.util.Map; + +public interface WithChannelDataCallback { + void onData(String channel, Map data); +} diff --git a/src/main/java/io/getstream/core/models/FeedID.java b/src/main/java/io/getstream/core/models/FeedID.java index 0af7a5ae..d7145492 100644 --- a/src/main/java/io/getstream/core/models/FeedID.java +++ b/src/main/java/io/getstream/core/models/FeedID.java @@ -40,6 +40,10 @@ public String getUserID() { return userID; } + public String getClaim() { + return slug + userID; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/io/getstream/core/models/RealtimeMessage.java b/src/main/java/io/getstream/core/models/RealtimeMessage.java new file mode 100644 index 00000000..54def730 --- /dev/null +++ b/src/main/java/io/getstream/core/models/RealtimeMessage.java @@ -0,0 +1,92 @@ +package io.getstream.core.models; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.OptBoolean; +import com.google.common.base.MoreObjects; +import java.util.Date; +import java.util.List; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RealtimeMessage { + private final FeedID feed; + private final String appID; + private final List deleted; + private final List newActivities; + private final Date publishedAt; + + @JsonCreator + public RealtimeMessage( + @JsonProperty("feed") FeedID feed, + @JsonProperty("app_id") String appID, + @JsonProperty("deleted") List deleted, + @JsonProperty("new") List newActivities, + @JsonProperty("published_at") Date publishedAt) { + this.feed = feed; + this.appID = appID; + this.deleted = deleted; + this.newActivities = newActivities; + this.publishedAt = publishedAt; + } + + public FeedID getFeed() { + return feed; + } + + @JsonProperty("app_id") + public String getAppID() { + return appID; + } + + @JsonFormat( + shape = JsonFormat.Shape.STRING, + pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS", + lenient = OptBoolean.FALSE, + timezone = "UTC") + public List getDeleted() { + return deleted; + } + + @JsonProperty("new") + public List getNewActivities() { + return newActivities; + } + + @JsonProperty("published_at") + public Date getPublishedAt() { + return publishedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RealtimeMessage message = (RealtimeMessage) o; + return Objects.equals(feed, message.feed) + && Objects.equals(appID, message.appID) + && Objects.equals(deleted, message.deleted) + && Objects.equals(newActivities, message.newActivities) + && Objects.equals(publishedAt, message.publishedAt); + } + + @Override + public int hashCode() { + return Objects.hash(feed, appID, deleted, newActivities, publishedAt); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("feed", this.feed) + .add("appID", this.appID) + .add("deleted", this.deleted) + .add("new", this.newActivities) + .add("publishedAt", this.publishedAt) + .toString(); + } +} diff --git a/src/main/java/io/getstream/core/utils/Serialization.java b/src/main/java/io/getstream/core/utils/Serialization.java index 2d7b473e..5f6e74e5 100644 --- a/src/main/java/io/getstream/core/utils/Serialization.java +++ b/src/main/java/io/getstream/core/utils/Serialization.java @@ -53,6 +53,16 @@ public static T fromJSON(InputStream json, String wrapper, JavaType type) th return mapper.readValue(mapper.treeAsTokens(tree), type); } + public static T fromJSON(String json, Class type) throws IOException { + return mapper.readValue(json, type); + } + + public static List fromJSONList(String json, Class type) throws IOException { + final CollectionType collection = + mapper.getTypeFactory().constructCollectionType(List.class, type); + return mapper.readValue(json, collection); + } + public static T convert(U obj, Class type) { return mapper.convertValue(obj, type); } diff --git a/src/test/java/io/getstream/cloud/CloudFeedTest.java b/src/test/java/io/getstream/cloud/CloudFeedTest.java index 9c0a2b96..c6a00d7d 100644 --- a/src/test/java/io/getstream/cloud/CloudFeedTest.java +++ b/src/test/java/io/getstream/cloud/CloudFeedTest.java @@ -1,15 +1,21 @@ package io.getstream.cloud; +import static org.junit.Assert.fail; + import io.getstream.client.Client; import io.getstream.client.entities.FootballMatch; import io.getstream.client.entities.Match; import io.getstream.client.entities.VolleyballMatch; +import io.getstream.core.faye.subscription.ChannelSubscription; import io.getstream.core.http.Token; import io.getstream.core.models.Activity; +import io.getstream.core.models.RealtimeMessage; import io.getstream.core.utils.Enrichment; import java.net.MalformedURLException; import java.util.Date; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java8.util.concurrent.CompletableFuture; import org.junit.BeforeClass; import org.junit.Test; @@ -22,6 +28,10 @@ public class CloudFeedTest { System.getenv("STREAM_SECRET") != null ? System.getenv("STREAM_SECRET") : System.getProperty("STREAM_SECRET"); + private static final String appId = + System.getenv("STREAM_APP_ID") != null + ? System.getenv("STREAM_APP_ID") + : System.getProperty("STREAM_APP_ID"); private static final String userID = "db07b4a3-8f48-41f7-950c-b228364496e1"; private static final Token token = buildToken(); private static String actorID; @@ -147,4 +157,33 @@ public void unfollow() throws Exception { CloudFlatFeed feed2 = client.flatFeed("flat", "2"); feed1.unfollow(feed2).join(); } + + @Test + public void TestFaye() throws Exception { + AtomicReference msg = new AtomicReference<>(); + CloudClient client = CloudClient.builder(apiKey, token, userID, appId).build(); + CloudFlatFeed feed = client.flatFeed("user", userID); + CompletableFuture subscription = + feed.subscribe(message -> msg.set(message)); + + feed.addActivity( + Activity.builder() + .actor("SU:" + userID) + .verb("tweet") + .object("tweet:10") + .extraField("message", "Beautiful bird!") + .build()) + .get(); + + for (int i = 0; i < 15; i++) { + if (msg.get() != null) { + subscription.cancel(true); + return; + } + Thread.sleep(250); + } + + subscription.cancel(true); + fail("Timeout. Did not receive message through websocket in time."); + } } From 4934357ad15b8142be680e8d35d6343bd69e857d Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Thu, 26 May 2022 16:37:53 +0200 Subject: [PATCH 2/2] fix: ci and increase wait to prevent flaky --- .github/workflows/ci.yml | 3 ++- .github/workflows/initiate_release.yml | 2 +- .github/workflows/release.yml | 3 ++- src/test/java/io/getstream/cloud/CloudFeedTest.java | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 67e7253e..8f478592 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,7 +2,7 @@ name: build on: [pull_request] -concurrency: +concurrency: group: ${{ github.workflow }}-${{ github.head_ref }} cancel-in-progress: true @@ -31,6 +31,7 @@ jobs: env: STREAM_KEY: ${{ secrets.STREAM_KEY }} STREAM_SECRET: ${{ secrets.STREAM_SECRET }} + STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }} run: | ./gradlew spotlessCheck --no-daemon ./gradlew test --no-daemon diff --git a/.github/workflows/initiate_release.yml b/.github/workflows/initiate_release.yml index 2d142cf3..1de32aa6 100644 --- a/.github/workflows/initiate_release.yml +++ b/.github/workflows/initiate_release.yml @@ -21,7 +21,7 @@ jobs: VERSION: ${{ github.event.inputs.version }} run: | npx --yes standard-version@9.3.2 --release-as "$VERSION" --skip.tag --skip.commit --tag-prefix=v - git config --global user.name 'github-actions' + git config --global user.name 'github-actions' git config --global user.email 'release@getstream.io' git checkout -q -b "release-$VERSION" git commit -am "chore(release): $VERSION" diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b5bb0739..30dbb0b1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -15,7 +15,7 @@ jobs: - uses: actions/checkout@v3 with: fetch-depth: 0 - + - uses: actions/github-script@v5 with: script: | @@ -34,6 +34,7 @@ jobs: env: STREAM_KEY: ${{ secrets.STREAM_KEY }} STREAM_SECRET: ${{ secrets.STREAM_SECRET }} + STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }} GPG_KEY_CONTENTS: ${{ secrets.GPG_KEY_CONTENTS }} OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }} OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} diff --git a/src/test/java/io/getstream/cloud/CloudFeedTest.java b/src/test/java/io/getstream/cloud/CloudFeedTest.java index c6a00d7d..85512dfb 100644 --- a/src/test/java/io/getstream/cloud/CloudFeedTest.java +++ b/src/test/java/io/getstream/cloud/CloudFeedTest.java @@ -175,7 +175,7 @@ public void TestFaye() throws Exception { .build()) .get(); - for (int i = 0; i < 15; i++) { + for (int i = 0; i < 25; i++) { if (msg.get() != null) { subscription.cancel(true); return;