Skip to content

Commit

Permalink
Adding executedPipelines to the IngestDocument copy constructor (#105427
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Feb 14, 2024
1 parent 4d09f96 commit 7273e02
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/105427.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105427
summary: Adding `executedPipelines` to the `IngestDocument` copy constructor
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -714,18 +714,9 @@ teardown:
- do:
ingest.simulate:
verbose: true
id: "outer"
body: >
{
"pipeline": {
"processors" : [
{
"pipeline" : {
"name": "outer"
}
}
]
}
,
"docs": [
{
"_index": "index",
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,24 @@ public IngestDocument(
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
}

// note: these rest of these constructors deal with the data-centric view of the IngestDocument, not the execution-centric view.
// For example, the copy constructor doesn't populate the `indexHistory` (as well as some other fields),
// because those fields are execution-centric.

/**
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided.
*
* @throws IllegalArgumentException if the passed-in ingest document references itself
*/
public IngestDocument(IngestDocument other) {
this(deepCopyMap(ensureNoSelfReferences(other.sourceAndMetadata)), deepCopyMap(other.ingestMetadata));
/*
* The executedPipelines field is clearly execution-centric rather than data centric. Despite what the comment above says, we're
* copying it here anyway. THe reason is that this constructor is only called from two non-test locations, and both of those
* involve the simulate pipeline logic. The simulate pipeline logic needs this information. Rather than making the code more
* complicated, we're just copying this over here since it does no harm.
*/
this.executedPipelines.addAll(other.executedPipelines);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,27 @@ public void testCopyConstructor() {
}
}

public void testCopyConstructorWithExecutedPipelines() {
/*
* This is similar to the first part of testCopyConstructor, except that we're executing a pipeilne, and running the
* assertions inside the processor so that we can test that executedPipelines is correct.
*/
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
TestProcessor processor = new TestProcessor(ingestDocument1 -> {
assertThat(ingestDocument1.getPipelineStack().size(), equalTo(1));
IngestDocument copy = new IngestDocument(ingestDocument1);
assertThat(ingestDocument1.getSourceAndMetadata(), not(sameInstance(copy.getSourceAndMetadata())));
assertIngestDocument(ingestDocument1, copy);
assertThat(copy.getPipelineStack(), equalTo(ingestDocument1.getPipelineStack()));
});
Pipeline pipeline = new Pipeline("pipeline1", "test pipeline", 1, Collections.emptyMap(), new CompoundProcessor(processor));
ingestDocument.executePipeline(pipeline, (ingestDocument1, exception) -> {
assertNotNull(ingestDocument1);
assertNull(exception);
});
assertThat(processor.getInvokedCounter(), equalTo(1));
}

public void testCopyConstructorWithZonedDateTime() {
ZoneId timezone = ZoneId.of("Europe/London");

Expand Down

0 comments on commit 7273e02

Please sign in to comment.