Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM wait for active shards on rolled index in a separate step #50718

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f6290e4
ILM wait for active shards on rolled index in a separate step
andreidan Jan 7, 2020
fccfee5
Add license header
andreidan Jan 7, 2020
456ba48
Fix RolloverActionTests to reflect the new step
andreidan Jan 8, 2020
292a8e6
WaitForActiveShardsStep uses the alias index
andreidan Jan 15, 2020
b2673bd
Fix integratino test
andreidan Jan 15, 2020
77364de
Merge branch 'master' into ilm-rollover-wait-for-active-shards
andreidan Jan 15, 2020
0a98bf0
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 15, 2020
4842f39
Don't wait for active shards when rolling over in ILM
andreidan Jan 15, 2020
eb8f95f
Fix TransportPutLifecycleActionTests
andreidan Jan 15, 2020
a4615f8
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 15, 2020
115a131
Comment to clarify why rollover doens't wait for active shards
andreidan Jan 16, 2020
0924222
Guard against the index having been deleted while executing a policy
andreidan Jan 16, 2020
bf09bbc
Return a meaningful shards state message
andreidan Jan 16, 2020
c33a595
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
6225c04
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
a27e1cc
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 16, 2020
a838925
Drop unused getters
andreidan Jan 16, 2020
9f9b842
Find rolled index by finding the max counter in the name.
andreidan Jan 17, 2020
c3f506e
Escape < and > in javadoc
andreidan Jan 17, 2020
7679143
Skip WaitForActiveShardsStep when lifecycle complete is set
andreidan Jan 17, 2020
3a1cd87
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 17, 2020
b838f3a
Use lower case and drop . in error messages
andreidan Jan 17, 2020
4449853
Mark vars as final
andreidan Jan 20, 2020
64c9f06
Add explicit error handling to parseIndexNameCounter
andreidan Jan 20, 2020
9322665
Remove Parser
andreidan Jan 20, 2020
36e07fc
Add Info object to report various step progress messages
andreidan Jan 20, 2020
c0ecf31
Make constructors default visible
andreidan Jan 20, 2020
f4f2c84
Merge branch 'master' into ilm-rollover-wait-for-active-shards
elasticmachine Jan 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -159,6 +160,13 @@ public void dryRun(boolean dryRun) {
this.dryRun = dryRun;
}

