diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java index 0927c41e3ba31..946bec1667ef4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java @@ -22,6 +22,8 @@ * such as slice index and future max timestamp, to allow downstream operators to optimize processing. */ public final class TimeSeriesSourceOperator extends LuceneSourceOperator { + private static final int MAX_TARGET_PAGE_SIZE = 2048; + private static final int CHUNK_SIZE = 128; public static final class Factory extends LuceneSourceOperator.Factory { public Factory( @@ -76,4 +78,17 @@ protected void buildMetadataBlocks(Block[] blocks, int offset, int currentPagePo blocks[offset] = blockFactory.newConstantIntVector(currentSlice.slicePosition(), currentPagePos).asBlock(); blocks[offset + 1] = blockFactory.newConstantLongVector(Long.MAX_VALUE, currentPagePos).asBlock(); } + + /** + * For time-series queries, try to use a page size that is a multiple of CHUNK_SIZE (see NUMERIC_BLOCK_SIZE in the tsdb codec) + * to enable bulk loading of numeric or tsid fields. Avoid pages that are too large, as this can disable bulk loading if there are + * holes in the doc IDs and disable constant block optimizations. Therefore, we cap the page size at 2048, which balances the + * overhead per page with the benefits of bulk loading and constant blocks. + */ + public static int pageSize(long estimateRowSizeInBytes, long maxPageSizeInBytes) { + long chunkSizeInBytes = CHUNK_SIZE * estimateRowSizeInBytes; + long numChunks = Math.ceilDiv(maxPageSizeInBytes, chunkSizeInBytes); + long pageSize = Math.clamp(numChunks * CHUNK_SIZE, CHUNK_SIZE, MAX_TARGET_PAGE_SIZE); + return Math.toIntExact(pageSize); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index 50a8deb23130b..13da553f3d1eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -181,7 +181,7 @@ else if (aggregatorMode.isOutputPartial()) { groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(), aggregatorMode, aggregatorFactories, - context.pageSize(aggregateExec.estimatedRowSize()), + context.pageSize(aggregateExec, aggregateExec.estimatedRowSize()), analysisRegistry ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 29c98a073938e..cd5cfab6eb102 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -301,7 +301,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, querySupplier(esQueryExec.query()), context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), context.queryPragmas().taskConcurrency(), - context.pageSize(rowEstimatedSize), + context.pageSize(esQueryExec, rowEstimatedSize), limit, sortBuilders, estimatedPerRowSortSize, @@ -312,7 +312,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, shardContexts, querySupplier(esQueryExec.queryBuilderAndTags()), context.queryPragmas().taskConcurrency(), - context.pageSize(rowEstimatedSize), + context.pageSize(esQueryExec, rowEstimatedSize), limit ); } else { @@ -320,9 +320,9 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, shardContexts, querySupplier(esQueryExec.queryBuilderAndTags()), context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), - context.autoPartitioningStrategy().get(), + context.autoPartitioningStrategy(), context.queryPragmas().taskConcurrency(), - context.pageSize(rowEstimatedSize), + context.pageSize(esQueryExec, rowEstimatedSize), limit, scoring ); @@ -408,7 +408,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory( groupSpecs, aggregatorMode, aggregatorFactories, - context.pageSize(ts.estimatedRowSize()) + context.pageSize(ts, ts.estimatedRowSize()) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 7778ef601eeb3..0bcd2f3ec78ad 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; import org.elasticsearch.compute.operator.ChangePointOperator; import org.elasticsearch.compute.operator.ColumnExtractOperator; import org.elasticsearch.compute.operator.ColumnLoadOperator; @@ -203,8 +204,13 @@ public LocalExecutionPlanner( /** * turn the given plan into a list of drivers to execute */ - public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { - + public LocalExecutionPlan plan( + String description, + FoldContext foldCtx, + PlannerSettings plannerSettings, + PhysicalPlan localPhysicalPlan + ) { + final boolean timeSeries = localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec); var context = new LocalExecutionPlannerContext( description, new ArrayList<>(), @@ -213,12 +219,8 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical bigArrays, blockFactory, foldCtx, - settings, - new Holder<>( - localPhysicalPlan.anyMatch(p -> p instanceof TimeSeriesAggregateExec) - ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES - : DataPartitioning.AutoStrategy.DEFAULT - ), + plannerSettings, + timeSeries, shardContexts ); @@ -503,7 +505,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte throw new EsqlIllegalArgumentException("limit only supported with literal values"); } return source.with( - new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(rowSize)), + new TopNOperatorFactory(limit, asList(elementTypes), asList(encoders), orders, context.pageSize(topNExec, rowSize)), source.layout ); } @@ -1024,8 +1026,8 @@ public record LocalExecutionPlannerContext( BigArrays bigArrays, BlockFactory blockFactory, FoldContext foldCtx, - Settings settings, - Holder autoPartitioningStrategy, + PlannerSettings plannerSettings, + boolean timeSeries, List shardContexts ) { void addDriverFactory(DriverFactory driverFactory) { @@ -1036,7 +1038,11 @@ void driverParallelism(DriverParallelism parallelism) { driverParallelism.set(parallelism); } - int pageSize(Integer estimatedRowSize) { + DataPartitioning.AutoStrategy autoPartitioningStrategy() { + return timeSeries ? DataPartitioning.AutoStrategy.DEFAULT_TIME_SERIES : DataPartitioning.AutoStrategy.DEFAULT; + } + + int pageSize(PhysicalPlan node, Integer estimatedRowSize) { if (estimatedRowSize == null) { throw new IllegalStateException("estimated row size hasn't been set"); } @@ -1046,7 +1052,11 @@ int pageSize(Integer estimatedRowSize) { if (queryPragmas.pageSize() != 0) { return queryPragmas.pageSize(); } - return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize); + if (timeSeries && node instanceof EsQueryExec) { + return TimeSeriesSourceOperator.pageSize(estimatedRowSize, plannerSettings.valuesLoadingJumboSize().getBytes()); + } else { + return Math.max(SourceOperator.MIN_TARGET_PAGE_SIZE, SourceOperator.TARGET_PAGE_SIZE / estimatedRowSize); + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 7d5f75f9201f9..6890c84a13f5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -648,7 +648,7 @@ public SourceProvider createSourceProvider(SourceFilter sourceFilter) { // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below) // it's doing this in the planning of EsQueryExec (the source of the data) // see also EsPhysicalOperationProviders.sourcePhysicalOperation - LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan); + var localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plannerSettings, localPlan); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 56b97759db83c..74bd485c66c3b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -724,6 +724,7 @@ void executeSubPlan( LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan( "final", foldCtx, + TEST_PLANNER_SETTINGS, new OutputExec(coordinatorPlan, collectedPages::add) ); drivers.addAll(coordinatorNodeExecutionPlan.createDrivers(getTestName())); @@ -745,7 +746,12 @@ void executeSubPlan( throw new AssertionError("expected no failure", e); }) ); - LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan); + LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan( + "data", + foldCtx, + EsqlTestUtils.TEST_PLANNER_SETTINGS, + csvDataNodePhysicalPlan + ); drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName())); Randomness.shuffle(drivers); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 281bd9dca9e35..e50a2b64a8620 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -136,6 +136,7 @@ import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -247,6 +248,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase { private TestDataSource countriesBboxWeb; // cartesian_shape field tests private final Configuration config; + private PlannerSettings plannerSettings; private record TestDataSource(Map mapping, EsIndex index, Analyzer analyzer, SearchStats stats) {} @@ -360,6 +362,7 @@ public void init() { functionRegistry, enrichResolution ); + this.plannerSettings = TEST_PLANNER_SETTINGS; } TestDataSource makeTestDataSource( @@ -7878,7 +7881,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP List.of() ); - return planner.plan("test", FoldContext.small(), plan); + return planner.plan("test", FoldContext.small(), plannerSettings, plan); } private List> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 804a74b0ff31f..bcb5595fad061 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -19,7 +19,10 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; @@ -42,6 +45,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ContextIndexSearcher; +import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -51,9 +55,11 @@ import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField; import org.elasticsearch.xpack.esql.core.util.StringUtils; import org.elasticsearch.xpack.esql.expression.Order; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.session.Configuration; @@ -125,6 +131,7 @@ public void testLuceneSourceOperatorHugeRowSize() throws IOException { LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(), + TEST_PLANNER_SETTINGS, new EsQueryExec( Source.EMPTY, index().name(), @@ -156,6 +163,7 @@ public void testLuceneTopNSourceOperator() throws IOException { LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(), + TEST_PLANNER_SETTINGS, new EsQueryExec( Source.EMPTY, index().name(), @@ -187,6 +195,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException { LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(), + TEST_PLANNER_SETTINGS, new EsQueryExec( Source.EMPTY, index().name(), @@ -211,6 +220,7 @@ public void testDriverClusterAndNodeName() throws IOException { LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(), + TEST_PLANNER_SETTINGS, new EsQueryExec( Source.EMPTY, index().name(), @@ -244,6 +254,41 @@ public void testPlanUnmappedFieldExtractSyntheticSource() throws Exception { assertThat(blockLoader, instanceOf(FallbackSyntheticSourceBlockLoader.class)); } + public void testTimeSeries() throws IOException { + int estimatedRowSize = estimatedRowSizeIsHuge ? randomIntBetween(20000, Integer.MAX_VALUE) : randomIntBetween(1, 50); + EsQueryExec queryExec = new EsQueryExec( + Source.EMPTY, + index().name(), + IndexMode.STANDARD, + index().indexNameWithModes(), + List.of(), + new Literal(Source.EMPTY, 10, DataType.INTEGER), + List.of(), + estimatedRowSize, + List.of(new EsQueryExec.QueryBuilderAndTags(null, List.of())) + ); + TimeSeriesAggregateExec aggExec = new TimeSeriesAggregateExec( + Source.EMPTY, + queryExec, + List.of(), + List.of(new Alias(Source.EMPTY, "count(*)", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*")))), + AggregatorMode.SINGLE, + List.of(), + 10, + null + ); + PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1)); + LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), plannerSettings, aggExec); + assertThat(plan.driverFactories.size(), lessThanOrEqualTo(pragmas.taskConcurrency())); + LocalExecutionPlanner.DriverSupplier supplier = plan.driverFactories.get(0).driverSupplier(); + var factory = (LuceneSourceOperator.Factory) supplier.physicalOperation().sourceOperatorFactory; + if (estimatedRowSizeIsHuge) { + assertThat(factory.maxPageSize(), equalTo(128)); + } else { + assertThat(factory.maxPageSize(), equalTo(2048)); + } + } + private BlockLoader constructBlockLoader() throws IOException { EsQueryExec queryExec = new EsQueryExec( Source.EMPTY, @@ -264,7 +309,7 @@ private BlockLoader constructBlockLoader() throws IOException { ), MappedFieldType.FieldExtractPreference.NONE ); - LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), fieldExtractExec); + var plan = planner().plan("test", FoldContext.small(), TEST_PLANNER_SETTINGS, fieldExtractExec); var p = plan.driverFactories.get(0).driverSupplier().physicalOperation(); var fieldInfo = ((ValuesSourceReaderOperator.Factory) p.intermediateOperatorFactories.get(0)).fields().get(0); return fieldInfo.blockLoader().apply(0); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index c3167573fbec3..6a92886a1eb5e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -137,7 +137,7 @@ public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory( groupSpecs, aggregatorMode, aggregatorFactories, - context.pageSize(ts.estimatedRowSize()) + context.pageSize(ts, ts.estimatedRowSize()) ); }