Skip to content

Commit

Permalink
Merge pull request #38 from contentful/iss37
Browse files Browse the repository at this point in the history
Client sync methods iterate through paginated responses
  • Loading branch information
tomxor committed Nov 21, 2014
2 parents bce6f34 + cde642f commit 20f0976
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 42 deletions.
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

<!-- Dependencies -->
<retrofit.version>1.7.0</retrofit.version>
<rxjava.version>1.0.0</rxjava.version>
<httpcomp.version>4.3.6</httpcomp.version>

<!-- Test Dependencies -->
<commonsio.version>2.4</commonsio.version>
Expand Down Expand Up @@ -66,6 +68,18 @@
<version>${retrofit.version}</version>
</dependency>

<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpcomp.version}</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand Down
58 changes: 34 additions & 24 deletions src/main/java/com/contentful/java/api/CDAClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,11 @@ public CDASpace fetchSpaceBlocking() throws RetrofitError {
* @param callback callback to attach to the request
*/
public void performInitialSynchronization(final CDACallback<CDASyncedSpace> callback) {
ensureSpace(true, new EnsureSpaceCallback(this, callback) {
@Override void onSpaceReady() {
service.performSynchronization(spaceKey, true, null,
new SyncSpaceCallback(null, CDAClient.this, callback));
RxExtensions.defer(new RxExtensions.DefFunc<CDASyncedSpace>() {
@Override CDASyncedSpace method() {
return performInitialSynchronizationBlocking();
}
});
}, callback);
}

/**
Expand All @@ -594,16 +593,32 @@ public CDASyncedSpace performInitialSynchronizationBlocking() throws RetrofitErr
CDASyncedSpace result;

try {
result = gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);
CDASyncedSpace tmp = gson.fromJson(new InputStreamReader(
response.getBody().in()), CDASyncedSpace.class);

result = new SpaceMerger(null, result, null, null, getSpace()).call();
result = iterateSpace(new SpaceMerger(null, tmp, null, null, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}

return result;
}

private CDASyncedSpace iterateSpace(CDASyncedSpace space) throws Exception {
String nextPageUrl = space.getNextPageUrl();
while (nextPageUrl != null) {
String syncToken = Utils.getQueryParamFromUrl(nextPageUrl, "sync_token");
if (syncToken == null) {
break;
}

CDASyncedSpace nextPage = performSynchronization(syncToken);
space = new SpaceMerger(space, nextPage, null, null, getSpace()).call();
nextPageUrl = space.getNextPageUrl();
}
return space;
}

/**
* Sync an existing Space.
*
Expand All @@ -616,12 +631,11 @@ public void performSynchronization(final CDASyncedSpace existingSpace,
throw new IllegalArgumentException("Existing space may not be null.");
}

ensureSpace(true, new EnsureSpaceCallback(this, callback) {
@Override void onSpaceReady() {
service.performSynchronization(spaceKey, null, existingSpace.getSyncToken(),
new SyncSpaceCallback(existingSpace, CDAClient.this, callback));
RxExtensions.defer(new RxExtensions.DefFunc<CDASyncedSpace>() {
@Override CDASyncedSpace method() {
return performSynchronizationBlocking(existingSpace);
}
});
}, callback);
}

/**
Expand All @@ -644,7 +658,9 @@ public CDASyncedSpace performSynchronizationBlocking(CDASyncedSpace existingSpac
try {
CDASyncedSpace updatedSpace =
gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);
result = new SpaceMerger(existingSpace, updatedSpace, null, response, getSpace()).call();

result = iterateSpace(
new SpaceMerger(existingSpace, updatedSpace, null, response, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}
Expand All @@ -660,16 +676,11 @@ public CDASyncedSpace performSynchronizationBlocking(CDASyncedSpace existingSpac
*/
public void performSynchronization(final String syncToken,
final CDACallback<CDASyncedSpace> callback) {
if (syncToken == null) {
throw new IllegalArgumentException("Sync token may not be null.");
}

ensureSpace(true, new EnsureSpaceCallback(this, callback) {
@Override void onSpaceReady() {
service.performSynchronization(spaceKey, null, syncToken,
new SyncSpaceCallback(null, CDAClient.this, callback));
RxExtensions.defer(new RxExtensions.DefFunc<CDASyncedSpace>() {
@Override CDASyncedSpace method() {
return performSynchronization(syncToken);
}
});
}, callback);
}

/**
Expand All @@ -689,8 +700,7 @@ public CDASyncedSpace performSynchronization(String syncToken) throws RetrofitEr

try {
result = gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);

result = new SpaceMerger(null, result, null, null, getSpace()).call();
result = iterateSpace(new SpaceMerger(null, result, null, null, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}
Expand Down
107 changes: 107 additions & 0 deletions src/main/java/com/contentful/java/api/RxExtensions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (C) 2014 Contentful GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.contentful.java.api;

import retrofit.RetrofitError;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.schedulers.Schedulers;

/**
* RxJava Extensions.
*/
final class RxExtensions {
private RxExtensions() {
throw new UnsupportedOperationException();
}

/**
* Base Action.
*/
abstract static class AbsAction<T> implements Action1<T> {
final CDACallback<T> callback;

public AbsAction(CDACallback<T> callback) {
this.callback = callback;
}
}

/**
* Success Action.
*/
static class ActionSuccess<T> extends AbsAction<T> {
public ActionSuccess(CDACallback<T> callback) {
super(callback);
}

@Override public void call(T t) {
if (!callback.isCancelled()) {
callback.onSuccess(t, null);
}
}
}

/**
* Error Action.
*/
static class ActionError extends AbsAction<Throwable> {
@SuppressWarnings("unchecked")
public ActionError(CDACallback callback) {
super(callback);
}

@Override public void call(Throwable t) {
if (!callback.isCancelled()) {
if (t instanceof RetrofitError) {
callback.onFailure((RetrofitError) t);
} else {
callback.onFailure(RetrofitError.unexpectedError(null, t));
}
}
}
}

/**
* DefFunc.
*/
abstract static class DefFunc<T> implements Func0<Observable<T>> {
@Override public final Observable<T> call() {
return Observable.just(method());
}

abstract T method();
}

/**
* Creates an Observable with the given {@code func} function and subscribes to it
* with a set of pre-defined actions. The provided {@code callback} will be passed to these
* actions in order to populate the events.
*/
static <R> CDACallback<R> defer(DefFunc<R> func, CDACallback<R> callback) {
if (callback == null) {
throw new IllegalArgumentException("callback may not be null.");
}

Observable.defer(func)
.observeOn(Schedulers.io())
.subscribe(
new ActionSuccess<R>(callback),
new ActionError(callback));
return callback;
}
}
20 changes: 20 additions & 0 deletions src/main/java/com/contentful/java/api/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Properties;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;

/**
* SDK utilities
Expand Down Expand Up @@ -131,5 +133,23 @@ static String getFromProperties(String field) throws IOException {
properties.load(Utils.class.getClassLoader().getResourceAsStream(SDK_PROPERTIES));
return properties.getProperty(field);
}

static String getQueryParamFromUrl(String url, String param) {
String result = null;

try {
URIBuilder builder = new URIBuilder(url);
for (NameValuePair pair : builder.getQueryParams()) {
if (pair.getName().equalsIgnoreCase(param)) {
result = pair.getValue();
break;
}
}
} catch (URISyntaxException e) {
e.printStackTrace();
}

return result;
}
}

16 changes: 10 additions & 6 deletions src/main/java/com/contentful/java/model/CDASyncedSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,28 @@
public class CDASyncedSpace extends ArrayResource {
private ArrayList<CDAResource> items;
private String nextSyncUrl;
private String nextPageUrl;

public ArrayList<CDAResource> getItems() {
return items;
}

/**
* Gets the next sync URL.
*
* @return String representing the next sync URL for this Space.
* Returns the next sync URL.
*/
public String getNextSyncUrl() {
return nextSyncUrl;
}

/**
* Gets the sync token from this Space's {@code nextSyncUrl} value.
*
* @return String representing the next token to be used for the next sync request.
* Returns the next page URL.
*/
public String getNextPageUrl() {
return nextPageUrl;
}

/**
* Returns the sync token from this Space's {@code nextSyncUrl} value.
*/
public String getSyncToken() {
if (nextSyncUrl == null) {
Expand Down
18 changes: 6 additions & 12 deletions src/test/java/com/contentful/java/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,30 @@ public class ClientTest extends AbsTestCase {
assertTrue(cat.getBestFriend().getBestFriend() == cat);
}

@Test(expected = TestException.class)
public void testCustomErrorHandler() throws Exception {
@Test(expected = TestException.class) public void testCustomErrorHandler() throws Exception {
TestClientFactory.newInstance().setClient(new Client() {
@Override
public Response execute(Request request) throws IOException {
@Override public Response execute(Request request) throws IOException {
throw new RuntimeException();
}
}).setErrorHandler(new ErrorHandler() {
@Override
public Throwable handleError(RetrofitError retrofitError) {
@Override public Throwable handleError(RetrofitError retrofitError) {
return new TestException();
}
}).build().fetchSpaceBlocking();
}

@Test(expected = RetrofitError.class)
public void testSynchronousException() throws Exception {
@Test(expected = RetrofitError.class) public void testSynchronousException() throws Exception {
CDAClient client =
TestClientFactory.newInstance().setAccessToken("error").setSpaceKey("error").build();

client.fetchEntriesBlocking();
}

@SuppressWarnings("unchecked")
@Test public void testNoSSL() throws Exception {
@SuppressWarnings("unchecked") @Test public void testNoSSL() throws Exception {
final Boolean[] res = new Boolean[] { null };

CDAClient client = TestClientFactory.newInstance().noSSL().setClient(new Client() {
@Override
public Response execute(Request request) throws IOException {
@Override public Response execute(Request request) throws IOException {
URI uri = URI.create(request.getUrl());
res[0] = Constants.SCHEME_HTTP.equalsIgnoreCase(uri.getScheme());

Expand Down

0 comments on commit 20f0976

Please sign in to comment.