Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreach processor - fork recursive call #50514

Merged
merged 9 commits into from
Jan 8, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;

import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -50,16 +50,19 @@
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {

public static final String TYPE = "foreach";
static final int MAX_RECURSE_PER_THREAD = 10;

private final String field;
private final Processor processor;
private final boolean ignoreMissing;
private final Consumer<Runnable> genericExecutor;

ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
this.genericExecutor = genericExecutor;
}

boolean isIgnoreMissing() {
Expand Down Expand Up @@ -91,6 +94,7 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum

Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
final Thread thread = Thread.currentThread();
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
Expand All @@ -99,7 +103,15 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
innerExecute(index + 1, values, newValues, document, handler);
if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
// we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
// only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
jakelandis marked this conversation as resolved.
Show resolved Hide resolved
// or we have recursed less then 10 times with the same thread, it's safe to recurse
innerExecute(index + 1, values, newValues, document, handler);
}
}
});
}
Expand All @@ -125,9 +137,11 @@ public Processor getInnerProcessor() {
public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;
private final Consumer<Runnable> genericExecutor;

Factory(ScriptService scriptService) {
Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
this.scriptService = scriptService;
this.genericExecutor = genericExecutor;
}

@Override
Expand All @@ -143,7 +157,7 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()),
entry(GsubProcessor.TYPE, new GsubProcessor.Factory()),
entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)),
entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)),
entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)),
entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class ForEachProcessorFactoryTests extends ESTestCase {

private final ScriptService scriptService = mock(ScriptService.class);
private final Consumer<Runnable> genericExecutor = Runnable::run;

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -57,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -75,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -88,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -101,15 +103,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down
Loading