Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: support simulate with verbose for pipeline processor #33839

Merged

Conversation

jakelandis
Copy link
Contributor

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show the full steps
that include the intermediate processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.


Prior to this change:

PUT _ingest/pipeline/mypipeline1
{
  "processors": [
    {
      "set": {
        "if" : "true",
        "tag": "set_from_pipeline1",
        "field": "a",
        "value": "foo"
      }
    },
    {
      "pipeline": {
        "pipeline": "mypipeline2"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline2
{
  "processors": [
    {
      "rename": {
        "tag": "rename_from_pipeline2",
        "field": "a",
        "target_field": "a1"
      }
    },
    {
      "pipeline": {
        "pipeline": "mypipeline3"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline3
{
  "processors": [
    {
      "rename": {
        "tag": "rename_from_pipeline3",
        "field": "a1",
        "target_field": "a2"
      }
    }
  ]
}


POST _ingest/pipeline/mypipeline1/_simulate?verbose
{
  "docs": [
    {
      "_source": {
        "a" : "b"
      }
    }
  ]
}

Results in

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set_from_pipeline1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "a": "foo"
            },
            "_ingest": {
              "timestamp": "2018-09-18T21:49:01.881075Z"
            }
          }
        },
        {
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "a2": "foo"
            },
            "_ingest": {
              "timestamp": "2018-09-18T21:49:01.881075Z"
            }
          }
        }
      ]
    }
  ]
}

^^ Note pipeline2 is missing from the result steps.

After this change:

{
  "docs": [
    {
      "processor_results": [
        {
          "tag": "set_from_pipeline1",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "a": "foo"
            },
            "_ingest": {
              "timestamp": "2018-09-18T21:49:32.52323Z"
            }
          }
        },
        {
          "tag": "rename_from_pipeline2",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "a1": "foo"
            },
            "_ingest": {
              "timestamp": "2018-09-18T21:49:32.52323Z"
            }
          }
        },
        {
          "tag": "rename_from_pipeline3",
          "doc": {
            "_index": "_index",
            "_type": "_type",
            "_id": "_id",
            "_source": {
              "a2": "foo"
            },
            "_ingest": {
              "timestamp": "2018-09-18T21:49:32.52323Z"
            }
          }
        }
      ]
    }
  ]
}

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show the full steps
that include the intermediate processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
@jakelandis jakelandis added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.5.0 labels Sep 18, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

Copy link
Member

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recursion check is wrong I think + a few nits on formatting noise.

Also, maybe it would be nice to provide some visual indication for what processor is part of what pipeline automatically instead of having to manually set tags for that? (e.g. we would just automatically prefix the tags for nested pipeline calls?)

if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Recursive invocation of pipeline ["
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct because it would also throw for repeated invocation of the same pipeline right? (Since PipelineProcessor doesn't implement equals you will probably only see this in some crazier scenarios (A calls B calls C and then another step of just B which would call C again making it throw on seeing C twice even though it's not a recursive invocation)) See #33419 for more here. I think the easiest fix to get out of that problem is to do the same thing that the pipeline processor does and simply track the current stack of pipelines in pipelinesSeen instead of all the pipelines ever seen (in this case this means just removing a pipeline proc from the pipelinesSeen after it has been unwrapped)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixed and updated in latest commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks looks good :)

}

public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: noisy reformat :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. sorry for the noise.
Note to self:
Intellij - Simple lambdas in one line to avoid this kind of noise.
image

@@ -69,9 +79,11 @@ public void testActualProcessor() throws Exception {

public void testActualCompoundProcessorWithoutOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; });
TestProcessor testProcessor = new TestProcessor(ingestDocument -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: noisy reformat :)

@@ -90,14 +102,17 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {

public void testActualCompoundProcessorWithOnFailure() throws Exception {
RuntimeException exception = new RuntimeException("fail");
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; });
TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {});
TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Noisy reformatting

@jakelandis
Copy link
Contributor Author

@original-brownbear - thanks for the review. changes implemented.

Also, maybe it would be nice to provide some visual indication for what processor is part of what pipeline automatically instead of having to manually set tags for that? (e.g. we would just automatically prefix the tags for nested pipeline calls?)

Agreed it would be nice, and will consider for future enhancement. (but not part of this PR)

assertThat(resultList.get(0).getFailure(), sameInstance(exception));
assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag()));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... not sure why github didn't preserve the location change in the diff here. Everything below this comment is new, nothing above (except imports) have changed.

Copy link
Member

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but probably best to wait for @rjernst :)

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too. One nit on naming: it's not necessarily recursion (self invocation), but just any form of a cycle across pipeline processors right? I would tweak the error message to something like "Cycle found between processors A and B"

@jakelandis
Copy link
Contributor Author

jakelandis commented Sep 19, 2018

@rjernst thanks! updated the wording to "Cycle detected for pipeline: <pipeline>" . I also updated the non-simulate error to match.

will merge when green.

@jakelandis jakelandis merged commit e37e5df into elastic:master Sep 20, 2018
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Sep 20, 2018
…#33839)

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show all intermediate
processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
As well this updates the error message for when cycles are detected
in pipelines calling other pipelines.
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Oct 21, 2018
…#33839)

* ingest: support simulate with verbose for pipeline processor

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show all intermediate 
processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
As well this updates the error message for when cycles are detected
in pipelines calling other pipelines.
jakelandis added a commit that referenced this pull request Oct 22, 2018
* ingest: support simulate with verbose for pipeline processor

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show all intermediate 
processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
As well this updates the error message for when cycles are detected
in pipelines calling other pipelines.
@jakelandis
Copy link
Contributor Author

6.5 backport: 2d24192

kcm pushed a commit that referenced this pull request Oct 30, 2018
* ingest: support simulate with verbose for pipeline processor

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show all intermediate 
processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
As well this updates the error message for when cycles are detected
in pipelines calling other pipelines.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants