Skip to content

Commit

Permalink
Ingest: Start separating Metadata from IngestSourceAndMetadata (#88401)
Browse files Browse the repository at this point in the history
Pull out the implementation of `Metadata` from `IngestSourceAndMetadata`.

`Metadata` will become a base class extended by the update contexts: ingest, update, update by query and reindex.

`Metadata` implements a map-like interface, making it easy for a class containing `Metadata` to implement the full `Map` interface.
  • Loading branch information
stu-elastic committed Jul 12, 2022
1 parent c271d39 commit 39de085
Show file tree
Hide file tree
Showing 17 changed files with 660 additions and 526 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testEscapeFields() throws Exception {
processor = new DotExpanderProcessor("_tag", null, null, "foo.bar");
processor.execute(document);
assertThat(document.getSource().size(), equalTo(1));
assertThat(document.getMetadataMap().size(), equalTo(1)); // the default version
assertThat(document.getMetadata().size(), equalTo(1)); // the default version
assertThat(document.getFieldValue("foo.bar", List.class).size(), equalTo(2));
assertThat(document.getFieldValue("foo.bar.0", String.class), equalTo("baz2"));
assertThat(document.getFieldValue("foo.bar.1", String.class), equalTo("baz1"));
Expand All @@ -60,7 +60,7 @@ public void testEscapeFields() throws Exception {
processor = new DotExpanderProcessor("_tag", null, null, "foo.bar");
processor.execute(document);
assertThat(document.getSource().size(), equalTo(1));
assertThat(document.getMetadataMap().size(), equalTo(1)); // the default version
assertThat(document.getMetadata().size(), equalTo(1)); // the default version
assertThat(document.getFieldValue("foo.bar", List.class).size(), equalTo(2));
assertThat(document.getFieldValue("foo.bar.0", Integer.class), equalTo(1));
assertThat(document.getFieldValue("foo.bar.1", String.class), equalTo("2"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public void testRenameAtomicOperationSetFails() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("new_field", (k, v) -> {
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("new_field", (o, k, v) -> {
if (v != null) {
throw new UnsupportedOperationException();
}
}, "list", (k, v) -> {}));
}, "list", (o, k, v) -> {}));
Processor processor = createRenameProcessor("list", "new_field", false);
try {
processor.execute(ingestDocument);
Expand All @@ -160,7 +160,7 @@ public void testRenameAtomicOperationRemoveFails() throws Exception {
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("list", (k, v) -> {
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("list", (o, k, v) -> {
if (v == null) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testSimulate() throws Exception {
source.put("processed", true);
IngestDocument ingestDocument = new IngestDocument("index", "id", Versions.MATCH_ANY, null, null, source);
assertThat(simulateDocumentBaseResult.getIngestDocument().getSource(), equalTo(ingestDocument.getSource()));
assertThat(simulateDocumentBaseResult.getIngestDocument().getMetadataMap(), equalTo(ingestDocument.getMetadataMap()));
assertThat(simulateDocumentBaseResult.getIngestDocument().getMetadata().getMap(), equalTo(ingestDocument.getMetadata().getMap()));
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());

// cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ IngestDocument getIngestDocument() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(DOC_FIELD);
Map<String, Object> metadataMap = ingestDocument.getMetadataMap();
for (Map.Entry<String, Object> metadata : metadataMap.entrySet()) {
if (metadata.getValue() != null) {
builder.field(metadata.getKey(), metadata.getValue().toString());
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();
for (String key : metadata.keySet()) {
Object value = metadata.get(key);
if (value != null) {
builder.field(key, value.toString());
}
}
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
Expand Down
24 changes: 5 additions & 19 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,15 @@ public IngestDocument(String index, String id, long version, String routing, Ver
source
);
this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getTimestamp());
this.ingestMetadata.put(TIMESTAMP, sourceAndMetadata.getMetadata().getTimestamp());
}

/**
* Copy constructor that creates a new {@link IngestDocument} which has exactly the same properties as the one provided as argument
*/
public IngestDocument(IngestDocument other) {
this(
new IngestSourceAndMetadata(
deepCopyMap(other.sourceAndMetadata.getSource()),
deepCopyMap(other.sourceAndMetadata.getMetadata()),
other.getIngestSourceAndMetadata().timestamp,
other.getIngestSourceAndMetadata().validators
),
new IngestSourceAndMetadata(deepCopyMap(other.sourceAndMetadata.getSource()), other.sourceAndMetadata.getMetadata().clone()),
deepCopyMap(other.ingestMetadata)
);
}
Expand All @@ -93,14 +88,12 @@ public IngestDocument(Map<String, Object> sourceAndMetadata, Map<String, Object>
Tuple<Map<String, Object>, Map<String, Object>> sm = IngestSourceAndMetadata.splitSourceAndMetadata(sourceAndMetadata);
this.sourceAndMetadata = new IngestSourceAndMetadata(
sm.v1(),
sm.v2(),
IngestSourceAndMetadata.getTimestamp(ingestMetadata),
IngestSourceAndMetadata.VALIDATORS
new org.elasticsearch.script.Metadata(sm.v2(), IngestSourceAndMetadata.getTimestamp(ingestMetadata))
);
this.ingestMetadata = new HashMap<>(ingestMetadata);
this.ingestMetadata.computeIfPresent(TIMESTAMP, (k, v) -> {
if (v instanceof String) {
return this.sourceAndMetadata.getTimestamp();
return this.sourceAndMetadata.getMetadata().getTimestamp();
}
return v;
});
Expand Down Expand Up @@ -737,18 +730,11 @@ public IngestSourceAndMetadata getIngestSourceAndMetadata() {
return sourceAndMetadata;
}

/**
* Get all Metadata values in a Map
*/
public Map<String, Object> getMetadataMap() {
return sourceAndMetadata.getMetadata();
}

/**
* Get the strongly typed metadata
*/
public org.elasticsearch.script.Metadata getMetadata() {
return sourceAndMetadata;
return sourceAndMetadata.getMetadata();
}

/**
Expand Down
22 changes: 11 additions & 11 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -897,27 +897,27 @@ private void innerExecute(
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
IngestSourceAndMetadata sourceAndMetadata = ingestDocument.getIngestSourceAndMetadata();
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(sourceAndMetadata.getIndex());
indexRequest.id(sourceAndMetadata.getId());
indexRequest.routing(sourceAndMetadata.getRouting());
indexRequest.version(sourceAndMetadata.getVersion());
if (sourceAndMetadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(sourceAndMetadata.getVersionType()));
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = sourceAndMetadata.getIfSeqNo()) != null) {
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = sourceAndMetadata.getIfPrimaryTerm()) != null) {
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(sourceAndMetadata.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest
// processor creates a source map that is self-referencing.
Expand All @@ -933,7 +933,7 @@ private void innerExecute(
return;
}
Map<String, String> map;
if ((map = sourceAndMetadata.getDynamicTemplates()) != null) {
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
Expand Down

0 comments on commit 39de085

Please sign in to comment.