Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for overwrite parameter in the enrich processor. #45029

Merged
merged 2 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

String enrichKey = ConfigurationUtils.readStringProperty(TYPE, tag, config, "enrich_key", policy.getEnrichKey());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);

final List<EnrichSpecification> specifications;
final List<Map<?, ?>> specificationConfig = ConfigurationUtils.readList(TYPE, tag, config, "enrich_values");
Expand All @@ -53,7 +54,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

switch (policy.getType()) {
case EnrichPolicy.EXACT_MATCH_TYPE:
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, specifications);
return new ExactMatchProcessor(tag, client, policyName, enrichKey, ignoreMissing, overrideEnabled, specifications);
default:
throw new IllegalArgumentException("unsupported policy type [" + policy.getType() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,23 @@ final class ExactMatchProcessor extends AbstractProcessor {
private final String policyName;
private final String enrichKey;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
private final List<EnrichSpecification> specifications;

ExactMatchProcessor(String tag,
Client client,
String policyName,
String enrichKey,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
this(
tag,
createSearchRunner(client),
policyName,
enrichKey,
ignoreMissing,
overrideEnabled,
specifications
);
}
Expand All @@ -55,12 +58,14 @@ final class ExactMatchProcessor extends AbstractProcessor {
String policyName,
String enrichKey,
boolean ignoreMissing,
boolean overrideEnabled,
List<EnrichSpecification> specifications) {
super(tag);
this.searchRunner = searchRunner;
this.policyName = policyName;
this.enrichKey = enrichKey;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.specifications = specifications;
}

Expand Down Expand Up @@ -111,7 +116,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
assert enrichDocument != null : "enrich document for id [" + enrichKey + "] was empty despite non-zero search hits length";
for (EnrichSpecification specification : specifications) {
Object enrichFieldValue = enrichDocument.get(specification.sourceField);
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
if (overrideEnabled || ingestDocument.hasField(specification.targetField) == false) {
ingestDocument.setFieldValue(specification.targetField, enrichFieldValue);
}
}
handler.accept(ingestDocument, null);
});
Expand Down Expand Up @@ -142,6 +149,10 @@ boolean isIgnoreMissing() {
return ignoreMissing;
}

boolean isOverrideEnabled() {
return overrideEnabled;
}

List<EnrichSpecification> getSpecifications() {
return specifications;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public void testCreateProcessorInstance() throws Exception {
config.put("ignore_missing", keyIgnoreMissing);
}

Boolean overrideEnabled = randomBoolean() ? null : randomBoolean();
if (overrideEnabled != null) {
config.put("override", overrideEnabled);
}

int numRandomValues = randomIntBetween(1, 8);
List<Tuple<String, String>> randomValues = new ArrayList<>(numRandomValues);
for (int i = 0; i < numRandomValues; i++) {
Expand All @@ -55,6 +60,11 @@ public void testCreateProcessorInstance() throws Exception {
assertThat(result.getPolicyName(), equalTo("majestic"));
assertThat(result.getEnrichKey(), equalTo("host"));
assertThat(result.isIgnoreMissing(), is(keyIgnoreMissing));
if (overrideEnabled != null) {
assertThat(result.isOverrideEnabled(), is(overrideEnabled));
} else {
assertThat(result.isOverrideEnabled(), is(true));
}
assertThat(result.getSpecifications().size(), equalTo(numRandomValues));
for (int i = 0; i < numRandomValues; i++) {
EnrichSpecification actual = result.getSpecifications().get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -47,7 +48,7 @@ public class ExactMatchProcessorTests extends ESTestCase {

public void testBasics() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Map.of("domain", "elastic.co"));
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testBasics() throws Exception {

public void testNoMatch() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction();
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Map.of("domain", "elastic.com"));
Expand Down Expand Up @@ -106,7 +107,7 @@ public void testNoMatch() throws Exception {
public void testSearchFailure() throws Exception {
String indexName = ".enrich-_name";
MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false,
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, true,
List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
Map.of("domain", "elastic.com"));
Expand Down Expand Up @@ -141,7 +142,7 @@ public void testSearchFailure() throws Exception {
public void testIgnoreKeyMissing() throws Exception {
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
true, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());

assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6));
Expand All @@ -152,7 +153,7 @@ public void testIgnoreKeyMissing() throws Exception {
}
{
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockedSearchFunction(), "_name", "domain",
false, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
false, true, List.of(new EnrichSpecification("tldRank", "tld_rank"), new EnrichSpecification("tld", "tld")));
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
Expand All @@ -166,6 +167,43 @@ public void testIgnoreKeyMissing() throws Exception {
}
}

public void testExistingFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
List.of(new EnrichSpecification("tld", "tld")));

IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {
resultHolder[0] = result;
exceptionHolder[0] = e;
});
assertThat(exceptionHolder[0], nullValue());
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo("tld"));
}

public void testExistingNullFieldWithOverrideDisabled() throws Exception {
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co")));
ExactMatchProcessor processor = new ExactMatchProcessor("_tag", mockSearch, "_name", "domain", false, false,
List.of(new EnrichSpecification("tld", "tld")));

Map<String, Object> source = new HashMap<>();
source.put("domain", "elastic.co");
source.put("tld", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would the behavior be if "tld" was not set to anything? same as null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in that case tld would be set by the enrich processor, because that field doesn't exist, so nothing will be overwritten.

IngestDocument ingestDocument = new IngestDocument(source, Map.of());
IngestDocument[] resultHolder = new IngestDocument[1];
Exception[] exceptionHolder = new Exception[1];
processor.execute(ingestDocument, (result, e) -> {
resultHolder[0] = result;
exceptionHolder[0] = e;
});
assertThat(exceptionHolder[0], nullValue());
assertThat(resultHolder[0].hasField("tld"), equalTo(true));
assertThat(resultHolder[0].getFieldValue("tld", Object.class), equalTo(null));
}

private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
private final SearchResponse mockResponse;
private final SetOnce<SearchRequest> capturedRequest;
Expand Down