Skip to content

Commit

Permalink
Ingest: Add validation and strong typing to sourceAndMetdata map (#87673
Browse files Browse the repository at this point in the history
)

Adds `IngestSourceAndMetdata` to replace the sourceAndMetadata map. 

This validates metadata values when they are added to the map for use in
scripts and other process as well as provides typed getters and for use
inside of server.

This change lays the foundation for strongly typed Metadata access in scripting.

Related: #87309
  • Loading branch information
stu-elastic committed Jun 27, 2022
1 parent 0adf139 commit 5a2d91c
Show file tree
Hide file tree
Showing 19 changed files with 1,125 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void testTargetField() throws Exception {
String fieldName;
boolean ignoreMissing;
do {
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
fieldValue = RandomDocumentPicks.randomString(random());
fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, modifyInput(fieldValue));
ignoreMissing = randomBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.Metadata;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
Expand Down Expand Up @@ -121,39 +120,6 @@ public void testConvertScalarToList() throws Exception {
}
}

public void testAppendMetadataExceptVersion() throws Exception {
// here any metadata field value becomes a list, which won't make sense in most of the cases,
// but support for append is streamlined like for set so we test it
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING);
List<String> values = new ArrayList<>();
Processor appendProcessor;
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
values.add(value);
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), value, true);
} else {
int valuesSize = randomIntBetween(0, 10);
for (int i = 0; i < valuesSize; i++) {
values.add(randomAlphaOfLengthBetween(1, 10));
}
appendProcessor = createAppendProcessor(randomMetadata.getFieldName(), values, true);
}

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Object initialValue = ingestDocument.getSourceAndMetadata().get(randomMetadata.getFieldName());
appendProcessor.execute(ingestDocument);
List<?> list = ingestDocument.getFieldValue(randomMetadata.getFieldName(), List.class);
if (initialValue == null) {
assertThat(list, equalTo(values));
} else {
assertThat(list.size(), equalTo(values.size() + 1));
assertThat(list.get(0), equalTo(initialValue));
for (int i = 1; i < list.size(); i++) {
assertThat(list.get(i), equalTo(values.get(i - 1)));
}
}
}

