Skip to content

Commit

Permalink
[Refactor] Use Lists instead of Maps for SystemIndices features (#87004
Browse files Browse the repository at this point in the history
…) (#87049)

The SystemIndices constructor should take a list instead of a map as an
argument so that we can guarantee that the map we use for feature lookups is
keyed on the feature name.

We also provide some new getter methods so that calling code does not have to
handle the map directly.
  • Loading branch information
williamrandolph committed May 23, 2022
1 parent 9b79ae5 commit e18fae6
Show file tree
Hide file tree
Showing 25 changed files with 135 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ protected void masterOperation(
) throws Exception {

List<GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus> features = systemIndices.getFeatures()
.values()
.stream()
.sorted(Comparator.comparing(SystemIndices.Feature::getName))
.map(feature -> getFeatureUpgradeStatus(state, feature))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ protected void masterOperation(
GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR
);
List<PostFeatureUpgradeResponse.Feature> featuresToMigrate = systemIndices.getFeatures()
.values()
.stream()
.map(feature -> getFeatureUpgradeStatus(state, feature))
.filter(status -> upgradableStatuses.contains(status.getUpgradeStatus()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected void masterOperation(
systemIndices.getFeatures().size()
);

for (SystemIndices.Feature feature : systemIndices.getFeatures().values()) {
for (SystemIndices.Feature feature : systemIndices.getFeatures()) {
feature.getCleanUpFunction().apply(clusterService, client, groupedActionListener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,8 @@ protected void masterOperation(
listener.onResponse(
new GetSnapshottableFeaturesResponse(
systemIndices.getFeatures()
.entrySet()
.stream()
.map(
featureEntry -> new GetSnapshottableFeaturesResponse.SnapshottableFeature(
featureEntry.getKey(),
featureEntry.getValue().getDescription()
)
)
.map(feature -> new GetSnapshottableFeaturesResponse.SnapshottableFeature(feature.getName(), feature.getDescription()))
.toList()
)
);
Expand Down
108 changes: 70 additions & 38 deletions server/src/main/java/org/elasticsearch/indices/SystemIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -65,30 +66,39 @@ public class SystemIndices {

private static final Automaton EMPTY = Automata.makeEmpty();

private static final Map<String, Feature> SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of(
TASKS_FEATURE_NAME,
/**
* This is the source for non-plugin system features.
*/
private static final Map<String, Feature> SERVER_SYSTEM_FEATURE_DESCRIPTORS = Stream.of(
new Feature(TASKS_FEATURE_NAME, "Manages task results", List.of(TASKS_DESCRIPTOR))
);
).collect(Collectors.toUnmodifiableMap(Feature::getName, Function.identity()));

/**
* The node's full list of system features is stored here. The map is keyed
* on the value of {@link Feature#getName()}, and is used for fast lookup of
* feature objects via {@link #getFeature(String)}.
*/
private final Map<String, Feature> featureDescriptors;

private final Automaton systemNameAutomaton;
private final CharacterRunAutomaton netNewSystemIndexAutomaton;
private final CharacterRunAutomaton systemNameRunAutomaton;
private final CharacterRunAutomaton systemIndexRunAutomaton;
private final CharacterRunAutomaton systemDataStreamIndicesRunAutomaton;
private final Predicate<String> systemDataStreamPredicate;
private final Map<String, Feature> featureDescriptors;
private final SystemIndexDescriptor[] indexDescriptors;
private final Map<String, SystemDataStreamDescriptor> dataStreamDescriptors;
private final Map<String, CharacterRunAutomaton> productToSystemIndicesMatcher;
private final ExecutorSelector executorSelector;

/**
* Initialize the SystemIndices object
* @param pluginAndModulesDescriptors A map of this node's feature names to
* feature objects.
* @param pluginAndModuleFeatures A list of features from which we will load system indices.
* These features come from plugins and modules. Non-plugin system
* features such as Tasks will be added automatically.
*/
public SystemIndices(Map<String, Feature> pluginAndModulesDescriptors) {
featureDescriptors = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors);
public SystemIndices(List<Feature> pluginAndModuleFeatures) {
featureDescriptors = buildFeatureMap(pluginAndModuleFeatures);
indexDescriptors = featureDescriptors.values()
.stream()
.flatMap(f -> f.getIndexDescriptors().stream())
Expand All @@ -115,21 +125,20 @@ public SystemIndices(Map<String, Feature> pluginAndModulesDescriptors) {
this.systemNameRunAutomaton = new CharacterRunAutomaton(systemNameAutomaton);
}

static void ensurePatternsAllowSuffix(Map<String, Feature> features) {
static void ensurePatternsAllowSuffix(Map<String, Feature> featureDescriptors) {
String suffixPattern = "*" + UPGRADED_INDEX_SUFFIX;
final List<String> descriptorsWithNoRoomForSuffix = features.entrySet()
final List<String> descriptorsWithNoRoomForSuffix = featureDescriptors.values()
.stream()
.flatMap(
feature -> feature.getValue()
.getIndexDescriptors()
feature -> feature.getIndexDescriptors()
.stream()
// The below filter & map are inside the enclosing flapMap so we have access to both the feature and the descriptor
.filter(descriptor -> overlaps(descriptor.getIndexPattern(), suffixPattern) == false)
.map(
descriptor -> new ParameterizedMessage(
"pattern [{}] from feature [{}]",
descriptor.getIndexPattern(),
feature.getKey()
feature.getName()
).getFormattedMessage()
)
)
Expand Down Expand Up @@ -167,9 +176,9 @@ private static void checkForDuplicateAliases(Collection<SystemIndexDescriptor> d
}
}

private static Map<String, CharacterRunAutomaton> getProductToSystemIndicesMap(Map<String, Feature> descriptors) {
private static Map<String, CharacterRunAutomaton> getProductToSystemIndicesMap(Map<String, Feature> featureDescriptors) {
Map<String, Automaton> productToSystemIndicesMap = new HashMap<>();
for (Feature feature : descriptors.values()) {
for (Feature feature : featureDescriptors.values()) {
feature.getIndexDescriptors().forEach(systemIndexDescriptor -> {
if (systemIndexDescriptor.isExternal()) {
systemIndexDescriptor.getAllowedElasticProductOrigins()
Expand Down Expand Up @@ -334,12 +343,37 @@ public Predicate<String> getProductSystemIndexNamePredicate(ThreadContext thread
return automaton::run;
}

public Map<String, Feature> getFeatures() {
return featureDescriptors;
/**
* Get a set of feature names. This is useful for checking whether particular
* features are present on the node.
* @return A set of all feature names
*/
public Set<String> getFeatureNames() {
return Set.copyOf(featureDescriptors.keySet());
}

/**
* Get a feature by name.
* @param name Name of a feature.
* @return The corresponding feature if it exists on this node, null otherwise.
*/
public Feature getFeature(String name) {
return featureDescriptors.get(name);
}

/**
* Get a collection of the Features this SystemIndices object is managing.
* @return A collection of Features.
*/
public Collection<Feature> getFeatures() {
return List.copyOf(featureDescriptors.values());
}

private static Automaton buildIndexAutomaton(Map<String, Feature> descriptors) {
Optional<Automaton> automaton = descriptors.values().stream().map(SystemIndices::featureToIndexAutomaton).reduce(Operations::union);
private static Automaton buildIndexAutomaton(Map<String, Feature> featureDescriptors) {
Optional<Automaton> automaton = featureDescriptors.values()
.stream()
.map(SystemIndices::featureToIndexAutomaton)
.reduce(Operations::union);
return MinimizationOperations.minimize(automaton.orElse(EMPTY), Integer.MAX_VALUE);
}

Expand All @@ -362,8 +396,8 @@ private static Automaton featureToIndexAutomaton(Feature feature) {
return systemIndexAutomaton.orElse(EMPTY);
}

private static Automaton buildDataStreamAutomaton(Map<String, Feature> descriptors) {
Optional<Automaton> automaton = descriptors.values()
private static Automaton buildDataStreamAutomaton(Map<String, Feature> featureDescriptors) {
Optional<Automaton> automaton = featureDescriptors.values()
.stream()
.flatMap(feature -> feature.getDataStreamDescriptors().stream())
.map(SystemDataStreamDescriptor::getDataStreamName)
Expand All @@ -373,13 +407,13 @@ private static Automaton buildDataStreamAutomaton(Map<String, Feature> descripto
return automaton.isPresent() ? MinimizationOperations.minimize(automaton.get(), Integer.MAX_VALUE) : EMPTY;
}

private static Predicate<String> buildDataStreamNamePredicate(Map<String, Feature> descriptors) {
CharacterRunAutomaton characterRunAutomaton = new CharacterRunAutomaton(buildDataStreamAutomaton(descriptors));
private static Predicate<String> buildDataStreamNamePredicate(Map<String, Feature> featureDescriptors) {
CharacterRunAutomaton characterRunAutomaton = new CharacterRunAutomaton(buildDataStreamAutomaton(featureDescriptors));
return characterRunAutomaton::run;
}

private static Automaton buildDataStreamBackingIndicesAutomaton(Map<String, Feature> descriptors) {
Optional<Automaton> automaton = descriptors.values()
private static Automaton buildDataStreamBackingIndicesAutomaton(Map<String, Feature> featureDescriptors) {
Optional<Automaton> automaton = featureDescriptors.values()
.stream()
.map(SystemIndices::featureToDataStreamBackingIndicesAutomaton)
.reduce(Operations::union);
Expand Down Expand Up @@ -504,21 +538,19 @@ public enum SystemIndexAccessLevel {
* Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed
* descriptors overlap with any of the other patterns. If any do, throws an exception.
*
* @param sourceToFeature A map of source (plugin) names to the SystemIndexDescriptors they provide.
* @param featureDescriptors A map of feature names to the Features that will provide SystemIndexDescriptors
* @throws IllegalStateException Thrown if any of the index patterns overlaps with another.
*/
static void checkForOverlappingPatterns(Map<String, Feature> sourceToFeature) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = sourceToFeature.entrySet()
static void checkForOverlappingPatterns(Map<String, Feature> featureDescriptors) {
List<Tuple<String, SystemIndexDescriptor>> sourceDescriptorPair = featureDescriptors.values()
.stream()
.flatMap(entry -> entry.getValue().getIndexDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor)))
.flatMap(feature -> feature.getIndexDescriptors().stream().map(descriptor -> new Tuple<>(feature.getName(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message
.toList();
List<Tuple<String, SystemDataStreamDescriptor>> sourceDataStreamDescriptorPair = sourceToFeature.entrySet()
List<Tuple<String, SystemDataStreamDescriptor>> sourceDataStreamDescriptorPair = featureDescriptors.values()
.stream()
.filter(entry -> entry.getValue().getDataStreamDescriptors().isEmpty() == false)
.flatMap(
entry -> entry.getValue().getDataStreamDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor))
)
.filter(feature -> feature.getDataStreamDescriptors().isEmpty() == false)
.flatMap(feature -> feature.getDataStreamDescriptors().stream().map(descriptor -> new Tuple<>(feature.getName(), descriptor)))
.sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getDataStreamName())) // Consistent ordering -> consistent error message
.toList();

Expand Down Expand Up @@ -577,11 +609,11 @@ private static boolean overlaps(String pattern1, String pattern2) {
return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false;
}

private static Map<String, Feature> buildSystemIndexDescriptorMap(Map<String, Feature> featuresMap) {
final Map<String, Feature> map = Maps.newMapWithExpectedSize(featuresMap.size() + SERVER_SYSTEM_INDEX_DESCRIPTORS.size());
map.putAll(featuresMap);
private static Map<String, Feature> buildFeatureMap(List<Feature> features) {
final Map<String, Feature> map = Maps.newMapWithExpectedSize(features.size() + SERVER_SYSTEM_FEATURE_DESCRIPTORS.size());
features.forEach(feature -> map.put(feature.getName(), feature));
// put the server items last since we expect less of them
SERVER_SYSTEM_INDEX_DESCRIPTORS.forEach((source, feature) -> {
SERVER_SYSTEM_FEATURE_DESCRIPTORS.forEach((source, feature) -> {
if (map.putIfAbsent(source, feature) != null) {
throw new IllegalArgumentException(
"plugin or module attempted to define the same source [" + source + "] as a built-in system index"
Expand Down
15 changes: 5 additions & 10 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -507,16 +507,11 @@ protected Node(
SystemIndexMigrationExecutor.getNamedXContentParsers().stream()
).flatMap(Function.identity()).collect(toList())
);
final Map<String, SystemIndices.Feature> featuresMap = pluginsService.filterPlugins(SystemIndexPlugin.class)
.stream()
.peek(plugin -> SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName()))
.collect(
Collectors.toUnmodifiableMap(
SystemIndexPlugin::getFeatureName,
plugin -> SystemIndices.Feature.fromSystemIndexPlugin(plugin, settings)
)
);
final SystemIndices systemIndices = new SystemIndices(featuresMap);
final List<SystemIndices.Feature> features = pluginsService.filterPlugins(SystemIndexPlugin.class).stream().map(plugin -> {
SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName());
return SystemIndices.Feature.fromSystemIndexPlugin(plugin, settings);
}).toList();
final SystemIndices systemIndices = new SystemIndices(features);
final ExecutorSelector executorSelector = systemIndices.getExecutorSelector();

ModulesBuilder modules = new ModulesBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,8 @@ private void startRestore(
.flatMap(Collection::stream)
.collect(Collectors.toSet());

final Map<String, SystemIndices.Feature> featureSet = systemIndices.getFeatures();
final Set<String> featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> {
if (featureSet.containsKey(featureName)) {
if (systemIndices.getFeatureNames().contains(featureName)) {
return true;
}
logger.warn(
Expand All @@ -344,7 +343,7 @@ private void startRestore(
);
return false;
})
.map(name -> systemIndices.getFeatures().get(name))
.map(systemIndices::getFeature)
.flatMap(feature -> feature.getDataStreamDescriptors().stream())
.map(SystemDataStreamDescriptor::getDataStreamName)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -649,7 +648,7 @@ private Map<String, List<String>> getFeatureStatesToRestore(

final List<String> featuresNotOnThisNode = featureStatesToRestore.keySet()
.stream()
.filter(featureName -> systemIndices.getFeatures().containsKey(featureName) == false)
.filter(s -> systemIndices.getFeatureNames().contains(s) == false)
.toList();
if (featuresNotOnThisNode.isEmpty() == false) {
throw new SnapshotRestoreException(
Expand Down Expand Up @@ -677,7 +676,7 @@ private Set<Index> resolveSystemIndicesToDelete(ClusterState currentState, Set<S
}

return featureStatesToRestore.stream()
.map(featureName -> systemIndices.getFeatures().get(featureName))
.map(systemIndices::getFeature)
.filter(Objects::nonNull) // Features that aren't present on this node will be warned about in `getFeatureStatesToRestore`
.flatMap(feature -> feature.getIndexDescriptors().stream())
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
if (request.includeGlobalState() || requestedStates.isEmpty() == false) {
if (request.includeGlobalState() && requestedStates.isEmpty()) {
// If we're including global state and feature states aren't specified, include all of them
featureStatesSet = systemIndices.getFeatures().keySet();
featureStatesSet = systemIndices.getFeatureNames();
} else if (requestedStates.size() == 1 && NO_FEATURE_STATES_VALUE.equalsIgnoreCase(requestedStates.get(0))) {
// If there's exactly one value and it's "none", include no states
featureStatesSet = Collections.emptySet();
Expand All @@ -282,7 +282,7 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
return;
}
featureStatesSet = new HashSet<>(requestedStates);
featureStatesSet.retainAll(systemIndices.getFeatures().keySet());
featureStatesSet.retainAll(systemIndices.getFeatureNames());
}
} else {
featureStatesSet = Collections.emptySet();
Expand Down Expand Up @@ -334,7 +334,7 @@ public ClusterState execute(ClusterState currentState) {
// been requested by the request directly
final Set<String> indexNames = new HashSet<>(indices);
for (String featureName : featureStatesSet) {
SystemIndices.Feature feature = systemIndices.getFeatures().get(featureName);
SystemIndices.Feature feature = systemIndices.getFeature(featureName);

Set<String> featureSystemIndices = feature.getIndexDescriptors()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ static SystemIndexMigrationInfo fromTaskState(
IndexScopedSettings indexScopedSettings
) {
SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(taskState.getCurrentIndex());
SystemIndices.Feature feature = systemIndices.getFeatures().get(taskState.getCurrentFeature());
SystemIndices.Feature feature = systemIndices.getFeature(taskState.getCurrentFeature());
IndexMetadata imd = metadata.index(taskState.getCurrentIndex());

// It's possible for one or both of these to happen if the executing node fails during execution and:
Expand Down

0 comments on commit e18fae6

Please sign in to comment.