Skip to content

Commit

Permalink
Add reroute processor (#76511)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Apr 18, 2023
1 parent f13f77b commit 11b598a
Show file tree
Hide file tree
Showing 9 changed files with 905 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/76511.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 76511
summary: Add `reroute` processor
area: Ingest Node
type: enhancement
issues: []
1 change: 1 addition & 0 deletions docs/reference/ingest/processors.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ include::processors/redact.asciidoc[]
include::processors/registered-domain.asciidoc[]
include::processors/remove.asciidoc[]
include::processors/rename.asciidoc[]
include::processors/reroute.asciidoc[]
include::processors/script.asciidoc[]
include::processors/set.asciidoc[]
include::processors/set-security-user.asciidoc[]
Expand Down
94 changes: 94 additions & 0 deletions docs/reference/ingest/processors/reroute.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
[[reroute-processor]]
=== Reroute processor
++++
<titleabbrev>Reroute</titleabbrev>
++++

experimental::[]

The `reroute` processor allows to route a document to another target index or data stream.
It has two main modes:

When setting the `destination` option, the target is explicitly specified and the `dataset` and `namespace` options can't be set.

When the `destination` option is not set, this processor is in a data stream mode.
Note that in this mode, the `reroute` processor can only be used on data streams that follow the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme].
Trying to use this processor on a data stream with a non-compliant name will raise an exception.

The name of a data stream consists of three parts: `<type>-<dataset>-<namespace>`.
See the {fleet-guide}/data-streams.html#data-streams-naming-scheme[data stream naming scheme] documentation for more details.

This processor can use both static values or reference fields from the document to determine the `dataset` and `namespace` components of the new target.
See <<reroute-options>> for more details.

NOTE: It's not possible to change the `type` of the data stream with the `reroute` processor.

After a `reroute` processor has been executed, all the other processors of the current pipeline are skipped, including the final pipeline.
If the current pipeline is executed in the context of a <<pipeline-processor>>, the calling pipeline will be skipped, too.
This means that at most one `reroute` processor is ever executed within a pipeline,
allowing to define mutually exclusive routing conditions,
similar to a if, else-if, else-if, … condition.

The reroute processor ensures that the `data_stream.<type|dataset|namespace>` fields are set according to the new target.
If the document contains a `event.dataset` value, it will be updated to reflect the same value as `data_stream.dataset`.

Note that the client needs to have permissions to the final target.
Otherwise, the document will be rejected with a security exception which looks like this:

[source,js]
--------------------------------------------------
{"type":"security_exception","reason":"action [indices:admin/auto_create] is unauthorized for API key id [8-dt9H8BqGblnY2uSI--] of user [elastic/fleet-server] on indices [logs-foo-default], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}
--------------------------------------------------
// NOTCONSOLE

[[reroute-options]]
.Reroute options
[options="header"]
|======
| Name | Required | Default | Description
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
| `namespace` | no | `{{data_stream.namespace}}` a| Field references or a static value for the namespace part of the data stream name. See the criteria for <<indices-create-api-path-params, index names>> for allowed characters. Must be no longer than 100 characters.

Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<namespace>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
include::common-options.asciidoc[]
|======

The `if` option can be used to define the condition in which the document should be rerouted to a new target.

[source,js]
--------------------------------------------------
{
"reroute": {
"tag": "nginx",
"if" : "ctx?.log?.file?.path?.contains('nginx')",
"dataset": "nginx"
}
}
--------------------------------------------------
// NOTCONSOLE

The dataset and namespace options can contain either a single value or a list of values that are used as a fallback.
If a field reference evaluates to `null`, is not present in the document, the next value or field reference is used.
If a field reference evaluates to a non-`String` value, the processor fails.

In the following example, the processor would first try to resolve the value for the `service.name` field to determine the value for `dataset`.
If that field resolves to `null`, is missing, or is a non-string value, it would try the next element in the list.
In this case, this is the static value `"generic`".
The `namespace` option is configured with just a single static value.

[source,js]
--------------------------------------------------
{
"reroute": {
"dataset": [
"{{service.name}}",
"generic"
],
"namespace": "default"
}
}
--------------------------------------------------
// NOTCONSOLE
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()),
entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)),
entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)),
entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()),
entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)),
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.core.Nullable;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Pattern;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;

