Skip to content

Commit

Permalink
Drop a utility method, add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Apr 12, 2023
1 parent 7cc0a99 commit 7647f9e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import java.util.Objects;
import java.util.function.Function;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;

public final class RerouteProcessor extends AbstractProcessor {

public static final String TYPE = "reroute";

private static final String NAMING_SCHEME_ERROR_MESSAGE =
"invalid data stream name: [%s]; must follow naming scheme <type>-<dataset>-<namespace>";

private static final String DATA_STREAM_PREFIX = "data_stream.";
private static final String DATA_STREAM_TYPE = DATA_STREAM_PREFIX + "type";
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
Expand Down Expand Up @@ -72,11 +78,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
// parse out the <type>-<dataset>-<namespace> components from _index
int indexOfFirstDash = indexName.indexOf('-');
if (indexOfFirstDash < 0) {
throw createInvalidDataStreamNameException(indexName);
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
int indexOfSecondDash = indexName.indexOf('-', indexOfFirstDash + 1);
if (indexOfSecondDash < 0) {
throw createInvalidDataStreamNameException(indexName);
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
}
type = parseDataStreamType(indexName, indexOfFirstDash);
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
Expand All @@ -97,12 +103,6 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

private static IllegalArgumentException createInvalidDataStreamNameException(String indexName) {
return new IllegalArgumentException(
"invalid data stream name: [" + indexName + "]; must follow naming scheme <type>-<dataset>-<namespace>"
);
}

private static String parseDataStreamType(String dataStreamName, int indexOfFirstDash) {
return dataStreamName.substring(0, indexOfFirstDash);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,23 @@ public void testFallbackToValuesFrom_index() throws Exception {
assertDataSetFields(ingestDocument, "logs", "generic", "default");
}

public void testInvalidDataStreamName() throws Exception {
{
IngestDocument ingestDocument = createIngestDocument("foo");
RerouteProcessor processor = createRerouteProcessor(List.of(), List.of());
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("invalid data stream name: [foo]; must follow naming scheme <type>-<dataset>-<namespace>"));
}

{
// naturally, though, a plain destination doesn't have to match the data stream naming convention
IngestDocument ingestDocument = createIngestDocument("foo");
RerouteProcessor processor = createRerouteProcessor("bar");
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("_index", String.class), equalTo("bar"));
}
}

private RerouteProcessor createRerouteProcessor(List<String> dataset, List<String> namespace) {
return new RerouteProcessor(
null,
Expand Down

0 comments on commit 7647f9e

Please sign in to comment.