Skip to content

Commit

Permalink
Remove lifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jan 27, 2020
1 parent b6e1f6f commit 34ed39b
Show file tree
Hide file tree
Showing 12 changed files with 19 additions and 80 deletions.
Expand Up @@ -158,12 +158,8 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Expand Up @@ -421,11 +421,7 @@ public TopDocsAndMaxScore[] topDocs(SearchHit[] hits) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
} finally {
clearReleasables(Lifetime.COLLECTION);
}
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);

TopDocs td = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Expand Up @@ -86,7 +86,6 @@
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.query.QueryPhase;
Expand Down Expand Up @@ -564,9 +563,9 @@ final SearchContext createContext(ReaderContext reader, ShardSearchRequest reque
contextScrollKeepAlive(reader, keepAlive);
context.lowLevelCancellation(lowLevelCancellation);
// Disable timeout while searching and mark the reader as accessed when releasing the context.
context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis()), Lifetime.CONTEXT);
context.addReleasable(() -> reader.accessed(threadPool.relativeTimeInMillis()));
reader.accessed(-1L);
context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context), Lifetime.CONTEXT);
context.addReleasable(() -> reader.indexShard().getSearchOperationListener().onFreeSearchContext(context));
} catch (Exception e) {
context.close();
throw e;
Expand Down
Expand Up @@ -117,8 +117,6 @@ public void execute(SearchContext context) {
context.searcher().search(query, collector);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context.shardTarget(), "Failed to execute global aggregators", e);
} finally {
context.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
}

Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;
import org.elasticsearch.search.query.QueryPhaseExecutionException;

import java.io.IOException;
Expand Down Expand Up @@ -76,7 +75,7 @@ protected AggregatorBase(String name, AggregatorFactories factories, SearchConte
this.breakerService = context.bigArrays().breakerService();
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
this.subAggregators = factories.createSubAggregators(context, this);
context.addReleasable(this, Lifetime.CONTEXT);
context.addReleasable(this);
final SearchShardTarget shardTarget = context.shardTarget();
// Register a safeguard to highlight any invalid construction logic (call to this constructor without subsequent preCollection call)
collectableSubAggregators = new BucketCollector() {
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;

import java.io.IOException;
import java.util.List;
Expand All @@ -50,7 +49,7 @@ public static final class MultiBucketAggregatorWrapper extends Aggregator {
this.parent = parent;
this.factory = factory;
this.first = first;
context.addReleasable(this, Lifetime.CONTEXT);
context.addReleasable(this);
aggregators = bigArrays.newObjectArray(1);
aggregators.set(0, first);
collectors = bigArrays.newObjectArray(1);
Expand Down
Expand Up @@ -48,8 +48,6 @@ public void hitExecute(SearchContext context, HitContext hitContext) {
} catch (IOException e) {
throw new FetchPhaseExecutionException(context.shardTarget(),
"Failed to explain doc [" + hitContext.hit().getId() + "]", e);
} finally {
context.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
}
}
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SearchContext.Lifetime;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,8 +91,6 @@ public void hitsExecute(SearchContext context, SearchHit[] hits) {
}
} catch (IOException e) {
throw ExceptionsHelper.convertToElastic(e);
} finally {
context.clearReleasables(Lifetime.COLLECTION);
}
}
}
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -59,10 +58,9 @@
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.suggest.SuggestionSearchContext;

import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -77,7 +75,7 @@ public abstract class SearchContext implements Releasable {
public static final int TRACK_TOTAL_HITS_DISABLED = -1;
public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000;

private Map<Lifetime, List<Releasable>> clearables = null;
private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;

Expand All @@ -93,10 +91,12 @@ protected SearchContext() {

@Override
public final void close() {
try {
clearReleasables(Lifetime.CONTEXT);
} finally {
doClose();
if (closed.compareAndSet(false, true)) {
try {
Releasables.close(releasables);
} finally {
doClose();
}
}
}

Expand Down Expand Up @@ -313,38 +313,14 @@ public SearchLookup lookup() {
*/
public abstract Profilers getProfilers();


/**
* Schedule the release of a resource. The time when {@link Releasable#close()} will be called on this object
* is function of the provided {@link Lifetime}.
* Adds a releasable that will be freed when this context is closed.
*/
public void addReleasable(Releasable releasable, Lifetime lifetime) {
if (clearables == null) {
clearables = new EnumMap<>(Lifetime.class);
}
List<Releasable> releasables = clearables.get(lifetime);
if (releasables == null) {
releasables = new ArrayList<>();
clearables.put(lifetime, releasables);
}
public void addReleasable(Releasable releasable) {
releasables.add(releasable);
}

public void clearReleasables(Lifetime lifetime) {
if (clearables != null) {
List<List<Releasable>>releasables = new ArrayList<>();
for (Lifetime lc : Lifetime.values()) {
if (lc.compareTo(lifetime) > 0) {
break;
}
List<Releasable> remove = clearables.remove(lc);
if (remove != null) {
releasables.add(remove);
}
}
Releasables.close(Iterables.flatten(releasables));
}
}

/**
* @return true if the request contains only suggest
*/
Expand All @@ -369,20 +345,6 @@ public final boolean hasOnlySuggest() {
/** Return a view of the additional query collectors that should be run for this context. */
public abstract Map<Class<?>, Collector> queryCollectors();

/**
* The life time of an object that is used during search execution.
*/
public enum Lifetime {
/**
* This life time is for objects that only live during collection time.
*/
COLLECTION,
/**
* This life time is for objects that need to live until the search context they are attached to is destroyed.
*/
CONTEXT
}

public abstract QueryShardContext getQueryShardContext();

@Override
Expand Down
Expand Up @@ -343,8 +343,6 @@ private static boolean searchWithCollector(SearchContext searchContext, ContextI
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
Expand Down Expand Up @@ -399,8 +397,6 @@ private static boolean searchWithCollectorManager(SearchContext searchContext, C
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
return false; // no rescoring when sorting by field
}
Expand Down
Expand Up @@ -63,7 +63,6 @@
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Expand Up @@ -248,7 +248,7 @@ public boolean shouldCache(Query query) {
* close their sub-aggregations. This is fairly similar to what the production code does. */
releasables.add((Releasable) invocation.getArguments()[0]);
return null;
}).when(searchContext).addReleasable(anyObject(), anyObject());
}).when(searchContext).addReleasable(anyObject());
return searchContext;
}

Expand Down

0 comments on commit 34ed39b

Please sign in to comment.