Skip to content

Commit

Permalink
Make Settings Diffable (#88815)
Browse files Browse the repository at this point in the history
Making settings diffable so that index metadata diffs
are smaller whenever the metadata changes without a setting change
as well as to make index setting updates over a wider number of
indices faster.

This saves about 3% of CPU time on master and about half that on data nodes
that is just burnt for writing setting strings
when bootstrapping many shards benchmarks benchmarks to 50k indices.
  • Loading branch information
original-brownbear committed Jul 29, 2022
1 parent f32c822 commit a3a5332
Show file tree
Hide file tree
Showing 31 changed files with 188 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Settings.writeSettingsToStream(settings, out);
settings.writeTo(out);
}
out.writeOptionalWriteable(getInfo(OsInfo.class));
out.writeOptionalWriteable(getInfo(ProcessInfo.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* Register repository request.
Expand Down Expand Up @@ -207,7 +206,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(type);
writeSettingsToStream(settings, out);
settings.writeTo(out);
out.writeBoolean(verify);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public Response(Settings persistentSettings, Settings transientSettings, Setting
@Override
public void writeTo(StreamOutput out) throws IOException {
assert out.getVersion().onOrAfter(Version.V_8_3_0);
Settings.writeSettingsToStream(persistentSettings, out);
Settings.writeSettingsToStream(transientSettings, out);
Settings.writeSettingsToStream(settings, out);
persistentSettings.writeTo(out);
transientSettings.writeTo(out);
settings.writeTo(out);
}

public Settings persistentSettings() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* Request for an update cluster settings action
Expand Down Expand Up @@ -162,8 +161,8 @@ public ClusterUpdateSettingsRequest persistentSettings(Map<String, ?> source) {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
transientSettings.writeTo(out);
persistentSettings.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public Settings getPersistentSettings() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Settings.writeSettingsToStream(transientSettings, out);
Settings.writeSettingsToStream(persistentSettings, out);
transientSettings.writeTo(out);
persistentSettings.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.Strings.EMPTY_ARRAY;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

/**
Expand Down Expand Up @@ -115,7 +114,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getVersion().before(SETTINGS_IN_REQUEST_VERSION)) {
writeSettingsToStream(Settings.EMPTY, out);
Settings.EMPTY.writeTo(out);
}
out.writeStringArray(featureStates);
out.writeBoolean(includeGlobalState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

/**
Expand Down Expand Up @@ -113,7 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(VERSION_SUPPORTING_QUIET_PARAMETER)) {
out.writeBoolean(quiet);
}
writeSettingsToStream(indexSettings, out);
indexSettings.writeTo(out);
out.writeStringArray(ignoreIndexSettings);
out.writeOptionalString(snapshotUuid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* A request to create an index. Best created with {@link org.elasticsearch.client.internal.Requests#createIndexRequest(String)}.
Expand Down Expand Up @@ -453,7 +452,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(cause);
out.writeString(index);
writeSettingsToStream(settings, out);
settings.writeTo(out);
if (out.getVersion().before(Version.V_8_0_0)) {
if ("{}".equals(mappings)) {
out.writeVInt(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
MappingMetadata.writeMappingMetadata(out, mappings);
out.writeMap(aliases, StreamOutput::writeString, StreamOutput::writeList);
out.writeMap(settings, StreamOutput::writeString, (o, v) -> Settings.writeSettingsToStream(v, o));
out.writeMap(defaultSettings, StreamOutput::writeString, (o, v) -> Settings.writeSettingsToStream(v, o));
out.writeMap(settings, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeMap(defaultSettings, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeMap(dataStreams, StreamOutput::writeString, StreamOutput::writeOptionalString);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public String getSetting(String index, String setting) {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(indexToSettings, StreamOutput::writeString, (o, s) -> Settings.writeSettingsToStream(s, o));
out.writeMap(indexToDefaultSettings, StreamOutput::writeString, (o, s) -> Settings.writeSettingsToStream(s, o));
out.writeMap(indexToSettings, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeMap(indexToDefaultSettings, StreamOutput::writeString, (o, v) -> v.writeTo(o));
}

private static void parseSettingsField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* Request for an update index settings action
Expand Down Expand Up @@ -182,7 +181,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArrayNullable(indices);
indicesOptions.writeIndicesOptions(out);
writeSettingsToStream(settings, out);
settings.writeTo(out);
out.writeBoolean(preserveExisting);
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeString(origin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* A request to create an index template.
Expand Down Expand Up @@ -446,7 +445,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indexPatterns);
out.writeInt(order);
out.writeBoolean(create);
writeSettingsToStream(settings, out);
settings.writeTo(out);
if (out.getVersion().before(Version.V_8_0_0)) {
out.writeVInt(mappings == null ? 0 : 1);
if (mappings != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(quiet);
out.writeOptionalString(pidFile == null ? null : pidFile.toString());
out.writeSecureString(keystorePassword);
Settings.writeSettingsToStream(nodeSettings, out);
nodeSettings.writeTo(out);
out.writeString(configDir.toString());
}
}
25 changes: 22 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static <K, T, M extends Map<K, T>> MapDiff<K, T, M> createDiff(
inserts++;
} else if (entry.getValue().equals(previousValue) == false) {
if (valueSerializer.supportsDiffableValues()) {
diffs.add(Map.entry(entry.getKey(), valueSerializer.diff(entry.getValue(), previousValue)));
diffs.add(mapEntry(entry.getKey(), valueSerializer.diff(entry.getValue(), previousValue)));
} else {
upserts.add(entry);
}
Expand Down Expand Up @@ -307,14 +307,14 @@ private MapDiff(
for (int i = 0; i < diffsCount; i++) {
K key = keySerializer.readKey(in);
Diff<T> diff = valueSerializer.readDiff(in, key);
diffs.add(Map.entry(key, diff));
diffs.add(mapEntry(key, diff));
}
int upsertsCount = in.readVInt();
upserts = upsertsCount == 0 ? List.of() : new ArrayList<>(upsertsCount);
for (int i = 0; i < upsertsCount; i++) {
K key = keySerializer.readKey(in);
T newValue = valueSerializer.read(in, key);
upserts.add(Map.entry(key, newValue));
upserts.add(mapEntry(key, newValue));
}
this.builderCtor = builderCtor;
}
Expand Down Expand Up @@ -402,6 +402,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

private static <K, T> Map.Entry<K, T> mapEntry(K key, T newValue) {
return new Map.Entry<>() {
@Override
public K getKey() {
return key;
}

@Override
public T getValue() {
return newValue;
}

@Override
public T setValue(T value) {
throw new UnsupportedOperationException();
}
};
}

/**
* Provides read and write operations to serialize keys of map
* @param <K> type of key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public static DesiredNode readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
Settings.writeSettingsToStream(settings, out);
settings.writeTo(out);
if (out.getVersion().onOrAfter(RANGE_FLOAT_PROCESSORS_SUPPORT_VERSION)) {
out.writeOptionalFloat(processors);
out.writeOptionalWriteable(processorsRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.validateIpValue;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY;

public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragment {
Expand Down Expand Up @@ -1314,6 +1313,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private static final Version SETTING_DIFF_VERSION = Version.V_8_5_0;

private static class IndexMetadataDiff implements Diff<IndexMetadata> {

private final String index;
Expand All @@ -1324,7 +1325,12 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final long aliasesVersion;
private final long[] primaryTerms;
private final State state;

// used for BwC when this instance was written by an older version node that does not diff settings yet
@Nullable
private final Settings settings;
@Nullable
private final Diff<Settings> settingsDiff;
private final Diff<ImmutableOpenMap<String, MappingMetadata>> mappings;
private final Diff<ImmutableOpenMap<String, AliasMetadata>> aliases;
private final Diff<ImmutableOpenMap<String, DiffableStringMap>> customData;
Expand All @@ -1342,6 +1348,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
routingNumShards = after.routingNumShards;
state = after.state;
settings = after.settings;
settingsDiff = after.settings.diff(before.settings);
primaryTerms = after.primaryTerms;
// TODO: find a nicer way to do BwC here and just work with Diff<MappingMetadata> here and in networking
mappings = DiffableUtils.diff(
Expand Down Expand Up @@ -1387,7 +1394,13 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
aliasesVersion = 1;
}
state = State.fromId(in.readByte());
settings = Settings.readSettingsFromStream(in);
if (in.getVersion().onOrAfter(SETTING_DIFF_VERSION)) {
settings = null;
settingsDiff = Settings.readSettingsDiffFromStream(in);
} else {
settings = Settings.readSettingsFromStream(in);
settingsDiff = null;
}
primaryTerms = in.readVLongArray();
mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MAPPING_DIFF_VALUE_READER);
aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), ALIAS_METADATA_DIFF_VALUE_READER);
Expand Down Expand Up @@ -1421,7 +1434,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(aliasesVersion);
}
out.writeByte(state.id);
Settings.writeSettingsToStream(settings, out);
assert settings != null
: "settings should always be non-null since this instance is not expected to have been read from another node";
if (out.getVersion().onOrAfter(SETTING_DIFF_VERSION)) {
settingsDiff.writeTo(out);
} else {
settings.writeTo(out);
}
out.writeVLongArray(primaryTerms);
mappings.writeTo(out);
aliases.writeTo(out);
Expand All @@ -1443,7 +1462,11 @@ public IndexMetadata apply(IndexMetadata part) {
builder.aliasesVersion(aliasesVersion);
builder.setRoutingNumShards(routingNumShards);
builder.state(state);
builder.settings(settings);
if (settingsDiff == null) {
builder.settings(settings);
} else {
builder.settings(settingsDiff.apply(part.settings));
}
builder.primaryTerms(primaryTerms);
builder.mapping = mappings.apply(
ImmutableOpenMap.<String, MappingMetadata>builder(1).fPut(MapperService.SINGLE_MAPPING_NAME, part.mapping).build()
Expand Down Expand Up @@ -1531,7 +1554,7 @@ public void writeTo(StreamOutput out, boolean mappingsAsHash) throws IOException
}
out.writeInt(routingNumShards);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
settings.writeTo(out);
out.writeVLongArray(primaryTerms);
// TODO: adjust serialization format to using an optional writable
if (mapping == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeInt(order);
out.writeStringCollection(patterns);
Settings.writeSettingsToStream(settings, out);
settings.writeTo(out);
out.writeMap(mappings, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeCollection(aliases.values());
out.writeOptionalVInt(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@

import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
import static org.elasticsearch.common.settings.Settings.writeSettingsToStream;

/**
* {@link Metadata} is the part of the {@link ClusterState} which persists across restarts. This persistence is XContent-based, so a
Expand Down Expand Up @@ -1228,8 +1227,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(clusterUUIDCommitted);
out.writeLong(version);
coordinationMetadata.writeTo(out);
Settings.writeSettingsToStream(transientSettings, out);
Settings.writeSettingsToStream(persistentSettings, out);
transientSettings.writeTo(out);
persistentSettings.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
hashesOfConsistentSettings.writeTo(out);
}
Expand Down Expand Up @@ -1314,8 +1313,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(clusterUUID);
out.writeBoolean(clusterUUIDCommitted);
coordinationMetadata.writeTo(out);
writeSettingsToStream(transientSettings, out);
writeSettingsToStream(persistentSettings, out);
transientSettings.writeTo(out);
persistentSettings.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
hashesOfConsistentSettings.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(uuid);
}
out.writeString(type);
Settings.writeSettingsToStream(settings, out);
settings.writeTo(out);
out.writeLong(generation);
out.writeLong(pendingGeneration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Settings.writeSettingsToStream(this.settings, out);
this.settings.writeTo(out);
}
if (this.mappings == null) {
out.writeBoolean(false);
Expand Down

0 comments on commit a3a5332

Please sign in to comment.