From f102f375941ad76c0bea2f5854c15e1db50a17e7 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 11:38:47 -0500 Subject: [PATCH 1/6] Split HeapAttackIT into two tests --- muted-tests.yml | 3 - .../esql/heap_attack/HeapAttackBaseIT.java | 324 +++++++++++ .../xpack/esql/heap_attack/HeapAttackIT.java | 509 +----------------- .../heap_attack/HeapAttackLookupJoinIT.java | 250 +++++++++ 4 files changed, 575 insertions(+), 511 deletions(-) create mode 100644 test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java create mode 100644 test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java diff --git a/muted-tests.yml b/muted-tests.yml index f754a00797d4f..0d298c858fed8 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -384,9 +384,6 @@ tests: - class: org.elasticsearch.gradle.TestClustersPluginFuncTest method: override jdk usage via ES_JAVA_HOME for known jdk os incompatibilities issue: https://github.com/elastic/elasticsearch/issues/135413 -- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT - method: testAggTooManyMvLongs - issue: https://github.com/elastic/elasticsearch/issues/135585 - class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT method: testOneRemoteClusterPartial issue: https://github.com/elastic/elasticsearch/issues/124055 diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java new file mode 100644 index 0000000000000..88fd650e70c50 --- /dev/null +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java @@ -0,0 +1,324 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.heap_attack; + +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.WarningsHandler; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.test.MapMatcher; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; +import java.util.function.IntFunction; + +import static org.elasticsearch.common.Strings.hasText; +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +/** + * Base class for heap attack tests. Contains common infrastructure and helper methods. + */ +public abstract class HeapAttackBaseIT extends ESRestTestCase { + @ClassRule + public static ElasticsearchCluster cluster = Clusters.buildCluster(); + + static volatile boolean SUITE_ABORTED = false; + + protected static final int MAX_ATTEMPTS = 5; + + protected interface TryCircuitBreaking { + Map attempt(int attempt) throws IOException; + } + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Before + public void skipOnAborted() { + assumeFalse("skip on aborted", SUITE_ABORTED); + } + + protected void assertCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOException { + assertCircuitBreaks( + tryBreaking, + matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception")) + ); + } + + protected void assertCircuitBreaks(TryCircuitBreaking tryBreaking, MapMatcher responseMatcher) throws IOException { + int attempt = 1; + while (attempt <= MAX_ATTEMPTS) { + try { + Map response = tryBreaking.attempt(attempt); + logger.warn("{}: should circuit broken but got {}", attempt, response); + attempt++; + } catch (ResponseException e) { + Map map = responseAsMap(e.getResponse()); + assertMap(map, responseMatcher); + return; + } + } + fail("giving up circuit breaking after " + attempt + " attempts"); + } + + protected Response query(String query, String filterPath) throws IOException { + Request request = new Request("POST", "/_query"); + request.addParameter("error_trace", ""); + if (filterPath != null) { + request.addParameter("filter_path", filterPath); + } + request.setJsonEntity(query.replace("\n", "\\n")); + request.setOptions( + RequestOptions.DEFAULT.toBuilder() + .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build()) + .setWarningsHandler(WarningsHandler.PERMISSIVE) + ); + logger.info("Running query:" + query); + return runQuery(() -> client().performRequest(request)); + } + + protected Response runQuery(CheckedSupplier run) throws IOException { + logger.info("--> test {} started querying", getTestName()); + final ThreadPool testThreadPool = new TestThreadPool(getTestName()); + final long startedTimeInNanos = System.nanoTime(); + Scheduler.Cancellable schedule = null; + try { + schedule = testThreadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() throws Exception { + SUITE_ABORTED = true; + TimeValue elapsed = TimeValue.timeValueNanos(System.nanoTime() - startedTimeInNanos); + logger.info("--> test {} triggering OOM after {}", getTestName(), elapsed); + Request triggerOOM = new Request("POST", "/_trigger_out_of_memory"); + client().performRequest(triggerOOM); + } + }, TimeValue.timeValueMinutes(5), testThreadPool.executor(ThreadPool.Names.GENERIC)); + Response resp = run.get(); + logger.info("--> test {} completed querying", getTestName()); + return resp; + } finally { + if (schedule != null) { + schedule.cancel(); + } + terminate(testThreadPool); + } + } + + @Override + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + settings = Settings.builder().put(settings).put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "6m").build(); + return super.buildClient(settings, hosts); + } + + protected void initSensorData(int docCount, int sensorCount, int joinFieldCount, boolean expressionBasedJoin) throws IOException { + logger.info("loading sensor data"); + // We cannot go over 1000 fields, due to failed on parsing mappings on index creation + // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded + assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990); + StringBuilder createIndexBuilder = new StringBuilder(); + createIndexBuilder.append(""" + { + "properties": { + "@timestamp": { "type": "date" }, + """); + String suffix = expressionBasedJoin ? "_left" : ""; + for (int i = 0; i < joinFieldCount; i++) { + createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },"); + } + createIndexBuilder.append(""" + "value": { "type": "double" } + } + }"""); + CreateIndexResponse response = createIndex( + "sensor_data", + Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), + createIndexBuilder.toString() + ); + assertTrue(response.isAcknowledged()); + int docsPerBulk = 1000; + long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); + + StringBuilder data = new StringBuilder(); + for (int i = 0; i < docCount; i++) { + data.append(String.format(Locale.ROOT, """ + {"create":{}} + {"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate))); + for (int j = 0; j < joinFieldCount; j++) { + data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, i % sensorCount)); + } + data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1)); + if (i % docsPerBulk == docsPerBulk - 1) { + bulk("sensor_data", data.toString()); + data.setLength(0); + } + } + initIndex("sensor_data", data.toString()); + } + + protected void initSensorLookup( + int lookupEntries, + int sensorCount, + IntFunction location, + int joinFieldsCount, + boolean expressionBasedJoin + ) throws IOException { + logger.info("loading sensor lookup"); + // cannot go over 1000 fields, due to failed on parsing mappings on index creation + // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded + assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990); + StringBuilder createIndexBuilder = new StringBuilder(); + createIndexBuilder.append(""" + { + "properties": { + """); + String suffix = expressionBasedJoin ? "_right" : ""; + for (int i = 0; i < joinFieldsCount; i++) { + createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },"); + } + createIndexBuilder.append(""" + "location": { "type": "geo_point" }, + "filter_key": { "type": "integer" } + } + }"""); + CreateIndexResponse response = createIndex( + "sensor_lookup", + Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), + createIndexBuilder.toString() + ); + assertTrue(response.isAcknowledged()); + int docsPerBulk = 1000; + StringBuilder data = new StringBuilder(); + for (int i = 0; i < lookupEntries; i++) { + int sensor = i % sensorCount; + data.append(String.format(Locale.ROOT, """ + {"create":{}} + {""")); + for (int j = 0; j < joinFieldsCount; j++) { + data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, sensor)); + } + data.append(String.format(Locale.ROOT, """ + "location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i)); + if (i % docsPerBulk == docsPerBulk - 1) { + bulk("sensor_lookup", data.toString()); + data.setLength(0); + } + } + initIndex("sensor_lookup", data.toString()); + } + + protected void bulk(String name, String bulk) throws IOException { + Request request = new Request("POST", "/" + name + "/_bulk"); + request.setJsonEntity(bulk); + request.setOptions( + RequestOptions.DEFAULT.toBuilder() + .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()) + ); + Response response = client().performRequest(request); + assertThat(entityAsMap(response), matchesMap().entry("errors", false).extraOk()); + + /* + * Flush after each bulk to clear the test-time seenSequenceNumbers Map in + * TranslogWriter. Without this the server will OOM from time to time keeping + * stuff around to run assertions on. + */ + request = new Request("POST", "/" + name + "/_flush"); + response = client().performRequest(request); + assertThat(entityAsMap(response), matchesMap().entry("_shards", matchesMap().extraOk().entry("failed", 0)).extraOk()); + } + + protected void initIndex(String name, String bulk) throws IOException { + if (indexExists(name) == false) { + // not strictly required, but this can help isolate failure from bulk indexing. + createIndex(name); + var settings = (Map) ((Map) getIndexSettings(name).get(name)).get("settings"); + if (settings.containsKey(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey()) == false) { + updateIndexSettings(name, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); + } + } + if (hasText(bulk)) { + bulk(name, bulk); + } + Request request = new Request("POST", "/" + name + "/_forcemerge"); + request.addParameter("max_num_segments", "1"); + RequestOptions.Builder requestOptions = RequestOptions.DEFAULT.toBuilder() + .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()); + request.setOptions(requestOptions); + Response response = client().performRequest(request); + assertWriteResponse(response); + + request = new Request("POST", "/" + name + "/_refresh"); + response = client().performRequest(request); + request.setOptions(requestOptions); + assertWriteResponse(response); + } + + @SuppressWarnings("unchecked") + protected static void assertWriteResponse(Response response) throws IOException { + Map shards = (Map) entityAsMap(response).get("_shards"); + assertThat((int) shards.get("successful"), greaterThanOrEqualTo(1)); + assertThat(shards.get("failed"), equalTo(0)); + } + + @Before + @After + public void assertRequestBreakerEmpty() throws Exception { + if (SUITE_ABORTED) { + return; + } + assertBusy(() -> { + Response response = adminClient().performRequest(new Request("GET", "/_nodes/stats")); + Map stats = responseAsMap(response); + Map nodes = (Map) stats.get("nodes"); + for (Object n : nodes.values()) { + Map node = (Map) n; + Map breakers = (Map) node.get("breakers"); + Map request = (Map) breakers.get("request"); + assertMap(request, matchesMap().extraOk().entry("estimated_size_in_bytes", 0).entry("estimated_size", "0b")); + } + }); + } + + protected static StringBuilder startQuery() { + StringBuilder query = new StringBuilder(); + query.append("{\"query\":\""); + return query; + } +} diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 50ada817970a6..446f7d6505c8b 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -9,41 +9,22 @@ import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; -import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.util.EntityUtils; import org.apache.lucene.tests.util.TimeUnits; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestClient; import org.elasticsearch.client.WarningsHandler; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.test.ListMatcher; import org.elasticsearch.test.MapMatcher; -import org.elasticsearch.test.cluster.ElasticsearchCluster; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -55,14 +36,12 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.elasticsearch.common.Strings.hasText; import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.matchesRegex; @@ -71,21 +50,7 @@ * sure they don't consume the entire heap and crash Elasticsearch. */ @TimeoutSuite(millis = 40 * TimeUnits.MINUTE) -public class HeapAttackIT extends ESRestTestCase { - @ClassRule - public static ElasticsearchCluster cluster = Clusters.buildCluster(); - - static volatile boolean SUITE_ABORTED = false; - - @Override - protected String getTestRestCluster() { - return cluster.getHttpAddresses(); - } - - @Before - public void skipOnAborted() { - assumeFalse("skip on aborted", SUITE_ABORTED); - } +public class HeapAttackIT extends HeapAttackBaseIT { /** * This used to fail, but we've since compacted top n so it actually succeeds now. @@ -218,19 +183,6 @@ public void testStupidTopN() throws IOException { assertCircuitBreaks(attempt -> sortBySomeLongsLimit(2147483630)); } - private static final int MAX_ATTEMPTS = 5; - - interface TryCircuitBreaking { - Map attempt(int attempt) throws IOException; - } - - private void assertCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOException { - assertCircuitBreaks( - tryBreaking, - matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception")) - ); - } - private void assertFoldCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOException { assertCircuitBreaks( tryBreaking, @@ -238,22 +190,6 @@ private void assertFoldCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOEx ); } - private void assertCircuitBreaks(TryCircuitBreaking tryBreaking, MapMatcher responseMatcher) throws IOException { - int attempt = 1; - while (attempt <= MAX_ATTEMPTS) { - try { - Map response = tryBreaking.attempt(attempt); - logger.warn("{}: should circuit broken but got {}", attempt, response); - attempt++; - } catch (ResponseException e) { - Map map = responseAsMap(e.getResponse()); - assertMap(map, responseMatcher); - return; - } - } - fail("giving up circuit breaking after " + attempt + " attempts"); - } - private void assertParseFailure(ThrowingRunnable r) throws IOException { ResponseException e = expectThrows(ResponseException.class, r); Map map = responseAsMap(e.getResponse()); @@ -555,60 +491,6 @@ private Map manyEval(int evalLines) throws IOException { return responseAsMap(query(query.toString(), null)); } - private Response query(String query, String filterPath) throws IOException { - Request request = new Request("POST", "/_query"); - request.addParameter("error_trace", ""); - if (filterPath != null) { - request.addParameter("filter_path", filterPath); - } - request.setJsonEntity(query.replace("\n", "\\n")); - request.setOptions( - RequestOptions.DEFAULT.toBuilder() - .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(6).millis())).build()) - .setWarningsHandler(WarningsHandler.PERMISSIVE) - ); - logger.info("Running query:" + query); - return runQuery(() -> client().performRequest(request)); - } - - private Response runQuery(CheckedSupplier run) throws IOException { - logger.info("--> test {} started querying", getTestName()); - final ThreadPool testThreadPool = new TestThreadPool(getTestName()); - final long startedTimeInNanos = System.nanoTime(); - Scheduler.Cancellable schedule = null; - try { - schedule = testThreadPool.schedule(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - throw new AssertionError(e); - } - - @Override - protected void doRun() throws Exception { - SUITE_ABORTED = true; - TimeValue elapsed = TimeValue.timeValueNanos(System.nanoTime() - startedTimeInNanos); - logger.info("--> test {} triggering OOM after {}", getTestName(), elapsed); - Request triggerOOM = new Request("POST", "/_trigger_out_of_memory"); - client().performRequest(triggerOOM); - } - }, TimeValue.timeValueMinutes(5), testThreadPool.executor(ThreadPool.Names.GENERIC)); - Response resp = run.get(); - logger.info("--> test {} completed querying", getTestName()); - return resp; - } finally { - if (schedule != null) { - schedule.cancel(); - } - terminate(testThreadPool); - } - } - - @Override - protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { - settings = Settings.builder().put(settings).put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "6m").build(); - return super.buildClient(settings, hosts); - } - public void testFetchManyBigFields() throws IOException { initManyBigFieldsIndex(100, "keyword"); Map response = fetchManyBigFields(100); @@ -734,197 +616,6 @@ private Map fetchMvLongs() throws IOException { return responseAsMap(query(query.toString(), "columns")); } - public void testLookupExplosion() throws IOException { - int sensorDataCount = 400; - int lookupEntries = 10000; - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, false); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionManyFields() throws IOException { - int sensorDataCount = 400; - int lookupEntries = 1000; - int joinFieldsCount = 990; - Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, false); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionExpression() throws IOException { - int sensorDataCount = 400; - int lookupEntries = 10000; - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, true); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionManyFieldsExpression() throws IOException { - int sensorDataCount = 400; - int lookupEntries = 1000; - int joinFieldsCount = 399;// only join on 399 columns due to max expression size of 400 - Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, true); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionManyMatchesManyFields() throws IOException { - // 1500, 10000 is enough locally, but some CI machines need more. - int lookupEntries = 10000; - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries, false)); - } - - public void testLookupExplosionManyMatches() throws IOException { - // 1500, 10000 is enough locally, but some CI machines need more. - int lookupEntries = 10000; - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, false)); - } - - public void testLookupExplosionManyMatchesExpression() throws IOException { - int lookupEntries = 10000; - assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, true)); - } - - public void testLookupExplosionManyMatchesFiltered() throws IOException { - // This test will only work with the expanding join optimization - // that pushes the filter to the right side of the lookup. - // Without the optimization, it will fail with circuit_breaking_exception - int sensorDataCount = 10000; - int lookupEntries = 10000; - int reductionFactor = 1000; // reduce the number of matches by this factor - // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value - assertTrue(0 == lookupEntries % reductionFactor); - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, false); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); - } - - public void testLookupExplosionManyMatchesFilteredExpression() throws IOException { - // This test will only work with the expanding join optimization - // that pushes the filter to the right side of the lookup. - // Without the optimization, it will fail with circuit_breaking_exception - int sensorDataCount = 10000; - int lookupEntries = 10000; - int reductionFactor = 1000; // reduce the number of matches by this factor - // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value - assertTrue(0 == lookupEntries % reductionFactor); - Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, true); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); - } - - public void testLookupExplosionNoFetch() throws IOException { - int sensorDataCount = 6000; - int lookupEntries = 10000; - Map map = lookupExplosionNoFetch(sensorDataCount, lookupEntries); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionNoFetchManyMatches() throws IOException { - // 8500 is plenty on most nodes - assertCircuitBreaks(attempt -> lookupExplosionNoFetch(attempt * 8500, 10000)); - } - - public void testLookupExplosionBigString() throws IOException { - int sensorDataCount = 500; - int lookupEntries = 1; - Map map = lookupExplosionBigString(sensorDataCount, lookupEntries); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); - } - - public void testLookupExplosionBigStringManyMatches() throws IOException { - // 500, 1 is enough with a single node, but the serverless copy of this test uses many nodes. - // So something like 5000, 10 is much more of a sure thing there. - assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 5000, 10)); - } - - private Map lookupExplosion( - int sensorDataCount, - int lookupEntries, - int joinFieldsCount, - int lookupEntriesToKeep, - boolean expressionBasedJoin - ) throws IOException { - try { - lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount, expressionBasedJoin); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON "); - if (expressionBasedJoin) { - for (int i = 0; i < joinFieldsCount; i++) { - if (i != 0) { - query.append(" AND "); - } - query.append("id_left").append(i); - query.append("=="); - query.append("id_right").append(i); - } - } else { - for (int i = 0; i < joinFieldsCount; i++) { - if (i != 0) { - query.append(","); - } - query.append("id").append(i); - } - } - if (lookupEntries != lookupEntriesToKeep) { - boolean applyAsExpressionJoinFilter = expressionBasedJoin && randomBoolean(); - // we randomly add the filter after the join or as part of the join - // in both cases we should have the same amount of results - if (applyAsExpressionJoinFilter == false) { - // add a filter after the join to reduce the number of matches - // we add both a Lucene pushable filter and a non-pushable filter - // this is to make sure that even if there are non-pushable filters the pushable filters is still applied - query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep); - } else { - // apply the filter as part of the join - // then we filter out the rows that do not match the filter after - // so the number of rows is the same as in the field based join case - // and can get the same number of rows for verification purposes - query.append(" AND filter_key < ").append(lookupEntriesToKeep); - query.append(" | WHERE filter_key IS NOT NULL "); - } - } - query.append(" | STATS COUNT(location) | LIMIT 100\"}"); - return responseAsMap(query(query.toString(), null)); - } finally { - deleteIndex("sensor_data"); - deleteIndex("sensor_lookup"); - } - } - - private Map lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException { - try { - lookupExplosionData(sensorDataCount, lookupEntries, 1, false); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}"); - return responseAsMap(query(query.toString(), null)); - } finally { - deleteIndex("sensor_data"); - deleteIndex("sensor_lookup"); - } - } - - private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount, boolean expressionBasedJoin) - throws IOException { - initSensorData(sensorDataCount, 1, joinFieldCount, expressionBasedJoin); - initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount, expressionBasedJoin); - } - - private Map lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException { - try { - initSensorData(sensorDataCount, 1, 1, false); - initSensorLookupString(lookupEntries, 1, i -> { - int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); - StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes())); - while (str.length() < target) { - str.append("Lorem ipsum dolor sit amet, consectetur adipiscing elit."); - } - logger.info("big string is {} characters", str.length()); - return str.toString(); - }); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}"); - return responseAsMap(query(query.toString(), null)); - } finally { - deleteIndex("sensor_data"); - deleteIndex("sensor_lookup"); - } - } - public void testEnrichExplosion() throws IOException { int sensorDataCount = 1000; int lookupEntries = 100; @@ -1113,127 +804,6 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx initIndex("mv_longs", bulk.toString()); } - private void initSensorData(int docCount, int sensorCount, int joinFieldCount, boolean expressionBasedJoin) throws IOException { - logger.info("loading sensor data"); - // We cannot go over 1000 fields, due to failed on parsing mappings on index creation - // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded - assertTrue("Too many columns, it will throw an exception later", joinFieldCount <= 990); - StringBuilder createIndexBuilder = new StringBuilder(); - createIndexBuilder.append(""" - { - "properties": { - "@timestamp": { "type": "date" }, - """); - String suffix = expressionBasedJoin ? "_left" : ""; - for (int i = 0; i < joinFieldCount; i++) { - createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },"); - } - createIndexBuilder.append(""" - "value": { "type": "double" } - } - }"""); - CreateIndexResponse response = createIndex( - "sensor_data", - Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), - createIndexBuilder.toString() - ); - assertTrue(response.isAcknowledged()); - int docsPerBulk = 1000; - long firstDate = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2025-01-01T00:00:00Z"); - - StringBuilder data = new StringBuilder(); - for (int i = 0; i < docCount; i++) { - data.append(String.format(Locale.ROOT, """ - {"create":{}} - {"timestamp":"%s",""", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(i * 10L + firstDate))); - for (int j = 0; j < joinFieldCount; j++) { - data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, i % sensorCount)); - } - data.append(String.format(Locale.ROOT, "\"value\": %f}\n", i * 1.1)); - if (i % docsPerBulk == docsPerBulk - 1) { - bulk("sensor_data", data.toString()); - data.setLength(0); - } - } - initIndex("sensor_data", data.toString()); - } - - private void initSensorLookup( - int lookupEntries, - int sensorCount, - IntFunction location, - int joinFieldsCount, - boolean expressionBasedJoin - ) throws IOException { - logger.info("loading sensor lookup"); - // cannot go over 1000 fields, due to failed on parsing mappings on index creation - // [sensor_data] java.lang.IllegalArgumentException: Limit of total fields [1000] has been exceeded - assertTrue("Too many join on fields, it will throw an exception later", joinFieldsCount <= 990); - StringBuilder createIndexBuilder = new StringBuilder(); - createIndexBuilder.append(""" - { - "properties": { - """); - String suffix = expressionBasedJoin ? "_right" : ""; - for (int i = 0; i < joinFieldsCount; i++) { - createIndexBuilder.append("\"id").append(suffix).append(i).append("\": { \"type\": \"long\" },"); - } - createIndexBuilder.append(""" - "location": { "type": "geo_point" }, - "filter_key": { "type": "integer" } - } - }"""); - CreateIndexResponse response = createIndex( - "sensor_lookup", - Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), - createIndexBuilder.toString() - ); - assertTrue(response.isAcknowledged()); - int docsPerBulk = 1000; - StringBuilder data = new StringBuilder(); - for (int i = 0; i < lookupEntries; i++) { - int sensor = i % sensorCount; - data.append(String.format(Locale.ROOT, """ - {"create":{}} - {""")); - for (int j = 0; j < joinFieldsCount; j++) { - data.append(String.format(Locale.ROOT, "\"id%s%d\":%d, ", suffix, j, sensor)); - } - data.append(String.format(Locale.ROOT, """ - "location": "POINT(%s)", "filter_key": %d}\n""", location.apply(sensor), i)); - if (i % docsPerBulk == docsPerBulk - 1) { - bulk("sensor_lookup", data.toString()); - data.setLength(0); - } - } - initIndex("sensor_lookup", data.toString()); - } - - private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunction string) throws IOException { - logger.info("loading sensor lookup with huge strings"); - createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """ - { - "properties": { - "id0": { "type": "long" }, - "string": { "type": "text" } - } - }"""); - int docsPerBulk = 10; - StringBuilder data = new StringBuilder(); - for (int i = 0; i < lookupEntries; i++) { - int sensor = i % sensorCount; - data.append(String.format(Locale.ROOT, """ - {"create":{}} - {"id0": %d, "string": "%s"} - """, sensor, string.apply(sensor))); - if (i % docsPerBulk == docsPerBulk - 1) { - bulk("sensor_lookup", data.toString()); - data.setLength(0); - } - } - initIndex("sensor_lookup", data.toString()); - } - private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction location) throws IOException { initSensorLookup(lookupEntries, sensorCount, location, 1, false); logger.info("loading sensor enrich"); @@ -1253,81 +823,4 @@ private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction) ((Map) getIndexSettings(name).get(name)).get("settings"); - if (settings.containsKey(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey()) == false) { - updateIndexSettings(name, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)); - } - } - if (hasText(bulk)) { - bulk(name, bulk); - } - Request request = new Request("POST", "/" + name + "/_forcemerge"); - request.addParameter("max_num_segments", "1"); - RequestOptions.Builder requestOptions = RequestOptions.DEFAULT.toBuilder() - .setRequestConfig(RequestConfig.custom().setSocketTimeout(Math.toIntExact(TimeValue.timeValueMinutes(5).millis())).build()); - request.setOptions(requestOptions); - Response response = client().performRequest(request); - assertWriteResponse(response); - - request = new Request("POST", "/" + name + "/_refresh"); - response = client().performRequest(request); - request.setOptions(requestOptions); - assertWriteResponse(response); - } - - @SuppressWarnings("unchecked") - private static void assertWriteResponse(Response response) throws IOException { - Map shards = (Map) entityAsMap(response).get("_shards"); - assertThat((int) shards.get("successful"), greaterThanOrEqualTo(1)); - assertThat(shards.get("failed"), equalTo(0)); - } - - @Before - @After - public void assertRequestBreakerEmpty() throws Exception { - if (SUITE_ABORTED) { - return; - } - assertBusy(() -> { - Response response = adminClient().performRequest(new Request("GET", "/_nodes/stats")); - Map stats = responseAsMap(response); - Map nodes = (Map) stats.get("nodes"); - for (Object n : nodes.values()) { - Map node = (Map) n; - Map breakers = (Map) node.get("breakers"); - Map request = (Map) breakers.get("request"); - assertMap(request, matchesMap().extraOk().entry("estimated_size_in_bytes", 0).entry("estimated_size", "0b")); - } - }); - } - - private static StringBuilder startQuery() { - StringBuilder query = new StringBuilder(); - query.append("{\"query\":\""); - return query; - } } diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java new file mode 100644 index 0000000000000..3ae734644ed45 --- /dev/null +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java @@ -0,0 +1,250 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.heap_attack; + +import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; + +import org.apache.lucene.tests.util.TimeUnits; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.function.IntFunction; + +import static org.elasticsearch.test.MapMatcher.assertMap; +import static org.elasticsearch.test.MapMatcher.matchesMap; + +/** + * Tests that run ESQL queries that use a ton of memory. We want to make + * sure they don't consume the entire heap and crash Elasticsearch. + */ +@TimeoutSuite(millis = 20 * TimeUnits.MINUTE) +public class HeapAttackLookupJoinIT extends HeapAttackBaseIT { + + public void testLookupExplosion() throws IOException { + int sensorDataCount = 400; + int lookupEntries = 10000; + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, false); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionManyFields() throws IOException { + int sensorDataCount = 400; + int lookupEntries = 1000; + int joinFieldsCount = 990; + Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, false); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionExpression() throws IOException { + int sensorDataCount = 400; + int lookupEntries = 10000; + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries, true); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionManyFieldsExpression() throws IOException { + int sensorDataCount = 400; + int lookupEntries = 1000; + int joinFieldsCount = 399;// only join on 399 columns due to max expression size of 400 + Map map = lookupExplosion(sensorDataCount, lookupEntries, joinFieldsCount, lookupEntries, true); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionManyMatchesManyFields() throws IOException { + // 1500, 10000 is enough locally, but some CI machines need more. + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 30, lookupEntries, false)); + } + + public void testLookupExplosionManyMatches() throws IOException { + // 1500, 10000 is enough locally, but some CI machines need more. + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, false)); + } + + public void testLookupExplosionManyMatchesExpression() throws IOException { + int lookupEntries = 10000; + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, lookupEntries, 1, lookupEntries, true)); + } + + public void testLookupExplosionManyMatchesFiltered() throws IOException { + // This test will only work with the expanding join optimization + // that pushes the filter to the right side of the lookup. + // Without the optimization, it will fail with circuit_breaking_exception + int sensorDataCount = 10000; + int lookupEntries = 10000; + int reductionFactor = 1000; // reduce the number of matches by this factor + // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value + assertTrue(0 == lookupEntries % reductionFactor); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, false); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); + } + + public void testLookupExplosionManyMatchesFilteredExpression() throws IOException { + // This test will only work with the expanding join optimization + // that pushes the filter to the right side of the lookup. + // Without the optimization, it will fail with circuit_breaking_exception + int sensorDataCount = 10000; + int lookupEntries = 10000; + int reductionFactor = 1000; // reduce the number of matches by this factor + // lookupEntries % reductionFactor must be 0 to ensure the number of rows returned matches the expected value + assertTrue(0 == lookupEntries % reductionFactor); + Map map = lookupExplosion(sensorDataCount, lookupEntries, 1, lookupEntries / reductionFactor, true); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries / reductionFactor)))); + } + + public void testLookupExplosionNoFetch() throws IOException { + int sensorDataCount = 6000; + int lookupEntries = 10000; + Map map = lookupExplosionNoFetch(sensorDataCount, lookupEntries); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionNoFetchManyMatches() throws IOException { + // 8500 is plenty on most nodes + assertCircuitBreaks(attempt -> lookupExplosionNoFetch(attempt * 8500, 10000)); + } + + public void testLookupExplosionBigString() throws IOException { + int sensorDataCount = 500; + int lookupEntries = 1; + Map map = lookupExplosionBigString(sensorDataCount, lookupEntries); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); + } + + public void testLookupExplosionBigStringManyMatches() throws IOException { + // 500, 1 is enough with a single node, but the serverless copy of this test uses many nodes. + // So something like 5000, 10 is much more of a sure thing there. + assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 5000, 10)); + } + + private Map lookupExplosion( + int sensorDataCount, + int lookupEntries, + int joinFieldsCount, + int lookupEntriesToKeep, + boolean expressionBasedJoin + ) throws IOException { + try { + lookupExplosionData(sensorDataCount, lookupEntries, joinFieldsCount, expressionBasedJoin); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON "); + if (expressionBasedJoin) { + for (int i = 0; i < joinFieldsCount; i++) { + if (i != 0) { + query.append(" AND "); + } + query.append("id_left").append(i); + query.append("=="); + query.append("id_right").append(i); + } + } else { + for (int i = 0; i < joinFieldsCount; i++) { + if (i != 0) { + query.append(","); + } + query.append("id").append(i); + } + } + if (lookupEntries != lookupEntriesToKeep) { + boolean applyAsExpressionJoinFilter = expressionBasedJoin && randomBoolean(); + // we randomly add the filter after the join or as part of the join + // in both cases we should have the same amount of results + if (applyAsExpressionJoinFilter == false) { + // add a filter after the join to reduce the number of matches + // we add both a Lucene pushable filter and a non-pushable filter + // this is to make sure that even if there are non-pushable filters the pushable filters is still applied + query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep); + } else { + // apply the filter as part of the join + // then we filter out the rows that do not match the filter after + // so the number of rows is the same as in the field based join case + // and can get the same number of rows for verification purposes + query.append(" AND filter_key < ").append(lookupEntriesToKeep); + query.append(" | WHERE filter_key IS NOT NULL "); + } + } + query.append(" | STATS COUNT(location) | LIMIT 100\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } + } + + private Map lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException { + try { + lookupExplosionData(sensorDataCount, lookupEntries, 1, false); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(*)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } + } + + private void lookupExplosionData(int sensorDataCount, int lookupEntries, int joinFieldCount, boolean expressionBasedJoin) + throws IOException { + initSensorData(sensorDataCount, 1, joinFieldCount, expressionBasedJoin); + initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484", joinFieldCount, expressionBasedJoin); + } + + private Map lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException { + try { + initSensorData(sensorDataCount, 1, 1, false); + initSensorLookupString(lookupEntries, 1, i -> { + int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); + StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes())); + while (str.length() < target) { + str.append("Lorem ipsum dolor sit amet, consectetur adipiscing elit."); + } + logger.info("big string is {} characters", str.length()); + return str.toString(); + }); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id0 | STATS COUNT(string)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } + } + + private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunction string) throws IOException { + logger.info("loading sensor lookup with huge strings"); + createIndex("sensor_lookup", Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP.getName()).build(), """ + { + "properties": { + "id0": { "type": "long" }, + "string": { "type": "text" } + } + }"""); + int docsPerBulk = 10; + StringBuilder data = new StringBuilder(); + for (int i = 0; i < lookupEntries; i++) { + int sensor = i % sensorCount; + data.append(String.format(Locale.ROOT, """ + {"create":{}} + {"id0": %d, "string": "%s"} + """, sensor, string.apply(sensor))); + if (i % docsPerBulk == docsPerBulk - 1) { + bulk("sensor_lookup", data.toString()); + data.setLength(0); + } + } + initIndex("sensor_lookup", data.toString()); + } + +} From 8f71300110e45c25bebe2a523aaf1d13fbcf1257 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 11:47:05 -0500 Subject: [PATCH 2/6] Change comment --- .../xpack/esql/heap_attack/HeapAttackLookupJoinIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java index 3ae734644ed45..9a379126e11c0 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java @@ -25,7 +25,7 @@ import static org.elasticsearch.test.MapMatcher.matchesMap; /** - * Tests that run ESQL queries that use a ton of memory. We want to make + * Tests that run ESQL lookup join queries that use a ton of memory. We want to make * sure they don't consume the entire heap and crash Elasticsearch. */ @TimeoutSuite(millis = 20 * TimeUnits.MINUTE) From 971ba11bdce0931802c323e8633c91d66e6ac8df Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 14:48:49 -0500 Subject: [PATCH 3/6] Code review comments and bugfix --- .../elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java | 2 +- .../xpack/esql/heap_attack/HeapAttackLookupJoinIT.java | 4 ++-- .../{HeapAttackBaseIT.java => HeapAttackTestCase.java} | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/{HeapAttackBaseIT.java => HeapAttackTestCase.java} (99%) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 446f7d6505c8b..32f02ef92798b 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -50,7 +50,7 @@ * sure they don't consume the entire heap and crash Elasticsearch. */ @TimeoutSuite(millis = 40 * TimeUnits.MINUTE) -public class HeapAttackIT extends HeapAttackBaseIT { +public class HeapAttackIT extends HeapAttackTestCase { /** * This used to fail, but we've since compacted top n so it actually succeeds now. diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java index 9a379126e11c0..f8cff809e1f74 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java @@ -29,7 +29,7 @@ * sure they don't consume the entire heap and crash Elasticsearch. */ @TimeoutSuite(millis = 20 * TimeUnits.MINUTE) -public class HeapAttackLookupJoinIT extends HeapAttackBaseIT { +public class HeapAttackLookupJoinIT extends HeapAttackTestCase { public void testLookupExplosion() throws IOException { int sensorDataCount = 400; @@ -117,7 +117,7 @@ public void testLookupExplosionNoFetchManyMatches() throws IOException { } public void testLookupExplosionBigString() throws IOException { - int sensorDataCount = 500; + int sensorDataCount = 300; int lookupEntries = 1; Map map = lookupExplosionBigString(sensorDataCount, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries)))); diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackTestCase.java similarity index 99% rename from test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java rename to test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackTestCase.java index 88fd650e70c50..821042b672fa6 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackBaseIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackTestCase.java @@ -49,7 +49,7 @@ /** * Base class for heap attack tests. Contains common infrastructure and helper methods. */ -public abstract class HeapAttackBaseIT extends ESRestTestCase { +public abstract class HeapAttackTestCase extends ESRestTestCase { @ClassRule public static ElasticsearchCluster cluster = Clusters.buildCluster(); From e5e7d3b4acc331823039acae13c71c2815647983 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 14:54:28 -0500 Subject: [PATCH 4/6] Move enrich related test cases to HeapAttackLookupJoinIT too --- .../xpack/esql/heap_attack/HeapAttackIT.java | 50 ----------------- .../heap_attack/HeapAttackLookupJoinIT.java | 56 ++++++++++++++++++- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index 32f02ef92798b..59c803c29e535 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.function.IntFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -616,36 +615,6 @@ private Map fetchMvLongs() throws IOException { return responseAsMap(query(query.toString(), "columns")); } - public void testEnrichExplosion() throws IOException { - int sensorDataCount = 1000; - int lookupEntries = 100; - Map map = enrichExplosion(sensorDataCount, lookupEntries); - assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount)))); - } - - public void testEnrichExplosionManyMatches() throws IOException { - // 1000, 10000 is enough on most nodes - assertCircuitBreaks(attempt -> enrichExplosion(1000, attempt * 5000)); - } - - private Map enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException { - try { - initSensorData(sensorDataCount, 1, 1, false); - initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484"); - try { - StringBuilder query = startQuery(); - query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}"); - return responseAsMap(query(query.toString(), null)); - } finally { - Request delete = new Request("DELETE", "/_enrich/policy/sensor"); - assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true)); - } - } finally { - deleteIndex("sensor_data"); - deleteIndex("sensor_lookup"); - } - } - private void initManyLongs(int countPerLong) throws IOException { logger.info("loading many documents with longs"); StringBuilder bulk = new StringBuilder(); @@ -804,23 +773,4 @@ private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOEx initIndex("mv_longs", bulk.toString()); } - private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction location) throws IOException { - initSensorLookup(lookupEntries, sensorCount, location, 1, false); - logger.info("loading sensor enrich"); - - Request create = new Request("PUT", "/_enrich/policy/sensor"); - create.setJsonEntity(""" - { - "match": { - "indices": "sensor_lookup", - "match_field": "id0", - "enrich_fields": ["location"] - } - } - """); - assertMap(responseAsMap(client().performRequest(create)), matchesMap().entry("acknowledged", true)); - Request execute = new Request("POST", "/_enrich/policy/sensor/_execute"); - assertMap(responseAsMap(client().performRequest(execute)), matchesMap().entry("status", Map.of("phase", "COMPLETE"))); - } - } diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java index f8cff809e1f74..b20b5aa3a8a90 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; import org.apache.lucene.tests.util.TimeUnits; +import org.elasticsearch.client.Request; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.IndexMode; @@ -25,10 +26,10 @@ import static org.elasticsearch.test.MapMatcher.matchesMap; /** - * Tests that run ESQL lookup join queries that use a ton of memory. We want to make - * sure they don't consume the entire heap and crash Elasticsearch. + * Tests that run ESQL lookup join and enrich queries that use a ton of memory. + * We want to make sure they don't consume the entire heap and crash Elasticsearch. */ -@TimeoutSuite(millis = 20 * TimeUnits.MINUTE) +@TimeoutSuite(millis = 40 * TimeUnits.MINUTE) public class HeapAttackLookupJoinIT extends HeapAttackTestCase { public void testLookupExplosion() throws IOException { @@ -247,4 +248,53 @@ private void initSensorLookupString(int lookupEntries, int sensorCount, IntFunct initIndex("sensor_lookup", data.toString()); } + public void testEnrichExplosion() throws IOException { + int sensorDataCount = 1000; + int lookupEntries = 100; + Map map = enrichExplosion(sensorDataCount, lookupEntries); + assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount)))); + } + + public void testEnrichExplosionManyMatches() throws IOException { + // 1000, 10000 is enough on most nodes + assertCircuitBreaks(attempt -> enrichExplosion(1000, attempt * 5000)); + } + + private Map enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException { + try { + initSensorData(sensorDataCount, 1, 1, false); + initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484"); + try { + StringBuilder query = startQuery(); + query.append("FROM sensor_data | ENRICH sensor ON id0 | STATS COUNT(*)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + Request delete = new Request("DELETE", "/_enrich/policy/sensor"); + assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true)); + } + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } + } + + private void initSensorEnrich(int lookupEntries, int sensorCount, IntFunction location) throws IOException { + initSensorLookup(lookupEntries, sensorCount, location, 1, false); + logger.info("loading sensor enrich"); + + Request create = new Request("PUT", "/_enrich/policy/sensor"); + create.setJsonEntity(""" + { + "match": { + "indices": "sensor_lookup", + "match_field": "id0", + "enrich_fields": ["location"] + } + } + """); + assertMap(responseAsMap(client().performRequest(create)), matchesMap().entry("acknowledged", true)); + Request execute = new Request("POST", "/_enrich/policy/sensor/_execute"); + assertMap(responseAsMap(client().performRequest(execute)), matchesMap().entry("status", Map.of("phase", "COMPLETE"))); + } + } From 84219cfb24cfbba7e2e951ea79b7746916ee35a9 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 15:47:08 -0500 Subject: [PATCH 5/6] Unmute another test --- muted-tests.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index fc1dd0d09046b..3822cade700dd 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -300,9 +300,6 @@ tests: - class: org.elasticsearch.reservedstate.service.RepositoriesFileSettingsIT method: testSettingsApplied issue: https://github.com/elastic/elasticsearch/issues/126748 -- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackIT - method: testLookupExplosionBigString - issue: https://github.com/elastic/elasticsearch/issues/135122 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test stop transform with force and wait_for_checkpoint true} issue: https://github.com/elastic/elasticsearch/issues/135135 From 4669961ca8d33e2e5f4710d782746406656de149 Mon Sep 17 00:00:00 2001 From: Julian Kiryakov Date: Fri, 21 Nov 2025 20:06:01 -0500 Subject: [PATCH 6/6] Adjust size to prevent test flaky test failures --- .../xpack/esql/heap_attack/HeapAttackLookupJoinIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java index b20b5aa3a8a90..debbbbdc99c53 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackLookupJoinIT.java @@ -118,7 +118,7 @@ public void testLookupExplosionNoFetchManyMatches() throws IOException { } public void testLookupExplosionBigString() throws IOException { - int sensorDataCount = 300; + int sensorDataCount = 200; int lookupEntries = 1; Map map = lookupExplosionBigString(sensorDataCount, lookupEntries); assertMap(map, matchesMap().extraOk().entry("values", List.of(List.of(sensorDataCount * lookupEntries))));