Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 93 additions & 93 deletions publish.gradle
Original file line number Diff line number Diff line change
@@ -1,93 +1,93 @@
apply plugin: 'maven-publish'
apply plugin: 'signing'

// Create variables with empty default values
ext["ossrhUsername"] = ''
ext["ossrhPassword"] = ''
ext["signing.keyId"] = ''
ext["signing.password"] = ''
ext["signing.secretKeyRingFile"] = ''
ext["sonatypeStagingProfileId"] = ''

File secretPropsFile = project.rootProject.file('local.properties')
if (secretPropsFile.exists()) {
// Read local.properties file first if it exists
Properties p = new Properties()
new FileInputStream(secretPropsFile).withCloseable { is -> p.load(is) }
p.each { name, value -> ext[name] = value }
} else {
// Use system environment variables
ext["ossrhUsername"] = System.getenv('OSSRH_USERNAME')
ext["ossrhPassword"] = System.getenv('OSSRH_PASSWORD')
ext["signing.keyId"] = System.getenv('SIGNING_KEY_ID')
ext["signing.password"] = System.getenv('SIGNING_PASSWORD')
ext["signing.secretKeyRingFile"] = System.getenv('SIGNING_SECRET_KEY_RING_FILE')
ext["sonatypeStagingProfileId"] = System.getenv('SONATYPE_STAGING_PROFILE_ID')
}

nexusPublishing {
repositories {
sonatype {
stagingProfileId = sonatypeStagingProfileId
username = ossrhUsername
password = ossrhPassword
}
}
}

task javadocJar(type: Jar) {
archiveClassifier = 'javadoc'
from javadoc
}

task sourcesJar(type: Jar) {
archiveClassifier = 'sources'
from sourceSets.main.allSource
}

artifacts {
archives javadocJar, sourcesJar
}

afterEvaluate {
publishing {
publications {
release(MavenPublication) {
from components.java
artifactId 'stream-java'

artifact sourcesJar
artifact javadocJar

pom {
name = "Stream Feeds official Java API Client"
description = "Stream Feeds Java Client for backend and android integrations"
url = 'https://github.com/getstream/stream-chat-java'
licenses {
license {
name = 'The 3-Clause BSD License'
url = 'https://opensource.org/licenses/BSD-3-Clause'
distribution = 'repo'
}
}
developers {
developer {
id = 'getstream-support'
name = 'Stream Support'
email = 'support@getstream.io'
}
}
scm {
connection = 'scm:git:github.com/getstream/stream-java.git'
developerConnection = 'scm:git:ssh://github.com/getstream/stream-java.git'
url = 'https://github.com/getstream/stream-java'
}
}
}
}
}
}

signing {
sign publishing.publications
}
//apply plugin: 'maven-publish'
//apply plugin: 'signing'
//
//// Create variables with empty default values
//ext["ossrhUsername"] = ''
//ext["ossrhPassword"] = ''
//ext["signing.keyId"] = ''
//ext["signing.password"] = ''
//ext["signing.secretKeyRingFile"] = ''
//ext["sonatypeStagingProfileId"] = ''
//
//File secretPropsFile = project.rootProject.file('local.properties')
//if (secretPropsFile.exists()) {
// // Read local.properties file first if it exists
// Properties p = new Properties()
// new FileInputStream(secretPropsFile).withCloseable { is -> p.load(is) }
// p.each { name, value -> ext[name] = value }
//} else {
// // Use system environment variables
// ext["ossrhUsername"] = System.getenv('OSSRH_USERNAME')
// ext["ossrhPassword"] = System.getenv('OSSRH_PASSWORD')
// ext["signing.keyId"] = System.getenv('SIGNING_KEY_ID')
// ext["signing.password"] = System.getenv('SIGNING_PASSWORD')
// ext["signing.secretKeyRingFile"] = System.getenv('SIGNING_SECRET_KEY_RING_FILE')
// ext["sonatypeStagingProfileId"] = System.getenv('SONATYPE_STAGING_PROFILE_ID')
//}
//
//nexusPublishing {
// repositories {
// sonatype {
// stagingProfileId = sonatypeStagingProfileId
// username = ossrhUsername
// password = ossrhPassword
// }
// }
//}
//
//task javadocJar(type: Jar) {
// archiveClassifier = 'javadoc'
// from javadoc
//}
//
//task sourcesJar(type: Jar) {
// archiveClassifier = 'sources'
// from sourceSets.main.allSource
//}
//
//artifacts {
// archives javadocJar, sourcesJar
//}
//
//afterEvaluate {
// publishing {
// publications {
// release(MavenPublication) {
// from components.java
// artifactId 'stream-java'
//
// artifact sourcesJar
// artifact javadocJar
//
// pom {
// name = "Stream Feeds official Java API Client"
// description = "Stream Feeds Java Client for backend and android integrations"
// url = 'https://github.com/getstream/stream-chat-java'
// licenses {
// license {
// name = 'The 3-Clause BSD License'
// url = 'https://opensource.org/licenses/BSD-3-Clause'
// distribution = 'repo'
// }
// }
// developers {
// developer {
// id = 'getstream-support'
// name = 'Stream Support'
// email = 'support@getstream.io'
// }
// }
// scm {
// connection = 'scm:git:github.com/getstream/stream-java.git'
// developerConnection = 'scm:git:ssh://github.com/getstream/stream-java.git'
// url = 'https://github.com/getstream/stream-java'
// }
// }
// }
// }
// }
//}
//
//signing {
// sign publishing.publications
//}
11 changes: 8 additions & 3 deletions src/main/java/io/getstream/cloud/CloudClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.getstream.core.faye.DefaultMessageTransformer;
import io.getstream.core.faye.Message;
import io.getstream.core.faye.client.FayeClient;
import io.getstream.core.faye.client.FayeErrorListener;
import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.http.HTTPClient;
import io.getstream.core.http.OKHTTPClientAdapter;
Expand Down Expand Up @@ -219,7 +220,7 @@ public CompletableFuture<OGData> openGraph(URL url) throws StreamException {
}