/**
* Sets the wait for active shards configuration for the rolled index that gets created.
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
createIndexRequest.waitForActiveShards(waitForActiveShards);
}

/**
* Adds condition to check if the index is at least <code>age</code> old
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,19 @@ public List<Step> toSteps(Client client, String phase, Step.StepKey nextStepKey)

StepKey waitForRolloverReadyStepKey = new StepKey(phase, NAME, WaitForRolloverReadyStep.NAME);
StepKey rolloverStepKey = new StepKey(phase, NAME, RolloverStep.NAME);
StepKey waitForActiveShardsKey = new StepKey(phase, NAME, WaitForActiveShardsStep.NAME);
StepKey updateDateStepKey = new StepKey(phase, NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey setIndexingCompleteStepKey = new StepKey(phase, NAME, INDEXING_COMPLETE_STEP_NAME);

WaitForRolloverReadyStep waitForRolloverReadyStep = new WaitForRolloverReadyStep(waitForRolloverReadyStepKey, rolloverStepKey,
client, maxSize, maxAge, maxDocs);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, updateDateStepKey, client);
RolloverStep rolloverStep = new RolloverStep(rolloverStepKey, waitForActiveShardsKey, client);
WaitForActiveShardsStep waitForActiveShardsStep = new WaitForActiveShardsStep(waitForActiveShardsKey, updateDateStepKey);
UpdateRolloverLifecycleDateStep updateDateStep = new UpdateRolloverLifecycleDateStep(updateDateStepKey, setIndexingCompleteStepKey,
System::currentTimeMillis);
UpdateSettingsStep setIndexingCompleteStep = new UpdateSettingsStep(setIndexingCompleteStepKey, nextStepKey,
client, indexingComplete);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, updateDateStep, setIndexingCompleteStep);
return Arrays.asList(waitForRolloverReadyStep, rolloverStep, waitForActiveShardsStep, updateDateStep, setIndexingCompleteStep);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -70,6 +71,9 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
dakrone marked this conversation as resolved.
Show resolved Hide resolved
getClient().admin().indices().rolloverIndex(rolloverRequest,
ActionListener.wrap(response -> {
assert response.isRolledOver() : "the only way this rollover call should fail is with an exception";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.AliasOrIndex.Alias;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
* After we performed the index rollover we wait for the the configured number of shards for the rolled over index (ie. newly created
* index) to become available.
*/
public class WaitForActiveShardsStep extends ClusterStateWaitStep {

public static final String NAME = "wait-for-active-shards";

private static final Logger logger = LogManager.getLogger(WaitForActiveShardsStep.class);

WaitForActiveShardsStep(StepKey key, StepKey nextStepKey) {
super(key, nextStepKey);
}

@Override
public boolean isRetryable() {
return true;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetaData originalIndexMeta = clusterState.metaData().index(index);
dakrone marked this conversation as resolved.
Show resolved Hide resolved

if (originalIndexMeta == null) {
String errorMessage = String.format(Locale.ROOT, "[%s] lifecycle action for index [%s] executed but index no longer exists",
getKey().getAction(), index.getName());
// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
}

boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(originalIndexMeta.getSettings());
if (indexingComplete) {
String message = String.format(Locale.ROOT, "index [%s] has lifecycle complete set, skipping [%s]",
originalIndexMeta.getIndex().getName(), WaitForActiveShardsStep.NAME);
logger.trace(message);
return new Result(true, new Info(message));
}

String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(originalIndexMeta.getSettings());
if (Strings.isNullOrEmpty(rolloverAlias)) {
throw new IllegalStateException("setting [" + RolloverAction.LIFECYCLE_ROLLOVER_ALIAS
+ "] is not set on index [" + originalIndexMeta.getIndex().getName() + "]");
}

AliasOrIndex aliasOrIndex = clusterState.metaData().getAliasAndIndexLookup().get(rolloverAlias);
assert aliasOrIndex.isAlias() : rolloverAlias + " must be an alias but it is an index";

Alias alias = (Alias) aliasOrIndex;
IndexMetaData aliasWriteIndex = alias.getWriteIndex();
final String rolledIndexName;
final String waitForActiveShardsSettingValue;
if (aliasWriteIndex != null) {
rolledIndexName = aliasWriteIndex.getIndex().getName();
waitForActiveShardsSettingValue = aliasWriteIndex.getSettings().get("index.write.wait_for_active_shards");
} else {
List<IndexMetaData> indices = alias.getIndices();
int maxIndexCounter = -1;
IndexMetaData rolledIndexMeta = null;
for (IndexMetaData indexMetaData : indices) {
int indexNameCounter = parseIndexNameCounter(indexMetaData.getIndex().getName());
if (maxIndexCounter < indexNameCounter) {
maxIndexCounter = indexNameCounter;
rolledIndexMeta = indexMetaData;
}
}
if (rolledIndexMeta == null) {
String errorMessage = String.format(Locale.ROOT,
"unable to find the index that was rolled over from [%s] as part of lifecycle action [%s]", index.getName(),
getKey().getAction());

// Index must have been since deleted
logger.debug(errorMessage);
return new Result(false, new Info(errorMessage));
}
rolledIndexName = rolledIndexMeta.getIndex().getName();
waitForActiveShardsSettingValue = rolledIndexMeta.getSettings().get("index.write.wait_for_active_shards");
}

ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName);

IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName);
int currentActiveShards = 0;
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
currentActiveShards += shardRouting.value.activeShards().size();
}
return new Result(enoughShardsActive, new ActiveShardsInfo(currentActiveShards, activeShardCount.toString(), enoughShardsActive));
}

/**
* Parses the number from the rolled over index name. It also supports the date-math format (ie. index name is wrapped in &lt; and &gt;)
* <p>
* Eg.
* <p>
* - For "logs-000002" it'll return 2
* - For "&lt;logs-{now/d}-3&gt;" it'll return 3
*/
static int parseIndexNameCounter(String indexName) {
int numberIndex = indexName.lastIndexOf("-");
if (numberIndex == -1) {
throw new IllegalArgumentException("no - separator found in index name [" + indexName + "]");
}
try {
return Integer.parseInt(indexName.substring(numberIndex + 1, indexName.endsWith(">") ? indexName.length() - 1 :
indexName.length()));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("unable to parse the index name [" + indexName + "] to extract the counter", e);
}
}

