Skip to content

Commit

Permalink
Fail shard if search execution uncovers corruption
Browse files Browse the repository at this point in the history
If, as part of the search execution, a corruption is uncovered, we should fail the shard
relates to #11419
  • Loading branch information
kimchy committed Jun 2, 2015
1 parent 0b57f46 commit 740fe48
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -233,7 +234,7 @@ public DfsSearchResult executeDfsPhase(ShardSearchRequest request) {
return context.dfsResult();
} catch (Throwable e) {
logger.trace("Dfs phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -263,7 +264,7 @@ public QuerySearchResult executeScan(ShardSearchRequest request) {
return context.queryResult();
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
context.size(originalSize);
Expand Down Expand Up @@ -292,7 +293,7 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Scan phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -336,7 +337,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) {
}
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand All @@ -357,7 +358,7 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand All @@ -372,7 +373,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
context.searcher().dfSource(new CachedDfSource(context.searcher().getIndexReader(), request.dfs(), context.similarityService().similarity(),
indexCache.filter(), indexCache.filterPolicy()));
} catch (Throwable e) {
freeContext(context.id());
processFailure(context, e);
cleanContext(context);
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
}
Expand All @@ -391,7 +392,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -429,7 +430,7 @@ public QueryFetchSearchResult executeFetchPhase(ShardSearchRequest request) {
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -476,7 +477,7 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) {
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -515,7 +516,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
freeContext(context.id());
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -543,7 +544,7 @@ public FetchSearchResult executeFetchPhase(ShardFetchRequest request) {
} catch (Throwable e) {
context.indexShard().searchService().onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
freeContext(context.id()); // we just try to make sure this is freed - rethrow orig exception.
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
} finally {
cleanContext(context);
Expand Down Expand Up @@ -668,6 +669,17 @@ private void cleanContext(SearchContext context) {
SearchContext.removeCurrent();
}

private void processFailure(SearchContext context, Throwable t) {
freeContext(context.id());
try {
if (Lucene.isCorruptionException(t)) {
context.indexShard().failShard("search execution corruption failure", t);
}
} catch (Throwable e) {
logger.warn("failed to process shard failure to (potentially) send back shard failure on corruption", e);
}
}

private void parseTemplate(ShardSearchRequest request) {

final ExecutableScript executable;
Expand Down

0 comments on commit 740fe48

Please sign in to comment.