private CompletableFuture<ChannelSubscription> feedSubscriber(
FeedID feedId, RealtimeMessageCallback messageCallback) {
FeedID feedId, RealtimeMessageCallback messageCallback, FayeErrorListener errorListener) {
final CompletableFuture<ChannelSubscription> subscriberCompletion = new CompletableFuture<>();
try {
checkNotNull(appID, "Missing app id, which is needed in order to subscribe feed");
Expand All @@ -239,10 +240,14 @@ private CompletableFuture<ChannelSubscription> feedSubscriber(
Serialization.fromJSON(new String(payload), RealtimeMessage.class);
messageCallback.onMessage(message);
} catch (Exception e) {
e.printStackTrace();
if (errorListener != null){
errorListener.onError(e, null);
}
}
},
() -> feedSubscriptions.remove("/" + notificationChannel))
() -> feedSubscriptions.remove("/" + notificationChannel),
errorListener
)
.get();

subscription.channelSubscription = channelSubscription;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/getstream/cloud/CloudFeed.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.Iterables;
import io.getstream.core.exceptions.StreamException;
import io.getstream.core.faye.client.FayeErrorListener;
import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.http.Response;
import io.getstream.core.models.Activity;
Expand Down Expand Up @@ -52,9 +53,9 @@ protected final CloudClient getClient() {
}

public final CompletableFuture<ChannelSubscription> subscribe(
RealtimeMessageCallback messageCallback) {
RealtimeMessageCallback messageCallback, FayeErrorListener errorListener) {
checkNotNull(subscriber, "A subscriber must be provided in order to start listening to a feed");
return subscriber.subscribe(id, messageCallback);
return subscriber.subscribe(id, messageCallback, errorListener);
}

public final FeedID getID() {
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/getstream/cloud/FeedSubscriber.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.getstream.cloud;

import io.getstream.core.faye.client.FayeErrorListener;
import io.getstream.core.faye.subscription.ChannelSubscription;
import io.getstream.core.models.FeedID;
import java8.util.concurrent.CompletableFuture;

public interface FeedSubscriber {
CompletableFuture<ChannelSubscription> subscribe(
FeedID feedID, RealtimeMessageCallback messageCallback);
CompletableFuture<ChannelSubscription> subscribe(
FeedID feedID, RealtimeMessageCallback messageCallback, FayeErrorListener errorListener);
}
40 changes: 27 additions & 13 deletions src/main/java/io/getstream/core/faye/client/FayeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand Down Expand Up @@ -58,6 +60,8 @@ public FayeClient(URL baseURL) {

private MessageTransformer messageTransformer = new DefaultMessageTransformer();

private FayeErrorListener errorListener;

public void setMessageTransformer(MessageTransformer messageTransformer) {
this.messageTransformer = messageTransformer;
}
Expand Down Expand Up @@ -123,6 +127,10 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) {
// 'Error occurred', error, stacktrace);
closeWebSocket();
initWebSocket();

if (errorListener != null) {
errorListener.onError(t, response);
}
}

private boolean manuallyClosed = false;
Expand All @@ -137,15 +145,19 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
}

private void scheduleTimerTask(Callback callback, long duration) {
if (timer == null) timer = new Timer();
timer.schedule(
new TimerTask() {
@Override
public void run() {
callback.call();
}
},
duration);
try {
if (timer == null) timer = new Timer();
timer.schedule(
new TimerTask() {
@Override
public void run() {
callback.call();
}
},
duration);
} catch (Exception ignored) {
// We don't really care if the timer is cancelled, we create a new client anyway.
}
}

public void handshake() {
Expand Down Expand Up @@ -251,23 +263,25 @@ private void subscribeChannels(String[] channels) {

public CompletableFuture<ChannelSubscription> subscribe(
String channel, ChannelDataCallback callback) {
return subscribe(channel, callback, null, null);
return subscribe(channel, callback, null,null, null);
}

private CompletableFuture<ChannelSubscription> subscribe(String channel, Boolean force) {
return subscribe(channel, null, null, force);
return subscribe(channel, null, null, null, force);
}

public CompletableFuture<ChannelSubscription> subscribe(
String channel, ChannelDataCallback callback, SubscriptionCancelledCallback onCancelled) {
return subscribe(channel, callback, onCancelled, null);
String channel, ChannelDataCallback callback, SubscriptionCancelledCallback onCancelled, FayeErrorListener errorListener) {
return subscribe(channel, callback, onCancelled, errorListener, null);
}

private CompletableFuture<ChannelSubscription> subscribe(
String channel,
ChannelDataCallback onData,
SubscriptionCancelledCallback onCancelled,
FayeErrorListener errorListener,
Boolean force) {
this.errorListener = errorListener;
// default value
if (force == null) force = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.getstream.core.faye.client;

import okhttp3.Response;

public interface FayeErrorListener {
void onError(Throwable throwable, Response response);
}
2 changes: 1 addition & 1 deletion src/test/java/io/getstream/cloud/CloudFeedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void TestFaye() throws Exception {
CloudClient client = CloudClient.builder(apiKey, token, userID, appId).build();
CloudFlatFeed feed = client.flatFeed("user", userID);
CompletableFuture<ChannelSubscription> subscription =
feed.subscribe(message -> msg.set(message));
feed.subscribe(message -> msg.set(message), null);

feed.addActivity(
Activity.builder()
Expand Down