Skip to content

Commit

Permalink
Add custom metadata support to data steams. (#63991)
Browse files Browse the repository at this point in the history
Composable index template may hold custom metadata. This change adds behaviour that
when a data stream gets created the custom metadata of the matching composable index
template is copied to new data stream. The get data stream api can then be used to
view the custom metadata.

Example:

```
PUT /_index_template/my-logs-template
{
  "index_patterns": [ "logs-*" ],
  "data_stream": { },
  "template": {
      "settings": {
          "index.number_of_replicas": 0
      }
  },
  "_meta": {
      "managed": true
  }
}

PUT /_data_stream/logs-myapp

GET /_data_stream
```

The get data stream api then yields the following response:

```
{
    "data_streams": [
        {
            "name": "logs-myapp",
            "timestamp_field": {
                "name": "@timestamp"
            },
            "indices": [
                {
                    "index_name": ".ds-logs-myapp-000001",
                    "index_uuid": "3UaBxM3mQXuHR6qx0IDVCw"
                }
            ],
            "generation": 1,
            "_meta": {
                "managed": true
            },
            "status": "GREEN",
            "template": "my-logs-template"
        }
    ]
}
```

Closes #59195
  • Loading branch information
martijnvg committed Oct 26, 2020
1 parent f1930ae commit 5dace55
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,19 @@ public final class DataStream {
String indexTemplate;
@Nullable
String ilmPolicyName;
@Nullable
private final Map<String, Object> metadata;

public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate, @Nullable String ilmPolicyName) {
@Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map<String, Object> metadata) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = indices;
this.generation = generation;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
this.metadata = metadata;
}

public String getName() {
Expand Down Expand Up @@ -81,13 +84,18 @@ public String getIlmPolicyName() {
return ilmPolicyName;
}

public Map<String, Object> getMetadata() {
return metadata;
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
public static final ParseField METADATA_FIELD = new ParseField("_meta");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
Expand All @@ -101,7 +109,8 @@ public String getIlmPolicyName() {
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
String indexTemplate = (String) args[5];
String ilmPolicy = (String) args[6];
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy);
Map<String, Object> metadata = (Map<String, Object>) args[7];
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata);
});

static {
Expand All @@ -112,6 +121,7 @@ public String getIlmPolicyName() {
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -129,11 +139,12 @@ public boolean equals(Object o) {
indices.equals(that.indices) &&
dataStreamStatus == that.dataStreamStatus &&
Objects.equals(indexTemplate, that.indexTemplate) &&
Objects.equals(ilmPolicyName, that.ilmPolicyName);
Objects.equals(ilmPolicyName, that.ilmPolicyName) &&
Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName);
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,25 @@

package org.elasticsearch.client.indices;

import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.DataStreamTestHelper;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction.Response.DataStreamInfo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;

public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetDataStreamAction.Response, GetDataStreamResponse> {

private static List<Index> randomIndexInstances() {
int numIndices = randomIntBetween(0, 128);
List<Index> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(new Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), UUIDs.randomBase64UUID(random())));
}
return indices;
}

private static DataStreamInfo randomInstance() {
List<Index> indices = randomIndexInstances();
long generation = indices.size() + randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
DataStream dataStream = new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
DataStream dataStream = DataStreamTestHelper.randomInstance();
return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -35,6 +37,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
Expand All @@ -45,18 +48,20 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
private final TimestampField timeStampField;
private final List<Index> indices;
private final long generation;
private final Map<String, Object> metadata;

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation) {
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
assert indices.size() > 0;
assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
}

public DataStream(String name, TimestampField timeStampField, List<Index> indices) {
this(name, timeStampField, indices, indices.size());
this(name, timeStampField, indices, indices.size(), null);
}

public String getName() {
Expand All @@ -75,6 +80,11 @@ public long getGeneration() {
return generation;
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}

/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
Expand All @@ -87,7 +97,7 @@ public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata);
}

