Skip to content

Commit

Permalink
Revert "Cut over from SearchContext to ReaderContext (#51282)"
Browse files Browse the repository at this point in the history
This reverts commit 3ce9a3b.
  • Loading branch information
dnhatn committed Feb 16, 2020
1 parent d2c651e commit e6a5bec
Show file tree
Hide file tree
Showing 30 changed files with 682 additions and 541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,12 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,13 @@ public void writeTo(StreamOutput out) throws IOException {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeReaderContext(request.id());
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
(request, channel, task) -> {
boolean freed = searchService.freeReaderContext(request.id());
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,11 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
try {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
} finally {
clearReleasables(Lifetime.COLLECTION);
}

TopDocs td = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -148,25 +146,25 @@ private StatsHolder groupStats(String group) {
}

@Override
public void onNewReaderContext(ReaderContext readerContext) {
public void onNewContext(SearchContext context) {
openContexts.inc();
}

@Override
public void onFreeReaderContext(ReaderContext readerContext) {
public void onFreeContext(SearchContext context) {
openContexts.dec();
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(SearchContext context) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(SearchContext context) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - context.getOriginNanoTime()));
}

static final class StatsHolder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -78,34 +76,34 @@ default void onFailedFetchPhase(SearchContext searchContext) {}
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {}

/**
* Executed when a new reader context was created
* @param readerContext the created context
* Executed when a new search context was created
* @param context the created context
*/
default void onNewReaderContext(ReaderContext readerContext) {}
default void onNewContext(SearchContext context) {}

/**
* Executed when a previously created reader context is freed.
* Executed when a previously created search context is freed.
* This happens either when the search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param readerContext the freed reader context
* @param context the freed search context
*/
default void onFreeReaderContext(ReaderContext readerContext) {}
default void onFreeContext(SearchContext context) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param scrollContext the created search context
* @param context the created search context
*/
default void onNewScrollContext(ScrollContext scrollContext) {}
default void onNewScrollContext(SearchContext context) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param scrollContext the freed search context
* @param context the freed search context
*/
default void onFreeScrollContext(ScrollContext scrollContext) {}
default void onFreeScrollContext(SearchContext context) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
Expand All @@ -116,14 +114,6 @@ default void onFreeScrollContext(ScrollContext scrollContext) {}
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
* this method to release resources used by the search context.
*/
default void onFreeSearchContext(SearchContext context) {

}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -203,43 +193,43 @@ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
}

@Override
public void onNewReaderContext(ReaderContext readerContext) {
public void onNewContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewReaderContext(readerContext);
listener.onNewContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeReaderContext(ReaderContext readerContext) {
public void onFreeContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeReaderContext(readerContext);
listener.onFreeContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(scrollContext);
listener.onNewScrollContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(scrollContext);
listener.onFreeScrollContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
Expand All @@ -258,16 +248,5 @@ public void validateSearchContext(SearchContext context, TransportRequest reques
}
ExceptionsHelper.reThrowIfNotNull(exception);
}

@Override
public void onFreeSearchContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeSearchContext(context);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeSearchContext listener [{}] failed", listener), e);
}
}
}
}
}
Loading

0 comments on commit e6a5bec

Please sign in to comment.