Skip to content

Commit

Permalink
Cleanup+Speed-up DataStream Metadata handling (#86470)
Browse files Browse the repository at this point in the history
No need to copy maps so often, this was quite slow in many-shards
benchmarking which uses lots of datastreams.
  • Loading branch information
original-brownbear committed May 9, 2022
1 parent 8107176 commit 699da84
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -32,6 +33,8 @@
public class DataStreamMetadata implements Metadata.Custom {

public static final String TYPE = "data_stream";

public static final DataStreamMetadata EMPTY = new DataStreamMetadata(Map.of(), Map.of());
private static final ParseField DATA_STREAM = new ParseField("data_stream");
private static final ParseField DATA_STREAM_ALIASES = new ParseField("data_stream_aliases");
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -72,7 +75,21 @@ public DataStreamMetadata(Map<String, DataStream> dataStreams, Map<String, DataS
}

public DataStreamMetadata(StreamInput in) throws IOException {
this(in.readMap(StreamInput::readString, DataStream::new), in.readMap(StreamInput::readString, DataStreamAlias::new));
this(
in.readImmutableMap(StreamInput::readString, DataStream::new),
in.readImmutableMap(StreamInput::readString, DataStreamAlias::new)
);
}

public DataStreamMetadata withAddedDatastream(DataStream datastream) {
final String name = datastream.getName();
final DataStream existing = dataStreams.get(name);
if (existing == null) {
return new DataStreamMetadata(Maps.copyMapWithAddedEntry(dataStreams, name, datastream), dataStreamAliases);
} else if (existing.equals(datastream)) {
return this;
}
return new DataStreamMetadata(Maps.copyMapWithAddedOrReplacedEntry(dataStreams, name, datastream), dataStreamAliases);
}

public Map<String, DataStream> dataStreams() {
Expand Down
107 changes: 39 additions & 68 deletions server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ public boolean equalsAliases(Metadata other) {

public SortedMap<String, IndexAbstraction> getIndicesLookup() {
if (indicesLookup == null) {
DataStreamMetadata dataStreamMetadata = custom(DataStreamMetadata.TYPE);
indicesLookup = Builder.buildIndicesLookup(dataStreamMetadata, indices);
indicesLookup = Builder.buildIndicesLookup(custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY), indices);
}
return indicesLookup;
}
Expand Down Expand Up @@ -921,15 +920,11 @@ public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) {
}

public Map<String, DataStream> dataStreams() {
return Optional.ofNullable((DataStreamMetadata) this.custom(DataStreamMetadata.TYPE))
.map(DataStreamMetadata::dataStreams)
.orElse(Collections.emptyMap());
return this.custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY).dataStreams();
}

public Map<String, DataStreamAlias> dataStreamAliases() {
return Optional.ofNullable((DataStreamMetadata) this.custom(DataStreamMetadata.TYPE))
.map(DataStreamMetadata::getDataStreamAliases)
.orElse(Collections.emptyMap());
return this.custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY).getDataStreamAliases();
}

public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
Expand Down Expand Up @@ -1520,14 +1515,7 @@ public Builder removeIndexTemplate(String name) {
}

public DataStream dataStream(String dataStreamName) {
previousIndicesLookup = null;

DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
if (dataStreamMetadata != null) {
return dataStreamMetadata.dataStreams().get(dataStreamName);
} else {
return null;
}
return dataStreamMetadata().dataStreams().get(dataStreamName);
}

public Builder dataStreams(Map<String, DataStream> dataStreams, Map<String, DataStreamAlias> dataStreamAliases) {
Expand All @@ -1552,29 +1540,21 @@ public Builder put(DataStream dataStream) {
// trigger this validation on each new Metadata creation, even if there are no changes to data streams.
dataStream.validate(indices::get);

Map<String, DataStream> existingDataStreams = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.dataStreams())).orElse(new HashMap<>());
existingDataStreams.put(dataStream.getName(), dataStream);
Map<String, DataStreamAlias> existingDataStreamAliases = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.getDataStreamAliases())).orElse(new HashMap<>());

