Skip to content

Commit

Permalink
Limiting the number of nested pipelines that can be executed (#105428)
Browse files Browse the repository at this point in the history
Limiting the number of nested pipelines that can be executed within a single pipeline to 100
  • Loading branch information
masseyke committed Feb 13, 2024
1 parent c884945 commit f0ec294
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 16 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/105428.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105428
summary: Limiting the number of nested pipelines that can be executed
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Strings;
import org.elasticsearch.ingest.GraphStructureException;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -40,6 +41,9 @@
*/
public class ManyNestedPipelinesIT extends ESIntegTestCase {
private final int manyPipelinesCount = randomIntBetween(2, 50);
private final int tooManyPipelinesCount = IngestDocument.MAX_PIPELINES + 1;
private static final String MANY_PIPELINES_PREFIX = "many_";
private static final String TOO_MANY_PIPELINES_PREFIX = "too_many_";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand All @@ -50,19 +54,21 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
public void loadManyPipelines() {
internalCluster().ensureAtLeastNumDataNodes(1);
internalCluster().startMasterOnlyNode();
createChainedPipelines(manyPipelinesCount);
createManyChainedPipelines();
}

public void testIngestManyPipelines() {
String index = "index";
DocWriteResponse response = prepareIndex(index).setSource(Map.of("foo", "bar")).setPipeline("pipeline_0").get();
DocWriteResponse response = prepareIndex(index).setSource(Map.of("foo", "bar"))
.setPipeline(MANY_PIPELINES_PREFIX + "pipeline_0")
.get();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
GetResponse getREsponse = client().prepareGet(index, response.getId()).get();
assertThat(getREsponse.getSource().get("foo"), equalTo("baz"));
}

public void testSimulateManyPipelines() throws IOException {
List<SimulateDocumentResult> results = executeSimulate(false);
List<SimulateDocumentResult> results = executeSimulateManyPipelines(false);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) results.get(0);
Expand All @@ -72,7 +78,7 @@ public void testSimulateManyPipelines() throws IOException {
}

public void testSimulateVerboseManyPipelines() throws IOException {
List<SimulateDocumentResult> results = executeSimulate(true);
List<SimulateDocumentResult> results = executeSimulateManyPipelines(true);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult result = (SimulateDocumentVerboseResult) results.get(0);
Expand All @@ -83,7 +89,45 @@ public void testSimulateVerboseManyPipelines() throws IOException {
assertThat(resultDoc.getFieldValue("foo", String.class), equalTo("baz"));
}

private List<SimulateDocumentResult> executeSimulate(boolean verbose) throws IOException {
public void testTooManyPipelines() throws IOException {
/*
* Logically, this test method contains three tests (too many pipelines for ingest, simulate, and simulate verbose). But creating
* pipelines is so slow that they are lumped into this one method.
*/
createTooManyChainedPipelines();
expectThrows(
GraphStructureException.class,
() -> prepareIndex("foo").setSource(Map.of("foo", "bar")).setPipeline(TOO_MANY_PIPELINES_PREFIX + "pipeline_0").get()
);
{
List<SimulateDocumentResult> results = executeSimulateTooManyPipelines(false);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) results.get(0);
assertNotNull(result.getFailure());
assertNotNull(result.getFailure().getCause());
assertThat(result.getFailure().getCause(), instanceOf(GraphStructureException.class));
}
{
List<SimulateDocumentResult> results = executeSimulateTooManyPipelines(true);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult result = (SimulateDocumentVerboseResult) results.get(0);
assertNotNull(result);
assertNotNull(result.getProcessorResults().get(0).getFailure());
assertThat(result.getProcessorResults().get(0).getFailure().getCause(), instanceOf(GraphStructureException.class));
}
}

private List<SimulateDocumentResult> executeSimulateManyPipelines(boolean verbose) throws IOException {
return executeSimulatePipelines(MANY_PIPELINES_PREFIX, verbose);
}

private List<SimulateDocumentResult> executeSimulateTooManyPipelines(boolean verbose) throws IOException {
return executeSimulatePipelines(TOO_MANY_PIPELINES_PREFIX, verbose);
}

private List<SimulateDocumentResult> executeSimulatePipelines(String prefix, boolean verbose) throws IOException {
BytesReference simulateRequestBytes = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("docs")
Expand All @@ -98,22 +142,30 @@ private List<SimulateDocumentResult> executeSimulate(boolean verbose) throws IOE
.endObject()
);
SimulatePipelineResponse simulatePipelineResponse = clusterAdmin().prepareSimulatePipeline(simulateRequestBytes, XContentType.JSON)
.setId("pipeline_0")
.setId(prefix + "pipeline_0")
.setVerbose(verbose)
.get();
return simulatePipelineResponse.getResults();
}

private void createChainedPipelines(int count) {
private void createManyChainedPipelines() {
createChainedPipelines(MANY_PIPELINES_PREFIX, manyPipelinesCount);
}

private void createTooManyChainedPipelines() {
createChainedPipelines(TOO_MANY_PIPELINES_PREFIX, tooManyPipelinesCount);
}

private void createChainedPipelines(String prefix, int count) {
for (int i = 0; i < count - 1; i++) {
createChainedPipeline(i);
createChainedPipeline(prefix, i);
}
createLastPipeline(count - 1);
createLastPipeline(prefix, count - 1);
}

private void createChainedPipeline(int number) {
String pipelineId = "pipeline_" + number;
String nextPipelineId = "pipeline_" + (number + 1);
private void createChainedPipeline(String prefix, int number) {
String pipelineId = prefix + "pipeline_" + number;
String nextPipelineId = prefix + "pipeline_" + (number + 1);
String pipelineTemplate = """
{
"processors": [
Expand All @@ -129,8 +181,8 @@ private void createChainedPipeline(int number) {
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
}

private void createLastPipeline(int number) {
String pipelineId = "pipeline_" + number;
private void createLastPipeline(String prefix, int number) {
String pipelineId = prefix + "pipeline_" + number;
String pipeline = """
{
"processors": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class IngestDocument {

private static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
static final String TIMESTAMP = "timestamp";
// This is the maximum number of nested pipelines that can be within a pipeline. If there are more, we bail out with an error
public static final int MAX_PIPELINES = Integer.parseInt(System.getProperty("es.ingest.max_pipelines", "100"));

private final IngestCtxMap ctxMap;
private final Map<String, Object> ingestMetadata;
Expand Down Expand Up @@ -836,7 +838,12 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
return;
}

if (executedPipelines.add(pipeline.getId())) {
if (executedPipelines.size() >= MAX_PIPELINES) {
handler.accept(
null,
new GraphStructureException("Too many nested pipelines. Cannot have more than " + MAX_PIPELINES + " nested pipelines")
);
} else if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ public void testActualPipelineProcessorNested() throws Exception {
*/
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
int pipelineCount = randomIntBetween(2, 150);
int pipelineCount = randomIntBetween(2, IngestDocument.MAX_PIPELINES);
for (int i = 0; i < pipelineCount - 1; i++) {
String pipelineId = "pipeline" + i;
String nextPipelineId = "pipeline" + (i + 1);
Expand Down Expand Up @@ -728,6 +728,58 @@ public void testActualPipelineProcessorNested() throws Exception {
assertThat(resultList.get(resultList.size() - 1).getType(), equalTo(countCallsProcessor.getType()));
}

public void testActualPipelineProcessorNestedTooManyPipelines() throws Exception {
/*
* This test creates a pipeline made up of many nested pipeline processors, ending in a processor that counts both how many times
* it is called for a given document (by updating a field on that document) and how many times it is called overall.
*/
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
int pipelineCount = randomIntBetween(IngestDocument.MAX_PIPELINES + 1, 500);
for (int i = 0; i < pipelineCount - 1; i++) {
String pipelineId = "pipeline" + i;
String nextPipelineId = "pipeline" + (i + 1);
Map<String, Object> nextPipelineConfig = new HashMap<>();
nextPipelineConfig.put("name", nextPipelineId);
Pipeline pipeline = new Pipeline(
pipelineId,
null,
null,
null,
new CompoundProcessor(factory.create(Map.of(), null, null, nextPipelineConfig))
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
}

// The last pipeline calls the CountCallsProcessor rather than yet another pipeline processor:
String lastPipelineId = "pipeline" + (pipelineCount - 1);
CountCallsProcessor countCallsProcessor = new CountCallsProcessor();
Pipeline lastPipeline = new Pipeline(lastPipelineId, null, null, null, new CompoundProcessor(countCallsProcessor));
when(ingestService.getPipeline(lastPipelineId)).thenReturn(lastPipeline);

String firstPipelineId = "pipeline0";
Map<String, Object> firstPipelineConfig = new HashMap<>();
firstPipelineConfig.put("name", firstPipelineId);
PipelineProcessor pipelineProcessor = factory.create(Map.of(), null, null, firstPipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

IngestDocument[] documentHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> {
documentHolder[0] = result;
exceptionHolder[0] = e;
});
IngestDocument document = documentHolder[0];
Exception exception = exceptionHolder[0];
assertNull(document);
assertNotNull(exception);
assertThat(exception.getMessage(), containsString("Too many nested pipelines"));
// We expect that the last processor was never called:
assertThat(countCallsProcessor.getTotalCount(), equalTo(0));
}

public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = createIngestService();
Expand Down

0 comments on commit f0ec294

Please sign in to comment.