diff --git a/pom.xml b/pom.xml
index e7b3afb0..0a47aa9d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,8 @@
1.7.0
+ 1.0.0
+ 4.3.6
2.4
@@ -66,6 +68,18 @@
${retrofit.version}
+
+ io.reactivex
+ rxjava
+ ${rxjava.version}
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ ${httpcomp.version}
+
+
commons-io
commons-io
diff --git a/src/main/java/com/contentful/java/api/CDAClient.java b/src/main/java/com/contentful/java/api/CDAClient.java
index 9c537eff..1da49ca8 100644
--- a/src/main/java/com/contentful/java/api/CDAClient.java
+++ b/src/main/java/com/contentful/java/api/CDAClient.java
@@ -575,12 +575,11 @@ public CDASpace fetchSpaceBlocking() throws RetrofitError {
* @param callback callback to attach to the request
*/
public void performInitialSynchronization(final CDACallback callback) {
- ensureSpace(true, new EnsureSpaceCallback(this, callback) {
- @Override void onSpaceReady() {
- service.performSynchronization(spaceKey, true, null,
- new SyncSpaceCallback(null, CDAClient.this, callback));
+ RxExtensions.defer(new RxExtensions.DefFunc() {
+ @Override CDASyncedSpace method() {
+ return performInitialSynchronizationBlocking();
}
- });
+ }, callback);
}
/**
@@ -594,9 +593,10 @@ public CDASyncedSpace performInitialSynchronizationBlocking() throws RetrofitErr
CDASyncedSpace result;
try {
- result = gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);
+ CDASyncedSpace tmp = gson.fromJson(new InputStreamReader(
+ response.getBody().in()), CDASyncedSpace.class);
- result = new SpaceMerger(null, result, null, null, getSpace()).call();
+ result = iterateSpace(new SpaceMerger(null, tmp, null, null, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}
@@ -604,6 +604,21 @@ public CDASyncedSpace performInitialSynchronizationBlocking() throws RetrofitErr
return result;
}
+ private CDASyncedSpace iterateSpace(CDASyncedSpace space) throws Exception {
+ String nextPageUrl = space.getNextPageUrl();
+ while (nextPageUrl != null) {
+ String syncToken = Utils.getQueryParamFromUrl(nextPageUrl, "sync_token");
+ if (syncToken == null) {
+ break;
+ }
+
+ CDASyncedSpace nextPage = performSynchronization(syncToken);
+ space = new SpaceMerger(space, nextPage, null, null, getSpace()).call();
+ nextPageUrl = space.getNextPageUrl();
+ }
+ return space;
+ }
+
/**
* Sync an existing Space.
*
@@ -616,12 +631,11 @@ public void performSynchronization(final CDASyncedSpace existingSpace,
throw new IllegalArgumentException("Existing space may not be null.");
}
- ensureSpace(true, new EnsureSpaceCallback(this, callback) {
- @Override void onSpaceReady() {
- service.performSynchronization(spaceKey, null, existingSpace.getSyncToken(),
- new SyncSpaceCallback(existingSpace, CDAClient.this, callback));
+ RxExtensions.defer(new RxExtensions.DefFunc() {
+ @Override CDASyncedSpace method() {
+ return performSynchronizationBlocking(existingSpace);
}
- });
+ }, callback);
}
/**
@@ -644,7 +658,9 @@ public CDASyncedSpace performSynchronizationBlocking(CDASyncedSpace existingSpac
try {
CDASyncedSpace updatedSpace =
gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);
- result = new SpaceMerger(existingSpace, updatedSpace, null, response, getSpace()).call();
+
+ result = iterateSpace(
+ new SpaceMerger(existingSpace, updatedSpace, null, response, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}
@@ -660,16 +676,11 @@ public CDASyncedSpace performSynchronizationBlocking(CDASyncedSpace existingSpac
*/
public void performSynchronization(final String syncToken,
final CDACallback callback) {
- if (syncToken == null) {
- throw new IllegalArgumentException("Sync token may not be null.");
- }
-
- ensureSpace(true, new EnsureSpaceCallback(this, callback) {
- @Override void onSpaceReady() {
- service.performSynchronization(spaceKey, null, syncToken,
- new SyncSpaceCallback(null, CDAClient.this, callback));
+ RxExtensions.defer(new RxExtensions.DefFunc() {
+ @Override CDASyncedSpace method() {
+ return performSynchronization(syncToken);
}
- });
+ }, callback);
}
/**
@@ -689,8 +700,7 @@ public CDASyncedSpace performSynchronization(String syncToken) throws RetrofitEr
try {
result = gson.fromJson(new InputStreamReader(response.getBody().in()), CDASyncedSpace.class);
-
- result = new SpaceMerger(null, result, null, null, getSpace()).call();
+ result = iterateSpace(new SpaceMerger(null, result, null, null, getSpace()).call());
} catch (Exception e) {
throw RetrofitError.unexpectedError(response.getUrl(), e);
}
diff --git a/src/main/java/com/contentful/java/api/RxExtensions.java b/src/main/java/com/contentful/java/api/RxExtensions.java
new file mode 100644
index 00000000..12fcd908
--- /dev/null
+++ b/src/main/java/com/contentful/java/api/RxExtensions.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 2014 Contentful GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.contentful.java.api;
+
+import retrofit.RetrofitError;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func0;
+import rx.schedulers.Schedulers;
+
+/**
+ * RxJava Extensions.
+ */
+final class RxExtensions {
+ private RxExtensions() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Base Action.
+ */
+ abstract static class AbsAction implements Action1 {
+ final CDACallback callback;
+
+ public AbsAction(CDACallback callback) {
+ this.callback = callback;
+ }
+ }
+
+ /**
+ * Success Action.
+ */
+ static class ActionSuccess extends AbsAction {
+ public ActionSuccess(CDACallback callback) {
+ super(callback);
+ }
+
+ @Override public void call(T t) {
+ if (!callback.isCancelled()) {
+ callback.onSuccess(t, null);
+ }
+ }
+ }
+
+ /**
+ * Error Action.
+ */
+ static class ActionError extends AbsAction {
+ @SuppressWarnings("unchecked")
+ public ActionError(CDACallback callback) {
+ super(callback);
+ }
+
+ @Override public void call(Throwable t) {
+ if (!callback.isCancelled()) {
+ if (t instanceof RetrofitError) {
+ callback.onFailure((RetrofitError) t);
+ } else {
+ callback.onFailure(RetrofitError.unexpectedError(null, t));
+ }
+ }
+ }
+ }
+
+ /**
+ * DefFunc.
+ */
+ abstract static class DefFunc implements Func0> {
+ @Override public final Observable call() {
+ return Observable.just(method());
+ }
+
+ abstract T method();
+ }
+
+ /**
+ * Creates an Observable with the given {@code func} function and subscribes to it
+ * with a set of pre-defined actions. The provided {@code callback} will be passed to these
+ * actions in order to populate the events.
+ */
+ static CDACallback defer(DefFunc func, CDACallback callback) {
+ if (callback == null) {
+ throw new IllegalArgumentException("callback may not be null.");
+ }
+
+ Observable.defer(func)
+ .observeOn(Schedulers.io())
+ .subscribe(
+ new ActionSuccess(callback),
+ new ActionError(callback));
+ return callback;
+ }
+}
diff --git a/src/main/java/com/contentful/java/api/Utils.java b/src/main/java/com/contentful/java/api/Utils.java
index a34dea19..a8467503 100644
--- a/src/main/java/com/contentful/java/api/Utils.java
+++ b/src/main/java/com/contentful/java/api/Utils.java
@@ -22,6 +22,8 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Properties;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URIBuilder;
/**
* SDK utilities
@@ -131,5 +133,23 @@ static String getFromProperties(String field) throws IOException {
properties.load(Utils.class.getClassLoader().getResourceAsStream(SDK_PROPERTIES));
return properties.getProperty(field);
}
+
+ static String getQueryParamFromUrl(String url, String param) {
+ String result = null;
+
+ try {
+ URIBuilder builder = new URIBuilder(url);
+ for (NameValuePair pair : builder.getQueryParams()) {
+ if (pair.getName().equalsIgnoreCase(param)) {
+ result = pair.getValue();
+ break;
+ }
+ }
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ }
+
+ return result;
+ }
}
diff --git a/src/main/java/com/contentful/java/model/CDASyncedSpace.java b/src/main/java/com/contentful/java/model/CDASyncedSpace.java
index b557b8ca..0ccfb939 100644
--- a/src/main/java/com/contentful/java/model/CDASyncedSpace.java
+++ b/src/main/java/com/contentful/java/model/CDASyncedSpace.java
@@ -26,24 +26,28 @@
public class CDASyncedSpace extends ArrayResource {
private ArrayList items;
private String nextSyncUrl;
+ private String nextPageUrl;
public ArrayList getItems() {
return items;
}
/**
- * Gets the next sync URL.
- *
- * @return String representing the next sync URL for this Space.
+ * Returns the next sync URL.
*/
public String getNextSyncUrl() {
return nextSyncUrl;
}
/**
- * Gets the sync token from this Space's {@code nextSyncUrl} value.
- *
- * @return String representing the next token to be used for the next sync request.
+ * Returns the next page URL.
+ */
+ public String getNextPageUrl() {
+ return nextPageUrl;
+ }
+
+ /**
+ * Returns the sync token from this Space's {@code nextSyncUrl} value.
*/
public String getSyncToken() {
if (nextSyncUrl == null) {
diff --git a/src/test/java/com/contentful/java/ClientTest.java b/src/test/java/com/contentful/java/ClientTest.java
index 94554bfe..3fe25daf 100644
--- a/src/test/java/com/contentful/java/ClientTest.java
+++ b/src/test/java/com/contentful/java/ClientTest.java
@@ -65,36 +65,30 @@ public class ClientTest extends AbsTestCase {
assertTrue(cat.getBestFriend().getBestFriend() == cat);
}
- @Test(expected = TestException.class)
- public void testCustomErrorHandler() throws Exception {
+ @Test(expected = TestException.class) public void testCustomErrorHandler() throws Exception {
TestClientFactory.newInstance().setClient(new Client() {
- @Override
- public Response execute(Request request) throws IOException {
+ @Override public Response execute(Request request) throws IOException {
throw new RuntimeException();
}
}).setErrorHandler(new ErrorHandler() {
- @Override
- public Throwable handleError(RetrofitError retrofitError) {
+ @Override public Throwable handleError(RetrofitError retrofitError) {
return new TestException();
}
}).build().fetchSpaceBlocking();
}
- @Test(expected = RetrofitError.class)
- public void testSynchronousException() throws Exception {
+ @Test(expected = RetrofitError.class) public void testSynchronousException() throws Exception {
CDAClient client =
TestClientFactory.newInstance().setAccessToken("error").setSpaceKey("error").build();
client.fetchEntriesBlocking();
}
- @SuppressWarnings("unchecked")
- @Test public void testNoSSL() throws Exception {
+ @SuppressWarnings("unchecked") @Test public void testNoSSL() throws Exception {
final Boolean[] res = new Boolean[] { null };
CDAClient client = TestClientFactory.newInstance().noSSL().setClient(new Client() {
- @Override
- public Response execute(Request request) throws IOException {
+ @Override public Response execute(Request request) throws IOException {
URI uri = URI.create(request.getUrl());
res[0] = Constants.SCHEME_HTTP.equalsIgnoreCase(uri.getScheme());