Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,15 @@
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.Before;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand All @@ -40,8 +29,6 @@
*/
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {

private static final Logger LOGGER = LogManager.getLogger(AbstractPausableIntegTestCase.class);

protected static final Semaphore scriptPermits = new Semaphore(0);

protected int pageSize = -1;
Expand Down Expand Up @@ -108,53 +95,10 @@ public void setupIndex() throws IOException {
}
}

public static class PausableFieldPlugin extends Plugin implements ScriptPlugin {

public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
try {
assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES));
} catch (Exception e) {
throw new AssertionError(e);
}
LOGGER.debug("--> emitting value");
emit(1);
}
};
}
};
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
protected boolean onWait() throws InterruptedException {
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;

import java.util.Collection;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertTrue;

/**
* A plugin that provides a script language "pause" that can be used to simulate slow running queries.
* See also {@link AbstractPausableIntegTestCase}.
*/
public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin {

// Called when the engine enters the execute() method.
protected void onStartExecute() {}

// Called when the engine needs to wait for further execution to be allowed.
protected abstract boolean onWait() throws InterruptedException;

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
if (context == LongFieldScript.CONTEXT) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
onStartExecute();
try {
assertTrue(onWait());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
emit(1);
}
};
}
};
}
throw new IllegalStateException("unsupported type " + context);
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,8 @@
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.OnScriptError;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.transport.RemoteClusterAware;
Expand All @@ -44,7 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -80,7 +73,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
plugins.add(InternalExchangePlugin.class);
plugins.add(PauseFieldPlugin.class);
plugins.add(SimplePauseFieldPlugin.class);
return plugins;
}

Expand All @@ -99,64 +92,7 @@ public List<Setting<?>> getSettings() {

@Before
public void resetPlugin() {
PauseFieldPlugin.allowEmitting = new CountDownLatch(1);
PauseFieldPlugin.startEmitting = new CountDownLatch(1);
}

public static class PauseFieldPlugin extends Plugin implements ScriptPlugin {
public static CountDownLatch startEmitting = new CountDownLatch(1);
public static CountDownLatch allowEmitting = new CountDownLatch(1);

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override

public String getType() {
return "pause";
}

@Override
@SuppressWarnings("unchecked")
public <FactoryType> FactoryType compile(
String name,
String code,
ScriptContext<FactoryType> context,
Map<String, String> params
) {
if (context == LongFieldScript.CONTEXT) {
return (FactoryType) new LongFieldScript.Factory() {
@Override
public LongFieldScript.LeafFactory newFactory(
String fieldName,
Map<String, Object> params,
SearchLookup searchLookup,
OnScriptError onScriptError
) {
return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
@Override
public void execute() {
startEmitting.countDown();
try {
assertTrue(allowEmitting.await(30, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new AssertionError(e);
}
emit(1);
}
};
}
};
}
throw new IllegalStateException("unsupported type " + context);
}

@Override
public Set<ScriptContext<?>> getSupportedContexts() {
return Set.of(LongFieldScript.CONTEXT);
}
};
}
SimplePauseFieldPlugin.resetPlugin();
}

/**
Expand Down Expand Up @@ -184,7 +120,7 @@ public void testSuccessfulPathways() throws Exception {
}

// wait until we know that the query against 'remote-b:blocking' has started
PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);
SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS);

// wait until the query of 'cluster-a:logs-*' has finished (it is not blocked since we are not searching the 'blocking' index on it)
assertBusy(() -> {
Expand Down Expand Up @@ -234,7 +170,7 @@ public void testSuccessfulPathways() throws Exception {
}

// allow remoteB query to proceed
PauseFieldPlugin.allowEmitting.countDown();
SimplePauseFieldPlugin.allowEmitting.countDown();

// wait until both remoteB and local queries have finished
assertBusy(() -> {
Expand Down
Loading