/**
Expand All @@ -101,7 +111,7 @@ public DataStream removeBackingIndex(Index index) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
return new DataStream(name, timeStampField, backingIndices, generation);
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
}

/**
Expand All @@ -126,7 +136,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation);
return new DataStream(name, timeStampField, backingIndices, generation, metadata);
}

/**
Expand All @@ -142,7 +152,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
}

public DataStream(StreamInput in) throws IOException {
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong());
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readMap(): null);
}

public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -155,22 +166,28 @@ public void writeTo(StreamOutput out) throws IOException {
timeStampField.writeTo(out);
out.writeList(indices);
out.writeVLong(generation);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeMap(metadata);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField METADATA_FIELD = new ParseField("_meta");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3]));
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
(Map<String, Object>) args[4]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), TimestampField.PARSER, TIMESTAMP_FIELD_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -184,6 +201,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(GENERATION_FIELD.getPreferredName(), generation);
if (metadata != null) {
builder.field(METADATA_FIELD.getPreferredName(), metadata);
}
builder.endObject();
return builder;
}
Expand All @@ -196,12 +216,13 @@ public boolean equals(Object o) {
return name.equals(that.name) &&
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
generation == that.generation;
generation == that.generation &&
Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation);
return Objects.hash(name, timeStampField, indices, generation, metadata);
}

public static final class TimestampField implements Writeable, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn

String fieldName = template.getDataStreamTemplate().getTimestampField();
DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName);
DataStream newDataStream = new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()));
DataStream newDataStream =
new DataStream(request.name, timestampField, List.of(firstBackingIndex.getIndex()), 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,8 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
List<Index> updatedIndices = dataStream.getIndices().stream()
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
dataStream.getMetadata());
}

public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,7 @@ public void testIndicesAliasesRequestTargetDataStreams() {

Metadata.Builder mdBuilder = Metadata.builder()
.put(backingIndex, false)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex()), 1));
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(backingIndex.getIndex())));
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();

{
Expand Down Expand Up @@ -1916,7 +1916,7 @@ public void testDataStreams() {
Metadata.Builder mdBuilder = Metadata.builder()
.put(index1, false)
.put(index2, false)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex()), 2));
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), List.of(index1.getIndex(), index2.getIndex())));
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
backingIndices.add(im.getIndex());
}

b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
Expand All @@ -1034,7 +1034,7 @@ public void testBuildIndicesLookupForDataStreams() {
indices.add(idx.getIndex());
b.put(idx, true);
}
b.put(new DataStream(name, createTimestampField("@timestamp"), indices, indices.size()));
b.put(new DataStream(name, createTimestampField("@timestamp"), indices));
}

Metadata metadata = b.build();
Expand Down Expand Up @@ -1100,8 +1100,7 @@ public void testValidateDataStreamsThrowsExceptionOnConflict() {
DataStream dataStream = new DataStream(
dataStreamName,
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
backingIndices.size()
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);

IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
Expand Down Expand Up @@ -1174,8 +1173,7 @@ public void testValidateDataStreamsAllowsPrefixedBackingIndices() {
DataStream dataStream = new DataStream(
dataStreamName,
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
backingIndices.size()
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())
);

IndexAbstraction.DataStream dataStreamAbstraction = new IndexAbstraction.DataStream(dataStream, backingIndices);
Expand Down Expand Up @@ -1275,7 +1273,7 @@ private static CreateIndexResult createIndices(int numIndices, int numBackingInd
b.put(im, false);
backingIndices.add(im.getIndex());
}
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum));
b.put(new DataStream(dataStreamName, createTimestampField("@timestamp"), backingIndices, lastBackingIndexNum, null));
return new CreateIndexResult(indices, backingIndices, b.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomBoolean;

public final class DataStreamTestHelper {

Expand Down Expand Up @@ -103,7 +105,11 @@ public static DataStream randomInstance() {
long generation = indices.size() + ESTestCase.randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(LuceneTestCase.random())));
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation);
Map<String, Object> metadata = null;
if (randomBoolean()) {
metadata = Map.of("key", "value");
}
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata);
}

/**
Expand All @@ -127,7 +133,8 @@ public static ClusterState getClusterStateWithDataStreams(List<Tuple<String, Int
dsTuple.v1(),
createTimestampField("@timestamp"),
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()),
dsTuple.v2()
dsTuple.v2(),
null
);
builder.put(ds);
}
Expand Down

0 comments on commit 5dace55

Please sign in to comment.