From 72dd7ed054a92aa281b2925c3c9cc1f9e5d9c938 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Wed, 10 Sep 2025 16:12:46 +0200 Subject: [PATCH 1/5] Account for the inlinestats LocalRelations This adds memory tracking for the blocks used in the `LocalRelation`s generated at the intermediary phase of executing an INLINESTATS. --- .../xpack/esql/EsqlTestUtils.java | 6 +- .../esql/plugin/TransportActionServices.java | 5 +- .../esql/plugin/TransportEsqlQueryAction.java | 3 +- .../xpack/esql/session/EsqlSession.java | 8 +- .../xpack/esql/session/SessionUtils.java | 12 +- .../xpack/esql/session/SessionUtilsTests.java | 117 ++++++++++++++++++ 6 files changed, 134 insertions(+), 17 deletions(-) create mode 100644 x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index ff47fc4121b0f..74d7e13c4df7e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DoubleBlock; @@ -94,6 +95,7 @@ import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.plugin.TransportActionServices; @@ -430,8 +432,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { mock(ProjectResolver.class), mock(IndexNameExpressionResolver.class), null, - new InferenceService(mock(Client.class)) - ); + new InferenceService(mock(Client.class)), + new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY)); private static TransportService createMockTransportService() { var service = mock(TransportService.class); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java index 0255fad60811e..5818aad03512a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.search.SearchService; import org.elasticsearch.transport.TransportService; @@ -24,5 +25,5 @@ public record TransportActionServices( ProjectResolver projectResolver, IndexNameExpressionResolver indexNameExpressionResolver, UsageService usageService, - InferenceService inferenceService -) {} + InferenceService inferenceService, + BlockFactoryProvider blockFactoryProvider) {} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 43fd7e8dc8077..e976f416c2249 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -166,7 +166,8 @@ public TransportEsqlQueryAction( projectResolver, indexNameExpressionResolver, usageService, - new InferenceService(client) + new InferenceService(client), + blockFactoryProvider ); this.computeService = new ComputeService( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e351820946135..31310f8866b2d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverCompletionInfo; @@ -124,6 +125,7 @@ public interface PlanRunner { private final IndicesExpressionGrouper indicesExpressionGrouper; private final InferenceService inferenceService; private final RemoteClusterService remoteClusterService; + private final BlockFactory blockFactory; private boolean explainMode; private String parsedPlanString; @@ -160,6 +162,7 @@ public EsqlSession( this.inferenceService = services.inferenceService(); this.preMapper = new PreMapper(services); this.remoteClusterService = services.transportService().getRemoteClusterService(); + this.blockFactory = services.blockFactoryProvider().blockFactory(); } public String sessionId() { @@ -305,11 +308,10 @@ private void executeSubPlan( })); } - private static LocalRelation resultToPlan(LogicalPlan plan, Result result) { + private LocalRelation resultToPlan(LogicalPlan plan, Result result) { List pages = result.pages(); List schema = result.schema(); - // if (pages.size() > 1) { - Block[] blocks = SessionUtils.fromPages(schema, pages); + Block[] blocks = SessionUtils.fromPages(schema, pages, blockFactory); return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java index 294d8110fd7b2..0c028f8bdb2fa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java @@ -7,8 +7,8 @@ package org.elasticsearch.xpack.esql.session; -import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; @@ -22,19 +22,13 @@ public class SessionUtils { private SessionUtils() {} - public static Block[] fromPages(List schema, List pages) { - // Limit ourselves to 1mb of results similar to LOOKUP for now. - long bytesUsed = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); - if (bytesUsed > ByteSizeValue.ofMb(1).getBytes()) { - throw new IllegalArgumentException("sub-plan execution results too large [" + ByteSizeValue.ofBytes(bytesUsed) + "] > 1mb"); - } + public static Block[] fromPages(List schema, List pages, BlockFactory blockFactory) { int positionCount = pages.stream().mapToInt(Page::getPositionCount).sum(); Block.Builder[] builders = new Block.Builder[schema.size()]; Block[] blocks; try { for (int b = 0; b < builders.length; b++) { - builders[b] = PlannerUtils.toElementType(schema.get(b).dataType()) - .newBlockBuilder(positionCount, PlannerUtils.NON_BREAKING_BLOCK_FACTORY); + builders[b] = PlannerUtils.toElementType(schema.get(b).dataType()).newBlockBuilder(positionCount, blockFactory); } for (Page p : pages) { for (int b = 0; b < builders.length; b++) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java new file mode 100644 index 0000000000000..fb2a8512265a5 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.session; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; +import org.elasticsearch.xpack.esql.core.type.DataType; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.unit.ByteSizeUnit.GB; +import static org.elasticsearch.xpack.esql.plan.AbstractNodeSerializationTests.randomSource; +import static org.elasticsearch.xpack.esql.session.SessionUtils.fromPages; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SessionUtilsTests extends ESTestCase { + + /* + * 1. Generate a list of Pages with one BytesRef block, each of different positions, filled with random bytes. + * 2. Convert the list of Pages into a single BytesRefBlock Page using `fromPages()`. + * 3. Verify that the resulting BytesRefBlock contains the same bytes from the input Pages. + * 4. Verify that a CircuitBreakingException is thrown when the memory limit is too low. + */ + public void testFromPages() { + final int minBytes = 500; + final int maxBytes = randomIntBetween(minBytes, minBytes * 1_000); + byte[] inBuffer = new byte[maxBytes]; + BlockFactory blockFactory = blockFactory((int) GB.toBytes(1)); + + BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(maxBytes); + List pages = new ArrayList<>(); + int producedBytes = 0; + int producedRows = 0; + int rowsPerPage = randomIntBetween(1, 100); + int rows = 0; + while (producedBytes < maxBytes) { + int rowBytes = Math.min(randomIntBetween(1, maxBytes / minBytes), maxBytes - producedBytes); + byte[] rowValue = randomByteArrayOfLength(rowBytes); + + builder.appendBytesRef(new BytesRef(rowValue)); + System.arraycopy(rowValue, 0, inBuffer, producedBytes, rowBytes); + + producedBytes += rowBytes; + rows++; + + if (rows > rowsPerPage) { + producedRows += rows; + rows = 0; + enqueueBlock(builder, pages); + builder = blockFactory.newBytesRefBlockBuilder(maxBytes); + rowsPerPage = randomIntBetween(1, 100); + } + } + if (rows > 0) { + producedRows += rows; + enqueueBlock(builder, pages); + } + + Attribute attr = new ReferenceAttribute(randomSource(), randomAlphaOfLengthOrNull(10), randomAlphaOfLength(10), DataType.KEYWORD); + + Block[] outBlocks = fromPages(List.of(attr), pages, blockFactory); + assertThat(outBlocks.length, is(1)); + BytesRefBlock bytesBlock = (BytesRefBlock) outBlocks[0]; + assertThat(bytesBlock.getPositionCount(), is(producedRows)); + + byte[] outBuffer = new byte[producedBytes]; + for (int i = 0, posCount = bytesBlock.getPositionCount(), outOffset = 0; i < posCount; i++) { + BytesRef ref = bytesBlock.getBytesRef(i, new BytesRef()); + System.arraycopy(ref.bytes, ref.offset, outBuffer, outOffset, ref.length); + outOffset += ref.length; + } + assertThat(outBuffer, is(inBuffer)); + + Releasables.close(outBlocks); + + BlockFactory convertBlockFactory = blockFactory(minBytes); + assertThrows(CircuitBreakingException.class, () -> fromPages(List.of(attr), pages, convertBlockFactory)); + + Releasables.close(pages); + } + + private BlockFactory blockFactory(long maxBytes) { + CircuitBreaker breaker = new MockBigArrays.LimitedBreaker(this.getClass().getName(), ByteSizeValue.ofBytes(maxBytes)); + CircuitBreakerService breakerService = mock(CircuitBreakerService.class); + when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker); + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, breakerService); + return new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays); + } + + private void enqueueBlock(BytesRefBlock.Builder builder, List pages) { + Block block = builder.build(); + pages.add(new Page(block)); + Releasables.close(builder); + } +} From dd69956c8027ca3fddf19d9af9b6f2e15a72eccb Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 10 Sep 2025 14:38:38 +0000 Subject: [PATCH 2/5] [CI] Auto commit changes from spotless --- .../main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java | 3 ++- .../xpack/esql/plugin/TransportActionServices.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 74d7e13c4df7e..0b5f57616ef49 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -433,7 +433,8 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { mock(IndexNameExpressionResolver.class), null, new InferenceService(mock(Client.class)), - new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY)); + new BlockFactoryProvider(PlannerUtils.NON_BREAKING_BLOCK_FACTORY) + ); private static TransportService createMockTransportService() { var service = mock(TransportService.class); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java index 5818aad03512a..3158e3de4c08c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportActionServices.java @@ -26,4 +26,5 @@ public record TransportActionServices( IndexNameExpressionResolver indexNameExpressionResolver, UsageService usageService, InferenceService inferenceService, - BlockFactoryProvider blockFactoryProvider) {} + BlockFactoryProvider blockFactoryProvider +) {} From e61528c8f8e5cc5e22f39e2d15b7871642ce0f37 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Wed, 10 Sep 2025 18:17:01 +0200 Subject: [PATCH 3/5] Release intermediary blocks --- .../org/elasticsearch/xpack/esql/session/EsqlSession.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 31310f8866b2d..e45852035bd49 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -272,10 +272,12 @@ private void executeSubPlan( var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request); runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> { + Block[] localRelationBlocks = null; try { // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation completionInfoAccumulator.accumulate(result.completionInfo()); LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result); + localRelationBlocks = resultWrapper.supplier().get(); // replace the original logical plan with the backing result LogicalPlan newLogicalPlan = optimizedPlan.transformUp( @@ -303,6 +305,9 @@ private void executeSubPlan( executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener); } } finally { + if (localRelationBlocks != null) { + Releasables.closeExpectNoException(localRelationBlocks); + } Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); } })); From 1927f1d0dfc62df69b68a23750f304715be9e5b1 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Thu, 11 Sep 2025 14:51:07 +0200 Subject: [PATCH 4/5] safely unaccount the blocks when refs kept across threads --- .../xpack/esql/session/EsqlSession.java | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e45852035bd49..d88f073a404cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -84,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -272,12 +273,13 @@ private void executeSubPlan( var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request); runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> { - Block[] localRelationBlocks = null; + AtomicReference localRelationBlocks = new AtomicReference<>(); try { // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation completionInfoAccumulator.accumulate(result.completionInfo()); LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result); - localRelationBlocks = resultWrapper.supplier().get(); + localRelationBlocks.set(resultWrapper.supplier().get()); + var releasingNext = ActionListener.runAfter(next, () -> releaseLocalRelationBlocks(localRelationBlocks)); // replace the original logical plan with the backing result LogicalPlan newLogicalPlan = optimizedPlan.transformUp( @@ -295,19 +297,21 @@ private void executeSubPlan( if (newSubPlan == null) {// run the final "main" plan LOGGER.debug("Executing final plan:\n{}", newLogicalPlan); var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request); - runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> { + runner.run(newPhysicalPlan, releasingNext.delegateFailureAndWrap((finalListener, finalResult) -> { completionInfoAccumulator.accumulate(finalResult.completionInfo()); finalListener.onResponse( new Result(finalResult.schema(), finalResult.pages(), completionInfoAccumulator.finish(), executionInfo) ); })); } else {// continue executing the subplans - executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener); + executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, releasingNext); } + } catch (Exception e) { + // safely release the blocks in case an exception occurs either before, but also after the "final" runner.run() forks off + // the current thread, but with the blocks still referenced + releaseLocalRelationBlocks(localRelationBlocks); + throw e; } finally { - if (localRelationBlocks != null) { - Releasables.closeExpectNoException(localRelationBlocks); - } Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks))); } })); @@ -320,6 +324,13 @@ private LocalRelation resultToPlan(LogicalPlan plan, Result result) { return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks)); } + private static void releaseLocalRelationBlocks(AtomicReference localRelationBlocks) { + Block[] relationBlocks = localRelationBlocks.getAndSet(null); + if (relationBlocks != null) { + Releasables.closeExpectNoException(relationBlocks); + } + } + private EsqlStatement parse(String query, QueryParams params) { var parsed = new EsqlParser().createQuery(query, params, planTelemetry, configuration); if (LOGGER.isDebugEnabled()) { From 55e6a14f5dd33e43717d4e7d74b8cd8dfaf812c9 Mon Sep 17 00:00:00 2001 From: Bogdan Pintea Date: Mon, 22 Sep 2025 14:26:06 +0200 Subject: [PATCH 5/5] Add back limits for the intermediate results --- .../xpack/esql/session/EsqlSession.java | 29 ++++- .../xpack/esql/session/SessionUtils.java | 9 ++ .../xpack/esql/session/SessionUtilsTests.java | 122 +++++++++++++----- 3 files changed, 126 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index d88f073a404cd..527e59fd65bb2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockUtils; @@ -93,6 +94,7 @@ import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin.firstSubPlan; +import static org.elasticsearch.xpack.esql.session.SessionUtils.checkPagesBelowSize; public class EsqlSession { @@ -107,6 +109,9 @@ public interface PlanRunner { } private static final TransportVersion LOOKUP_JOIN_CCS = TransportVersion.fromName("lookup_join_ccs"); + private static final double INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE = .1; + private static final long INTERMEDIATE_LOCAL_RELATION_MIN_SIZE = ByteSizeValue.ofMb(1).getBytes(); + private static final long INTERMEDIATE_LOCAL_RELATION_MAX_SIZE = ByteSizeValue.ofMb(30).getBytes(); private final String sessionId; private final Configuration configuration; @@ -127,6 +132,7 @@ public interface PlanRunner { private final InferenceService inferenceService; private final RemoteClusterService remoteClusterService; private final BlockFactory blockFactory; + private final long maxIntermediateLocalRelationSize; private boolean explainMode; private String parsedPlanString; @@ -164,6 +170,7 @@ public EsqlSession( this.preMapper = new PreMapper(services); this.remoteClusterService = services.transportService().getRemoteClusterService(); this.blockFactory = services.blockFactoryProvider().blockFactory(); + maxIntermediateLocalRelationSize = maxIntermediateLocalRelationSize(blockFactory); } public String sessionId() { @@ -277,7 +284,7 @@ private void executeSubPlan( try { // Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation completionInfoAccumulator.accumulate(result.completionInfo()); - LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan(), result); + LocalRelation resultWrapper = resultToPlan(subPlans.stubReplacedSubPlan().source(), result); localRelationBlocks.set(resultWrapper.supplier().get()); var releasingNext = ActionListener.runAfter(next, () -> releaseLocalRelationBlocks(localRelationBlocks)); @@ -317,11 +324,19 @@ private void executeSubPlan( })); } - private LocalRelation resultToPlan(LogicalPlan plan, Result result) { + private LocalRelation resultToPlan(Source planSource, Result result) { List pages = result.pages(); + checkPagesBelowSize( + pages, + maxIntermediateLocalRelationSize, + (actual) -> "sub-plan execution results too large [" + + ByteSizeValue.ofBytes(actual) + + "] > " + + ByteSizeValue.ofBytes(maxIntermediateLocalRelationSize) + ); List schema = result.schema(); Block[] blocks = SessionUtils.fromPages(schema, pages, blockFactory); - return new LocalRelation(plan.source(), schema, LocalSupplier.of(blocks)); + return new LocalRelation(planSource, schema, LocalSupplier.of(blocks)); } private static void releaseLocalRelationBlocks(AtomicReference localRelationBlocks) { @@ -331,6 +346,14 @@ private static void releaseLocalRelationBlocks(AtomicReference localRel } } + // returns INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE percent of the circuit breaker limit, but at least + // INTERMEDIATE_LOCAL_RELATION_MIN_SIZE and at most INTERMEDIATE_LOCAL_RELATION_MAX_SIZE + static long maxIntermediateLocalRelationSize(BlockFactory blockFactory) { + long breakerLimit = blockFactory.breaker().getLimit(); + long percentageLimit = (long) (breakerLimit * INTERMEDIATE_LOCAL_RELATION_CB_PERCETAGE / 100.d); + return Math.min(Math.max(percentageLimit, INTERMEDIATE_LOCAL_RELATION_MIN_SIZE), INTERMEDIATE_LOCAL_RELATION_MAX_SIZE); + } + private EsqlStatement parse(String query, QueryParams params) { var parsed = new EsqlParser().createQuery(query, params, planTelemetry, configuration); if (LOGGER.isDebugEnabled()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java index 0c028f8bdb2fa..fb8f817c5ba03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/SessionUtils.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.LongFunction; public class SessionUtils { @@ -42,6 +43,14 @@ public static Block[] fromPages(List schema, List pages, BlockF return blocks; } + public static long checkPagesBelowSize(List pages, long maxSize, LongFunction exceptionMessage) { + long currentSize = pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); + if (currentSize > maxSize) { + throw new IllegalArgumentException(exceptionMessage.apply(currentSize)); + } + return currentSize; + } + public static List fromPage(List schema, Page page) { if (page.getPositionCount() != 1) { throw new IllegalArgumentException("expected single row"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java index fb2a8512265a5..d599e924066d2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/SessionUtilsTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.test.ESTestCase; @@ -30,27 +31,106 @@ import static org.elasticsearch.common.unit.ByteSizeUnit.GB; import static org.elasticsearch.xpack.esql.plan.AbstractNodeSerializationTests.randomSource; +import static org.elasticsearch.xpack.esql.session.EsqlSession.maxIntermediateLocalRelationSize; +import static org.elasticsearch.xpack.esql.session.SessionUtils.checkPagesBelowSize; import static org.elasticsearch.xpack.esql.session.SessionUtils.fromPages; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class SessionUtilsTests extends ESTestCase { - /* - * 1. Generate a list of Pages with one BytesRef block, each of different positions, filled with random bytes. - * 2. Convert the list of Pages into a single BytesRefBlock Page using `fromPages()`. - * 3. Verify that the resulting BytesRefBlock contains the same bytes from the input Pages. - * 4. Verify that a CircuitBreakingException is thrown when the memory limit is too low. - */ + private final BlockFactory BLOCK_FACTORY_1GB = blockFactory((int) GB.toBytes(1)); + private final Attribute KEYWORD_ATTRIBUTE = new ReferenceAttribute( + randomSource(), + randomAlphaOfLengthOrNull(10), + randomAlphaOfLength(10), + DataType.KEYWORD + ); + + record PagesRec(List pages, byte[] data, int dataLen, int totalRows) implements Releasable { + @Override + public void close() { + Releasables.close(pages); + } + } + public void testFromPages() { + try (PagesRec pagesRec = generatePageSet(BLOCK_FACTORY_1GB)) { + Block[] outBlocks = fromPages(List.of(KEYWORD_ATTRIBUTE), pagesRec.pages, BLOCK_FACTORY_1GB); + + assertThat(outBlocks.length, is(1)); + // Verify that the resulted "compacted" block contains the same number of rows + BytesRefBlock bytesBlock = (BytesRefBlock) outBlocks[0]; + assertThat(bytesBlock.getPositionCount(), is(pagesRec.totalRows)); + + // Verify that the resulting BytesRefBlock contains the same bytes from the input Pages. + byte[] outBuffer = new byte[pagesRec.dataLen]; + for (int i = 0, posCount = bytesBlock.getPositionCount(), outOffset = 0; i < posCount; i++) { + BytesRef ref = bytesBlock.getBytesRef(i, new BytesRef()); + System.arraycopy(ref.bytes, ref.offset, outBuffer, outOffset, ref.length); + outOffset += ref.length; + } + assertThat(outBuffer, is(pagesRec.data)); + + Releasables.close(outBlocks); + } + } + + public void testFromPagesCircuitBreaks() { + try (PagesRec pagesRec = generatePageSet(BLOCK_FACTORY_1GB)) { + BlockFactory convertBlockFactory = blockFactory(pagesRec.dataLen - 1); + assertThrows(CircuitBreakingException.class, () -> fromPages(List.of(KEYWORD_ATTRIBUTE), pagesRec.pages, convertBlockFactory)); + } + } + + public void testCheckPagesBelowSize() { + try (PagesRec pagesRec = generatePageSet(BLOCK_FACTORY_1GB)) { + var message = "data too large: "; + var ex = assertThrows( + IllegalArgumentException.class, + () -> checkPagesBelowSize(pagesRec.pages, pagesRec.dataLen - 1, l -> message + l) + ); + // pages are mocked, their size is considerably larger than dataLen + long pagesRamSize = pagesRec.pages.stream().mapToLong(Page::ramBytesUsedByBlocks).sum(); + assertThat(ex.getMessage(), containsString(message + pagesRamSize)); + } + } + + // EsqlSession's + public void testMaxIntermediateLocalRelationSize() { + var circuitBreaker = mock(CircuitBreaker.class); + var blockFactory = mock(BlockFactory.class); + when(blockFactory.breaker()).thenReturn(circuitBreaker); + + // enforcing upper limit + when(circuitBreaker.getLimit()).thenReturn(ByteSizeValue.ofGb(32).getBytes()); + assertThat(maxIntermediateLocalRelationSize(blockFactory), is(ByteSizeValue.ofMb(30).getBytes())); + + // enforcing lower limit + when(circuitBreaker.getLimit()).thenReturn(ByteSizeValue.ofMb(32).getBytes()); + assertThat(maxIntermediateLocalRelationSize(blockFactory), is(ByteSizeValue.ofMb(1).getBytes())); + + // in-between limits + var twentyGb = ByteSizeValue.ofGb(20).getBytes(); + when(circuitBreaker.getLimit()).thenReturn(twentyGb); + assertThat(maxIntermediateLocalRelationSize(blockFactory), is((long) (twentyGb / 1000.d))); + } + + private static PagesRec generatePageSet(BlockFactory blockFactory) { final int minBytes = 500; final int maxBytes = randomIntBetween(minBytes, minBytes * 1_000); - byte[] inBuffer = new byte[maxBytes]; - BlockFactory blockFactory = blockFactory((int) GB.toBytes(1)); + return generatePages(minBytes, maxBytes, blockFactory); + } + // Generates a list of Pages with one BytesRef block, each of different positions, filled with random bytes. + private static PagesRec generatePages(int minBytes, int maxBytes, BlockFactory blockFactory) { BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(maxBytes); + + byte[] buffer = new byte[maxBytes]; List pages = new ArrayList<>(); + int producedBytes = 0; int producedRows = 0; int rowsPerPage = randomIntBetween(1, 100); @@ -60,7 +140,7 @@ public void testFromPages() { byte[] rowValue = randomByteArrayOfLength(rowBytes); builder.appendBytesRef(new BytesRef(rowValue)); - System.arraycopy(rowValue, 0, inBuffer, producedBytes, rowBytes); + System.arraycopy(rowValue, 0, buffer, producedBytes, rowBytes); producedBytes += rowBytes; rows++; @@ -78,27 +158,7 @@ public void testFromPages() { enqueueBlock(builder, pages); } - Attribute attr = new ReferenceAttribute(randomSource(), randomAlphaOfLengthOrNull(10), randomAlphaOfLength(10), DataType.KEYWORD); - - Block[] outBlocks = fromPages(List.of(attr), pages, blockFactory); - assertThat(outBlocks.length, is(1)); - BytesRefBlock bytesBlock = (BytesRefBlock) outBlocks[0]; - assertThat(bytesBlock.getPositionCount(), is(producedRows)); - - byte[] outBuffer = new byte[producedBytes]; - for (int i = 0, posCount = bytesBlock.getPositionCount(), outOffset = 0; i < posCount; i++) { - BytesRef ref = bytesBlock.getBytesRef(i, new BytesRef()); - System.arraycopy(ref.bytes, ref.offset, outBuffer, outOffset, ref.length); - outOffset += ref.length; - } - assertThat(outBuffer, is(inBuffer)); - - Releasables.close(outBlocks); - - BlockFactory convertBlockFactory = blockFactory(minBytes); - assertThrows(CircuitBreakingException.class, () -> fromPages(List.of(attr), pages, convertBlockFactory)); - - Releasables.close(pages); + return new PagesRec(pages, buffer, producedBytes, producedRows); } private BlockFactory blockFactory(long maxBytes) { @@ -109,7 +169,7 @@ private BlockFactory blockFactory(long maxBytes) { return new BlockFactory(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST), bigArrays); } - private void enqueueBlock(BytesRefBlock.Builder builder, List pages) { + private static void enqueueBlock(BytesRefBlock.Builder builder, List pages) { Block block = builder.build(); pages.add(new Page(block)); Releasables.close(builder);