Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: build

on: [pull_request]

concurrency:
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true

Expand Down Expand Up @@ -31,6 +31,7 @@ jobs:
env:
STREAM_KEY: ${{ secrets.STREAM_KEY }}
STREAM_SECRET: ${{ secrets.STREAM_SECRET }}
STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }}
run: |
./gradlew spotlessCheck --no-daemon
./gradlew test --no-daemon
2 changes: 1 addition & 1 deletion .github/workflows/initiate_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
VERSION: ${{ github.event.inputs.version }}
run: |
npx --yes standard-version@9.3.2 --release-as "$VERSION" --skip.tag --skip.commit --tag-prefix=v
git config --global user.name 'github-actions'
git config --global user.name 'github-actions'
git config --global user.email 'release@getstream.io'
git checkout -q -b "release-$VERSION"
git commit -am "chore(release): $VERSION"
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- uses: actions/github-script@v5
with:
script: |
Expand All @@ -34,6 +34,7 @@ jobs:
env:
STREAM_KEY: ${{ secrets.STREAM_KEY }}
STREAM_SECRET: ${{ secrets.STREAM_SECRET }}
STREAM_APP_ID: ${{ secrets.STREAM_APP_ID }}
GPG_KEY_CONTENTS: ${{ secrets.GPG_KEY_CONTENTS }}
OSSRH_USERNAME: ${{ secrets.OSSRH_USERNAME }}
OSSRH_PASSWORD: ${{ secrets.OSSRH_PASSWORD }}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/getstream/cloud/CloudAggregatedFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class CloudAggregatedFeed extends CloudFeed {
super(client, id);
}

CloudAggregatedFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
super(client, id, subscriber);
}

