Skip to content

Commit

Permalink
RemoveProcessor updated to support fieldsToKeep (#83665)
Browse files Browse the repository at this point in the history
  • Loading branch information
zembrzuski committed Mar 7, 2022
1 parent a388e4a commit eadb566
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 24 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/83665.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83665
summary: "RemoveProcessor updated to support fieldsToKeep"
area: Ingest
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,72 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields. Nothing happens if the field is not present.
*/
public final class RemoveProcessor extends AbstractProcessor {

public static final String TYPE = "remove";

private final List<TemplateScript.Factory> fields;
private final List<TemplateScript.Factory> fieldsToRemove;
private final List<TemplateScript.Factory> fieldsToKeep;
private final boolean ignoreMissing;

RemoveProcessor(String tag, String description, List<TemplateScript.Factory> fields, boolean ignoreMissing) {
RemoveProcessor(
String tag,
String description,
List<TemplateScript.Factory> fieldsToRemove,
List<TemplateScript.Factory> fieldsToKeep,
boolean ignoreMissing
) {
super(tag, description);
this.fields = new ArrayList<>(fields);
this.fieldsToRemove = new ArrayList<>(fieldsToRemove);
this.fieldsToKeep = new ArrayList<>(fieldsToKeep);
this.ignoreMissing = ignoreMissing;
}

public List<TemplateScript.Factory> getFields() {
return fields;
}

@Override
public IngestDocument execute(IngestDocument document) {
if (ignoreMissing) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
if (document.hasField(path)) {
document.removeField(path);
}
});
if (fieldsToKeep.isEmpty() == false) {
fieldsToKeepProcessor(document);
} else {
fields.forEach(document::removeField);
fieldsToRemoveProcessor(document);
}

return document;
}

private void fieldsToRemoveProcessor(IngestDocument document) {
if (ignoreMissing) {
fieldsToRemove.forEach(field -> removeWhenPresent(document, document.renderTemplate(field)));
} else {
fieldsToRemove.forEach(document::removeField);
}
}

private void fieldsToKeepProcessor(IngestDocument document) {
IngestDocument.getAllFields(document.getSourceAndMetadata())
.stream()
.filter(documentField -> IngestDocument.Metadata.isMetadata(documentField) == false)
.filter(documentField -> shouldKeep(documentField, fieldsToKeep, document) == false)
.forEach(documentField -> removeWhenPresent(document, documentField));
}

private void removeWhenPresent(IngestDocument document, String documentField) {
if (document.hasField(documentField)) {
document.removeField(documentField);
}
}

static boolean shouldKeep(String documentField, List<TemplateScript.Factory> fieldsToKeep, IngestDocument document) {
return fieldsToKeep.stream().anyMatch(fieldToKeep -> {
String path = document.renderTemplate(fieldToKeep);
return documentField.equals(path) || path.startsWith(documentField + ".") || documentField.startsWith(path + ".");
});
}

@Override
public String getType() {
return TYPE;
Expand All @@ -75,8 +106,35 @@ public RemoveProcessor create(
String description,
Map<String, Object> config
) throws Exception {
final List<TemplateScript.Factory> compiledTemplatesToRemove = getTemplates(processorTag, config, "field");
final List<TemplateScript.Factory> compiledTemplatesToKeep = getTemplates(processorTag, config, "keep");

if (compiledTemplatesToRemove.isEmpty() && compiledTemplatesToKeep.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "keep", "or [field] must be specified");
}

if (compiledTemplatesToRemove.isEmpty() == false && compiledTemplatesToKeep.isEmpty() == false) {
throw newConfigurationException(TYPE, processorTag, "keep", "and [field] cannot both be used in the same processor");
}

boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new RemoveProcessor(processorTag, description, compiledTemplatesToRemove, compiledTemplatesToKeep, ignoreMissing);
}

private List<TemplateScript.Factory> getTemplates(String processorTag, Map<String, Object> config, String propertyName) {
return getFields(processorTag, config, propertyName).stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, propertyName, f, scriptService))
.collect(Collectors.toList());
}

private List<String> getFields(String processorTag, Map<String, Object> config, String propertyName) {
final List<String> fields = new ArrayList<>();
final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, "field");

if (config.containsKey(propertyName) == false) {
return fields;
}

final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, propertyName);
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
Expand All @@ -85,11 +143,16 @@ public RemoveProcessor create(
fields.add((String) field);
}

