Skip to content

Commit

Permalink
Track pages in ESQL enrich request/response (#102190)
Browse files Browse the repository at this point in the history
This pull request is to track pages in ESQL enrich lookup requests and 
responses. Since the input page of a lookup request can be used after
enrich, it cannot be released unless it's serialized to a different
node. This serves as a workaround until we have a reference count for
Blocks.
  • Loading branch information
dnhatn committed Nov 15, 2023
1 parent 842e563 commit d6bbc29
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 55 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102190.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102190
summary: Track pages in ESQL enrich request/response
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -29,6 +31,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning")
Expand Down Expand Up @@ -76,6 +79,24 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), EsqlPlugin.class);
}

protected void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
if (limit != null) {
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build()
)
);
} else {
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build()
)
);
}
}

protected EsqlQueryResponse run(String esqlCommands) {
return run(esqlCommands, randomPragmas());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@

import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -43,9 +49,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -61,17 +69,64 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
plugins.add(LocalStateEnrich.class);
plugins.add(IngestCommonPlugin.class);
plugins.add(ReindexPlugin.class);
plugins.add(InternalTransportSettingPlugin.class);
return plugins;
}

public static class InternalTransportSettingPlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(IGNORE_DESERIALIZATION_ERRORS_SETTING);
}
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "128mb")
/*
* Force standard settings for the request breaker or we may not break at all.
* Without this we can randomly decide to use the `noop` breaker for request
* and it won't break.....
*/
.put(
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(),
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getDefault(Settings.EMPTY)
)
.put(
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(),
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getDefault(Settings.EMPTY)
)
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000)))
// allow reading pages from network can trip the circuit breaker
.put(IGNORE_DESERIALIZATION_ERRORS_SETTING.getKey(), true)
.build();
}

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
final Client client;
if (randomBoolean()) {
client = client(randomFrom(clusterService().state().nodes().getCoordinatingOnlyNodes().values()).getName());
} else {
client = client();
}
if (randomBoolean()) {
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 4096)));
try {
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info("request failed", e);
ensureBlocksReleased();
} finally {
setRequestCircuitBreakerLimit(null);
}
}
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
}

@Before
public void setupEnrichPolicies() {
client().admin()
Expand Down Expand Up @@ -129,6 +184,13 @@ public void setupMainIndex() {
client().admin().indices().prepareRefresh("listens").get();
}

@Before
public void ensureAtLeastOneCoordinatingNodeOnly() {
if (clusterService().state().nodes().getCoordinatingOnlyNodes().isEmpty()) {
internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
}
}

record Listen(long timestamp, String songId, double duration) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -74,24 +73,6 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.build();
}

private void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
if (limit != null) {
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), limit).build()
)
);
} else {
assertAcked(
clusterAdmin().prepareUpdateSettings()
.setPersistentSettings(
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey()).build()
)
);
}
}

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048)));
Expand Down

0 comments on commit d6bbc29

Please sign in to comment.