Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cut over from SearchContext to ReaderContext #51282

Merged
merged 5 commits into from Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -158,12 +158,8 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Expand Up @@ -286,13 +286,13 @@ public void writeTo(StreamOutput out) throws IOException {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
boolean freed = searchService.freeReaderContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
boolean freed = searchService.freeReaderContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
Expand Down
Expand Up @@ -421,11 +421,7 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
} finally {
clearReleasables(Lifetime.COLLECTION);
}
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);

TopDocs td = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -146,25 +148,25 @@ private StatsHolder groupStats(String group) {
}

@Override
public void onNewContext(SearchContext context) {
public void onNewReaderContext(ReaderContext readerContext) {
openContexts.inc();
}

@Override
public void onFreeContext(SearchContext context) {
public void onFreeReaderContext(ReaderContext readerContext) {
openContexts.dec();
}

@Override
public void onNewScrollContext(SearchContext context) {
public void onNewScrollContext(ScrollContext scrollContext) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(SearchContext context) {
public void onFreeScrollContext(ScrollContext scrollContext) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - context.getOriginNanoTime()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
}

static final class StatsHolder {
Expand Down
Expand Up @@ -21,6 +21,8 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -76,34 +78,34 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed when a new search context was created
* @param context the created context
* Executed when a new reader context was created
* @param readerContext the created context
*/
default void onNewContext(SearchContext context) {}
default void onNewReaderContext(ReaderContext readerContext) {}

/**
* Executed when a previously created search context is freed.
* Executed when a previously created reader context is freed.
* This happens either when the search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
* @param readerContext the freed reader context
*/
default void onFreeContext(SearchContext context) {}
default void onFreeReaderContext(ReaderContext readerContext) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param context the created search context
* @param scrollContext the created search context
*/
default void onNewScrollContext(SearchContext context) {}
default void onNewScrollContext(ScrollContext scrollContext) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
* @param scrollContext the freed search context
*/
default void onFreeScrollContext(SearchContext context) {}
default void onFreeScrollContext(ScrollContext scrollContext) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
Expand All @@ -114,6 +116,14 @@ default void onFreeScrollContext(SearchContext context) {}
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
* this method to release resources used by the search context.
*/
default void onFreeSearchContext(SearchContext context) {

}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -193,43 +203,43 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
}

@Override
public void onNewContext(SearchContext context) {
public void onNewReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewContext(context);
listener.onNewReaderContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeContext(SearchContext context) {
public void onFreeReaderContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeContext(context);
listener.onFreeReaderContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onNewScrollContext(SearchContext context) {
public void onNewScrollContext(ScrollContext scrollContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(context);
listener.onNewScrollContext(scrollContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(SearchContext context) {
public void onFreeScrollContext(ScrollContext scrollContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(context);
listener.onFreeScrollContext(scrollContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
Expand All @@ -248,5 +258,16 @@ public void validateSearchContext(SearchContext context, TransportRequest reques
}
ExceptionsHelper.reThrowIfNotNull(exception);
}

@Override
public void onFreeSearchContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeSearchContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeSearchContext listener [{}] failed", listener), e);
}
}
}
}
}