Skip to content

Commit

Permalink
Adding a custom exception for problems with the graph of pipelines to…
Browse files Browse the repository at this point in the history
… be applied to a document (#105196) (#105473)

This PR removes the need to parse the exception message to detect if a cycle has been detected
in the ingest pipelines to be run on a document.
  • Loading branch information
masseyke committed Feb 14, 2024
1 parent d57de7d commit 9b6cf27
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 11 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/105196.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 105196
summary: Adding a custom exception for problems with the graph of pipelines to be
applied to a document
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ teardown:

---
"Test Pipeline Processor with Circular Pipelines":
- skip:
version: " - 8.12.99"
reason: exception class changed in 8.13.0
- do:
ingest.put_pipeline:
id: "outer"
Expand Down Expand Up @@ -100,13 +103,13 @@ teardown:
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
catch: /graph_structure_exception/
index:
index: test
id: "1"
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.type: "graph_structure_exception" }
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: outer" }

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ingest.GraphStructureException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
Expand Down Expand Up @@ -1819,7 +1820,8 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.action.search.VersionMismatchException::new,
161,
Version.V_7_12_0
);
),
INGEST_GRAPH_STRUCTURE_EXCEPTION(GraphStructureException.class, GraphStructureException::new, 177, Version.V_7_17_19);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* This exception is thrown when there is something wrong with the structure of the graph (such as the graph of pipelines) to be applied
* to a document. For example, this is thrown when there are cycles in the graph when cycles are not allowed.
*/
public class GraphStructureException extends ElasticsearchException {

public GraphStructureException(String message) {
super(message);
}

public GraphStructureException(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public final class IngestDocument {

public static final String INGEST_KEY = "_ingest";
public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
private static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
private static final String INGEST_KEY_PREFIX = INGEST_KEY + ".";
private static final String SOURCE_PREFIX = SourceFieldMapper.NAME + ".";

Expand Down Expand Up @@ -841,7 +841,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
handler.accept(null, new GraphStructureException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import java.util.List;
import java.util.function.BiConsumer;

import static org.elasticsearch.ingest.IngestDocument.PIPELINE_CYCLE_ERROR_MESSAGE;

/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
*/
Expand Down Expand Up @@ -90,9 +88,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// special handling for pipeline cycle errors
if (e instanceof ElasticsearchException
&& e.getCause() instanceof IllegalStateException
&& e.getCause().getMessage().startsWith(PIPELINE_CYCLE_ERROR_MESSAGE)) {
if (e instanceof ElasticsearchException && e.getCause() instanceof GraphStructureException) {
if (ignoreFailure) {
processorResultList.add(
new SimulateProcessorResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.PeerRecoveryNotFound;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
import org.elasticsearch.ingest.GraphStructureException;
import org.elasticsearch.ingest.IngestProcessorException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -826,6 +827,7 @@ public void testIds() {
ids.put(159, NodeHealthCheckFailureException.class);
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, VersionMismatchException.class);
ids.put(177, GraphStructureException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public void testActualPipelineProcessorWithCycle() throws Exception {
Exception[] holder = new Exception[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = e);
IngestProcessorException exception = (IngestProcessorException) holder[0];
assertThat(exception.getCause(), instanceOf(IllegalStateException.class));
assertThat(exception.getCause(), instanceOf(GraphStructureException.class));
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
}

Expand Down

0 comments on commit 9b6cf27

Please sign in to comment.