Skip to content

Commit

Permalink
JCBC-692: fix lock on creating DefaultQueryResult
Browse files Browse the repository at this point in the history
Motivation
----------
Users reported that all they get when trying to do a n1ql query is a
timeout. This comes from a race condition/deadlock that happens when
instantiating the DefaultQueryResult (blocking on each component of the
AsyncQueryResult).

Modifications
-------------
Only ever block once, in CouchbaseBucket, instead of multiple times in
the DefaultQueryResult constructor, by using a zip.

Constructor now only deals with collections or scalar types, not with
Observables.

Results
-------
Acquiring a query result synchronously should now work as expected.

Change-Id: I535d51ba9d67077fba99d209f936efdd5458802d
Reviewed-on: http://review.couchbase.org/45868
Tested-by: Simon Baslé <simon@couchbase.com>
Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
simonbasle committed Jan 26, 2015
1 parent 1ded28a commit 401f57a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 54 deletions.
51 changes: 28 additions & 23 deletions src/main/java/com/couchbase/client/java/CouchbaseBucket.java
Expand Up @@ -28,8 +28,10 @@
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.JsonLongDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.query.AsyncQueryResult;
import com.couchbase.client.java.query.AsyncQueryRow;
import com.couchbase.client.java.query.DefaultQueryResult;
import com.couchbase.client.java.query.Query;
import com.couchbase.client.java.query.QueryPlan;
Expand All @@ -45,7 +47,9 @@
import com.couchbase.client.java.view.SpatialViewResult;
import com.couchbase.client.java.view.ViewQuery;
import com.couchbase.client.java.view.ViewResult;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func4;

import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -526,34 +530,35 @@ public SpatialViewResult call(AsyncSpatialViewResult asyncSpatialViewResult) {

@Override
public QueryResult query(Statement statement, final long timeout, final TimeUnit timeUnit) {
return Blocking.blockForSingle(asyncBucket
.query(statement)
.map(new Func1<AsyncQueryResult, QueryResult>() {
@Override
public QueryResult call(AsyncQueryResult asyncQueryResult) {
return new DefaultQueryResult(asyncQueryResult.rows(), asyncQueryResult.info(),
asyncQueryResult.errors(), asyncQueryResult.finalSuccess(), asyncQueryResult.parseSuccess(),
asyncQueryResult.requestId(), asyncQueryResult.clientContextId(),
timeout, timeUnit);
}
})
.single(), timeout, timeUnit);
return query(Query.simple(statement), timeout, timeUnit);
}

@Override
public QueryResult query(Query query, final long timeout, final TimeUnit timeUnit) {
return Blocking.blockForSingle(asyncBucket
.query(query)
.map(new Func1<AsyncQueryResult, QueryResult>() {
@Override
public QueryResult call(AsyncQueryResult asyncQueryResult) {
return new DefaultQueryResult(asyncQueryResult.rows(), asyncQueryResult.info(),
asyncQueryResult.errors(), asyncQueryResult.finalSuccess(), asyncQueryResult.parseSuccess(),
asyncQueryResult.requestId(), asyncQueryResult.clientContextId(),
timeout, timeUnit);
}
})
.single(), timeout, timeUnit);
.query(query)
.flatMap(new Func1<AsyncQueryResult, Observable<QueryResult>>() {
@Override
public Observable<QueryResult> call(AsyncQueryResult aqr) {
final boolean parseSuccess = aqr.parseSuccess();
final String requestId = aqr.requestId();
final String clientContextId = aqr.clientContextId();

return Observable.zip(aqr.rows().toList(),
aqr.info().singleOrDefault(JsonObject.empty()),
aqr.errors().toList(),
aqr.finalSuccess().singleOrDefault(Boolean.FALSE),
new Func4<List<AsyncQueryRow>, JsonObject, List<JsonObject>, Boolean, QueryResult>() {
@Override
public QueryResult call(List<AsyncQueryRow> rows, JsonObject info,
List<JsonObject> errors, Boolean finalSuccess) {
return new DefaultQueryResult(rows, info, errors, finalSuccess, parseSuccess,
requestId, clientContextId);
}
});
}
})
.single(), timeout, timeUnit);
}

@Override
Expand Down
@@ -1,13 +1,10 @@
package com.couchbase.client.java.query;

import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.util.Blocking;
import rx.Observable;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.couchbase.client.java.document.json.JsonObject;

public class DefaultQueryResult implements QueryResult {

Expand All @@ -23,38 +20,27 @@ public class DefaultQueryResult implements QueryResult {
/**
* Create a default blocking representation of a query result.
*
* @param rows the async view of rows.
* @param info the async view of metrics.
* @param errors the async view of errors and warnings.
* @param rows the list of rows.
* @param info the metrics.
* @param errors the list of errors and warnings.
* @param finalSuccess the definitive (but potentially delayed) result of the query.
* @param parseSuccess the intermediate result of the query
* @param timeout the maximum time allowed for all components of the result to be retrieved (global timeout).
* @param timeUnit the unit for timeout.
*/
public DefaultQueryResult(Observable<AsyncQueryRow> rows,
Observable<JsonObject> info, Observable<JsonObject> errors,
Observable<Boolean> finalSuccess, boolean parseSuccess,
String requestId, String clientContextId,
long timeout, TimeUnit timeUnit) {
public DefaultQueryResult(List<AsyncQueryRow> rows,
JsonObject info, List<JsonObject> errors,
Boolean finalSuccess, boolean parseSuccess,
String requestId, String clientContextId) {

this.requestId = requestId;
this.clientContextId = clientContextId;
this.parseSuccess = parseSuccess;
//block on the finalSuccess item, ensuring streamed section of the result is finished
this.finalSuccess = Blocking.blockForSingle(finalSuccess, timeout, timeUnit);

//since we have the final status, other streams should be instantaneous
this.allRows = Blocking.blockForSingle(rows
.map(new Func1<AsyncQueryRow, QueryRow>() {
@Override
public QueryRow call(AsyncQueryRow asyncQueryRow) {
return new DefaultQueryRow(asyncQueryRow.value());
}
})
.toList(), 1, TimeUnit.SECONDS);

this.errors = Blocking.blockForSingle(errors.toList(), 1, TimeUnit.SECONDS);
this.info = Blocking.blockForSingle(info.singleOrDefault(JsonObject.empty()), 1, TimeUnit.SECONDS);
this.finalSuccess = finalSuccess != null && finalSuccess;
this.allRows = new ArrayList<QueryRow>(rows.size());
for (AsyncQueryRow row : rows) {
this.allRows.add(new DefaultQueryRow(row.value()));
}
this.errors = errors;
this.info = info;
}

@Override
Expand Down

0 comments on commit 401f57a

Please sign in to comment.