public void testAppendingDuplicateValueToScalarDoesNotModifyDocument() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String originalValue = randomAlphaOfLengthBetween(1, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,14 @@ public void testRenameExistingFieldNullValue() throws Exception {
}

public void testRenameAtomicOperationSetFails() throws Exception {
Map<String, Object> source = new HashMap<String, Object>() {
@Override
public Object put(String key, Object value) {
if (key.equals("new_field")) {
throw new UnsupportedOperationException();
}
return super.put(key, value);
}
};
source.put("list", Collections.singletonList("item"));
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofSourceAndMetadata(source);
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("new_field", (k, v) -> {
if (v != null) {
throw new UnsupportedOperationException();
}
}, "list", (k, v) -> {}));
Processor processor = createRenameProcessor("list", "new_field", false);
try {
processor.execute(ingestDocument);
Expand All @@ -161,18 +157,14 @@ public Object put(String key, Object value) {
}

public void testRenameAtomicOperationRemoveFails() throws Exception {
Map<String, Object> source = new HashMap<String, Object>() {
@Override
public Object remove(Object key) {
if (key.equals("list")) {
throw new UnsupportedOperationException();
}
return super.remove(key);
}
};
source.put("list", Collections.singletonList("item"));
Map<String, Object> metadata = new HashMap<>();
metadata.put("list", Collections.singletonList("item"));

IngestDocument ingestDocument = TestIngestDocument.ofSourceAndMetadata(source);
IngestDocument ingestDocument = TestIngestDocument.ofMetadataWithValidator(metadata, Map.of("list", (k, v) -> {
if (v == null) {
throw new UnsupportedOperationException();
}
}));
Processor processor = createRenameProcessor("list", "new_field", false);
try {
processor.execute(ingestDocument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.Metadata;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestIngestDocument;
Expand All @@ -28,6 +27,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.ingest.IngestDocument.Metadata;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -36,6 +36,9 @@ public class SetProcessorTests extends ESTestCase {
public void testSetExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
while (Metadata.isMetadata(fieldName)) {
fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
}
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, null, true, false);
processor.execute(ingestDocument);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;

public class UriPartsProcessorTests extends ESTestCase {

Expand Down Expand Up @@ -191,7 +191,9 @@ public void testRemoveIfSuccessfulDoesNotRemoveTargetField() throws Exception {

Map<String, Object> expectedSourceAndMetadata = new HashMap<>();
expectedSourceAndMetadata.put(field, Map.of("scheme", "http", "domain", "www.google.com", "path", ""));
assertThat(output.getSourceAndMetadata().entrySet(), containsInAnyOrder(expectedSourceAndMetadata.entrySet().toArray()));
for (Map.Entry<String, Object> entry : expectedSourceAndMetadata.entrySet()) {
assertThat(output.getSourceAndMetadata(), hasEntry(entry.getKey(), entry.getValue()));
}
}

public void testInvalidUri() {
Expand Down Expand Up @@ -234,7 +236,9 @@ private void testUriParsing(boolean keepOriginal, boolean removeIfSuccessful, St
}
expectedSourceAndMetadata.put("url", values);

assertThat(output.getSourceAndMetadata().entrySet(), containsInAnyOrder(expectedSourceAndMetadata.entrySet().toArray()));
for (Map.Entry<String, Object> entry : expectedSourceAndMetadata.entrySet()) {
assertThat(output.getSourceAndMetadata(), hasEntry(entry.getKey(), entry.getValue()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public void testSimulate() throws Exception {
source.put("fail", false);
source.put("processed", true);
IngestDocument ingestDocument = new IngestDocument("index", "id", Versions.MATCH_ANY, null, null, source);
assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
assertThat(simulateDocumentBaseResult.getIngestDocument().getSource(), equalTo(ingestDocument.getSource()));
assertThat(simulateDocumentBaseResult.getIngestDocument().getMetadata(), equalTo(ingestDocument.getMetadata()));
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());

// cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -23,7 +24,6 @@

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -42,7 +42,7 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
"ingest_document",
true,
a -> {
HashMap<String, Object> sourceAndMetadata = new HashMap<>();
Map<String, Object> sourceAndMetadata = Maps.newHashMapWithExpectedSize(5);
sourceAndMetadata.put(Metadata.INDEX.getFieldName(), a[0]);
sourceAndMetadata.put(Metadata.ID.getFieldName(), a[1]);
if (a[2] != null) {
Expand All @@ -55,7 +55,8 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
sourceAndMetadata.put(Metadata.VERSION_TYPE.getFieldName(), a[4]);
}
sourceAndMetadata.putAll((Map<String, Object>) a[5]);
return new WriteableIngestDocument(IngestDocument.of(sourceAndMetadata, (Map<String, Object>) a[6]));
Map<String, Object> ingestMetadata = (Map<String, Object>) a[6];
return new WriteableIngestDocument(new IngestDocument(sourceAndMetadata, ingestMetadata));
}
);
static {
Expand Down Expand Up @@ -89,7 +90,7 @@ final class WriteableIngestDocument implements Writeable, ToXContentFragment {
WriteableIngestDocument(StreamInput in) throws IOException {
Map<String, Object> sourceAndMetadata = in.readMap();
Map<String, Object> ingestMetadata = in.readMap();
this.ingestDocument = IngestDocument.of(sourceAndMetadata, ingestMetadata);
this.ingestDocument = new IngestDocument(sourceAndMetadata, ingestMetadata);
}

@Override
Expand All @@ -105,18 +106,16 @@ IngestDocument getIngestDocument() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(DOC_FIELD);
Map<IngestDocument.Metadata, Object> metadataMap = ingestDocument.getMetadata();
for (Map.Entry<IngestDocument.Metadata, Object> metadata : metadataMap.entrySet()) {
Map<String, Object> metadataMap = ingestDocument.getMetadata();
for (Map.Entry<String, Object> metadata : metadataMap.entrySet()) {
if (metadata.getValue() != null) {
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
builder.field(metadata.getKey(), metadata.getValue().toString());
}
}
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}
Map<String, Object> source = IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata());
metadataMap.keySet().forEach(mD -> source.remove(mD.getFieldName()));
builder.field(SOURCE_FIELD, source);
builder.field(SOURCE_FIELD, ingestDocument.getSource());
builder.field(INGEST_FIELD, ingestDocument.getIngestMetadata());
builder.endObject();
return builder;
Expand Down

0 comments on commit 5a2d91c

Please sign in to comment.