Skip to content

Commit

Permalink
Introduce reroute method on IngestDocument (#94000)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Mar 22, 2023
1 parent 8a8bc4f commit cdf2522
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 53 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/94000.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 94000
summary: Introduce redirect method on IngestDocument
area: Ingest Node
type: enhancement
issues:
- 83653
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,7 @@ teardown:
index: foo
id: "1"
- match: { _source.a: true }
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
#- match: { _source.accumulator: [ "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value" ] }

# only the foo index
- do:
Expand Down Expand Up @@ -150,11 +146,7 @@ teardown:
index: foo
id: "1"
- match: { _source.a: true }
# The next is commented out because there's a bug where the final_pipeline is executed twice under certain circumstances
# (See issue https://github.com/elastic/elasticsearch/issues/83653).
# TODO: Uncomment after the issue is fixed, and remove the repeated value test of the current behavior
#- match: { _source.accumulator: [ "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value", "non-repeated-value" ] }
- match: { _source.accumulator: [ "non-repeated-value" ] }

# only the foo index
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.ingest.Processor;
Expand All @@ -48,6 +49,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -97,6 +99,26 @@ public void testFinalPipelineCantChangeDestination() {
);
}

public void testFinalPipelineCantRerouteDestination() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);

final BytesReference finalPipelineBody = new BytesArray("""
{"processors": [{"reroute": {}}]}""");
client().admin().cluster().putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)).actionGet();

final IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()
);
assertThat(
e,
hasToString(
endsWith("final pipeline [final_pipeline] can't change the target index (from [index] to [target]) for document [1]")
)
);
}

public void testFinalPipelineOfOldDestinationIsNotInvoked() {
Settings settings = Settings.builder()
.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline")
Expand Down Expand Up @@ -187,6 +209,73 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() {
assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testDefaultPipelineOfRerouteDestinationIsInvoked() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"reroute": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"final": {}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IndexResponse indexResponse = client().prepareIndex("index")
.setId("1")
.setSource(Map.of("field", "value"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertEquals(RestStatus.CREATED, indexResponse.status());
SearchResponse target = client().prepareSearch("target").get();
assertEquals(1, target.getHits().getTotalHits().value);
assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final"));
}

public void testAvoidIndexingLoop() {
Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build();
createIndex("index", settings);

settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build();
createIndex("target", settings);

BytesReference defaultPipelineBody = new BytesArray("""
{"processors": [{"reroute": {"dest": "target"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON))
.actionGet();

BytesReference targetPipeline = new BytesArray("""
{"processors": [{"reroute": {"dest": "index"}}]}""");
client().admin()
.cluster()
.putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON))
.actionGet();

IllegalStateException exception = expectThrows(
IllegalStateException.class,
() -> client().prepareIndex("index")
.setId("1")
.setSource(Map.of("dest", "index"))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
);
assertThat(
exception.getMessage(),
equalTo("index cycle detected while processing pipeline [target_default_pipeline] for document [1]: [index, target, index]")
);
}

public void testFinalPipeline() {
final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build();
createIndex("index", settings);
Expand Down Expand Up @@ -393,6 +482,26 @@ public String getType() {
return "changing_dest";
}

},
"reroute",
(processorFactories, tag, description, config) -> {
final String dest = Objects.requireNonNullElse(
ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"),
"target"
);
return new AbstractProcessor(tag, description) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) throws Exception {
ingestDocument.reroute(dest);
return ingestDocument;
}

@Override
public String getType() {
return "reroute";
}

};
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
handler.accept(ingestDocument, null);
return;
}
Expand All @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
Processor processor;
IngestMetric metric;
// iteratively execute any sync processors
while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) {
while (currentProcessor < processorsWithMetrics.size()
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
&& ingestDocument.isReroute() == false) {
processorWithMetric = processorsWithMetrics.get(currentProcessor);
processor = processorWithMetric.v1();
metric = processorWithMetric.v2();
Expand All @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
}

assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public final class IngestDocument {

// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();

private boolean doNoSelfReferencesCheck = false;
private boolean reroute = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
Expand All @@ -80,6 +80,7 @@ public IngestDocument(IngestDocument other) {
new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
this.reroute = other.reroute;
}

/**
Expand Down Expand Up @@ -903,6 +904,29 @@ public String toString() {
return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}';
}

public void reroute(String destIndex) {
getMetadata().setIndex(destIndex);
reroute = true;
}

/**
* The document is redirected to another target.
* This implies that we'll skip the current pipeline and invoke the default pipeline of the new target
*
* @return whether the document is redirected to another target
*/
boolean isReroute() {
return reroute;
}

/**
* Set the {@link #reroute} flag to false so that subsequent calls to {@link #isReroute()} will return false until/unless
* {@link #reroute(String)} is called.
*/
void resetReroute() {
reroute = false;
}

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
Expand Down

0 comments on commit cdf2522

Please sign in to comment.