Skip to content

Commit

Permalink
0.18.7 Failure exception while executing a valid query after an inval…
Browse files Browse the repository at this point in the history
…id query, closes elastic#1617.
  • Loading branch information
kimchy committed Jan 18, 2012
1 parent f3759e9 commit 2e0c7a5
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 64 deletions.
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.search.type;

import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -37,8 +36,6 @@
*/
public class TransportSearchCache {

private final Queue<Collection<ShardSearchFailure>> cacheShardFailures = new LinkedTransferQueue<Collection<ShardSearchFailure>>();

private final Queue<Collection<DfsSearchResult>> cacheDfsResults = new LinkedTransferQueue<Collection<DfsSearchResult>>();

private final Queue<Map<SearchShardTarget, QuerySearchResultProvider>> cacheQueryResults = new LinkedTransferQueue<Map<SearchShardTarget, QuerySearchResultProvider>>();
Expand All @@ -48,19 +45,6 @@ public class TransportSearchCache {
private final Queue<Map<SearchShardTarget, QueryFetchSearchResult>> cacheQueryFetchResults = new LinkedTransferQueue<Map<SearchShardTarget, QueryFetchSearchResult>>();


public Collection<ShardSearchFailure> obtainShardFailures() {
Collection<ShardSearchFailure> shardFailures;
while ((shardFailures = cacheShardFailures.poll()) == null) {
cacheShardFailures.offer(new LinkedTransferQueue<ShardSearchFailure>());
}
return shardFailures;
}

public void releaseShardFailures(Collection<ShardSearchFailure> shardFailures) {
shardFailures.clear();
cacheShardFailures.offer(shardFailures);
}

public Collection<DfsSearchResult> obtainDfsResults() {
Collection<DfsSearchResult> dfsSearchResults;
while ((dfsSearchResults = cacheDfsResults.poll()) == null) {
Expand Down
Expand Up @@ -147,7 +147,7 @@ void executeSecondPhase(final DfsSearchResult dfsResult, final AtomicInteger cou
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
Expand Up @@ -154,7 +154,7 @@ void executeQuery(final DfsSearchResult dfsResult, final AtomicInteger counter,
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
Expand Down Expand Up @@ -241,7 +241,7 @@ void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger count
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
Expand All @@ -47,20 +46,6 @@
public abstract class TransportSearchHelper {


/**
* Builds the shard failures, and releases the cache (meaning this should only be called once!).
*/
public static ShardSearchFailure[] buildShardFailures(Collection<ShardSearchFailure> shardFailures, TransportSearchCache searchCache) {
ShardSearchFailure[] ret;
if (shardFailures.isEmpty()) {
ret = ShardSearchFailure.EMPTY_ARRAY;
} else {
ret = shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}
searchCache.releaseShardFailures(shardFailures);
return ret;
}

public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType());
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());
Expand Down
Expand Up @@ -155,7 +155,7 @@ void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger count
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
AsyncAction.this.shardFailures.add(new ShardSearchFailure(t));
AsyncAction.this.addShardFailure(new ShardSearchFailure(t));
successulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
Expand Up @@ -20,14 +20,20 @@
package org.elasticsearch.action.search.type;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand All @@ -37,7 +43,6 @@
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -83,7 +88,7 @@ private class AsyncAction {

private final DiscoveryNodes nodes;

protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;

private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();

Expand All @@ -102,6 +107,23 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
this.counter = new AtomicInteger(scrollId.context().length);
}

protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}

// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}

public void start() {
if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
Expand Down Expand Up @@ -187,7 +209,7 @@ private void executePhase(DiscoveryNode node, final long searchId) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
shardFailures.add(new ShardSearchFailure(t));
addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
Expand All @@ -200,7 +222,7 @@ private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

Expand All @@ -213,7 +235,7 @@ private void innerFinishHim() {
}
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
System.currentTimeMillis() - startTime, buildShardFailures()));
}
}
}
Expand Up @@ -20,7 +20,12 @@
package org.elasticsearch.action.search.type;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -29,6 +34,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.trove.ExtTIntArrayList;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand All @@ -41,7 +47,6 @@
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -87,7 +92,7 @@ private class AsyncAction {

private final DiscoveryNodes nodes;

protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;

private final Map<SearchShardTarget, QuerySearchResultProvider> queryResults = searchCache.obtainQueryResults();

Expand All @@ -107,6 +112,23 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
this.successfulOps = new AtomicInteger(scrollId.context().length);
}

protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}

// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}

public void start() {
if (scrollId.context().length == 0) {
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null));
Expand Down Expand Up @@ -179,7 +201,7 @@ private void executeQueryPhase(final AtomicInteger counter, DiscoveryNode node,
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
shardFailures.add(new ShardSearchFailure(t));
addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
executeFetchPhase();
Expand Down Expand Up @@ -229,7 +251,7 @@ private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)));
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
}

Expand All @@ -240,7 +262,7 @@ private void innerFinishHim() {
scrollId = request.scrollId();
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
System.currentTimeMillis() - startTime, buildShardFailures()));
searchCache.releaseQueryResults(queryResults);
searchCache.releaseFetchResults(fetchResults);
}
Expand Down
Expand Up @@ -21,14 +21,19 @@

import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand All @@ -41,7 +46,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -87,7 +91,7 @@ private class AsyncAction {

private final DiscoveryNodes nodes;

protected final Collection<ShardSearchFailure> shardFailures = searchCache.obtainShardFailures();
private volatile LinkedTransferQueue<ShardSearchFailure> shardFailures;

private final Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = searchCache.obtainQueryFetchResults();

Expand All @@ -106,11 +110,28 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action
this.counter = new AtomicInteger(scrollId.context().length);
}

protected final ShardSearchFailure[] buildShardFailures() {
LinkedTransferQueue<ShardSearchFailure> localFailures = shardFailures;
if (localFailures == null) {
return ShardSearchFailure.EMPTY_ARRAY;
}
return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY);
}

// we do our best to return the shard failures, but its ok if its not fully concurrently safe
// we simply try and return as much as possible
protected final void addShardFailure(ShardSearchFailure failure) {
if (shardFailures == null) {
shardFailures = new LinkedTransferQueue<ShardSearchFailure>();
}
shardFailures.add(failure);
}

public void start() {
if (scrollId.context().length == 0) {
final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false);
searchCache.releaseQueryFetchResults(queryFetchResults);
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache)));
listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures()));
return;
}

Expand Down Expand Up @@ -193,7 +214,7 @@ private void executePhase(DiscoveryNode node, final long searchId) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, searchId);
}
shardFailures.add(new ShardSearchFailure(t));
addShardFailure(new ShardSearchFailure(t));
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
Expand All @@ -206,7 +227,7 @@ private void finishHim() {
try {
innerFinishHim();
} catch (Exception e) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache));
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
Expand Down Expand Up @@ -246,7 +267,7 @@ private void innerFinishHim() throws IOException {
scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(),
System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache)));
System.currentTimeMillis() - startTime, buildShardFailures()));
}
}
}

0 comments on commit 2e0c7a5

Please sign in to comment.