Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,7 +27,7 @@
import org.elasticsearch.common.scheduler.TimeValueSchedule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -183,9 +182,12 @@ public ProjectMetadata moveIndicesToPreviouslyFailedStep(ProjectMetadata current
void onMaster(ClusterState clusterState) {
maybeScheduleJob();

// TODO multi-project: this probably needs a per-project iteration
@FixForMultiProject
final ProjectState state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID);
for (var projectId : clusterState.metadata().projects().keySet()) {
onMaster(clusterState.projectState(projectId));
}
}

void onMaster(ProjectState state) {
final ProjectMetadata projectMetadata = state.metadata();
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null) {
Expand Down Expand Up @@ -409,27 +411,29 @@ public boolean isForceExecution() {
});
}

@NotMultiProjectCapable(description = "See comment inside the method")
@Override
public void applyClusterState(ClusterChangedEvent event) {
// only act if we are master, otherwise keep idle until elected
if (event.localNodeMaster() == false) {
return;
}

@FixForMultiProject
final IndexLifecycleMetadata ilmMetadata = event.state()
.metadata()
.getProject(Metadata.DEFAULT_PROJECT_ID)
.custom(IndexLifecycleMetadata.TYPE);
if (ilmMetadata == null) {
return;
}
final IndexLifecycleMetadata previousIlmMetadata = event.previousState()
.metadata()
.getProject(Metadata.DEFAULT_PROJECT_ID)
.custom(IndexLifecycleMetadata.TYPE);
if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) {
policyRegistry.update(ilmMetadata);
// We're updating the policy registry cache here, which doesn't actually work with multiple projects because the policies from one
// project would overwrite the polices from another project. However, since we're not planning on running ILM in a multi-project
// cluster, we can ignore this.
Comment on lines +422 to +424
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add an annotation and use it here before I merge this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we could also assert that there's only one project? I would feel more comfortable knowing that a test would probably fail if we ever used the ILM plugin in a multi-project cluster, as well has having the annotation to help us remember.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of an assertion too, but the IndexTemplateRegistry already installs multiple ILM polices in every project, so an assertion here would already trip.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I tried running the ILM Java REST tests and those do fail because of this (although it's not directly apparent from the failure that it's because of this registry cache).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I thought that this entire module was excluded in serverless, so this code simply doesn't exist there.

If the issue is just that we're tripping integration tests which have multiple projects but don't exclude this module like serverless does... we could fix that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought about this deeply and it may not be possible, but I'd quite like to understand why if not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the issue is just that we're tripping integration tests which have multiple projects but don't exclude this module like serverless does... we could fix that?

Yep, that's exactly what happens. All tests that make use of the DEFAULT distribution type will trip this assertion. We might not need to run all of these tests in MP mode, but probably a significant portion of them we do - more than we can easily convert for this PR at least.

for (var project : event.state().metadata().projects().values()) {
final IndexLifecycleMetadata ilmMetadata = project.custom(IndexLifecycleMetadata.TYPE);
if (ilmMetadata == null) {
continue;
}
final var previousProject = event.previousState().metadata().projects().get(project.id());
final IndexLifecycleMetadata previousIlmMetadata = previousProject == null
? null
: previousProject.custom(IndexLifecycleMetadata.TYPE);
if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) {
policyRegistry.update(ilmMetadata);
}
}
}

Expand Down Expand Up @@ -461,10 +465,13 @@ public boolean policyExists(String policyId) {
* @param clusterState the current cluster state
* @param fromClusterStateChange whether things are triggered from the cluster-state-listener or the scheduler
*/
@FixForMultiProject
void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
@FixForMultiProject
final var state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID);
for (var projectId : clusterState.metadata().projects().keySet()) {
triggerPolicies(clusterState.projectState(projectId), fromClusterStateChange);
}
}

void triggerPolicies(ProjectState state, boolean fromClusterStateChange) {
final var projectMetadata = state.metadata();
IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);

Expand Down Expand Up @@ -585,51 +592,54 @@ PolicyStepsRegistry getPolicyRegistry() {
return policyRegistry;
}

