Skip to content

Commit

Permalink
Add template snippets support for KV ingest processor (#73758) (#74508)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jun 23, 2021
1 parent 74c4044 commit 2ece2cd
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 39 deletions.
4 changes: 2 additions & 2 deletions docs/reference/ingest/processors/kv.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ TIP: Using the KV Processor can result in field names that you cannot control. C
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed
| `field` | yes | - | The field to be parsed. Supports <<template-snippets,template snippets>>.
| `field_split` | yes | - | Regex pattern to use for splitting key-value pairs
| `value_split` | yes | - | Regex pattern to use for splitting the key from the value within a key-value pair
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document
| `target_field` | no | `null` | The field to insert the extracted keys into. Defaults to the root of the document. Supports <<template-snippets,template snippets>>.
| `include_keys` | no | `null` | List of keys to filter and insert into document. Defaults to including all keys
| `exclude_keys` | no | `null` | List of keys to exclude from document
| `ignore_missing` | no | `false` | If `true` and `field` does not exist or is `null`, the processor quietly exits without modifying the document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
entry(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()),
entry(JsonProcessor.TYPE, new JsonProcessor.Factory()),
entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()),
entry(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)),
entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()),
entry(BytesProcessor.TYPE, new BytesProcessor.Factory()),
entry(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;

import java.util.Collections;
import java.util.List;
Expand All @@ -32,17 +34,17 @@ public final class KeyValueProcessor extends AbstractProcessor {

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final TemplateScript.Factory field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(String tag, String description, String field, String fieldSplit, String valueSplit, Set<String> includeKeys,
Set<String> excludeKeys, String targetField, boolean ignoreMissing,
KeyValueProcessor(String tag, String description, TemplateScript.Factory field, String fieldSplit, String valueSplit,
Set<String> includeKeys, Set<String> excludeKeys, TemplateScript.Factory targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets, String prefix) {
super(tag, description);
this.field = field;
Expand All @@ -58,9 +60,9 @@ public final class KeyValueProcessor extends AbstractProcessor {
);
}

private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, String field,
private static Consumer<IngestDocument> buildExecution(String fieldSplit, String valueSplit, TemplateScript.Factory field,
Set<String> includeKeys, Set<String> excludeKeys,
String targetField, boolean ignoreMissing,
TemplateScript.Factory targetField, boolean ignoreMissing,
String trimKey, String trimValue, boolean stripBrackets,
String prefix) {
final Predicate<String> keyFilter;
Expand All @@ -77,19 +79,7 @@ private static Consumer<IngestDocument> buildExecution(String fieldSplit, String
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}

final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
Expand All @@ -101,17 +91,43 @@ private static Consumer<IngestDocument> buildExecution(String fieldSplit, String
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);
return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
String target = "";
if (targetField != null) {
target = document.renderTemplate(targetField);
}

final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (target.isEmpty()) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = target + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
String path = document.renderTemplate(field);
if (path.isEmpty() || document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}
String value = document.getFieldValue(path, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs.");
}
for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
Expand Down Expand Up @@ -140,7 +156,7 @@ private static Function<String, String[]> buildSplitter(String split, boolean fi
}
}

String getField() {
TemplateScript.Factory getField() {
return field;
}

Expand All @@ -160,7 +176,7 @@ Set<String> getExcludeKeys() {
return excludeKeys;
}

String getTargetField() {
TemplateScript.Factory getTargetField() {
return targetField;
}

Expand Down Expand Up @@ -188,11 +204,25 @@ public String getType() {
}

public static class Factory implements Processor.Factory {
private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public KeyValueProcessor create(Map<String, Processor.Factory> registry, String processorTag,
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"field", field, scriptService);
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
TemplateScript.Factory targetFieldTemplate = null;
if (targetField != null) {
targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag,
"target_field", targetField, scriptService);
}

String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
Expand All @@ -212,8 +242,8 @@ public KeyValueProcessor create(Map<String, Processor.Factory> registry, String
}
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new KeyValueProcessor(
processorTag, description, field, fieldSplit, valueSplit, includeKeys, excludeKeys, targetField, ignoreMissing,
trimKey, trimValue, stripBrackets, prefix
processorTag, description, fieldTemplate, fieldSplit, valueSplit, includeKeys, excludeKeys, targetFieldTemplate,
ignoreMissing, trimKey, trimValue, stripBrackets, prefix
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,16 +26,22 @@

public class KeyValueProcessorFactoryTests extends ESTestCase {

private KeyValueProcessor.Factory factory;

@Before
public void init() {
factory = new KeyValueProcessor.Factory(TestTemplateService.instance());
}

public void testCreateWithDefaults() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), is(nullValue()));
Expand All @@ -42,7 +50,6 @@ public void testCreateWithDefaults() throws Exception {
}

public void testCreateWithAllFieldsSet() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand All @@ -54,17 +61,16 @@ public void testCreateWithAllFieldsSet() throws Exception {
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

public void testCreateWithMissingField() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
ElasticsearchException exception = expectThrows(ElasticsearchParseException.class,
Expand All @@ -73,7 +79,6 @@ public void testCreateWithMissingField() {
}

public void testCreateWithMissingFieldSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAlphaOfLength(10);
Expand All @@ -83,7 +88,6 @@ public void testCreateWithMissingFieldSplit() {
}

public void testCreateWithMissingValueSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.test.ESTestCase;

import java.util.ArrayList;
Expand All @@ -27,7 +28,7 @@

public class KeyValueProcessorTests extends ESTestCase {

private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance());

public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testMissingField() throws Exception {
Processor processor = createKvProcessor("unknown", "&",
"=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,58 @@ teardown:
id: 1
- match: { _source.goodbye: "everybody" }
- match: { _source.hello: "world" }

---
"Test KV Processor with template snippets":
- do:
ingest.put_pipeline:
id: "1"
body: >
{
"processors": [
{
"kv" : {
"field" : "{{origin}}",
"target_field" : "{{target}}",
"field_split": " ",
"value_split": "="
}
},
{
"kv" : {
"field" : "{{origin}}",
"field_split": " ",
"value_split": "="
}
},
{
"kv" : {
"field" : "{{origin1}}",
"field_split": " ",
"value_split": "=",
"ignore_missing": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "1"
body: {
origin: "field1",
field1: "goodbye=everybody hello=world",
target: "bar"
}

- do:
get:
index: test
id: 1
- match: { _source.bar.goodbye: "everybody" }
- match: { _source.bar.hello: "world" }
- match: { _source.goodbye: "everybody" }
- match: { _source.hello: "world" }

0 comments on commit 2ece2cd

Please sign in to comment.