Skip to content

Commit

Permalink
[7.9] Make for each processor resistant to field modification (#62791) (
Browse files Browse the repository at this point in the history
#62808)

* Make for each processor resistant to field modification  (#62791)

This change provides consistent view of field that foreach processor is iterating over. That prevents it to go into infinite loop and put great pressure on the cluster.

Closes #62790

* fix compilation
  • Loading branch information
probakowski committed Sep 23, 2020
1 parent d34da0e commit 2498e8e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements."));
}
} else {
innerExecute(0, values, new ArrayList<>(values.size()), ingestDocument, handler);
innerExecute(0, new ArrayList<>(values), new ArrayList<>(values.size()), ingestDocument, handler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,30 @@ public void testIgnoreMissing() throws Exception {
assertThat(testProcessor.getInvokedCounter(), equalTo(0));
}

public void testAppendingToTheSameField() {
Map<String, Object> source = Collections.singletonMap("field", Arrays.asList("a", "b"));
IngestDocument originalIngestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, source);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(id->id.appendFieldValue("field", "a"));
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
ingestDocument.removeField("_ingest._value");
assertThat(ingestDocument, equalTo(originalIngestDocument));
}

public void testRemovingFromTheSameField() {
Map<String, Object> source = Collections.singletonMap("field", Arrays.asList("a", "b"));
IngestDocument originalIngestDocument = new IngestDocument("_index", "_id", "_type", null, null, null, source);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
TestProcessor testProcessor = new TestProcessor(id -> id.removeField("field.0"));
ForEachProcessor processor = new ForEachProcessor("_tag", null, "field", testProcessor, true);
processor.execute(ingestDocument, (result, e) -> {});
assertThat(testProcessor.getInvokedCounter(), equalTo(2));
ingestDocument.removeField("_ingest._value");
assertThat(ingestDocument, equalTo(originalIngestDocument));
}

private class AsyncUpperCaseProcessor implements Processor {

private final String field;
Expand Down

0 comments on commit 2498e8e

Please sign in to comment.