static Set<String> indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) {
static boolean hasIndicesInDangerousStepForNodeShutdown(ClusterState state, String nodeId) {
final Set<String> shutdownNodes = PluginShutdownService.shutdownTypeNodes(
state,
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM,
SingleNodeShutdownMetadata.Type.REPLACE
);
if (shutdownNodes.isEmpty()) {
return Set.of();
return true;
}

// Returning a set of strings will cause weird behavior with multiple projects
@FixForMultiProject
Set<String> indicesPreventingShutdown = state.metadata()
.projects()
.values()
.stream()
.flatMap(project -> project.indices().entrySet().stream())
// Filter out to only consider managed indices
.filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName()))
// Only look at indices in the shrink action
.filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action()))
// Only look at indices on a step that may potentially be dangerous if we removed the node
.filter(indexToMetadata -> {
String step = indexToMetadata.getValue().getLifecycleExecutionState().step();
return SetSingleNodeAllocateStep.NAME.equals(step)
|| CheckShrinkReadyStep.NAME.equals(step)
|| ShrinkStep.NAME.equals(step)
|| ShrunkShardsAllocatedStep.NAME.equals(step);
})
// Only look at indices where the node picked for the shrink is the node marked as shutting down
.filter(indexToMetadata -> {
String nodePicked = indexToMetadata.getValue()
.getSettings()
.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id");
return nodeId.equals(nodePicked);
})
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
logger.trace(
"with nodes marked as shutdown for removal {}, indices {} are preventing shutdown",
shutdownNodes,
indicesPreventingShutdown
);
return indicesPreventingShutdown;
boolean result = true;
for (var project : state.metadata().projects().values()) {
Set<String> indicesPreventingShutdown = project.indices()
.entrySet()
.stream()
// Filter out to only consider managed indices
.filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName()))
// Only look at indices in the shrink action
.filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action()))
// Only look at indices on a step that may potentially be dangerous if we removed the node
.filter(indexToMetadata -> {
String step = indexToMetadata.getValue().getLifecycleExecutionState().step();
return SetSingleNodeAllocateStep.NAME.equals(step)
|| CheckShrinkReadyStep.NAME.equals(step)
|| ShrinkStep.NAME.equals(step)
|| ShrunkShardsAllocatedStep.NAME.equals(step);
})
// Only look at indices where the node picked for the shrink is the node marked as shutting down
.filter(indexToMetadata -> {
String nodePicked = indexToMetadata.getValue()
.getSettings()
.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id");
return nodeId.equals(nodePicked);
})
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
logger.trace(
"with nodes marked as shutdown for removal {}, indices {} in project {} are preventing shutdown",
shutdownNodes,
indicesPreventingShutdown,
project.id()
);
if (indicesPreventingShutdown.isEmpty() == false) {
result = false;
}
}
return result;
}

@Override
Expand All @@ -641,8 +651,7 @@ public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shu
case REPLACE:
case REMOVE:
case SIGTERM:
Set<String> indices = indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId);
return indices.isEmpty();
return hasIndicesInDangerousStepForNodeShutdown(clusterService.state(), nodeId);
default:
throw new IllegalArgumentException("unknown shutdown type: " + shutdownType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
Expand Down Expand Up @@ -169,13 +170,11 @@ public void testStoppedModeSkip() {
.numberOfReplicas(randomIntBetween(0, 5))
.build();
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
Metadata metadata = Metadata.builder()
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPED))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();
.indices(indices);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, ClusterState.EMPTY_STATE);
Expand Down Expand Up @@ -208,13 +207,11 @@ public void testRequestedStopOnShrink() {
.numberOfReplicas(randomIntBetween(0, 5))
.build();
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
Metadata metadata = Metadata.builder()
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();
.indices(indices);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();

Expand Down Expand Up @@ -264,13 +261,11 @@ private void verifyCanStopWithStep(String stoppableStep) {
.numberOfReplicas(randomIntBetween(0, 5))
.build();
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
Metadata metadata = Metadata.builder()
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();
.indices(indices);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();

Expand Down Expand Up @@ -312,13 +307,11 @@ public void testRequestedStopOnSafeAction() {
.numberOfReplicas(randomIntBetween(0, 5))
.build();
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
Metadata metadata = Metadata.builder()
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();
.indices(indices);
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();

Expand Down Expand Up @@ -429,11 +422,9 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
.build();
Map<String, IndexMetadata> indices = Map.of(index1.getName(), i1indexMetadata, index2.getName(), i2indexMetadata);

Metadata metadata = Metadata.builder()
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();
.indices(indices);

Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
var clusterSettings = new ClusterSettings(
Expand All @@ -443,7 +434,7 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
DiscoveryNode node = clusterService.localNode();
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
.build();
ClusterServiceUtils.setState(clusterService, currentState);
Expand Down Expand Up @@ -533,15 +524,16 @@ public void testParsingOriginationDateBeforeIndexCreation() {
}
}

public void testIndicesOnShuttingDownNodesInDangerousStep() {
public void testHasIndicesInDangerousStepForNodeShutdown() {
for (SingleNodeShutdownMetadata.Type type : List.of(
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM,
SingleNodeShutdownMetadata.Type.REPLACE
)) {
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of()));
final var project = ProjectMetadata.builder(randomProjectIdOrDefault()).build();
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(true));

IndexMetadata nonDangerousIndex = IndexMetadata.builder("no_danger")
.settings(settings(IndexVersion.current()).put(LifecycleSettings.LIFECYCLE_NAME, "mypolicy"))
Expand Down Expand Up @@ -583,14 +575,12 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
.build();
Map<String, IndexMetadata> indices = Map.of("no_danger", nonDangerousIndex, "danger", dangerousIndex);

Metadata metadata = Metadata.builder()
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING))
.indices(indices)
.persistentSettings(settings(IndexVersion.current()).build())
.build();

state = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.putProjectMetadata(
ProjectMetadata.builder(project)
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING))
.indices(indices)
)
.nodes(
DiscoveryNodes.builder()
.localNodeId(nodeId)
Expand All @@ -613,8 +603,8 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
.build();

// No danger yet, because no node is shutting down
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of()));
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(true));

state = ClusterState.builder(state)
.metadata(
Expand All @@ -638,12 +628,12 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
)
.build();

assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
// No danger, because this is a "RESTART" type shutdown
assertThat(
"restart type shutdowns are not considered dangerous",
IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"),
equalTo(Set.of())
IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"),
equalTo(true)
);

final String targetNodeName = type == SingleNodeShutdownMetadata.Type.REPLACE ? randomAlphaOfLengthBetween(10, 20) : null;
Expand Down Expand Up @@ -673,7 +663,7 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
.build();

// The dangerous index should be calculated as being in danger now
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of("danger")));
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(false));
}
}
}
Loading