this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams, existingDataStreamAliases));
this.customs.put(DataStreamMetadata.TYPE, dataStreamMetadata().withAddedDatastream(dataStream));
return this;
}

private DataStreamMetadata dataStreamMetadata() {
return (DataStreamMetadata) this.customs.getOrDefault(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY);
}

public boolean put(String aliasName, String dataStream, Boolean isWriteDataStream, String filter) {
previousIndicesLookup = null;

Map<String, DataStream> existingDataStream = Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
.map(dsmd -> new HashMap<>(dsmd.dataStreams()))
.orElse(new HashMap<>());
Map<String, DataStreamAlias> dataStreamAliases = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.getDataStreamAliases())).orElse(new HashMap<>());

if (existingDataStream.containsKey(dataStream) == false) {
final DataStreamMetadata dataStreamMetadata = dataStreamMetadata();
Map<String, DataStream> existingDataStreams = dataStreamMetadata.dataStreams();
Map<String, DataStreamAlias> dataStreamAliases = new HashMap<>(dataStreamMetadata.getDataStreamAliases());
if (existingDataStreams.containsKey(dataStream) == false) {
throw new IllegalArgumentException("alias [" + aliasName + "] refers to a non existing data stream [" + dataStream + "]");
}

Expand All @@ -1598,22 +1578,18 @@ public boolean put(String aliasName, String dataStream, Boolean isWriteDataStrea
}
dataStreamAliases.put(aliasName, alias);

this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStream, dataStreamAliases));
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams, dataStreamAliases));
return true;
}

public Builder removeDataStream(String name) {
previousIndicesLookup = null;

Map<String, DataStream> existingDataStreams = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.dataStreams())).orElse(new HashMap<>());
final DataStreamMetadata dataStreamMetadata = dataStreamMetadata();
Map<String, DataStream> existingDataStreams = new HashMap<>(dataStreamMetadata.dataStreams());
Map<String, DataStreamAlias> existingDataStreamAliases = new HashMap<>(dataStreamMetadata.getDataStreamAliases());
existingDataStreams.remove(name);

Map<String, DataStreamAlias> existingDataStreamAliases = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.getDataStreamAliases())).orElse(new HashMap<>());

Set<String> aliasesToDelete = new HashSet<>();
List<DataStreamAlias> aliasesToUpdate = new ArrayList<>();
for (var alias : existingDataStreamAliases.values()) {
Expand Down Expand Up @@ -1641,9 +1617,8 @@ public Builder removeDataStream(String name) {
public boolean removeDataStreamAlias(String aliasName, String dataStreamName, boolean mustExist) {
previousIndicesLookup = null;

Map<String, DataStreamAlias> dataStreamAliases = Optional.ofNullable(
(DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE)
).map(dsmd -> new HashMap<>(dsmd.getDataStreamAliases())).orElse(new HashMap<>());
final DataStreamMetadata dataStreamMetadata = dataStreamMetadata();
Map<String, DataStreamAlias> dataStreamAliases = new HashMap<>(dataStreamMetadata.getDataStreamAliases());

DataStreamAlias existing = dataStreamAliases.get(aliasName);
if (mustExist && existing == null) {
Expand All @@ -1660,10 +1635,7 @@ public boolean removeDataStreamAlias(String aliasName, String dataStreamName, bo
} else {
dataStreamAliases.remove(aliasName);
}
Map<String, DataStream> existingDataStream = Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
.map(dsmd -> new HashMap<>(dsmd.dataStreams()))
.orElse(new HashMap<>());
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStream, dataStreamAliases));
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(dataStreamMetadata.dataStreams(), dataStreamAliases));
return true;
}

