Skip to content

Commit

Permalink
[7.x] Foreach processor - fork recursive call (#50514) (#50773)
Browse files Browse the repository at this point in the history
A very large number of recursive calls can cause a stack overflow
exception. This commit forks the recursive calls for non-async
processors. Once forked, each thread will handle at most 10
recursive calls to help keep the stack size and thread count
down to a reasonable size.
  • Loading branch information
jakelandis committed Jan 9, 2020
1 parent 1d8e51f commit de6f132
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;

import org.elasticsearch.ingest.WrappingProcessor;
import org.elasticsearch.script.ScriptService;
import java.util.function.Consumer;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -50,16 +50,19 @@
public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor {

public static final String TYPE = "foreach";
static final int MAX_RECURSE_PER_THREAD = 10;

private final String field;
private final Processor processor;
private final boolean ignoreMissing;
private final Consumer<Runnable> genericExecutor;

ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) {
ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer<Runnable> genericExecutor) {
super(tag);
this.field = field;
this.processor = processor;
this.ignoreMissing = ignoreMissing;
this.genericExecutor = genericExecutor;
}

boolean isIgnoreMissing() {
Expand Down Expand Up @@ -91,6 +94,7 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum

Object value = values.get(index);
Object previousValue = document.getIngestMetadata().put("_value", value);
final Thread thread = Thread.currentThread();
processor.execute(document, (result, e) -> {
if (e != null) {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
Expand All @@ -99,7 +103,15 @@ void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocum
handler.accept(null, null);
} else {
newValues.add(document.getIngestMetadata().put("_value", previousValue));
innerExecute(index + 1, values, newValues, document, handler);
if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) {
// we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread
// only fork after 10 recursive calls, then fork every 10 to keep the number of threads down
genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler));
} else {
// we are on a different thread (we went asynchronous), it's safe to recurse
// or we have recursed less then 10 times with the same thread, it's safe to recurse
innerExecute(index + 1, values, newValues, document, handler);
}
}
});
}
Expand All @@ -125,9 +137,11 @@ public Processor getInnerProcessor() {
public static final class Factory implements Processor.Factory {

private final ScriptService scriptService;
private final Consumer<Runnable> genericExecutor;

Factory(ScriptService scriptService) {
Factory(ScriptService scriptService, Consumer<Runnable> genericExecutor) {
this.scriptService = scriptService;
this.genericExecutor = genericExecutor;
}

@Override
Expand All @@ -143,7 +157,7 @@ public ForEachProcessor create(Map<String, Processor.Factory> factories, String
Map.Entry<String, Map<String, Object>> entry = entries.iterator().next();
Processor processor =
ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue());
return new ForEachProcessor(tag, field, processor, ignoreMissing);
return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ConvertProcessor.TYPE, new ConvertProcessor.Factory());
processors.put(GsubProcessor.TYPE, new GsubProcessor.Factory());
processors.put(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService));
processors.put(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor));
processors.put(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService));
processors.put(SortProcessor.TYPE, new SortProcessor.Factory());
processors.put(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,21 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class ForEachProcessorFactoryTests extends ESTestCase {

private final ScriptService scriptService = mock(ScriptService.class);
private final Consumer<Runnable> genericExecutor = Runnable::run;

public void testCreate() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -57,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -75,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_first", (r, t, c) -> processor);
registry.put("_second", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Expand All @@ -88,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception {
}

public void testCreateWithNonExistingProcessorType() throws Exception {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("processor", Collections.singletonMap("_name", Collections.emptyMap()));
Expand All @@ -101,15 +103,15 @@ public void testCreateWithMissingField() throws Exception {
Processor processor = new TestProcessor(ingestDocument -> { });
Map<String, Processor.Factory> registry = new HashMap<>();
registry.put("_name", (r, t, c) -> processor);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap())));
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config));
assertThat(exception.getMessage(), equalTo("[field] required property is missing"));
}

public void testCreateWithMissingProcessor() {
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService);
ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor);
Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config));
Expand Down

0 comments on commit de6f132

Please sign in to comment.