static final class ActiveShardsInfo implements ToXContentObject {

private final long currentActiveShardsCount;
private final String targetActiveShardsCount;
private final boolean enoughShardsActive;
private final String message;

static final ParseField CURRENT_ACTIVE_SHARDS_COUNT = new ParseField("current_active_shards_count");
static final ParseField TARGET_ACTIVE_SHARDS_COUNT = new ParseField("target_active_shards_count");
static final ParseField ENOUGH_SHARDS_ACTIVE = new ParseField("enough_shards_active");
static final ParseField MESSAGE = new ParseField("message");

ActiveShardsInfo(long currentActiveShardsCount, String targetActiveShardsCount, boolean enoughShardsActive) {
this.currentActiveShardsCount = currentActiveShardsCount;
this.targetActiveShardsCount = targetActiveShardsCount;
this.enoughShardsActive = enoughShardsActive;

if (enoughShardsActive) {
message = "the target of [" + targetActiveShardsCount + "] are active. Don't need to wait anymore";
} else {
message = "waiting for [" + targetActiveShardsCount + "] shards to become active, but only [" + currentActiveShardsCount +
"] are active";
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.field(CURRENT_ACTIVE_SHARDS_COUNT.getPreferredName(), currentActiveShardsCount);
builder.field(TARGET_ACTIVE_SHARDS_COUNT.getPreferredName(), targetActiveShardsCount);
builder.field(ENOUGH_SHARDS_ACTIVE.getPreferredName(), enoughShardsActive);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ActiveShardsInfo info = (ActiveShardsInfo) o;
return currentActiveShardsCount == info.currentActiveShardsCount &&
enoughShardsActive == info.enoughShardsActive &&
Objects.equals(targetActiveShardsCount, info.targetActiveShardsCount) &&
Objects.equals(message, info.message);
}

@Override
public int hashCode() {
return Objects.hash(currentActiveShardsCount, targetActiveShardsCount, enoughShardsActive, message);
}
}

static final class Info implements ToXContentObject {

private final String message;

static final ParseField MESSAGE = new ParseField("message");

Info(String message) {
this.message = message;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MESSAGE.getPreferredName(), message);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Info info = (Info) o;
return Objects.equals(message, info.message);
}

@Override
public int hashCode() {
return Objects.hash(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,32 @@ public void testToSteps() {
RolloverAction action = createTestInstance();
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
randomAlphaOfLengthBetween(1, 10));
List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertNotNull(steps);
assertEquals(4, steps.size());
assertEquals(5, steps.size());
StepKey expectedFirstStepKey = new StepKey(phase, RolloverAction.NAME, WaitForRolloverReadyStep.NAME);
StepKey expectedSecondStepKey = new StepKey(phase, RolloverAction.NAME, RolloverStep.NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
StepKey expectedThirdStepKey = new StepKey(phase, RolloverAction.NAME, WaitForActiveShardsStep.NAME);
StepKey expectedFourthStepKey = new StepKey(phase, RolloverAction.NAME, UpdateRolloverLifecycleDateStep.NAME);
StepKey expectedFifthStepKey = new StepKey(phase, RolloverAction.NAME, RolloverAction.INDEXING_COMPLETE_STEP_NAME);
WaitForRolloverReadyStep firstStep = (WaitForRolloverReadyStep) steps.get(0);
RolloverStep secondStep = (RolloverStep) steps.get(1);
UpdateRolloverLifecycleDateStep thirdStep = (UpdateRolloverLifecycleDateStep) steps.get(2);
UpdateSettingsStep fourthStep = (UpdateSettingsStep) steps.get(3);
WaitForActiveShardsStep thirdStep = (WaitForActiveShardsStep) steps.get(2);
UpdateRolloverLifecycleDateStep fourthStep = (UpdateRolloverLifecycleDateStep) steps.get(3);
UpdateSettingsStep fifthStep = (UpdateSettingsStep) steps.get(4);
assertEquals(expectedFirstStepKey, firstStep.getKey());
assertEquals(expectedSecondStepKey, secondStep.getKey());
assertEquals(expectedThirdStepKey, thirdStep.getKey());
assertEquals(expectedFourthStepKey, fourthStep.getKey());
assertEquals(expectedFifthStepKey, fifthStep.getKey());
assertEquals(secondStep.getKey(), firstStep.getNextStepKey());
assertEquals(thirdStep.getKey(), secondStep.getNextStepKey());
assertEquals(fourthStep.getKey(), thirdStep.getNextStepKey());
assertEquals(fifthStep.getKey(), fourthStep.getNextStepKey());
assertEquals(action.getMaxSize(), firstStep.getMaxSize());
assertEquals(action.getMaxAge(), firstStep.getMaxAge());
assertEquals(action.getMaxDocs(), firstStep.getMaxDocs());
assertEquals(nextStepKey, fourthStep.getNextStepKey());
assertEquals(nextStepKey, fifthStep.getNextStepKey());
}
}
Loading