Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,383 changes: 695 additions & 688 deletions src/main/java/io/getstream/cloud/CloudAggregatedFeed.java

Large diffs are not rendered by default.

566 changes: 341 additions & 225 deletions src/main/java/io/getstream/cloud/CloudClient.java

Large diffs are not rendered by default.

633 changes: 327 additions & 306 deletions src/main/java/io/getstream/cloud/CloudFeed.java

Large diffs are not rendered by default.

1,292 changes: 649 additions & 643 deletions src/main/java/io/getstream/cloud/CloudFlatFeed.java

Large diffs are not rendered by default.

1,536 changes: 771 additions & 765 deletions src/main/java/io/getstream/cloud/CloudNotificationFeed.java

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/main/java/io/getstream/cloud/FeedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.getstream.cloud;

import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.models.FeedID;
import java8.util.concurrent.CompletableFuture;


public interface FeedSubscriber {
CompletableFuture<ChannelSubscription> subscribe(FeedID feedID, RealtimeMessageCallback messageCallback);
}
7 changes: 7 additions & 0 deletions src/main/java/io/getstream/cloud/RealtimeMessageCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.getstream.cloud;

import io.getstream.core.models.RealtimeMessage;

public interface RealtimeMessageCallback {
void onMessage(RealtimeMessage message);
}
69 changes: 69 additions & 0 deletions src/main/java/io/getstream/core/faye/Advice.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.getstream.core.faye;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.google.common.base.MoreObjects;

import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Advice {
private final String reconnect;
private final Integer interval;
private final Integer timeout;

public static final String NONE = "none";
public static final String HANDSHAKE = "handshake";
public static final String RETRY = "retry";

// for deserialization
public Advice() {
reconnect = null;
interval = null;
timeout = null;
}

public Advice(String reconnect, Integer interval, Integer timeout) {
this.reconnect = reconnect;
this.interval = interval;
this.timeout = timeout;
}

public String getReconnect() {
return reconnect;
}

public Integer getInterval() {
return interval;
}

public Integer getTimeout() {
return timeout;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Advice that = (Advice) o;
return Objects.equals(reconnect, that.reconnect)
&& Objects.equals(interval, that.interval)
&& Objects.equals(timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(reconnect, interval, timeout);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("reconnect", this.reconnect)
.add("interval", this.interval)
.add("timeout", this.timeout)
.toString();
}
}
124 changes: 124 additions & 0 deletions src/main/java/io/getstream/core/faye/Channel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.getstream.core.faye;

import com.google.common.base.MoreObjects;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import io.getstream.core.faye.emitter.EventEmitter;
import io.getstream.core.faye.emitter.EventListener;

public class Channel {

public static final String HANDSHAKE = "/meta/handshake";
public static final String CONNECT = "/meta/connect";
public static final String DISCONNECT = "/meta/disconnect";
public static final String SUBSCRIBE = "/meta/subscribe";
public static final String UNSUBSCRIBE = "/meta/unsubscribe";

private final String name;

public Channel(String name) {
this.name = name;
}

public String getName() {
return name;
}

final EventEmitter<Message> eventEmitter = new EventEmitter<>();

public void bind(String event, EventListener<Message> listener) {
eventEmitter.on(event, listener);
}

public void unbind(String event, EventListener<Message> listener) {
eventEmitter.removeListener(event, listener);
}

public void trigger(String event, Message message) {
eventEmitter.emit(event, message);
}

public boolean hasListeners(String event) throws Exception {
return eventEmitter.hasListeners(event);
}

public static List<String> expand(String name) {
final List<String> channels = new ArrayList<>(Arrays.asList("/**", name));
final List<String> segments = parse(name);

if (segments == null) return null;

List<String> copy = new ArrayList<>(segments);
copy.add(copy.size() - 1, "*");
channels.add(unparse(copy));

for (int i = 1; i < segments.size(); i++) {
copy = segments.subList(0, i);
copy.add("**");
channels.add(unparse(copy));
}

return channels;
}

public static boolean isValid(String name) {
return Grammar.CHANNEL_NAME.matcher(name).matches() || Grammar.CHANNEL_PATTERN.matcher(name).matches();
}

public static List<String> parse(String name) {
if (!isValid(name)) return null;
final String[] splits = name.split("/");
return Arrays.asList(splits).subList(1, splits.length);
}

public static String unparse(List<String> segments) {
final String joinedSegments = String.join("/", segments);
return "/" + joinedSegments;
}

public static Boolean isMeta(String name) {
final List<String> segments = parse(name);
if (segments == null) return null;
return segments.get(0).equals("meta");
}

public static Boolean isService(String name) {
final List<String> segments = parse(name);
if (segments == null) return null;
return segments.get(0).equals("service");
}

public static Boolean isSubscribable(String name) {
if (!isValid(name)) return null;
final Boolean isMeta = isMeta(name);
final Boolean isService = isService(name);
if (isMeta == null || isService == null) return null;
return !isMeta && !isService;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Channel that = (Channel) o;
return Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(name);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("name", this.name)
.toString();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.getstream.core.faye;

public class DefaultMessageTransformer extends MessageTransformer {
@Override
public Message transformRequest(Message message) {
return message;
}

@Override
public Message transformResponse(Message message) {
return message;
}
}
58 changes: 58 additions & 0 deletions src/main/java/io/getstream/core/faye/FayeClientError.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.getstream.core.faye;

import com.google.common.base.MoreObjects;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class FayeClientError extends Throwable {
private final Integer code;
private final List<String> params;
private final String errorMessage;

public FayeClientError(Integer code, List<String> params, String errorMessage) {
this.code = code;
this.params = params;
this.errorMessage = errorMessage;
}

public static FayeClientError parse(String errorMessage) {
if (errorMessage == null) errorMessage = "";
if (!Grammar.ERROR.matcher(errorMessage).matches()) {
return new FayeClientError(null, null, errorMessage);
}

final List<String> parts = Arrays.asList(errorMessage.split(":"));
final Integer code = Integer.parseInt(parts.get(0));
final List<String> params = Arrays.asList(parts.get(1).split(","));
final String message = parts.get(2);

return new FayeClientError(code, params, message);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FayeClientError that = (FayeClientError) o;
return Objects.equals(code, that.code)
&& Objects.equals(params, that.params)
&& Objects.equals(errorMessage, that.errorMessage);
}

@Override
public int hashCode() {
return Objects.hash(code, params, errorMessage);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("code", this.code)
.add("params", this.params)
.add("errorMessage", this.errorMessage)
.toString();
}
}
10 changes: 10 additions & 0 deletions src/main/java/io/getstream/core/faye/Grammar.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.getstream.core.faye;

import java.util.regex.Pattern;

public class Grammar {
static final Pattern CHANNEL_NAME = Pattern.compile("^\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+(\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+)*$", Pattern.MULTILINE);
static final Pattern CHANNEL_PATTERN = Pattern.compile("^(\\/(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)))+)*\\/\\*{1,2}$");
static final Pattern ERROR = Pattern.compile("^([0-9][0-9][0-9]:(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*(,(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*)*:(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*|[0-9][0-9][0-9]::(((([a-z]|[A-Z])|[0-9])|(\\-|\\_|\\!|\\~|\\(|\\)|\\$|\\@)| |\\/|\\*|\\.))*)$");
static final Pattern VERSION = Pattern.compile("^([0-9])+(\\.(([a-z]|[A-Z])|[0-9])(((([a-z]|[A-Z])|[0-9])|\\-|\\_))*)*$");
}
Loading