public final class RerouteProcessor extends AbstractProcessor {

public static final String TYPE = "reroute";

private static final String NAMING_SCHEME_ERROR_MESSAGE =
"invalid data stream name: [%s]; must follow naming scheme <type>-<dataset>-<namespace>";

private static final String DATA_STREAM_PREFIX = "data_stream.";
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
private static final String EVENT_DATASET = "event.dataset";
private final List<DataStreamValueSource> dataset;
private final List<DataStreamValueSource> namespace;
private final String destination;

RerouteProcessor(
String tag,
String description,
List<DataStreamValueSource> dataset,
List<DataStreamValueSource> namespace,
String destination
) {
super(tag, description);
if (dataset.isEmpty()) {
this.dataset = List.of(DATASET_VALUE_SOURCE);
} else {
this.dataset = dataset;
}
if (namespace.isEmpty()) {
this.namespace = List.of(NAMESPACE_VALUE_SOURCE);
} else {
this.namespace = namespace;
}
this.destination = destination;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
if (destination != null) {
ingestDocument.reroute(destination);
return ingestDocument;
}
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
final String type;
final String currentDataset;
final String currentNamespace;

// parse out the <type>-<dataset>-<namespace> components from _index
int indexOfFirstDash = indexName.indexOf('-');
if (indexOfFirstDash < 0) {
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
if (indexOfSecondDash < 0) {
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
type = parseDataStreamType(indexName, indexOfFirstDash);
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);

String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
String newTarget = type + "-" + dataset + "-" + namespace;
ingestDocument.reroute(newTarget);
ingestDocument.setFieldValue(DATA_STREAM_TYPE, type);
ingestDocument.setFieldValue(DATA_STREAM_DATASET, dataset);
ingestDocument.setFieldValue(DATA_STREAM_NAMESPACE, namespace);
if (ingestDocument.hasField(EVENT_DATASET)) {
// ECS specifies that "event.dataset should have the same value as data_stream.dataset"
// not eagerly set event.dataset but only if the doc contains it already to ensure it's consistent with data_stream.dataset
ingestDocument.setFieldValue(EVENT_DATASET, dataset);
}
return ingestDocument;
}

private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
return dataStreamName.substring(0, indexOfFirstDash);
}

private static String parseDataStreamDataset(String dataStreamName, int indexOfFirstDash, int indexOfSecondDash) {
return dataStreamName.substring(indexOfFirstDash + 1, indexOfSecondDash);
}

private static String parseDataStreamNamespace(String dataStreamName, int indexOfSecondDash) {
return dataStreamName.substring(indexOfSecondDash + 1);
}

private String determineDataStreamField(
IngestDocument ingestDocument,
List<DataStreamValueSource> valueSources,
String fallbackFromCurrentTarget
) {
// first try to get value from the configured dataset/namespace field references
// if this contains a static value rather than a field reference, this is guaranteed to return
for (DataStreamValueSource value : valueSources) {
String result = value.resolve(ingestDocument);
if (result != null) {
return result;
}
}
// use the dataset/namespace value we parsed out from the current target (_index) as a fallback
return fallbackFromCurrentTarget;
}

@Override
public String getType() {
return TYPE;
}

List<DataStreamValueSource> getDataStreamDataset() {
return dataset;
}

List<DataStreamValueSource> getDataStreamNamespace() {
return namespace;
}

String getDestination() {
return destination;
}

public static final class Factory implements Processor.Factory {

@Override
public RerouteProcessor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) throws Exception {
List<DataStreamValueSource> dataset;
try {
dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
.stream()
.map(DataStreamValueSource::dataset)
.toList();
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, tag, "dataset", e.getMessage());
}
List<DataStreamValueSource> namespace;
try {
namespace = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "namespace")
.stream()
.map(DataStreamValueSource::namespace)
.toList();
} catch (IllegalArgumentException e) {
throw newConfigurationException(TYPE, tag, "namespace", e.getMessage());
}

String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
}

return new RerouteProcessor(tag, description, dataset, namespace, destination);
}
}

/**
* Contains either a {{field reference}} or a static value for a dataset or a namespace field
*/
static final class DataStreamValueSource {

private static final int MAX_LENGTH = 100;
private static final String REPLACEMENT = "_";
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");

private final String value;
private final String fieldReference;
private final Function<String, String> sanitizer;

public static DataStreamValueSource dataset(String dataset) {
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
}

public static DataStreamValueSource namespace(String namespace) {
return new DataStreamValueSource(namespace, nsp -> sanitizeDataStreamField(nsp, DISALLOWED_IN_NAMESPACE));
}

private static String sanitizeDataStreamField(String s, Pattern disallowedInDataset) {
if (s == null) {
return null;
}
s = s.toLowerCase(Locale.ROOT);
s = s.substring(0, Math.min(s.length(), MAX_LENGTH));
return disallowedInDataset.matcher(s).replaceAll(REPLACEMENT);
}

private DataStreamValueSource(String value, Function<String, String> sanitizer) {
this.sanitizer = sanitizer;
this.value = value;
if (value.contains("{{") || value.contains("}}")) {
if (value.startsWith("{{") == false || value.endsWith("}}") == false) {
throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
}
String fieldReference = value.substring(2, value.length() - 2);
// field references may have two or three curly braces
if (fieldReference.startsWith("{") && fieldReference.endsWith("}")) {
fieldReference = fieldReference.substring(1, fieldReference.length() - 1);
}
// only a single field reference is allowed
// so something like this is disallowed: {{foo}}-{{bar}}
if (fieldReference.contains("{") || fieldReference.contains("}")) {
throw new IllegalArgumentException("'" + value + "' is not a valid field reference");
}
this.fieldReference = fieldReference;
} else {
this.fieldReference = null;
if (Objects.equals(sanitizer.apply(value), value) == false) {
throw new IllegalArgumentException("'" + value + "' contains disallowed characters");
}
}
}

/**
* Resolves the field reference from the provided ingest document or returns the static value if this value source doesn't represent
* a field reference.
* @param ingestDocument
* @return the resolved field reference or static value
*/
@Nullable
public String resolve(IngestDocument ingestDocument) {
if (fieldReference != null) {
return sanitizer.apply(ingestDocument.getFieldValue(fieldReference, String.class, true));
} else {
return value;
}
}
}
}

0 comments on commit 11b598a

Please sign in to comment.