Skip to content

Commit

Permalink
JCBC-910: Add includeDocs variant to force order of get items
Browse files Browse the repository at this point in the history
Motivation
----------
View query result order can be randomly mixed when includeDocs is true,
if one of the documents takes longer to be retrieved (because of the
efficient but unordered flatMap).

Sometimes it should be consistent, even if that means a slight overhead
on performance or memory.

Modifications
-------------
If includeDocs isn't used, concatMap can be used instead of flatMap (the
ordering is correct and the impact is negligible).

Added includeDocsOrdered variants that will enforce the row order. This
uses the experimental concatMapEager operator that fires all get request
immediately (instead of serially with a concatMap) and then buffers out
of order responses to drain them in correct order as soon as it becomes
possible.

This usage is flagged through the 'isOrderRetained()' method.

Added unit tests that validate this behavior.

Result
------
A user can better tune view query result order consistency.

Change-Id: I3ca3998f13d40051297c228aa64661c785e2a5d1
Reviewed-on: http://review.couchbase.org/59250
Reviewed-by: Simon Baslé <simon@couchbase.com>
Tested-by: Simon Baslé <simon@couchbase.com>
Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
great authored and simonbasle committed Feb 5, 2016
1 parent d556ad8 commit cb6097d
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 20 deletions.
73 changes: 73 additions & 0 deletions src/main/java/com/couchbase/client/java/view/ViewQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ViewQuery implements Serializable {

private boolean development;
private boolean includeDocs;
private boolean retainOrder;
private Class<? extends Document<?>> includeDocsTarget;
private String keysJson;

Expand All @@ -79,6 +80,7 @@ private ViewQuery(String design, String view) {
this.view = view;
params = new String[NUM_PARAMS * 2];
includeDocs = false;
retainOrder = false;
includeDocsTarget = null;
}

Expand All @@ -102,10 +104,61 @@ public ViewQuery development(boolean development) {
return this;
}

/**
* Proactively load the full document for the row returned, while strictly retaining view row order.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocsOrdered() {
return includeDocsOrdered(true, JsonDocument.class);
}

/**
* Proactively load the full document for the row returned, while strictly retaining view row order.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
* @param target the custom document type target.
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocsOrdered(Class<? extends Document<?>> target) {
return includeDocsOrdered(true, target);
}

/**
* Proactively load the full document for the row returned, while strictly retaining view row order.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
* @param includeDocs if it should be enabled or not.
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocsOrdered(boolean includeDocs) {
return includeDocsOrdered(includeDocs, JsonDocument.class);
}

/**
* Proactively load the full document for the row returned, while strictly retaining view row order.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
* @param includeDocs if it should be enabled or not.
* @param target the custom document type target.
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocsOrdered(boolean includeDocs, Class<? extends Document<?>> target) {
this.includeDocs = includeDocs;
this.retainOrder = includeDocs; //deactivate if includeDocs is deactivated
this.includeDocsTarget = target;
return this;
}

/**
* Proactively load the full document for the row returned.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
*
* The order could be changed if one of the document takes longer to load than the others
* (see {@link #includeDocsOrdered()} )} if you want to enforce the row order).
*
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocs() {
Expand All @@ -116,6 +169,10 @@ public ViewQuery includeDocs() {
* Proactively load the full document for the row returned.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
*
* The order could be changed if one of the document takes longer to load than the others
* (see {@link #includeDocsOrdered(Class)} )} if you want to enforce the row order).
*
* @param target the custom document type target.
* @return the {@link ViewQuery} DSL.
*/
Expand All @@ -127,6 +184,10 @@ public ViewQuery includeDocs(Class<? extends Document<?>> target) {
* Proactively load the full document for the row returned.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
*
* The order could be changed if one of the document takes longer to load than the others
* (see {@link #includeDocsOrdered(boolean)} if you want to enforce the row order).
*
* @param includeDocs if it should be enabled or not.
* @return the {@link ViewQuery} DSL.
*/
Expand All @@ -138,12 +199,17 @@ public ViewQuery includeDocs(boolean includeDocs) {
* Proactively load the full document for the row returned.
*
* This only works if reduce is false, since with reduce the original document ID is not included anymore.
*
* The order could be changed if one of the document takes longer to load than the others
* (see {@link #includeDocsOrdered(boolean, Class)} if you want to enforce the row order).
*
* @param includeDocs if it should be enabled or not.
* @param target the custom document type target.
* @return the {@link ViewQuery} DSL.
*/
public ViewQuery includeDocs(boolean includeDocs, Class<? extends Document<?>> target) {
this.includeDocs = includeDocs;
this.retainOrder = false;
this.includeDocsTarget = target;
return this;
}
Expand Down Expand Up @@ -513,6 +579,13 @@ public String getKeys() {
return this.keysJson;
}

/**
* @return true if row order, as returned by the view, should be kept while also {@link #includeDocs() including docs}.
*/
boolean isOrderRetained() {
return this.retainOrder;
}

public boolean isDevelopment() {
return development;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,15 @@ public AsyncViewResult call(final JsonObject jsonInfo) {
Observable<AsyncViewRow> rows = response
.rows()
.map(new ByteBufToJsonObject())
.flatMap(new Func1<JsonObject, Observable<AsyncViewRow>>() {
.compose(new Observable.Transformer<JsonObject, AsyncViewRow>() {
@Override
public Observable<AsyncViewRow> call(final JsonObject row) {
final String id = row.getString("id");

if (query.isIncludeDocs()) {
return bucket.get(id, query.includeDocsTarget()).map(new Func1<Document<?>, AsyncViewRow>() {
@Override
public AsyncViewRow call(Document<?> document) {
return new DefaultAsyncViewRow(bucket, id, row.get("key"), row.get("value"), document);
}
});
public Observable<AsyncViewRow> call(Observable<JsonObject> observable) {
if (!query.isIncludeDocs()) {
return observable.concatMap(buildAsyncViewRow());
} else if (query.isOrderRetained()) {
return observable.concatMapEager(buildAsyncViewRow());
} else {
return Observable.just((AsyncViewRow)
new DefaultAsyncViewRow(bucket, id, row.get("key"), row.get("value"), null)
);
return observable.flatMap(buildAsyncViewRow());
}
}
});
Expand All @@ -254,6 +247,28 @@ public JsonObject call(String input) {
return new DefaultAsyncViewResult(rows, totalRows, success, error, debug);
}

private Func1<JsonObject, Observable<AsyncViewRow>> buildAsyncViewRow() {
return new Func1<JsonObject, Observable<AsyncViewRow>>() {
@Override
public Observable<AsyncViewRow> call(final JsonObject row) {
final String id = row.getString("id");

if (query.isIncludeDocs()) {
return bucket.get(id, query.includeDocsTarget()).map(new Func1<Document<?>, AsyncViewRow>() {
@Override
public AsyncViewRow call(Document<?> document) {
return new DefaultAsyncViewRow(bucket, id, row.get("key"), row.get("value"), document);
}
});
} else {
return Observable.just((AsyncViewRow)
new DefaultAsyncViewRow(bucket, id, row.get("key"), row.get("value"), null)
);
}
}
};
}

}

}
177 changes: 171 additions & 6 deletions src/test/java/com/couchbase/client/java/view/ViewQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,44 @@
*/
package com.couchbase.client.java.view;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.view.ViewQueryResponse;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseBucket;
import com.couchbase.client.java.SerializationHelper;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import org.junit.Test;

import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

/**
* Verifies the correct functionality of the {@link ViewQuery} DSL.
Expand Down Expand Up @@ -311,4 +337,143 @@ public void shouldStoreKeysAsJsonOutsideParams() {
assertFalse(query.toString().contains("keys="));
assertFalse(query.toString().contains("3"));
}

@Test
public void shouldFlagOrderRetainedWhenUsingIncludeDocsOrdered() {
ViewQuery query1 = ViewQuery.from("a", "b").includeDocsOrdered();
ViewQuery query2 = ViewQuery.from("a", "b").includeDocsOrdered(true);
ViewQuery query3 = ViewQuery.from("a", "b").includeDocsOrdered(JsonDocument.class);
ViewQuery query4 = ViewQuery.from("a", "b").includeDocsOrdered(true, JsonDocument.class);

assertEquals(true, query1.isOrderRetained());
assertEquals(true, query2.isOrderRetained());
assertEquals(true, query3.isOrderRetained());
assertEquals(true, query4.isOrderRetained());
}


@Test
public void shouldDeactivateOrderRetainedWhenSettingIncludeDocsOrderedToFalse() {
ViewQuery query1 = ViewQuery.from("a", "b").includeDocsOrdered();
assertEquals(true, query1.isOrderRetained());
query1.includeDocsOrdered(false);
assertEquals(false, query1.isOrderRetained());

ViewQuery query2 = ViewQuery.from("a", "b").includeDocsOrdered(true, JsonDocument.class);
assertEquals(true, query2.isOrderRetained());
query2.includeDocsOrdered(false, JsonDocument.class);
assertEquals(false, query2.isOrderRetained());
}


@Test
public void shouldLoadDocumentsOutOfOrderWithIncludeDocs() {
StringBuilder trace = new StringBuilder();
Bucket bucket = mockDelayedBucket(2, trace, "A", "B", "C", "D");
ViewResult result = bucket.query(ViewQuery.from("any", "view")
.includeDocs());

//to assert reception is out of order
String[] expected = new String[]{"C", "D", "A", "B"};
//to assert requests are in order, emissions are out of order (A and B delayed)
String expectedTrace = "\nGET A\nGET B\nGET C\nGot C\nGET D\nGot D\nDelayed A by 100ms\nGot A\nDelayed B by 200ms\nGot B";

assertOrder(expected, expectedTrace, result.allRows(), trace.toString());
}

@Test
public void shouldLoadDocumentsInOrderWithIncludeDocsOrdered() {
StringBuilder trace = new StringBuilder();
Bucket bucket = mockDelayedBucket(2, trace, "A", "B", "C", "D");
ViewResult result = bucket.query(ViewQuery.from("any", "view")
.includeDocsOrdered());

//to assert reception is in order
String[] expectedIds = new String[]{"A", "B", "C", "D"};
//to assert requests are in order and emissions are out of order (A and B delayed)
String expectedTrace = "\nGET A\nGET B\nGET C\nGot C\nGET D\nGot D\nDelayed A by 100ms\nGot A\nDelayed B by 200ms\nGot B";
assertOrder(expectedIds, expectedTrace, result.allRows(), trace.toString());
}

private void assertOrder(String[] expectedIds, String expectedTrace, List<ViewRow> rows, String trace) {
for (int i = 0; i < rows.size(); i++) {
ViewRow row = rows.get(i);
assertNotNull(row);
JsonDocument doc = row.document();
assertEquals(row.id(), doc.id());
assertEquals(expectedIds[i], row.id());
}

assertEquals(expectedTrace, trace);
}

private Bucket mockDelayedBucket(final int numberDelayed, final StringBuilder trace, final String... keys) {
final Set<String> delayed = new HashSet<String>(numberDelayed);
delayed.addAll(Arrays.asList(keys).subList(0, numberDelayed));

List<ByteBuf> fakeRows = new ArrayList<ByteBuf>(keys.length);
for (String key : keys) {
String fakeRowJson = JsonObject.create()
.put("id", key)
.toString();
ByteBuf fakeBuffer = Unpooled.copiedBuffer(fakeRowJson, CharsetUtil.UTF_8);
fakeRows.add(fakeBuffer);
}
final Observable fakeRowObs = Observable.from(fakeRows);
final AtomicInteger delay = new AtomicInteger(100);

final AsyncBucket spyBucket = Mockito.mock(AsyncBucket.class);

//this will induce a delay on the first n keys when includeDocs' get is triggered, and trace the invocations
when(spyBucket.get(Matchers.anyString(), any(Class.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
String key = (String) invocation.getArguments()[0];
Observable<JsonDocument> obs = Observable.just(JsonDocument.create(key))
.doOnNext(new Action1<JsonDocument>() {
@Override
public void call(JsonDocument jsonDocument) {
trace.append("\nGET ").append(jsonDocument.id());
}
});
if (delayed.contains(key)) {
final int d = delay.getAndAdd(100);
obs = obs.delay(d, TimeUnit.MILLISECONDS)
.doOnNext(new Action1<JsonDocument>() {
@Override
public void call(JsonDocument jsonDocument) {
trace.append("\nDelayed ").append(jsonDocument.id()).append(" by ").append(d).append("ms");
}
});
}
return obs.doOnNext(new Action1<JsonDocument>() {
@Override
public void call(JsonDocument jsonDocument) {
trace.append("\nGot ").append(jsonDocument.id());
}
});
}
});

//this simulates a view response with the preconstructed buffers above, and calls the view result
//mapper so that it uses the mock get for its includeDocs calls.
when(spyBucket.query(any(ViewQuery.class))).thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final ViewQuery query = (ViewQuery) invocation.getArguments()[0];

ViewQueryResponse response = new ViewQueryResponse(fakeRowObs, Observable.<ByteBuf>empty(),
Observable.<String>empty(), 0, "", ResponseStatus.SUCCESS, null);

return Observable.just(response)
.flatMap(new Func1<ViewQueryResponse, Observable<AsyncViewResult>>() {
@Override
public Observable<AsyncViewResult> call(final ViewQueryResponse response) {
return ViewQueryResponseMapper.mapToViewResult(spyBucket, query, response);
}
});
}
});
return new CouchbaseBucket(spyBucket, DefaultCouchbaseEnvironment.create(), null, "", "");
}
}

0 comments on commit cb6097d

Please sign in to comment.