public CompletableFuture<? extends List<? extends Group<Activity>>> getActivities()
throws StreamException {
return getActivities(
Expand Down
143 changes: 135 additions & 8 deletions src/main/java/io/getstream/cloud/CloudClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import io.getstream.core.Region;
import io.getstream.core.Stream;
import io.getstream.core.exceptions.StreamException;
import io.getstream.core.faye.DefaultMessageTransformer;
import io.getstream.core.faye.Message;
import io.getstream.core.faye.client.FayeClient;
import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.http.HTTPClient;
import io.getstream.core.http.OKHTTPClientAdapter;
import io.getstream.core.http.Response;
Expand All @@ -14,20 +18,38 @@
import io.getstream.core.models.Data;
import io.getstream.core.models.FeedID;
import io.getstream.core.models.OGData;
import io.getstream.core.models.RealtimeMessage;
import io.getstream.core.options.RequestOption;
import io.getstream.core.utils.Serialization;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java8.util.concurrent.CompletableFuture;

public final class CloudClient {
private final String apiKey;
private final Token token;
private final String appID;
private final String userID;
private final Stream stream;

private CloudClient(String key, Token token, String userID, URL baseURL, HTTPClient httpClient) {
private final FayeClient faye;

private CloudClient(
String key,
Token token,
String userID,
String appID,
URL baseURL,
HTTPClient httpClient,
URL fayeURL) {
this.apiKey = key;
this.token = token;
this.appID = appID;
this.userID = userID;
this.stream = new Stream(key, baseURL, httpClient);
this.faye = new FayeClient(fayeURL);
this.faye.setMessageTransformer(new FayeMessageTransformer());
}

public static Builder builder(String apiKey, String token, String userID) {
Expand All @@ -38,22 +60,25 @@ public static Builder builder(String apiKey, Token token, String userID) {
return new Builder(apiKey, token, userID);
}

public CompletableFuture<OGData> openGraph(URL url) throws StreamException {
return stream.openGraph(token, url);
public static Builder builder(String apiKey, Token token, String userID, String appID) {
return new Builder(apiKey, token, userID, appID);
}

public static final class Builder {
private static final String DEFAULT_HOST = "stream-io-api.com";
private static final String DEFAULT_FAYE_URL = "https://faye-us-east.stream-io-api.com/faye";

private final String apiKey;
private final Token token;
private final String userID;
private final String appID;
private HTTPClient httpClient;

private String scheme = "https";
private String region = Region.US_EAST.toString();
private String host = DEFAULT_HOST;
private int port = 443;
private String fayeURL = DEFAULT_FAYE_URL;

public Builder(String apiKey, Token token, String userID) {
checkNotNull(apiKey, "API key can't be null");
Expand All @@ -64,6 +89,19 @@ public Builder(String apiKey, Token token, String userID) {
this.apiKey = apiKey;
this.token = token;
this.userID = userID;
this.appID = null;
}

public Builder(String apiKey, Token token, String userID, String appID) {
checkNotNull(apiKey, "API key can't be null");
checkNotNull(token, "Token can't be null");
checkNotNull(userID, "User ID can't be null");
checkArgument(!apiKey.isEmpty(), "API key can't be empty");
checkArgument(!userID.isEmpty(), "User ID can't be empty");
this.apiKey = apiKey;
this.token = token;
this.userID = userID;
this.appID = appID;
}

public Builder httpClient(HTTPClient httpClient) {
Expand Down Expand Up @@ -105,6 +143,13 @@ public Builder region(String region) {
return this;
}

public Builder fayeURL(String fayeURL) {
checkNotNull(fayeURL, "FayeUrl can't be null");
checkArgument(!fayeURL.isEmpty(), "FayeUrl can't be empty");
this.fayeURL = fayeURL;
return this;
}

private String buildHost() {
final StringBuilder sb = new StringBuilder();
if (host.equals(DEFAULT_HOST)) {
Expand All @@ -118,15 +163,97 @@ public CloudClient build() throws MalformedURLException {
if (httpClient == null) {
httpClient = new OKHTTPClientAdapter();
}

return new CloudClient(
apiKey, token, userID, new URL(scheme, buildHost(), port, ""), httpClient);
apiKey,
token,
userID,
appID,
new URL(scheme, buildHost(), port, ""),
httpClient,
new URL(DEFAULT_FAYE_URL));
}
}

private static class FeedSubscription {
private String token;
private String userId;
private ChannelSubscription channelSubscription;

private FeedSubscription(String token, String userId) {
this.token = token;
this.userId = userId;
}

private FeedSubscription(String token, String userId, ChannelSubscription subscription) {
this.token = token;
this.userId = userId;
this.channelSubscription = subscription;
}
}

private final Map<String, FeedSubscription> feedSubscriptions = new HashMap<>();

private class FayeMessageTransformer extends DefaultMessageTransformer {
@Override
public Message transformRequest(Message message) {
final String subscription = message.getSubscription();
if (feedSubscriptions.containsKey(subscription)) {
final FeedSubscription feedSubscription = feedSubscriptions.get(subscription);
final Map<String, Object> ext = new HashMap<>();
ext.put("user_id", feedSubscription.userId);
ext.put("api_key", apiKey);
ext.put("signature", feedSubscription.token);
message.setExt(ext);
}
return message;
}
}

public <T> T getHTTPClientImplementation() {
return stream.getHTTPClientImplementation();
}

public CompletableFuture<OGData> openGraph(URL url) throws StreamException {
return stream.openGraph(token, url);
}

private CompletableFuture<ChannelSubscription> feedSubscriber(
FeedID feedId, RealtimeMessageCallback messageCallback) {
final CompletableFuture<ChannelSubscription> subscriberCompletion = new CompletableFuture<>();
try {
checkNotNull(appID, "Missing app id, which is needed in order to subscribe feed");
final String claim = feedId.getClaim();
final String notificationChannel = "site" + "-" + appID + "-" + "feed" + "-" + claim;
final FeedSubscription subscription =
new FeedSubscription(token.toString(), notificationChannel);
feedSubscriptions.put("/" + notificationChannel, subscription);

final ChannelSubscription channelSubscription =
faye.subscribe(
"/" + notificationChannel,
data -> {
try {
final byte[] payload = Serialization.toJSON(data);
final RealtimeMessage message =
Serialization.fromJSON(new String(payload), RealtimeMessage.class);
messageCallback.onMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
},
() -> feedSubscriptions.remove("/" + notificationChannel))
.get();

subscription.channelSubscription = channelSubscription;
feedSubscriptions.put("/" + notificationChannel, subscription);
subscriberCompletion.complete(channelSubscription);
} catch (Exception e) {
subscriberCompletion.completeExceptionally(e);
}
return subscriberCompletion;
}

// TODO: add personalized feed versions
public CloudFlatFeed flatFeed(String slug) {
return flatFeed(slug, userID);
Expand All @@ -141,7 +268,7 @@ public CloudFlatFeed flatFeed(String slug, String userID) {
}

public CloudFlatFeed flatFeed(FeedID id) {
return new CloudFlatFeed(this, id);
return new CloudFlatFeed(this, id, this::feedSubscriber);
}

public CloudAggregatedFeed aggregatedFeed(String slug) {
Expand All @@ -157,7 +284,7 @@ public CloudAggregatedFeed aggregatedFeed(String slug, String userID) {
}

public CloudAggregatedFeed aggregatedFeed(FeedID id) {
return new CloudAggregatedFeed(this, id);
return new CloudAggregatedFeed(this, id, this::feedSubscriber);
}

public CloudNotificationFeed notificationFeed(String slug) {
Expand All @@ -173,7 +300,7 @@ public CloudNotificationFeed notificationFeed(String slug, String userID) {
}

public CloudNotificationFeed notificationFeed(FeedID id) {
return new CloudNotificationFeed(this, id);
return new CloudNotificationFeed(this, id, this::feedSubscriber);
}

public CloudUser user(String userID) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/getstream/cloud/CloudFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.Iterables;
import io.getstream.core.exceptions.StreamException;
import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.http.Response;
import io.getstream.core.models.Activity;
import io.getstream.core.models.FeedID;
Expand All @@ -26,19 +27,36 @@
public class CloudFeed {
private final CloudClient client;
private final FeedID id;
private final FeedSubscriber subscriber;

CloudFeed(CloudClient client, FeedID id) {
checkNotNull(client, "Can't create feed w/o a client");
checkNotNull(id, "Can't create feed w/o an ID");

this.client = client;
this.id = id;
this.subscriber = null;
}

CloudFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
checkNotNull(client, "Can't create feed w/o a client");
checkNotNull(id, "Can't create feed w/o an ID");

this.client = client;
this.id = id;
this.subscriber = subscriber;
}

protected final CloudClient getClient() {
return client;
}

public final CompletableFuture<ChannelSubscription> subscribe(
RealtimeMessageCallback messageCallback) {
checkNotNull(subscriber, "A subscriber must be provided in order to start listening to a feed");
return subscriber.subscribe(id, messageCallback);
}

public final FeedID getID() {
return id;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/getstream/cloud/CloudFlatFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public final class CloudFlatFeed extends CloudFeed {
super(client, id);
}

CloudFlatFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
super(client, id, subscriber);
}

public CompletableFuture<List<Activity>> getActivities() throws StreamException {
return getActivities(
DefaultOptions.DEFAULT_LIMIT,
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/getstream/cloud/CloudNotificationFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public final class CloudNotificationFeed extends CloudFeed {
super(client, id);
}

CloudNotificationFeed(CloudClient client, FeedID id, FeedSubscriber subscriber) {
super(client, id, subscriber);
}

public CompletableFuture<PaginatedNotificationGroup<Activity>> getActivities()
throws StreamException {
return getActivities(
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/getstream/cloud/FeedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.getstream.cloud;

import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.models.FeedID;
import java8.util.concurrent.CompletableFuture;

public interface FeedSubscriber {
CompletableFuture<ChannelSubscription> subscribe(
FeedID feedID, RealtimeMessageCallback messageCallback);
}
7 changes: 7 additions & 0 deletions src/main/java/io/getstream/cloud/RealtimeMessageCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.getstream.cloud;

import io.getstream.core.models.RealtimeMessage;

public interface RealtimeMessageCallback {
void onMessage(RealtimeMessage message);
}
Loading