Expand Down Expand Up @@ -1838,7 +1810,7 @@ public Metadata build() {
List<IndexMetadata> aliasIndices = entry.getValue().stream().map(idx -> indicesMap.get(idx.getName())).toList();
validateAlias(entry.getKey(), aliasIndices);
}
final DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
final DataStreamMetadata dataStreamMetadata = dataStreamMetadata();
ensureNoNameCollisions(aliasedIndices.keySet(), indicesMap, allIndices, dataStreamMetadata);
assert assertDataStreams(indicesMap, dataStreamMetadata);

Expand Down Expand Up @@ -1901,7 +1873,7 @@ private static void ensureNoNameCollisions(
Set<String> indexAliases,
ImmutableOpenMap<String, IndexMetadata> indicesMap,
Set<String> allIndices,
@Nullable DataStreamMetadata dataStreamMetadata
DataStreamMetadata dataStreamMetadata
) {
final ArrayList<String> duplicates = new ArrayList<>();
final Set<String> aliasDuplicatesWithIndices = new HashSet<>();
Expand Down Expand Up @@ -2007,13 +1979,13 @@ private static void collectAliasDuplicates(
}

static SortedMap<String, IndexAbstraction> buildIndicesLookup(
@Nullable DataStreamMetadata dataStreamMetadata,
DataStreamMetadata dataStreamMetadata,
ImmutableOpenMap<String, IndexMetadata> indices
) {
SortedMap<String, IndexAbstraction> indicesLookup = new TreeMap<>();
Map<String, IndexAbstraction.DataStream> indexToDataStreamLookup = new HashMap<>();
// If there are no indices, then skip data streams. This happens only when metadata is read from disk
if (dataStreamMetadata != null && indices.size() > 0) {
if (indices.size() > 0) {
Map<String, List<String>> dataStreamToAliasLookup = new HashMap<>();
for (DataStreamAlias alias : dataStreamMetadata.getDataStreamAliases().values()) {
List<Index> allIndicesOfAllDataStreams = alias.getDataStreams().stream().map(name -> {
Expand Down Expand Up @@ -2144,28 +2116,27 @@ private static void validateAlias(String aliasName, List<IndexMetadata> indexMet
}
}

static boolean assertDataStreams(ImmutableOpenMap<String, IndexMetadata> indices, @Nullable DataStreamMetadata dsMetadata) {
if (dsMetadata != null) {
// Sanity check, because elsewhere a more user friendly error should have occurred:
List<String> conflictingAliases = null;
static boolean assertDataStreams(ImmutableOpenMap<String, IndexMetadata> indices, DataStreamMetadata dsMetadata) {
// Sanity check, because elsewhere a more user friendly error should have occurred:
List<String> conflictingAliases = null;

for (var dataStream : dsMetadata.dataStreams().values()) {
for (var index : dataStream.getIndices()) {
IndexMetadata im = indices.get(index.getName());
if (im != null && im.getAliases().isEmpty() == false) {
for (var alias : im.getAliases().values()) {
if (conflictingAliases == null) {
conflictingAliases = new LinkedList<>();
}
conflictingAliases.add(alias.alias());
for (var dataStream : dsMetadata.dataStreams().values()) {
for (var index : dataStream.getIndices()) {
IndexMetadata im = indices.get(index.getName());
if (im != null && im.getAliases().isEmpty() == false) {
for (var alias : im.getAliases().values()) {
if (conflictingAliases == null) {
conflictingAliases = new LinkedList<>();
}
conflictingAliases.add(alias.alias());
}
}
}
if (conflictingAliases != null) {
throw new AssertionError("aliases " + conflictingAliases + " cannot refer to backing indices of data streams");
}
}
if (conflictingAliases != null) {
throw new AssertionError("aliases " + conflictingAliases + " cannot refer to backing indices of data streams");
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ public void testValidateDataStreamsForNullDataStreamMetadata() {
.build();

try {
assertDataStreams(metadata.getIndices(), null);
assertDataStreams(metadata.getIndices(), DataStreamMetadata.EMPTY);
} catch (Exception e) {
fail("did not expect exception when validating a system without any data streams but got " + e.getMessage());
}
Expand Down

0 comments on commit 699da84

Please sign in to comment.