From bfd044e85f2cd52ca17c973c0504143dd802e5b1 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Fri, 27 Dec 2019 15:59:59 -0600 Subject: [PATCH 1/7] use iterative with semaphore for the for each processor --- .../ingest/common/ForEachProcessor.java | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 681b167c828e1..1eb5cd4f04e22 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.elasticsearch.ingest.WrappingProcessor; @@ -76,32 +78,41 @@ public void execute(IngestDocument ingestDocument, BiConsumer newValues = new CopyOnWriteArrayList<>(); - innerExecute(0, values, newValues, ingestDocument, handler); - } - } - - void innerExecute(int index, List values, List newValues, IngestDocument document, - BiConsumer handler) { - if (index == values.size()) { + final List newValues = new CopyOnWriteArrayList<>(); + final IngestDocument document = ingestDocument; + final Semaphore oneByOneGate = new Semaphore(1); + final AtomicBoolean errorOrDrop = new AtomicBoolean(false); + for (Object value : values) { + if (errorOrDrop.get() == false) { + try { + oneByOneGate.acquire(); + Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); + processor.execute(document, (result, e) -> { + try { + if (e != null) { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + errorOrDrop.set(true); + handler.accept(null, e); + } else if (result == null) { + errorOrDrop.set(true); + handler.accept(null, null); + } else { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + } + } finally { + oneByOneGate.release(); + } + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } document.setFieldValue(field, new ArrayList<>(newValues)); - handler.accept(document, null); - return; - } - - Object value = values.get(index); - Object previousValue = document.getIngestMetadata().put("_value", value); - processor.execute(document, (result, e) -> { - if (e != null) { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - handler.accept(null, e); - } else if (result == null) { - handler.accept(null, null); - } else { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - innerExecute(index + 1, values, newValues, document, handler); + if (errorOrDrop.get() == false) { + handler.accept(document, null); } - }); + } } @Override From 433706c3eecd2d1658676d604ecdf0ac03eed114 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 2 Jan 2020 08:15:42 -0600 Subject: [PATCH 2/7] Revert "use iterative with semaphore for the for each processor" This reverts commit 56994bc75ba5ccb5b5a6b4b5972a4d1c22807eb5. --- .../ingest/common/ForEachProcessor.java | 59 ++++++++----------- 1 file changed, 24 insertions(+), 35 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 1eb5cd4f04e22..681b167c828e1 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.elasticsearch.ingest.WrappingProcessor; @@ -78,41 +76,32 @@ public void execute(IngestDocument ingestDocument, BiConsumer newValues = new CopyOnWriteArrayList<>(); - final IngestDocument document = ingestDocument; - final Semaphore oneByOneGate = new Semaphore(1); - final AtomicBoolean errorOrDrop = new AtomicBoolean(false); - for (Object value : values) { - if (errorOrDrop.get() == false) { - try { - oneByOneGate.acquire(); - Object previousValue = ingestDocument.getIngestMetadata().put("_value", value); - processor.execute(document, (result, e) -> { - try { - if (e != null) { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - errorOrDrop.set(true); - handler.accept(null, e); - } else if (result == null) { - errorOrDrop.set(true); - handler.accept(null, null); - } else { - newValues.add(document.getIngestMetadata().put("_value", previousValue)); - } - } finally { - oneByOneGate.release(); - } - }); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } + List newValues = new CopyOnWriteArrayList<>(); + innerExecute(0, values, newValues, ingestDocument, handler); + } + } + + void innerExecute(int index, List values, List newValues, IngestDocument document, + BiConsumer handler) { + if (index == values.size()) { document.setFieldValue(field, new ArrayList<>(newValues)); - if (errorOrDrop.get() == false) { - handler.accept(document, null); - } + handler.accept(document, null); + return; } + + Object value = values.get(index); + Object previousValue = document.getIngestMetadata().put("_value", value); + processor.execute(document, (result, e) -> { + if (e != null) { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + handler.accept(null, e); + } else if (result == null) { + handler.accept(null, null); + } else { + newValues.add(document.getIngestMetadata().put("_value", previousValue)); + innerExecute(index + 1, values, newValues, document, handler); + } + }); } @Override From fa44262059fead7413d89451509513a37bae5943 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 2 Jan 2020 10:19:31 -0600 Subject: [PATCH 3/7] Foreach processor - fork recursive call A very large number of recursive calls can cause a stack overflow exception. This commit forks the recursive calls for non-async processors. --- .../ingest/common/ForEachProcessor.java | 20 ++- .../ingest/common/IngestCommonPlugin.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 14 +- .../ingest/common/ForEachProcessorTests.java | 128 ++++++++++++++++-- .../elasticsearch/ingest/IngestService.java | 2 +- .../org/elasticsearch/ingest/Processor.java | 6 +- 6 files changed, 148 insertions(+), 24 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 681b167c828e1..d837909b4d9c5 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -33,6 +33,7 @@ import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -54,12 +55,14 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin private final String field; private final Processor processor; private final boolean ignoreMissing; + private final ThreadPool threadPool; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, ThreadPool threadPool) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; + this.threadPool = threadPool; } boolean isIgnoreMissing() { @@ -91,6 +94,7 @@ void innerExecute(int index, List values, List newValues, IngestDocum Object value = values.get(index); Object previousValue = document.getIngestMetadata().put("_value", value); + final Thread thread = Thread.currentThread(); processor.execute(document, (result, e) -> { if (e != null) { newValues.add(document.getIngestMetadata().put("_value", previousValue)); @@ -99,7 +103,13 @@ void innerExecute(int index, List values, List newValues, IngestDocum handler.accept(null, null); } else { newValues.add(document.getIngestMetadata().put("_value", previousValue)); - innerExecute(index + 1, values, newValues, document, handler); + if (thread == Thread.currentThread()) { + // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread + threadPool.generic().execute(() -> innerExecute(index + 1, values, newValues, document, handler)); + } else { + // we are on a different thread (we went asynchronous), it's safe to recurse + innerExecute(index + 1, values, newValues, document, handler); + } } }); } @@ -125,9 +135,11 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; + private final ThreadPool threadPool; - Factory(ScriptService scriptService) { + Factory(ScriptService scriptService, ThreadPool threadPool) { this.scriptService = scriptService; + this.threadPool = threadPool; } @Override @@ -143,7 +155,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing); + return new ForEachProcessor(tag, field, processor, ignoreMissing, threadPool); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index b37e5d13e4602..28157b5cadc9e 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -75,7 +75,7 @@ public Map getProcessors(Processor.Parameters paramet entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), - entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService)), + entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.threadPool)), entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))), diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index 65fee8ce19a7d..fd37231ca5199 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Collections; @@ -36,12 +37,13 @@ public class ForEachProcessorFactoryTests extends ESTestCase { private final ScriptService scriptService = mock(ScriptService.class); + private final ThreadPool threadPool = mock(ThreadPool.class); public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("field", "_field"); @@ -57,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("field", "_field"); @@ -75,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("field", "_field"); @@ -88,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -101,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -109,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index cda203f660093..9b683328db744 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -26,6 +29,8 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -34,13 +39,36 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; + import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; public class ForEachProcessorTests extends ESTestCase { + private ThreadPool threadPool = mock(ThreadPool.class); + private final ExecutorService direct = EsExecutors.newDirectExecutorService(); + + @Before + public void setup(){ + when(threadPool.generic()).thenReturn(direct); + } + public void testExecute() throws Exception { + ThreadPoolExecutor threadPoolExecutor = + EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(threadPoolExecutor); + List values = new ArrayList<>(); values.add("foo"); values.add("bar"); @@ -51,15 +79,46 @@ public void testExecute() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false + false, threadPool ); processor.execute(ingestDocument, (result, e) -> {}); + assertBusy(() -> assertEquals(values.size(), threadPoolExecutor.getCompletedTaskCount())); + threadPoolExecutor.shutdown(); + threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); + @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.get(0), equalTo("FOO")); assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); + verify(threadPool, times(values.size())).generic(); + } + + public void testExecuteWithAsyncProcessor() throws Exception { + List values = new ArrayList<>(); + values.add("foo"); + values.add("bar"); + values.add("baz"); + IngestDocument ingestDocument = new IngestDocument( + "_index", "_id", null, null, null, Collections.singletonMap("values", values) + ); + + ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), + false, threadPool); + processor.execute(ingestDocument, (result, e) -> { + }); + + assertBusy(() -> { + @SuppressWarnings("unchecked") + List result = ingestDocument.getFieldValue("values", List.class); + assertEquals(values.size(), result.size()); + assertThat(result.get(0), equalTo("FOO")); + assertThat(result.get(1), equalTo("BAR")); + assertThat(result.get(2), equalTo("BAZ")); + }); + + verifyZeroInteractions(threadPool); } public void testExecuteWithFailure() throws Exception { @@ -72,7 +131,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, threadPool); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -90,7 +149,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false + false, threadPool ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -109,7 +168,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -135,7 +194,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false); + (model) -> model.get("other")), false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -146,6 +205,10 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { } public void testRandom() throws Exception { + ThreadPoolExecutor threadPoolExecutor = + EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, + EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); + when(threadPool.generic()).thenReturn(threadPoolExecutor); Processor innerProcessor = new Processor() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { @@ -164,7 +227,7 @@ public String getTag() { return null; } }; - int numValues = randomIntBetween(1, 32); + int numValues = randomIntBetween(100, 10000); List values = new ArrayList<>(numValues); for (int i = 0; i < numValues; i++) { values.add(""); @@ -173,8 +236,13 @@ public String getTag() { "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); + + assertBusy(() -> assertEquals(values.size(), threadPoolExecutor.getCompletedTaskCount())); + threadPoolExecutor.shutdown(); + threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); + @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); assertThat(result.size(), equalTo(numValues)); @@ -198,7 +266,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false); + ), false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -224,7 +292,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, threadPool); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -257,7 +325,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false), false); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, threadPool), + false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -275,10 +344,47 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, threadPool); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); } + private class AsyncUpperCaseProcessor implements Processor { + + private final String field; + + private AsyncUpperCaseProcessor(String field) { + this.field = field; + } + + @Override + public void execute(IngestDocument document, BiConsumer handler) { + new Thread(() -> { + try { + String value = document.getFieldValue(field, String.class, false); + document.setFieldValue(field, value.toUpperCase(Locale.ROOT)); + handler.accept(document, null); + } catch (Exception e) { + handler.accept(null, e); + } + }).start(); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this is an async processor, don't call this"); + } + + @Override + public String getType() { + return "uppercase-async"; + } + + @Override + public String getTag() { + return getType(); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 56b899f068b1c..05dfccd97630d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -101,7 +101,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this, client + ), this, client, threadPool ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 029e80234e9e8..77f5f4769b3c5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; import java.util.function.BiConsumer; @@ -122,6 +123,8 @@ class Parameters { public final IngestService ingestService; + public final ThreadPool threadPool; + /** * Provides scheduler support */ @@ -134,7 +137,7 @@ class Parameters { public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService, Client client) { + IngestService ingestService, Client client, ThreadPool threadPool) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -143,6 +146,7 @@ public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry this.scheduler = scheduler; this.ingestService = ingestService; this.client = client; + this.threadPool = threadPool; } } From c040362bb48b553ce254d951e25e94278ef99252 Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Thu, 2 Jan 2020 11:20:15 -0600 Subject: [PATCH 4/7] optimize such that we only fork every 10 calls --- .../ingest/common/ForEachProcessor.java | 6 ++++-- .../ingest/common/ForEachProcessorTests.java | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index d837909b4d9c5..980bac8ce6bad 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -51,6 +51,7 @@ public final class ForEachProcessor extends AbstractProcessor implements WrappingProcessor { public static final String TYPE = "foreach"; + static final int MAX_RECURSE_PER_THREAD = 10; private final String field; private final Processor processor; @@ -103,8 +104,9 @@ void innerExecute(int index, List values, List newValues, IngestDocum handler.accept(null, null); } else { newValues.add(document.getIngestMetadata().put("_value", previousValue)); - if (thread == Thread.currentThread()) { - // we are on the same thread, we need to fork to another thread to avoid recursive stack overflow on a single thread + if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { + // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread + // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down threadPool.generic().execute(() -> innerExecute(index + 1, values, newValues, document, handler)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index 9b683328db744..f56b98a924020 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -43,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; @@ -73,6 +74,7 @@ public void testExecute() throws Exception { values.add("foo"); values.add("bar"); values.add("baz"); + IntStream.range(0, ForEachProcessor.MAX_RECURSE_PER_THREAD).forEach(value -> values.add("a")); IngestDocument ingestDocument = new IngestDocument( "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); @@ -83,7 +85,7 @@ public void testExecute() throws Exception { ); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size(), threadPoolExecutor.getCompletedTaskCount())); + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, threadPoolExecutor.getCompletedTaskCount())); threadPoolExecutor.shutdown(); threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); @@ -92,7 +94,8 @@ public void testExecute() throws Exception { assertThat(result.get(0), equalTo("FOO")); assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); - verify(threadPool, times(values.size())).generic(); + IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); + verify(threadPool, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).generic(); } public void testExecuteWithAsyncProcessor() throws Exception { @@ -227,7 +230,7 @@ public String getTag() { return null; } }; - int numValues = randomIntBetween(100, 10000); + int numValues = randomIntBetween(1, 10000); List values = new ArrayList<>(numValues); for (int i = 0; i < numValues; i++) { values.add(""); @@ -239,7 +242,7 @@ public String getTag() { ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, threadPool); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size(), threadPoolExecutor.getCompletedTaskCount())); + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, threadPoolExecutor.getCompletedTaskCount())); threadPoolExecutor.shutdown(); threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); @@ -249,6 +252,7 @@ public String getTag() { for (String r : result) { assertThat(r, equalTo(".")); } + verify(threadPool, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).generic(); } public void testModifyFieldsOutsideArray() throws Exception { From 3e87f8365bb605c7a640ebcf2db6a524cbe76dae Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Mon, 6 Jan 2020 11:37:52 -0600 Subject: [PATCH 5/7] dont expose threadPool, add REST test --- .../ingest/common/ForEachProcessor.java | 24 +++--- .../ingest/common/IngestCommonPlugin.java | 2 +- .../common/ForEachProcessorFactoryTests.java | 16 ++-- .../ingest/common/ForEachProcessorTests.java | 73 +++++++++++-------- .../rest-api-spec/test/ingest/80_foreach.yml | 19 +++++ .../elasticsearch/ingest/IngestService.java | 3 +- .../org/elasticsearch/ingest/Processor.java | 8 +- 7 files changed, 88 insertions(+), 57 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java index 980bac8ce6bad..11c1d3e2ea764 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/ForEachProcessor.java @@ -23,6 +23,8 @@ import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.WrappingProcessor; +import org.elasticsearch.script.ScriptService; import java.util.ArrayList; import java.util.List; @@ -30,10 +32,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; - -import org.elasticsearch.ingest.WrappingProcessor; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.threadpool.ThreadPool; +import java.util.function.Consumer; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -56,14 +55,14 @@ public final class ForEachProcessor extends AbstractProcessor implements Wrappin private final String field; private final Processor processor; private final boolean ignoreMissing; - private final ThreadPool threadPool; + private final Consumer genericExecutor; - ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, ThreadPool threadPool) { + ForEachProcessor(String tag, String field, Processor processor, boolean ignoreMissing, Consumer genericExecutor) { super(tag); this.field = field; this.processor = processor; this.ignoreMissing = ignoreMissing; - this.threadPool = threadPool; + this.genericExecutor = genericExecutor; } boolean isIgnoreMissing() { @@ -107,9 +106,10 @@ void innerExecute(int index, List values, List newValues, IngestDocum if (thread == Thread.currentThread() && (index + 1) % MAX_RECURSE_PER_THREAD == 0) { // we are on the same thread and we need to fork to another thread to avoid recursive stack overflow on a single thread // only fork after 10 recursive calls, then fork every 10 to keep the number of threads down - threadPool.generic().execute(() -> innerExecute(index + 1, values, newValues, document, handler)); + genericExecutor.accept(() -> innerExecute(index + 1, values, newValues, document, handler)); } else { // we are on a different thread (we went asynchronous), it's safe to recurse + // or we have recursed less then 10 times with the same thread, it's safe to recurse innerExecute(index + 1, values, newValues, document, handler); } } @@ -137,11 +137,11 @@ public Processor getInnerProcessor() { public static final class Factory implements Processor.Factory { private final ScriptService scriptService; - private final ThreadPool threadPool; + private final Consumer genericExecutor; - Factory(ScriptService scriptService, ThreadPool threadPool) { + Factory(ScriptService scriptService, Consumer genericExecutor) { this.scriptService = scriptService; - this.threadPool = threadPool; + this.genericExecutor = genericExecutor; } @Override @@ -157,7 +157,7 @@ public ForEachProcessor create(Map factories, String Map.Entry> entry = entries.iterator().next(); Processor processor = ConfigurationUtils.readProcessor(factories, scriptService, entry.getKey(), entry.getValue()); - return new ForEachProcessor(tag, field, processor, ignoreMissing, threadPool); + return new ForEachProcessor(tag, field, processor, ignoreMissing, genericExecutor); } } } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 28157b5cadc9e..22c76e6b01d73 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -75,7 +75,7 @@ public Map getProcessors(Processor.Parameters paramet entry(ConvertProcessor.TYPE, new ConvertProcessor.Factory()), entry(GsubProcessor.TYPE, new GsubProcessor.Factory()), entry(FailProcessor.TYPE, new FailProcessor.Factory(parameters.scriptService)), - entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.threadPool)), + entry(ForEachProcessor.TYPE, new ForEachProcessor.Factory(parameters.scriptService, parameters.genericExecutor)), entry(DateIndexNameProcessor.TYPE, new DateIndexNameProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), entry(GrokProcessor.TYPE, new GrokProcessor.Factory(GROK_PATTERNS, createGrokThreadWatchdog(parameters))), diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java index fd37231ca5199..19b3966573f7f 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorFactoryTests.java @@ -24,12 +24,12 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -37,13 +37,13 @@ public class ForEachProcessorFactoryTests extends ESTestCase { private final ScriptService scriptService = mock(ScriptService.class); - private final ThreadPool threadPool = mock(ThreadPool.class); + private final Consumer genericExecutor = Runnable::run; public void testCreate() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -59,7 +59,7 @@ public void testSetIgnoreMissing() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -77,7 +77,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { Map registry = new HashMap<>(); registry.put("_first", (r, t, c) -> processor); registry.put("_second", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); @@ -90,7 +90,7 @@ public void testCreateWithTooManyProcessorTypes() throws Exception { } public void testCreateWithNonExistingProcessorType() throws Exception { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); config.put("processor", Collections.singletonMap("_name", Collections.emptyMap())); @@ -103,7 +103,7 @@ public void testCreateWithMissingField() throws Exception { Processor processor = new TestProcessor(ingestDocument -> { }); Map registry = new HashMap<>(); registry.put("_name", (r, t, c) -> processor); - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("processor", Collections.singletonList(Collections.singletonMap("_name", Collections.emptyMap()))); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(registry, null, config)); @@ -111,7 +111,7 @@ public void testCreateWithMissingField() throws Exception { } public void testCreateWithMissingProcessor() { - ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, threadPool); + ForEachProcessor.Factory forEachFactory = new ForEachProcessor.Factory(scriptService, genericExecutor); Map config = new HashMap<>(); config.put("field", "_field"); Exception exception = expectThrows(Exception.class, () -> forEachFactory.create(Collections.emptyMap(), null, config)); diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java index f56b98a924020..713b4a355bdab 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.ingest.TestTemplateService; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.util.ArrayList; @@ -43,32 +42,41 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.IntStream; import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument; - import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; public class ForEachProcessorTests extends ESTestCase { - private ThreadPool threadPool = mock(ThreadPool.class); + @SuppressWarnings("unchecked") + private Consumer genericExecutor = (Consumer) mock(Consumer.class); private final ExecutorService direct = EsExecutors.newDirectExecutorService(); @Before - public void setup(){ - when(threadPool.generic()).thenReturn(direct); + public void setup() { + //execute runnable on same thread for simplicity. some tests will override this and actually run async + doAnswer(invocationOnMock -> { + direct.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); } public void testExecute() throws Exception { - ThreadPoolExecutor threadPoolExecutor = + ThreadPoolExecutor asyncExecutor = EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - when(threadPool.generic()).thenReturn(threadPoolExecutor); + doAnswer(invocationOnMock -> { + asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); List values = new ArrayList<>(); values.add("foo"); @@ -81,13 +89,13 @@ public void testExecute() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new UppercaseProcessor("_tag", "_ingest._value", false, "_ingest._value"), - false, threadPool + false, genericExecutor ); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, threadPoolExecutor.getCompletedTaskCount())); - threadPoolExecutor.shutdown(); - threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); + asyncExecutor.shutdown(); + asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -95,7 +103,7 @@ public void testExecute() throws Exception { assertThat(result.get(1), equalTo("BAR")); assertThat(result.get(2), equalTo("BAZ")); IntStream.range(3, ForEachProcessor.MAX_RECURSE_PER_THREAD + 3).forEach(i -> assertThat(result.get(i), equalTo("A"))); - verify(threadPool, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).generic(); + verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); } public void testExecuteWithAsyncProcessor() throws Exception { @@ -108,7 +116,7 @@ public void testExecuteWithAsyncProcessor() throws Exception { ); ForEachProcessor processor = new ForEachProcessor("_tag", "values", new AsyncUpperCaseProcessor("_ingest._value"), - false, threadPool); + false, genericExecutor); processor.execute(ingestDocument, (result, e) -> { }); @@ -121,7 +129,7 @@ public void testExecuteWithAsyncProcessor() throws Exception { assertThat(result.get(2), equalTo("BAZ")); }); - verifyZeroInteractions(threadPool); + verifyZeroInteractions(genericExecutor); } public void testExecuteWithFailure() throws Exception { @@ -134,7 +142,7 @@ public void testExecuteWithFailure() throws Exception { throw new RuntimeException("failure"); } }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, threadPool); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", testProcessor, false, genericExecutor); Exception[] exceptions = new Exception[1]; processor.execute(ingestDocument, (result, e) -> {exceptions[0] = e;}); assertThat(exceptions[0].getMessage(), equalTo("failure")); @@ -152,7 +160,7 @@ public void testExecuteWithFailure() throws Exception { Processor onFailureProcessor = new TestProcessor(ingestDocument1 -> {}); processor = new ForEachProcessor( "_tag", "values", new CompoundProcessor(false, Arrays.asList(testProcessor), Arrays.asList(onFailureProcessor)), - false, threadPool + false, genericExecutor ); processor.execute(ingestDocument, (result, e) -> {}); assertThat(testProcessor.getInvokedCounter(), equalTo(3)); @@ -171,7 +179,7 @@ public void testMetaDataAvailable() throws Exception { id.setFieldValue("_ingest._value.index", id.getSourceAndMetadata().get("_index")); id.setFieldValue("_ingest._value.id", id.getSourceAndMetadata().get("_id")); }); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, threadPool); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertThat(innerProcessor.getInvokedCounter(), equalTo(2)); @@ -197,7 +205,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { ForEachProcessor processor = new ForEachProcessor( "_tag", "values", new SetProcessor("_tag", new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other")), false, threadPool); + (model) -> model.get("other")), false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertThat(ingestDocument.getFieldValue("values.0.new_field", String.class), equalTo("value")); @@ -208,10 +216,13 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { } public void testRandom() throws Exception { - ThreadPoolExecutor threadPoolExecutor = + ThreadPoolExecutor asyncExecutor = EsExecutors.newScaling(getClass().getName() + "/" + getTestName(), between(1, 2), between(3, 4), 10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY)); - when(threadPool.generic()).thenReturn(threadPoolExecutor); + doAnswer(invocationOnMock -> { + asyncExecutor.execute((Runnable) invocationOnMock.getArguments()[0]); + return null; + }).when(genericExecutor).accept(any(Runnable.class)); Processor innerProcessor = new Processor() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { @@ -239,12 +250,12 @@ public String getTag() { "_index", "_id", null, null, null, Collections.singletonMap("values", values) ); - ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, threadPool); + ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor, false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); - assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, threadPoolExecutor.getCompletedTaskCount())); - threadPoolExecutor.shutdown(); - threadPoolExecutor.awaitTermination(5, TimeUnit.SECONDS); + assertBusy(() -> assertEquals(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD, asyncExecutor.getCompletedTaskCount())); + asyncExecutor.shutdown(); + asyncExecutor.awaitTermination(5, TimeUnit.SECONDS); @SuppressWarnings("unchecked") List result = ingestDocument.getFieldValue("values", List.class); @@ -252,7 +263,7 @@ public String getTag() { for (String r : result) { assertThat(r, equalTo(".")); } - verify(threadPool, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).generic(); + verify(genericExecutor, times(values.size() / ForEachProcessor.MAX_RECURSE_PER_THREAD)).accept(any(Runnable.class)); } public void testModifyFieldsOutsideArray() throws Exception { @@ -270,7 +281,7 @@ public void testModifyFieldsOutsideArray() throws Exception { "_tag", "values", new CompoundProcessor(false, Collections.singletonList(new UppercaseProcessor("_tag_upper", "_ingest._value", false, "_ingest._value")), Collections.singletonList(new AppendProcessor("_tag", template, (model) -> (Collections.singletonList("added")))) - ), false, threadPool); + ), false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -296,7 +307,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_source._value", String.class))); - ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, threadPool); + ForEachProcessor forEachProcessor = new ForEachProcessor("_tag", "values", processor, false, genericExecutor); forEachProcessor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values", List.class); @@ -329,8 +340,8 @@ public void testNestedForEach() throws Exception { doc -> doc.setFieldValue("_ingest._value", doc.getFieldValue("_ingest._value", String.class).toUpperCase(Locale.ENGLISH)) ); ForEachProcessor processor = new ForEachProcessor( - "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, threadPool), - false, threadPool); + "_tag", "values1", new ForEachProcessor("_tag", "_ingest._value.values2", testProcessor, false, genericExecutor), + false, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); List result = ingestDocument.getFieldValue("values1.0.values2", List.class); @@ -348,7 +359,7 @@ public void testIgnoreMissing() throws Exception { ); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); TestProcessor testProcessor = new TestProcessor(doc -> {}); - ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, threadPool); + ForEachProcessor processor = new ForEachProcessor("_tag", "_ingest._value", testProcessor, true, genericExecutor); processor.execute(ingestDocument, (result, e) -> {}); assertIngestDocument(originalIngestDocument, ingestDocument); assertThat(testProcessor.getInvokedCounter(), equalTo(0)); diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml index df27e4279c8d4..9142317ce1507 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/80_foreach.yml @@ -43,3 +43,22 @@ teardown: index: test id: 1 - match: { _source.values: ["FOO", "BAR", "BAZ"] } + + #exceeds the recurse max per thread and will runs some of these on a different thread + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: > + { + "values": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", + "v", "w", "x", "y", "z"] + } + + - do: + get: + index: test + id: 1 + - match: { _source.values: ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", + "V", "W", "X", "Y", "Z"] } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 05dfccd97630d..939443d05a856 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -101,9 +101,10 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC - ), this, client, threadPool + ), this, client, threadPool.generic()::execute ) ); + this.threadPool = threadPool; } diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index 77f5f4769b3c5..671670212ab0a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -25,11 +25,11 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.LongSupplier; /** @@ -123,7 +123,7 @@ class Parameters { public final IngestService ingestService; - public final ThreadPool threadPool; + public final Consumer genericExecutor; /** * Provides scheduler support @@ -137,7 +137,7 @@ class Parameters { public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, LongSupplier relativeTimeSupplier, BiFunction scheduler, - IngestService ingestService, Client client, ThreadPool threadPool) { + IngestService ingestService, Client client, Consumer genericExecutor ) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; @@ -146,7 +146,7 @@ public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry this.scheduler = scheduler; this.ingestService = ingestService; this.client = client; - this.threadPool = threadPool; + this.genericExecutor = genericExecutor; } } From 46d283dd2cbf7d8373cbfb9361b8654d03dbbb0a Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 8 Jan 2020 10:52:44 -0600 Subject: [PATCH 6/7] fix unit tests --- .../ingest/IngestServiceTests.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 7890d30f6c4e3..0a16d5001638a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -60,6 +60,7 @@ import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.CustomTypeSafeMatcher; +import org.junit.Before; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; @@ -111,10 +112,18 @@ public Map getProcessors(Processor.Parameters paramet } }; + private ThreadPool threadPool; + + @Before + public void setup(){ + threadPool = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); + when(threadPool.executor(anyString())).thenReturn(executorService); + } public void testIngestPlugin() { - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); - IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, + IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(DUMMY_PLUGIN), client); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -122,19 +131,15 @@ public void testIngestPlugin() { } public void testIngestPluginDuplicate() { - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(mock(ClusterService.class), tp, null, null, + new IngestService(mock(ClusterService.class), threadPool, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client)); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } public void testExecuteIndexPipelineDoesNotExist() { - ThreadPool threadPool = mock(ThreadPool.class); Client client = mock(Client.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); - when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(DUMMY_PLUGIN), client); final IndexRequest indexRequest = @@ -1188,10 +1193,9 @@ public Map getProcessors(Processor.Parameters paramet }; // Create ingest service: - ThreadPool tp = mock(ThreadPool.class); Client client = mock(Client.class); IngestService ingestService = - new IngestService(mock(ClusterService.class), tp, null, null, null, List.of(testPlugin), client); + new IngestService(mock(ClusterService.class), threadPool, null, null, null, List.of(testPlugin), client); ingestService.addIngestClusterStateListener(ingestClusterStateListener); // Create pipeline and apply the resulting cluster state, which should update the counter in the right order: @@ -1259,9 +1263,11 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { - ThreadPool threadPool = mock(ThreadPool.class); + Client client = mock(Client.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); return new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(new IngestPlugin() { From 98d75400a67e9d49259f622ac7e684d2e494f95b Mon Sep 17 00:00:00 2001 From: Jake Landis Date: Wed, 8 Jan 2020 12:00:32 -0600 Subject: [PATCH 7/7] fix unit tests --- .../ml/action/TransportGetTrainedModelsStatsActionTests.java | 4 ++++ .../ml/inference/ingest/InferenceProcessorFactoryTests.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 1a359f962b325..03a011c717292 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -105,6 +107,8 @@ public Map getProcessors(Processor.Parameters paramet @Before public void setUpVariables() { ThreadPool tp = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(tp.generic()).thenReturn(executorService); client = mock(Client.class); clusterService = mock(ClusterService.class); Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java index f7216ab206e85..38c4f3f002b2d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/ingest/InferenceProcessorFactoryTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; @@ -44,6 +45,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ExecutorService; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; @@ -71,6 +73,8 @@ public Map getProcessors(Processor.Parameters paramet @Before public void setUpVariables() { ThreadPool tp = mock(ThreadPool.class); + ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(tp.generic()).thenReturn(executorService); client = mock(Client.class); Settings settings = Settings.builder().put("node.name", "InferenceProcessorFactoryTests_node").build(); ClusterSettings clusterSettings = new ClusterSettings(settings,