Skip to content

Commit

Permalink
Time series based workload desc order optimization through reverse se…
Browse files Browse the repository at this point in the history
…gment read (opensearch-project#7244)

This commit changes the EngineConfig for timeseries indexes only (e.g., indexes that use 
the @timestamp metadata field) so that a descending LeafSorter comparator is used to 
visit segments in order of most newest to oldest. For the more infrequent case that a user 
chooses to sort query results by ASC time, this would cause a search regression so the 
ContextIndexSearcher is updated to inspect the sort order from the search request and 
reverse the comparator so segments are visited in ascending order. LeafSorter behavior 
for non-timeseries indexes is left the same. 

Signed-off-by: gashutos <gashutos@amazon.com>
Signed-off-by: Chaitanya Gohel <104654647+gashutos@users.noreply.github.com>
  • Loading branch information
gashutos authored and austintlee committed Apr 28, 2023
1 parent 6f2eef6 commit 056429e
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- [Extensions] Moving Extensions APIs to protobuf serialization. ([#6960](https://github.com/opensearch-project/OpenSearch/pull/6960))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))

### Dependencies
- Bump `jackson` from 2.14.2 to 2.15.0 ([#7286](https://github.com/opensearch-project/OpenSearch/pull/7286)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@

package org.opensearch.cluster.metadata;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PointValues;
import org.opensearch.OpenSearchException;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.core.ParseField;
Expand All @@ -46,6 +50,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -59,6 +64,24 @@
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final String TIMESERIES_FIELDNAME = "@timestamp";
public static final Comparator<LeafReader> TIMESERIES_LEAF_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
PointValues points = r.getPointValues(TIMESERIES_FIELDNAME);
if (points != null) {
// could be a multipoint (probably not) but get the maximum time value anyway
byte[] sortValue = points.getMaxPackedValue();
// decode the first dimension because this should not be a multi dimension field
// it's a bug in the date field if it is
return LongPoint.decodeDimension(sortValue, 0);
} else {
// segment does not have a timestamp field, just return the minimum value
return Long.MIN_VALUE;
}
} catch (IOException e) {
throw new OpenSearchException("Not a timeseries Index! Field [{}] not found!", TIMESERIES_FIELDNAME);
}
}).reversed();

private final String name;
private final TimestampField timeStampField;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile boolean searchSegmentOrderReversed;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -897,6 +898,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
}

private void setSearchSegmentOrderReversed(boolean reversed) {
this.searchSegmentOrderReversed = reversed;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
this.searchIdleAfter = searchIdleAfter;
}
Expand Down Expand Up @@ -1068,6 +1073,13 @@ public Settings getNodeSettings() {
return nodeSettings;
}

/**
* Returns true if index level setting for leaf reverse order search optimization is enabled
*/
public boolean getSearchSegmentOrderReversed() {
return this.searchSegmentOrderReversed;
}

/**
* Updates the settings and index metadata and notifies all registered settings consumers with the new settings iff at least one
* setting has changed.
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -59,6 +60,7 @@
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -102,6 +104,7 @@ public final class EngineConfig {
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
private final BooleanSupplier primaryModeSupplier;
private final Comparator<LeafReader> leafSorter;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -204,6 +207,7 @@ private EngineConfig(Builder builder) {
this.isReadOnlyReplica = builder.isReadOnlyReplica;
this.primaryModeSupplier = builder.primaryModeSupplier;
this.translogFactory = builder.translogFactory;
this.leafSorter = builder.leafSorter;
}

/**
Expand Down Expand Up @@ -451,6 +455,15 @@ public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}

/**
* Returns subReaderSorter for org.apache.lucene.index.BaseCompositeReader.
* This gets used in lucene IndexReader and decides order of segment read.
* @return comparator
*/
public Comparator<LeafReader> getLeafSorter() {
return this.leafSorter;
}

/**
* Builder for EngineConfig class
*
Expand Down Expand Up @@ -483,6 +496,7 @@ public static class Builder {
private boolean isReadOnlyReplica;
private BooleanSupplier primaryModeSupplier;
private TranslogFactory translogFactory = new InternalTranslogFactory();
Comparator<LeafReader> leafSorter;

public Builder shardId(ShardId shardId) {
this.shardId = shardId;
Expand Down Expand Up @@ -614,6 +628,11 @@ public Builder translogFactory(TranslogFactory translogFactory) {
return this;
}

public Builder leafSorter(Comparator<LeafReader> leafSorter) {
this.leafSorter = leafSorter;
return this;
}

public EngineConfig build() {
return new EngineConfig(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -36,6 +37,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -151,7 +153,8 @@ public EngineConfig newEngineConfig(
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica,
BooleanSupplier primaryModeSupplier,
TranslogFactory translogFactory
TranslogFactory translogFactory,
Comparator<LeafReader> leafSorter
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -184,6 +187,7 @@ public EngineConfig newEngineConfig(
.readOnlyReplica(isReadOnlyReplica)
.primaryModeSupplier(primaryModeSupplier)
.translogFactory(translogFactory)
.leafSorter(leafSorter)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,9 @@ private IndexWriterConfig getIndexWriterConfig() {
if (config().getIndexSort() != null) {
iwc.setIndexSort(config().getIndexSort());
}
if (config().getLeafSorter() != null) {
iwc.setLeafSorter(config().getLeafSorter()); // The default segment search order
}
return iwc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.mapper;

import org.apache.lucene.analysis.Analyzer;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.analysis.FieldNameAnalyzer;

Expand Down Expand Up @@ -261,6 +262,15 @@ public String getNestedScope(String path) {
return null;
}

/**
* If this index contains @timestamp field with Date type, it will return true
* @return true or false based on above condition
*/
public boolean containsTimeStampField() {
MappedFieldType timeSeriesFieldType = this.fieldTypeLookup.get(DataStream.TIMESERIES_FIELDNAME);
return timeSeriesFieldType != null && timeSeriesFieldType instanceof DateFieldMapper.DateFieldType; // has to be Date field type
}

private static String parentObject(String field) {
int lastDot = field.lastIndexOf('.');
if (lastDot == -1) {
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.core.Assertions;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -326,6 +327,8 @@ Runnable getGlobalCheckpointSyncer() {
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

private final boolean isTimeSeriesIndex;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -441,6 +444,9 @@ public boolean shouldCache(Query query) {
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactorySupplier = translogFactorySupplier;
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3580,7 +3586,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
tombstoneDocSupplier(),
isReadOnlyReplica,
replicationTracker::isPrimaryMode,
translogFactorySupplier.apply(indexSettings, shardRouting)
translogFactorySupplier.apply(indexSettings, shardRouting),
isTimeSeriesIndex ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for timeseries
);
}

Expand Down Expand Up @@ -4594,4 +4601,12 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

/**
* If index is time series (if it contains @timestamp field)
* @return true or false based on above condition
*/
public boolean isTimeSeriesIndex() {
return this.isTimeSeriesIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.search.suggest.SuggestionSearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -210,7 +211,8 @@ final class DefaultSearchContext extends SearchContext {
engineSearcher.getQueryCache(),
engineSearcher.getQueryCachingPolicy(),
lowLevelCancellation,
executor
executor,
shouldReverseLeafReaderContexts()
);
this.relativeTimeSupplier = relativeTimeSupplier;
this.timeout = timeout;
Expand Down Expand Up @@ -885,4 +887,22 @@ public boolean isCancelled() {
public ReaderContext readerContext() {
return readerContext;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (this.indexShard.isTimeSeriesIndex()) {
// Only reverse order for asc order sort queries
if (request != null
&& request.source() != null
&& request.source().sorts() != null
&& request.source().sorts().size() > 0
&& request.source().sorts().get(0).order() == SortOrder.ASC) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;

/**
* Certain queries can benefit if we reverse the segment read order,
* for example time series based queries if searched for desc sort order
*/
private final boolean reverseLeafReaderContexts;

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
Expand All @@ -106,7 +112,37 @@ public ContextIndexSearcher(
boolean wrapWithExitableDirectoryReader,
Executor executor
) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, executor);
this(
reader,
similarity,
queryCache,
queryCachingPolicy,
new MutableQueryTimeout(),
wrapWithExitableDirectoryReader,
executor,
false
);
}

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader,
Executor executor,
boolean reverseLeafReaderContexts
) throws IOException {
this(
reader,
similarity,
queryCache,
queryCachingPolicy,
new MutableQueryTimeout(),
wrapWithExitableDirectoryReader,
executor,
reverseLeafReaderContexts
);
}

private ContextIndexSearcher(
Expand All @@ -116,13 +152,15 @@ private ContextIndexSearcher(
QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable,
boolean wrapWithExitableDirectoryReader,
Executor executor
Executor executor,
boolean reverseLeafReaderContexts
) throws IOException {
super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader, executor);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.cancellable = cancellable;
this.reverseLeafReaderContexts = reverseLeafReaderContexts;
}

public void setProfiler(QueryProfiler profiler) {
Expand Down Expand Up @@ -246,8 +284,15 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
if (reverseLeafReaderContexts) {
// reverse the segment search order if this flag is true.
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
} else {
for (int i = 0; i < leaves.size(); i++) {
searchLeaf(leaves.get(i), weight, collector);
}
}
}

Expand Down

0 comments on commit 056429e

Please sign in to comment.