Skip to content

Commit

Permalink
ILM wait for active shards on rolled index in a separate step (#50718) (
Browse files Browse the repository at this point in the history
#51296)

After we rollover the index we wait for the configured number of shards for the
rolled index to become active (based on the index.write.wait_for_active_shards
setting which might be present in a template, or otherwise in the default case,
for the primaries to become active).
This wait might be long due to disk watermarks being tripped, replicas not
being able to spring to life due to cluster nodes reconfiguration and others
and, the RolloverStep might not complete successfully due to this inherent
transient situation, albeit the rolled index having been created.

(cherry picked from commit 457a92f)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
andreidan committed Jan 22, 2020
1 parent 7b4c2bf commit 1232667
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 9 deletions.
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -170,6 +171,13 @@ public void dryRun(boolean dryRun) {
this.dryRun = dryRun;
}

/**
* Sets the wait for active shards configuration for the rolled index that gets created.
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
createIndexRequest.waitForActiveShards(waitForActiveShards);
}

/**
* Adds condition to check if the index is at least <code>age</code> old
*/
Expand Down
Expand Up @@ -139,17 +139,19 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)

StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME);
StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME);
StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME);
StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME);

WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey,
client, maxSize, maxAge, maxDocs);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey,
System::currentTimeMillis);
UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey,
client, indexingComplete);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep);
}

@Override
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -70,6 +71,9 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> {
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
Expand Down
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
* After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created
* index) to become available.
*/
public class WaitForActiveShardsStep extends ClusterStateWaitStep {

public static final String NAME = "wait-for-active-shards";

private static final Logger logger = LogManager.getLogger(WaitForActiveShardsStep.class);

WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData originalIndexMeta = clusterState.metaData().index(index);

if (originalIndexMeta == null) {
String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists",
getKey().getAction(), index.getName());
// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
}

boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(originalIndexMeta.getSettings());
if (indexingComplete) {
String message = String.format(Locale.ROOT, "index [%s] has lifecycle complete set, skipping [%s]",
originalIndexMeta.getIndex().getName(), WaitForActiveShardsStep.NAME);
logger.trace(message);
return new Result(true, new Info(message));
}

String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
}

AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias);
assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index";

Alias alias = (Alias) aliasOrIndex;
IndexMetaData aliasWriteIndex = alias.getWriteIndex();
final String rolledIndexName;
final String waitForActiveShardsSettingValue;
if (aliasWriteIndex != null) {
rolledIndexName = aliasWriteIndex.getIndex().getName();
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
} else {
List<IndexMetaData> indices = alias.getIndices();
int maxIndexCounter = -1;
IndexMetaData rolledIndexMeta = null;
for (IndexMetaData indexMetaData : indices) {
int indexNameCounter = parseIndexNameCounter(indexMetaData.getIndex().getName());
if (maxIndexCounter < indexNameCounter) {
maxIndexCounter = indexNameCounter;
rolledIndexMeta = indexMetaData;
}
}
if (rolledIndexMeta == null) {
String errorMessage = String.format(Locale.ROOT,
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
getKey().getAction());

// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
}
rolledIndexName = rolledIndexMeta.getIndex().getName();
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
}

ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName);

IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName);
int currentActiveShards = 0;
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
currentActiveShards += shardRouting.value.activeShards().size();
}
return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
}

/**
* Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in &lt; and &gt;)
* <p>
* Eg.
* <p>
* - For "logs-000002" it'll return 2
* - For "&lt;logs-{now/d}-3&gt;" it'll return 3
*/
static int parseIndexNameCounter(String indexName) {
int numberIndex = indexName.lastIndexOf("-");
if (numberIndex == -1) {
throw new IllegalArgumentException("no - separator found in index name [" + indexName + "]");
}
try {
return Integer.parseInt(indexName.substring(numberIndex + 1, indexName.endsWith(">") ? indexName.length() - 1 :
indexName.length()));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse the index name [" + indexName + "] to extract the counter", e);
}
}

static final class ActiveShardsInfo implements ToXContentObject {

private final long currentActiveShardsCount;
private final String targetActiveShardsCount;
private final boolean enoughShardsActive;
private final String message;

static final ParseField CURRENT_ACTIVE_SHARDS_COUNT = new ParseField("current_active_shards_count");
static final ParseField TARGET_ACTIVE_SHARDS_COUNT = new ParseField("target_active_shards_count");
static final ParseField ENOUGH_SHARDS_ACTIVE = new ParseField("enough_shards_active");
static final ParseField MESSAGE = new ParseField("message");

ActiveShardsInfo(long currentActiveShardsCount, String targetActiveShardsCount, boolean enoughShardsActive) {
this.currentActiveShardsCount = currentActiveShardsCount;
this.targetActiveShardsCount = targetActiveShardsCount;
this.enoughShardsActive = enoughShardsActive;

if (enoughShardsActive) {
message = "the target of [" + targetActiveShardsCount + "] are active. Don't need to wait anymore";
} else {
message = "waiting for [" + targetActiveShardsCount + "] shards to become active, but only [" + currentActiveShardsCount +
"] are active";
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(CURRENT_ACTIVE_SHARDS_COUNT.getPreferredName(), currentActiveShardsCount);
builder.field(TARGET_ACTIVE_SHARDS_COUNT.getPreferredName(), targetActiveShardsCount);
builder.field(ENOUGH_SHARDS_ACTIVE.getPreferredName(), enoughShardsActive);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ActiveShardsInfo info = (ActiveShardsInfo) o;
return currentActiveShardsCount == info.currentActiveShardsCount &&
enoughShardsActive == info.enoughShardsActive &&
Objects.equals(targetActiveShardsCount, info.targetActiveShardsCount) &&
Objects.equals(message, info.message);
}

@Override
public int hashCode() {
return Objects.hash(currentActiveShardsCount, targetActiveShardsCount, enoughShardsActive, message);
}
}

static final class Info implements ToXContentObject {

private final String message;

static final ParseField MESSAGE = new ParseField("message");

Info(String message) {
this.message = message;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Info info = (Info) o;
return Objects.equals(message, info.message);
}

@Override
public int hashCode() {
return Objects.hash(message);
}
}
}
Expand Up @@ -77,28 +77,32 @@ public void testToSteps() {
RolloverAction action = createTestInstance();
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
randomAlphaOfLengthBetween(1, 10));
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(4, steps.size());
assertEquals(5, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0);
RolloverStep secondStep = (RolloverStep) steps.get(1);
UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2);
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2);
UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3);
UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(expectedThirdStepKey, thirdStep.getKey());
assertEquals(expectedFourthStepKey, fourthStep.getKey());
assertEquals(expectedFifthStepKey, fifthStep.getKey());
assertEquals(secondStep.getKey(), firstStep.getNextStepKey());
assertEquals(thirdStep.getKey(), secondStep.getNextStepKey());
assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey());
assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey());
assertEquals(action.getMaxSize(), firstStep.getMaxSize());
assertEquals(action.getMaxAge(), firstStep.getMaxAge());
assertEquals(action.getMaxDocs(), firstStep.getMaxDocs());
assertEquals(nextStepKey, fourthStep.getNextStepKey());
assertEquals(nextStepKey, fifthStep.getNextStepKey());
}
}

0 comments on commit 1232667

Please sign in to comment.