final List<TemplateScript.Factory> compiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new RemoveProcessor(processorTag, description, compiledTemplates, ignoreMissing);
return fields;
}
}

public List<TemplateScript.Factory> getFieldsToRemove() {
return fieldsToRemove;
}

public List<TemplateScript.Factory> getFieldsToKeep() {
return fieldsToKeep;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ public void testCreate() throws Exception {
String processorTag = randomAlphaOfLength(10);
RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config);
assertThat(removeProcessor.getTag(), equalTo(processorTag));
assertThat(removeProcessor.getFields().get(0).newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(removeProcessor.getFieldsToRemove().get(0).newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
}

public void testCreateKeepField() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("keep", Arrays.asList("field1", "field2"));
String processorTag = randomAlphaOfLength(10);
RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config);
assertThat(removeProcessor.getTag(), equalTo(processorTag));
assertThat(removeProcessor.getFieldsToKeep().get(0).newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(removeProcessor.getFieldsToKeep().get(1).newInstance(Collections.emptyMap()).execute(), equalTo("field2"));
}

public void testCreateMultipleFields() throws Exception {
Expand All @@ -47,7 +57,7 @@ public void testCreateMultipleFields() throws Exception {
RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config);
assertThat(removeProcessor.getTag(), equalTo(processorTag));
assertThat(
removeProcessor.getFields()
removeProcessor.getFieldsToRemove()
.stream()
.map(template -> template.newInstance(Collections.emptyMap()).execute())
.collect(Collectors.toList()),
Expand All @@ -61,7 +71,19 @@ public void testCreateMissingField() throws Exception {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
assertThat(e.getMessage(), equalTo("[keep] or [field] must be specified"));
}
}

public void testCreateTooManyFields() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("keep", "field2");
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), equalTo("[keep] and [field] cannot both be used in the same processor"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
Expand All @@ -30,6 +33,7 @@ public void testRemoveFields() throws Exception {
randomAlphaOfLength(10),
null,
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory(field)),
Collections.emptyList(),
false
);
processor.execute(ingestDocument);
Expand Down Expand Up @@ -61,4 +65,119 @@ public void testIgnoreMissing() throws Exception {
Processor processor = new RemoveProcessor.Factory(TestTemplateService.instance()).create(null, processorTag, null, config);
processor.execute(ingestDocument);
}

public void testKeepFields() throws Exception {
Map<String, Object> address = new HashMap<>();
address.put("street", "Ipiranga Street");
address.put("number", 123);

Map<String, Object> source = new HashMap<>();
source.put("name", "eric clapton");
source.put("age", 55);
source.put("address", address);

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);

List<TemplateScript.Factory> fieldsToKeep = List.of(
new TestTemplateService.MockTemplateScript.Factory("name"),
new TestTemplateService.MockTemplateScript.Factory("address.street")
);

Processor processor = new RemoveProcessor(randomAlphaOfLength(10), null, new ArrayList<>(), fieldsToKeep, false);
processor.execute(ingestDocument);
assertTrue(ingestDocument.hasField("name"));
assertTrue(ingestDocument.hasField("address"));
assertTrue(ingestDocument.hasField("address.street"));
assertFalse(ingestDocument.hasField("age"));
assertFalse(ingestDocument.hasField("address.number"));
assertTrue(ingestDocument.hasField("_index"));
assertTrue(ingestDocument.hasField("_version"));
assertTrue(ingestDocument.hasField("_id"));
assertTrue(ingestDocument.hasField("_version_type"));
}

public void testShouldKeep(String a, String b) {
Map<String, Object> address = new HashMap<>();
address.put("street", "Ipiranga Street");
address.put("number", 123);

Map<String, Object> source = new HashMap<>();
source.put("name", "eric clapton");
source.put("address", address);

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), source);

assertTrue(
RemoveProcessor.shouldKeep(
"name",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("name")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"age",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("age")),
ingestDocument
)
);

assertFalse(
RemoveProcessor.shouldKeep(
"name",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("age")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"address",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address.street")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"address",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address.number")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"address.street",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"address.number",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address")),
ingestDocument
)
);

assertTrue(
RemoveProcessor.shouldKeep(
"address",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address")),
ingestDocument
)
);

assertFalse(
RemoveProcessor.shouldKeep(
"address.street",
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory("address.number")),
ingestDocument
)
);
}

}

0 comments on commit eadb566

Please sign in to comment.