Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testRepositoryCreation() throws Exception {
.setMetadata(true)
.get();
Metadata metadata = clusterStateResponse.getState().getMetadata();
RepositoriesMetadata repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE);
RepositoriesMetadata repositoriesMetadata = metadata.getProject().custom(RepositoriesMetadata.TYPE);
assertThat(repositoriesMetadata, notNullValue());
assertThat(repositoriesMetadata.repository("test-repo-1"), notNullValue());
assertThat(repositoriesMetadata.repository("test-repo-1").type(), equalTo("fs"));
Expand All @@ -94,7 +94,7 @@ public void testRepositoryCreation() throws Exception {
logger.info("--> check that both repositories are in cluster state");
clusterStateResponse = client.admin().cluster().prepareState(TEST_REQUEST_TIMEOUT).clear().setMetadata(true).get();
metadata = clusterStateResponse.getState().getMetadata();
repositoriesMetadata = metadata.custom(RepositoriesMetadata.TYPE);
repositoriesMetadata = metadata.getProject().custom(RepositoriesMetadata.TYPE);
assertThat(repositoriesMetadata, notNullValue());
assertThat(repositoriesMetadata.repositories().size(), equalTo(2));
assertThat(repositoriesMetadata.repository("test-repo-1"), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public static List<Entry> getNamedWriteables() {
RepositoryCleanupInProgress::readDiffFrom
);
// Metadata
registerMetadataCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom);
registerProjectCustom(entries, RepositoriesMetadata.TYPE, RepositoriesMetadata::new, RepositoriesMetadata::readDiffFrom);
registerProjectCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom);
registerProjectCustom(entries, ScriptMetadata.TYPE, ScriptMetadata::new, ScriptMetadata::readDiffFrom);
registerProjectCustom(entries, IndexGraveyard.TYPE, IndexGraveyard::new, IndexGraveyard::readDiffFrom);
Expand Down Expand Up @@ -283,7 +283,7 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
// Metadata
entries.add(
new NamedXContentRegistry.Entry(
Metadata.ClusterCustom.class,
Metadata.ProjectCustom.class,
new ParseField(RepositoriesMetadata.TYPE),
RepositoriesMetadata::fromXContent
)
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -98,6 +99,18 @@ public static <K, T, M extends Map<K, T>> MapDiff<K, T, M> emptyDiff() {
return (MapDiff<K, T, M>) EMPTY;
}

/**
* Merges two map diffs into one unified diff with write-only value serializer.
*/
@SuppressWarnings("unchecked")
public static <K, T extends Diffable<T>, T1 extends T, T2 extends T, M extends Map<K, T>> MapDiff<K, T, M> merge(
MapDiff<K, T1, ? extends ImmutableOpenMap<K, T1>> diff1,
MapDiff<K, T2, ? extends ImmutableOpenMap<K, T2>> diff2,
KeySerializer<K> keySerializer
) {
return merge(diff1, diff2, keySerializer, DiffableValueSerializer.getWriteOnlyInstance());
}

/**
* Merges two map diffs into one unified diff.
*/
Expand Down Expand Up @@ -146,6 +159,39 @@ public static <K, T, M extends Map<K, T>> boolean hasKey(MapDiff<K, T, M> diff,
return false;
}

/**
* Create a new MapDiff from the specified MapDiff by transforming its diffs with the provided diffUpdateFunction as well as
* transforming its upserts with the provided upsertUpdateFunction. Whether an entry should be transformed is determined by
* the specified keyPredicate.
* @param diff The original MapDiff
* @param keyPredicate Determines whether an entry should be transformed
* @param diffUpdateFunction A function to transform a Diff entry
* @param upsertUpdateFunction A function to transform an upsert entry
* @return A new MapDiff as a result of the transformation
*/
public static <K, T, M extends Map<K, T>> MapDiff<K, T, M> updateDiffsAndUpserts(
MapDiff<K, T, M> diff,
Predicate<K> keyPredicate,
BiFunction<K, Diff<T>, Diff<T>> diffUpdateFunction,
BiFunction<K, T, T> upsertUpdateFunction
) {
final var newDiffs = diff.getDiffs().stream().map(entry -> {
if (keyPredicate.test(entry.getKey()) == false) {
return entry;
}
return Map.entry(entry.getKey(), diffUpdateFunction.apply(entry.getKey(), entry.getValue()));
}).toList();

final var newUpserts = diff.getUpserts().stream().map(entry -> {
if (keyPredicate.test(entry.getKey()) == false) {
return entry;
}
return Map.entry(entry.getKey(), upsertUpdateFunction.apply(entry.getKey(), entry.getValue()));
}).toList();

return new MapDiff<>(diff.keySerializer, diff.valueSerializer, diff.deletes, newDiffs, newUpserts, diff.builderCtor);
}

/**
* Create a new JDK map backed MapDiff by transforming the keys with the provided keyFunction.
* @param diff Original MapDiff to transform
Expand Down
Loading