Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure successful operations are correct if second search phase is fast #5713

Merged
merged 2 commits into from Apr 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -78,7 +78,7 @@ protected void moveToSecondPhase() throws Exception {
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -158,7 +158,7 @@ void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, in
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
Expand All @@ -185,7 +185,7 @@ void innerFinishHim() throws Exception {
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -168,7 +168,7 @@ void onQueryFailure(Throwable t, QuerySearchRequest querySearchRequest, int shar
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successulOps.decrementAndGet();
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
}
Expand Down Expand Up @@ -272,7 +272,7 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
Expand All @@ -298,7 +298,7 @@ void innerFinishHim() throws Exception {
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -91,7 +91,7 @@ private void innerFinishHim() throws IOException {
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -172,7 +172,7 @@ void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shar
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
this.addShardFailure(shardIndex, shardTarget, t);
successulOps.decrementAndGet();
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
Expand All @@ -198,7 +198,7 @@ void innerFinishHim() throws Exception {
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -75,7 +75,7 @@ protected void moveToSecondPhase() throws Exception {
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildTookInMillis(), buildShardFailures()));
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
}
Expand Up @@ -89,7 +89,7 @@ protected abstract class BaseAsyncAction<FirstResult extends SearchPhaseResult>
protected final int expectedSuccessfulOps;
private final int expectedTotalOps;

protected final AtomicInteger successulOps = new AtomicInteger();
protected final AtomicInteger successfulOps = new AtomicInteger();
private final AtomicInteger totalOps = new AtomicInteger();

protected final AtomicArray<FirstResult> firstResults;
Expand Down Expand Up @@ -246,11 +246,13 @@ public void onFailure(Throwable t) {
void onFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result, ShardIterator shardIt) {
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
processFirstPhaseResult(shardIndex, shard, result);

// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
successfulOps.incrementAndGet();
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
successulOps.incrementAndGet();
final int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
if (xTotalOps == expectedTotalOps) {
try {
innerMoveToSecondPhase();
Expand Down Expand Up @@ -281,7 +283,7 @@ void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nul
logger.trace("{}: Failed to execute [{}]", t, shard, request);
}
}
if (successulOps.get() == 0) {
if (successfulOps.get() == 0) {
if (logger.isDebugEnabled()) {
logger.debug("All shards failed for phase: [{}]", firstPhaseName(), t);
}
Expand Down