diff --git a/stream-core/src/main/java/io/getstream/client/model/beans/UnfollowMany.java b/stream-core/src/main/java/io/getstream/client/model/beans/UnfollowMany.java new file mode 100644 index 00000000..e5a12550 --- /dev/null +++ b/stream-core/src/main/java/io/getstream/client/model/beans/UnfollowMany.java @@ -0,0 +1,111 @@ +package io.getstream.client.model.beans; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * Helper bean used to perform bulk unfollow. + */ +public class UnfollowMany { + + @JsonSerialize(contentAs = UnfollowMany.Entry.class) + private final List entries; + + private UnfollowMany(final List entries) { + this.entries = entries; + } + + @JsonValue + public List getEntries() { + return entries; + } + + /** + * Provide an easy way to build an immutable list of unfollow. + */ + public static class Builder { + private ImmutableList.Builder followEntries = new ImmutableList.Builder<>(); + + /** + * Add a new unfollow source/target pair. Keep history is set to false. + * @param source Source feed + * @param target Target feed + * @return This builder. + */ + public Builder add(final String source, final String target) { + this.followEntries.add(new Entry(source, target, false)); + return this; + } + + /** + * Add a new unfollow source/target pair. + * @param source Source feed + * @param target Target feed + * @param keepHistory Whether the history must be preserved. + * @return This builder. + */ + public Builder add(final String source, final String target, final boolean keepHistory) { + this.followEntries.add(new Entry(source, target, keepHistory)); + return this; + } + + public Builder addMany(final List entries) { + this.followEntries.addAll(entries); + return this; + } + + /** + * Build an immutable list of unfollow. + * + * @return A marked activity + */ + public UnfollowMany build() { + return new UnfollowMany(followEntries.build()); + } + } + + public static class Entry { + + private String source; + private String target; + private boolean keepHistory; + + @JsonCreator + public Entry(@JsonProperty("source") final String source, + @JsonProperty("target") final String target, + @JsonProperty("keep_history") final boolean keepHistory) { + this.source = source; + this.target = target; + this.keepHistory = keepHistory; + } + + public String getSource() { + return source; + } + + public void setSource(String source) { + this.source = source; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } + + public boolean getKeepHistory() { + return keepHistory; + } + + public void setKeepHistory(boolean keepHistory) { + this.keepHistory = keepHistory; + } + } +} diff --git a/stream-core/src/main/java/io/getstream/client/model/feeds/BaseFeed.java b/stream-core/src/main/java/io/getstream/client/model/feeds/BaseFeed.java index 0c066b9d..b6f14ae2 100644 --- a/stream-core/src/main/java/io/getstream/client/model/feeds/BaseFeed.java +++ b/stream-core/src/main/java/io/getstream/client/model/feeds/BaseFeed.java @@ -4,6 +4,7 @@ import io.getstream.client.model.activities.BaseActivity; import io.getstream.client.model.beans.FeedFollow; import io.getstream.client.model.beans.FollowMany; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.repo.StreamRepository; import io.getstream.client.service.AggregatedActivityServiceImpl; @@ -69,6 +70,11 @@ public void followMany(FollowMany follows) throws IOException, StreamClientExcep streamRepository.followMany(this, follows, DEFAULT_ACTIVITY_COPY_LIMIT); } + @Override + public void unfollowMany(UnfollowMany unfollowMany) throws IOException, StreamClientException { + streamRepository.unfollowMany(this, unfollowMany); + } + @Override public void unfollow(String feedSlug, String userId) throws IOException, StreamClientException { String feedId = String.format("%s:%s", feedSlug, userId); diff --git a/stream-core/src/main/java/io/getstream/client/model/feeds/Feed.java b/stream-core/src/main/java/io/getstream/client/model/feeds/Feed.java index 740a9425..fe429d92 100644 --- a/stream-core/src/main/java/io/getstream/client/model/feeds/Feed.java +++ b/stream-core/src/main/java/io/getstream/client/model/feeds/Feed.java @@ -4,6 +4,7 @@ import io.getstream.client.model.activities.BaseActivity; import io.getstream.client.model.beans.FeedFollow; import io.getstream.client.model.beans.FollowMany; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.service.AggregatedActivityServiceImpl; import io.getstream.client.service.FlatActivityServiceImpl; @@ -60,7 +61,7 @@ public interface Feed { void follow(String feedSlug, String userId, int activityCopyLimit) throws IOException, StreamClientException; /** - * Follow many feed in one shot. + * Follow many feeds in one shot. * * @param follows A {@link FollowMany} object which contains a list of sources and targets * @param activityCopyLimit the maximum number of activities from a @@ -71,7 +72,7 @@ public interface Feed { void followMany(FollowMany follows, int activityCopyLimit) throws IOException, StreamClientException; /** - * Follow many feed in one shot. + * Follow many feeds in one shot. * Default activity copy limit is set to 300. Maximum 300 activities from a given source feed * will be copied to the target feed. * @@ -81,6 +82,17 @@ public interface Feed { */ void followMany(FollowMany follows) throws IOException, StreamClientException; + /** + * Unfollow many feeds in one shot. + * + * @param unfollowMany A {@link UnfollowMany} object which contains a list of sources and targets. + * Any arbitrary feed can be specified as {@link io.getstream.client.model.beans.UnfollowMany.Entry#setSource(String)}, + * regardless the one used to trigger this operation. + * @throws StreamClientException in case of functional or server-side exception + * @throws IOException in case of network/socket exceptions + */ + void unfollowMany(UnfollowMany unfollowMany) throws IOException, StreamClientException; + /** * Unfollow the given target feed. * diff --git a/stream-core/src/main/java/io/getstream/client/repo/StreamRepository.java b/stream-core/src/main/java/io/getstream/client/repo/StreamRepository.java index 28a933a0..33c3f687 100644 --- a/stream-core/src/main/java/io/getstream/client/repo/StreamRepository.java +++ b/stream-core/src/main/java/io/getstream/client/repo/StreamRepository.java @@ -9,6 +9,7 @@ import io.getstream.client.model.beans.MarkedActivity; import io.getstream.client.model.beans.StreamActivitiesResponse; import io.getstream.client.model.beans.StreamResponse; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.feeds.BaseFeed; import io.getstream.client.model.filters.FeedFilter; @@ -59,7 +60,7 @@ public interface StreamRepository { void follow(BaseFeed feed, String targetFeedId, int activityCopyLimit) throws StreamClientException, IOException; /** - * Follow many feed in one shot. + * Follow many feeds in one shot. * * @param feed Feed that wants to follow a target feed. * @param followManyInput A {@link FollowMany} object which contains a list of sources and targets @@ -69,6 +70,16 @@ public interface StreamRepository { */ void followMany(BaseFeed feed, FollowMany followManyInput, int activityCopyLimit) throws StreamClientException, IOException; + /** + * Unfollow many feeds in one shot. + * + * @param feed Feed that wants to follow a target feed. + * @param unfollowManyInput A {@link UnfollowMany} object which contains a list of sources and targets. + * @throws StreamClientException in case of functional or server-side exception + * @throws IOException in case of network/socket exceptions + */ + void unfollowMany(BaseFeed feed, UnfollowMany unfollowManyInput) throws StreamClientException, IOException; + /** * Unfollow a feed. * diff --git a/stream-repo-apache/src/main/java/io/getstream/client/apache/repo/StreamRepositoryImpl.java b/stream-repo-apache/src/main/java/io/getstream/client/apache/repo/StreamRepositoryImpl.java index b8f6c851..ed03e069 100644 --- a/stream-repo-apache/src/main/java/io/getstream/client/apache/repo/StreamRepositoryImpl.java +++ b/stream-repo-apache/src/main/java/io/getstream/client/apache/repo/StreamRepositoryImpl.java @@ -17,6 +17,7 @@ import io.getstream.client.model.beans.MarkedActivity; import io.getstream.client.model.beans.StreamActivitiesResponse; import io.getstream.client.model.beans.StreamResponse; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.feeds.BaseFeed; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.repo.StreamRepository; @@ -104,6 +105,16 @@ public void followMany(BaseFeed feed, FollowMany followManyInput, int activityCo fireAndForget(request); } + @Override + public void unfollowMany(BaseFeed feed, UnfollowMany unfollowManyInput) throws StreamClientException, IOException { + HttpPost request = new HttpPost(UriBuilder.fromEndpoint(baseEndpoint) + .path("unfollow_many/") + .build()); + request.addHeader(HttpSignatureInterceptor.X_API_KEY_HEADER, apiKey); + request.setEntity(new StringEntity(objectMapper.writeValueAsString(unfollowManyInput), APPLICATION_JSON)); + fireAndForget(request); + } + @Override public void unfollow(BaseFeed feed, String targetFeedId, boolean keepHistory) throws StreamClientException, IOException { HttpDelete request = new HttpDelete(UriBuilder.fromEndpoint(baseEndpoint) diff --git a/stream-repo-apache/src/test/java/io/getstream/client/apache/IntegrationTest.java b/stream-repo-apache/src/test/java/io/getstream/client/apache/IntegrationTest.java index 5e2ace04..2e1938ca 100644 --- a/stream-repo-apache/src/test/java/io/getstream/client/apache/IntegrationTest.java +++ b/stream-repo-apache/src/test/java/io/getstream/client/apache/IntegrationTest.java @@ -20,6 +20,7 @@ import io.getstream.client.model.beans.MarkedActivity; import io.getstream.client.model.beans.StreamActivitiesResponse; import io.getstream.client.model.beans.StreamResponse; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.feeds.Feed; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.service.AggregatedActivityServiceImpl; @@ -178,6 +179,40 @@ public void shouldFollowMany() throws IOException, StreamClientException { streamClient.shutdown(); } + @Test + public void shouldUnfollowMany() throws IOException, StreamClientException { + StreamClient streamClient = new StreamClientImpl(CLIENT_CONFIGURATION, API_KEY, + API_SECRET); + + String followerId = this.getTestUserId("shouldunfollowMany"); + Feed feed = streamClient.newFeed("user", followerId); + + List following = feed.getFollowing(); + assertThat(following.size(), is(0)); + + FollowMany followMany = new FollowMany.Builder() + .add("user:" + followerId, "user:1") + .add("user:" + followerId, "user:2") + .add("user:" + followerId, "user:3") + .build(); + feed.followMany(followMany); + + List followingAfter = feed.getFollowing(); + assertThat(followingAfter.size(), is(3)); + + UnfollowMany unfollowMany = new UnfollowMany.Builder() + .add("user:" + followerId, "user:1") + .add("user:" + followerId, "user:2", true) + .add("user:" + followerId, "user:3", false) + .build(); + feed.unfollowMany(unfollowMany); + + List unfollowingAfter = feed.getFollowing(); + assertThat(unfollowingAfter.size(), is(0)); + + streamClient.shutdown(); + } + @Test public void shouldHaveOriginField() throws IOException, StreamClientException, InterruptedException { StreamClient streamClient = new StreamClientImpl(CLIENT_CONFIGURATION, API_KEY, diff --git a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamActivityRepository.java b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamActivityRepository.java index a0c040b6..78f1a007 100644 --- a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamActivityRepository.java +++ b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamActivityRepository.java @@ -1,16 +1,5 @@ package io.getstream.client.okhttp.repo; -import static io.getstream.client.util.JwtAuthenticationUtil.ALL; -import static io.getstream.client.util.JwtAuthenticationUtil.generateToken; - -import java.io.IOException; -import java.net.URI; -import java.util.Collections; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; import io.getstream.client.exception.StreamClientException; import io.getstream.client.model.activities.AggregatedActivity; @@ -33,6 +22,16 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import static io.getstream.client.util.JwtAuthenticationUtil.ALL; +import static io.getstream.client.util.JwtAuthenticationUtil.generateToken; public class StreamActivityRepository { @@ -68,10 +67,11 @@ public T addActivity(BaseFeed feed, T activity) throws Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructType(activity.getClass())); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructType(activity.getClass())); + } } public StreamActivitiesResponse addActivities(BaseFeed feed, Class type, List activities) throws StreamClientException, IOException { @@ -89,10 +89,11 @@ public StreamActivitiesResponse addActivities(BaseFe Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamActivitiesResponse.class, type)); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamActivitiesResponse.class, type)); + } } public T addToMany(List targetIds, T activity) throws StreamClientException, IOException { @@ -108,10 +109,11 @@ public T addToMany(List targetIds, T activity) final Request request = requestBuilder.build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructType(activity.getClass())); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructType(activity.getClass())); + } } public StreamResponse getActivities(BaseFeed feed, Class type, FeedFilter filter) throws IOException, StreamClientException { @@ -122,10 +124,11 @@ public StreamResponse getActivities(BaseFeed feed, C Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); + } } public StreamResponse> getAggregatedActivities(BaseFeed feed, Class type, FeedFilter filter) throws IOException, StreamClientException { @@ -136,10 +139,11 @@ public StreamResponse> getAggrega Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(AggregatedActivity.class, type))); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(AggregatedActivity.class, type))); + } } public StreamResponse> getNotificationActivities(BaseFeed feed, Class type, FeedFilter filter) throws IOException, StreamClientException { @@ -150,11 +154,12 @@ public StreamResponse> getNotif Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, - objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, + objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + } } public StreamResponse> getNotificationActivities(BaseFeed feed, Class type, FeedFilter filter, boolean markAsRead, boolean markAsSeen) throws IOException, StreamClientException { @@ -167,10 +172,11 @@ public StreamResponse> getNotif Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + } } public StreamResponse> getNotificationActivities(BaseFeed feed, Class type, FeedFilter filter, MarkedActivity markAsRead, MarkedActivity markAsSeen) throws IOException, StreamClientException { @@ -189,10 +195,11 @@ public StreamResponse> getNotif Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, objectMapper.getTypeFactory().constructParametricType(NotificationActivity.class, type))); + } } public void deleteActivityById(BaseFeed feed, String activityId) throws IOException, StreamClientException { @@ -203,8 +210,9 @@ public void deleteActivityById(BaseFeed feed, String activityId) throws IOExcept Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + } } public void deleteActivityByForeignId(BaseFeed feed, String activityId) throws IOException, StreamClientException { @@ -216,8 +224,9 @@ public void deleteActivityByForeignId(BaseFeed feed, String activityId) throws I Request request = addAuthentication(feed, requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + } } public StreamActivitiesResponse updateActivities(BaseFeed feed, Class type, List activities) throws IOException, StreamClientException { @@ -234,10 +243,11 @@ public StreamActivitiesResponse updateActivities(Bas LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - return objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamActivitiesResponse.class, type)); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + return objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamActivitiesResponse.class, type)); + } } private void handleResponseCode(Response response) throws StreamClientException, IOException { diff --git a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamPersonalizedRepositoryImpl.java b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamPersonalizedRepositoryImpl.java index 1820783c..32d36f7a 100644 --- a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamPersonalizedRepositoryImpl.java +++ b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamPersonalizedRepositoryImpl.java @@ -1,18 +1,5 @@ package io.getstream.client.okhttp.repo; -import static io.getstream.client.okhttp.repo.utils.FeedFilterUtils.apply; -import static io.getstream.client.util.JwtAuthenticationUtil.ALL; -import static io.getstream.client.util.JwtAuthenticationUtil.generateToken; - -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.Collections; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.ObjectMapper; import io.getstream.client.config.ClientConfiguration; import io.getstream.client.exception.StreamClientException; @@ -31,6 +18,18 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import static io.getstream.client.okhttp.repo.utils.FeedFilterUtils.apply; +import static io.getstream.client.util.JwtAuthenticationUtil.ALL; +import static io.getstream.client.util.JwtAuthenticationUtil.generateToken; public class StreamPersonalizedRepositoryImpl implements StreamPersonalizedRepository { @@ -71,11 +70,12 @@ public List get(PersonalizedFeed feed, Class requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - StreamResponse streamResponse = objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); - return streamResponse.getResults(); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + StreamResponse streamResponse = objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); + return streamResponse.getResults(); + } } @Override @@ -95,14 +95,15 @@ public MetaResponse addMeta(PersonalizedFeed feed, Serializable metaPayload) thr LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); - StreamResponse responseValue = objectMapper.readValue(response.body().byteStream(), StreamResponse.class); - if (responseValue != null) { - duration = Double.parseDouble(responseValue.getDuration()); + StreamResponse responseValue = objectMapper.readValue(response.body().byteStream(), StreamResponse.class); + if (responseValue != null) { + duration = Double.parseDouble(responseValue.getDuration()); + } + return new MetaResponse(duration, response.code()); } - return new MetaResponse(duration, response.code()); } @Override @@ -118,11 +119,12 @@ public List getInterest(PersonalizedFeed feed, Class requestBuilder).build(); LOG.debug("Invoking url: '{}", request.url().toString()); - Response response = httpClient.newCall(request).execute(); - handleResponseCode(response); - StreamResponse streamResponse = objectMapper.readValue(response.body().byteStream(), - objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); - return streamResponse.getResults(); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + StreamResponse streamResponse = objectMapper.readValue(response.body().byteStream(), + objectMapper.getTypeFactory().constructParametricType(StreamResponse.class, type)); + return streamResponse.getResults(); + } } private void handleResponseCode(Response response) throws StreamClientException, IOException { diff --git a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamRepositoryImpl.java b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamRepositoryImpl.java index 23dfb346..a1d4f65f 100644 --- a/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamRepositoryImpl.java +++ b/stream-repo-okhttp/src/main/java/io/getstream/client/okhttp/repo/StreamRepositoryImpl.java @@ -1,12 +1,5 @@ package io.getstream.client.okhttp.repo; -import java.io.IOException; -import java.net.URI; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.getstream.client.config.ClientConfiguration; @@ -20,6 +13,7 @@ import io.getstream.client.model.beans.MarkedActivity; import io.getstream.client.model.beans.StreamActivitiesResponse; import io.getstream.client.model.beans.StreamResponse; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.feeds.BaseFeed; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.okhttp.StreamClientImpl; @@ -36,6 +30,12 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.List; /** * Actual implementation of the Stream's REST API calls. @@ -111,6 +111,16 @@ public void followMany(BaseFeed feed, FollowMany followManyInput, int activityCo fireAndForget(requestBuilder.build()); } + @Override + public void unfollowMany(BaseFeed feed, UnfollowMany unfollowManyInput) throws StreamClientException, IOException { + Request.Builder requestBuilder = new Request.Builder().url(UriBuilder.fromEndpoint(baseEndpoint) + .path("unfollow_many/") + .build().toURL()); + requestBuilder.addHeader(HttpSignatureHandler.X_API_KEY_HEADER, apiKey); + requestBuilder.post(RequestBody.create(MediaType.parse(APPLICATION_JSON), objectMapper.writeValueAsString(unfollowManyInput))); + fireAndForget(requestBuilder.build()); + } + @Override public void unfollow(BaseFeed feed, String targetFeedId, boolean keepHistory) throws StreamClientException, IOException { Request.Builder requestBuilder = new Request.Builder().url(UriBuilder.fromEndpoint(baseEndpoint) @@ -214,7 +224,9 @@ public void shutdown() throws IOException { private void fireAndForget(final Request request) throws IOException, StreamClientException { LOG.debug("Invoking url: '{}", request.url().toString()); - handleResponseCode(httpClient.newCall(request).execute()); + try (Response response = httpClient.newCall(request).execute()) { + handleResponseCode(response); + } } private void handleResponseCode(Response response) throws StreamClientException, IOException { diff --git a/stream-repo-okhttp/src/test/java/io/getstream/client/okhttp/IntegrationTest.java b/stream-repo-okhttp/src/test/java/io/getstream/client/okhttp/IntegrationTest.java index ba88dec8..afbf5944 100644 --- a/stream-repo-okhttp/src/test/java/io/getstream/client/okhttp/IntegrationTest.java +++ b/stream-repo-okhttp/src/test/java/io/getstream/client/okhttp/IntegrationTest.java @@ -20,6 +20,7 @@ import io.getstream.client.model.beans.MarkedActivity; import io.getstream.client.model.beans.StreamActivitiesResponse; import io.getstream.client.model.beans.StreamResponse; +import io.getstream.client.model.beans.UnfollowMany; import io.getstream.client.model.feeds.Feed; import io.getstream.client.model.filters.FeedFilter; import io.getstream.client.service.AggregatedActivityServiceImpl; @@ -29,9 +30,6 @@ import org.junit.Test; import java.io.IOException; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.security.SignatureException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -172,6 +170,40 @@ public void shouldFollowMany() throws IOException, StreamClientException { streamClient.shutdown(); } + @Test + public void shouldUnfollowMany() throws IOException, StreamClientException { + StreamClient streamClient = new StreamClientImpl(CLIENT_CONFIGURATION, API_KEY, + API_SECRET); + + String followerId = this.getTestUserId("shouldunfollowMany"); + Feed feed = streamClient.newFeed("user", followerId); + + List following = feed.getFollowing(); + assertThat(following.size(), is(0)); + + FollowMany followMany = new FollowMany.Builder() + .add("user:" + followerId, "user:1") + .add("user:" + followerId, "user:2") + .add("user:" + followerId, "user:3") + .build(); + feed.followMany(followMany); + + List followingAfter = feed.getFollowing(); + assertThat(followingAfter.size(), is(3)); + + UnfollowMany unfollowMany = new UnfollowMany.Builder() + .add("user:" + followerId, "user:1") + .add("user:" + followerId, "user:2", true) + .add("user:" + followerId, "user:3", false) + .build(); + feed.unfollowMany(unfollowMany); + + List unfollowingAfter = feed.getFollowing(); + assertThat(unfollowingAfter.size(), is(0)); + + streamClient.shutdown(); + } + @Test public void shouldFollowManyWithActivityCopyLimit() throws IOException, StreamClientException { StreamClient streamClient = new StreamClientImpl(CLIENT_CONFIGURATION, API_KEY,