Skip to content

Commit

Permalink
[7.11] ILM: skip rollover if the data stream is rolled over already (#…
Browse files Browse the repository at this point in the history
…67778) (#67786)

The rollover action would perform a datastream rollover irrespective if the
managed index was the write index or not. This could lead to multiple rollovers
being executed eg. a manual call rolls over the datastream and later an ILM
managed index, the previous write index, will do so too. There are similar
scenarios possible if the `rollover` step failed (due to various reasons including
`Concurrent modification of alias`) and succeded when retried.

(cherry picked from commit f32ab15)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
andreidan committed Jan 20, 2021
1 parent cb8b631 commit 0c1ab1d
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,16 @@ public void performAction(IndexMetadata indexMetadata, ClusterState currentClust
IndexAbstraction indexAbstraction = currentClusterState.metadata().getIndicesLookup().get(indexName);
assert indexAbstraction != null : "expected the index " + indexName + " to exist in the lookup but it didn't";
final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getName();
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
if (dataStream != null) {
assert dataStream.getWriteIndex() != null : "datastream " + dataStream.getName() + " has no write index";
if (dataStream.getWriteIndex().getIndex().equals(indexMetadata.getIndex()) == false) {
logger.warn("index [{}] is not the write index for data stream [{}]. skipping rollover for policy [{}]",
indexName, dataStream.getName(), LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings()));
listener.onResponse(true);
return;
}
rolloverTarget = dataStream.getName();
} else {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,17 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + index.getName() + "] was not found";
final String rolloverTarget;
if (indexAbstraction.getParentDataStream() != null) {
rolloverTarget = indexAbstraction.getParentDataStream().getName();
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
if (dataStream != null) {
assert dataStream.getWriteIndex() != null : "datastream " + dataStream.getName() + " has no write index";
if (dataStream.getWriteIndex().getIndex().equals(index) == false) {
logger.warn("index [{}] is not the write index for data stream [{}]. skipping rollover for policy [{}]",
index.getName(), dataStream.getName(),
LifecycleSettings.LIFECYCLE_NAME_SETTING.get(metadata.index(index).getSettings()));
listener.onResponse(true, new WaitForRolloverReadyStep.EmptyInfo());
return;
}
rolloverTarget = dataStream.getName();
} else {
IndexMetadata indexMetadata = metadata.index(index);
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetadata.getSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.mockito.Mockito.verifyZeroInteractions;

public class RolloverStepTests extends AbstractStepMasterTimeoutTestCase<RolloverStep> {

Expand Down Expand Up @@ -161,6 +162,46 @@ public void onFailure(Exception e) {
Mockito.verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
}

public void testSkipRolloverIfDataStreamIsAlreadyRolledOver() {
String dataStreamName = "test-datastream";
IndexMetadata firstGenerationIndex = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();

IndexMetadata writeIndex = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 2))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
RolloverStep step = createRandomInstance();

SetOnce<Boolean> actionCompleted = new SetOnce<>();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(
Metadata.builder().put(firstGenerationIndex, true)
.put(writeIndex, true)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
org.elasticsearch.common.collect.List.of(firstGenerationIndex.getIndex(), writeIndex.getIndex())))
)
.build();
step.performAction(firstGenerationIndex, clusterState, null, new AsyncActionStep.Listener() {

@Override
public void onResponse(boolean complete) {
actionCompleted.set(complete);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
});

assertEquals(true, actionCompleted.get());

verifyZeroInteractions(client);
verifyZeroInteractions(adminClient);
verifyZeroInteractions(indicesClient);
}

private void mockClientRolloverCall(String rolloverTarget) {
Mockito.doAnswer(invocation -> {
RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

public class WaitForRolloverReadyStepTests extends AbstractStepTestCase<WaitForRolloverReadyStep> {

Expand Down Expand Up @@ -173,6 +174,43 @@ public void onFailure(Exception e) {
verify(indicesClient, Mockito.only()).rolloverIndex(Mockito.any(), Mockito.any());
}

public void testSkipRolloverIfDataStreamIsAlreadyRolledOver() {
String dataStreamName = "test-datastream";
IndexMetadata firstGenerationIndex = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();

IndexMetadata writeIndex = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 2))
.settings(settings(Version.CURRENT))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
WaitForRolloverReadyStep step = createRandomInstance();

SetOnce<Boolean> conditionsMet = new SetOnce<>();
Metadata metadata = Metadata.builder().put(firstGenerationIndex, true)
.put(writeIndex, true)
.put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
org.elasticsearch.common.collect.List.of(firstGenerationIndex.getIndex(), writeIndex.getIndex())))
.build();
step.evaluateCondition(metadata, firstGenerationIndex.getIndex(), new AsyncWaitStep.Listener() {

@Override
public void onResponse(boolean complete, ToXContentObject infomationContext) {
conditionsMet.set(complete);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, MASTER_TIMEOUT);

assertEquals(true, conditionsMet.get());

verifyZeroInteractions(client);
verifyZeroInteractions(adminClient);
verifyZeroInteractions(indicesClient);
}

private void mockRolloverIndexCall(String rolloverTarget, WaitForRolloverReadyStep step) {
Mockito.doAnswer(invocation -> {
RolloverRequest request = (RolloverRequest) invocation.getArguments()[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -63,6 +64,28 @@ public void testRolloverAction() throws Exception {
equalTo(PhaseCompleteStep.finalStep("hot").getKey())));
}

public void testRolloverIsSkippedOnManualDataStreamRollover() throws Exception {
String policyName = "logs-policy";
createNewSingletonPolicy(client(), policyName, "hot", new RolloverAction(null, null, 2L));

createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));

String dataStream = "logs-foo";
indexDocument(client(), dataStream, true);

String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1);
assertBusy(() -> assertThat(getStepKeyForIndex(client(), firstGenerationIndex).getName(),
equalTo(WaitForRolloverReadyStep.NAME)), 30, TimeUnit.SECONDS);

rolloverMaxOneDocCondition(client(), dataStream);
assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS);

// even though the first index doesn't have 2 documents to fulfill the rollover condition, it should complete the rollover action
// because it's not the write index anymore
assertBusy(() -> assertThat(getStepKeyForIndex(client(), firstGenerationIndex),
equalTo(PhaseCompleteStep.finalStep("hot").getKey())), 30, TimeUnit.SECONDS);
}

public void testShrinkActionInPolicyWithoutHotPhase() throws Exception {
String policyName = "logs-policy";
createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1));
Expand Down

0 comments on commit 0c1ab1d

Please sign in to comment.