Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/120168.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120168
summary: Reduce Data Loss in System Indices Migration
area: Infra/Core
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

package org.elasticsearch.migration;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.SystemIndexPlugin;
Expand All @@ -50,6 +54,10 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -255,12 +263,18 @@ protected void assertIndexHasCorrectProperties(
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
private final BlockingActionFilter blockingActionFilter;

public TestPlugin() {
blockingActionFilter = new BlockingActionFilter();
}

@Override
public List<ActionFilter> getActionFilters() {
return singletonList(blockingActionFilter);
}

@Override
Expand Down Expand Up @@ -299,5 +313,26 @@ public void indicesMigrationComplete(
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
listener.onResponse(true);
}

public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
private Set<String> blockedActions = emptySet();

@Override
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
if (blockedActions.contains(action)) {
throw new ElasticsearchException("force exception on [" + action + "]");
}
return true;
}

@Override
public int order() {
return 0;
}

public void blockActions(String... actions) {
blockedActions = unmodifiableSet(newHashSet(actions));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand All @@ -36,10 +39,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.upgrades.FeatureMigrationResults;
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;

Expand Down Expand Up @@ -272,6 +277,60 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
ensureGreen();

// Block the alias request to simulate a failure
InternalTestCluster internalTestCluster = internalCluster();
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
BlockingActionFilter blockingActionFilter = null;
for (ActionFilter filter : actionFilters.filters()) {
if (filter instanceof BlockingActionFilter) {
blockingActionFilter = (BlockingActionFilter) filter;
break;
}
}
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);

// Start the migration
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();

// Wait till the migration fails
assertBusy(() -> {
GetFeatureUpgradeStatusResponse statusResp = client().execute(
GetFeatureUpgradeStatusAction.INSTANCE,
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
});

// Get the settings to see if the write block was removed
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));

// Unblock the alias request
blockingActionFilter.blockActions();

// Retry the migration
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();

// Ensure that the migration is successful after the alias request is unblocked
assertBusy(() -> {
GetFeatureUpgradeStatusResponse statusResp = client().execute(
GetFeatureUpgradeStatusAction.INSTANCE,
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});
}

public void testMigrationWillRunAfterError() throws Exception {
createSystemIndexForDescriptor(INTERNAL_MANAGED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public boolean hasErrors() {
return errors;
}

/**
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
*/
public List<ElasticsearchException> getErrors() {
if (errors == false) {
return List.of();
} else {
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
}
}

/**
* Build a response from a list of action results. Sets the errors boolean based
* on whether an of the individual results contain an error.
Expand Down Expand Up @@ -165,6 +176,13 @@ public static AliasActionResult buildSuccess(List<String> indices, AliasActions
return new AliasActionResult(indices, action, null);
}

/**
* The error result if the action failed, null if the action succeeded.
*/
public ElasticsearchException getError() {
return error;
}

private int getStatus() {
return error == null ? 200 : error.status().getStatus();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -32,7 +34,6 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -59,6 +60,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -448,12 +450,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
logAndThrowExceptionForFailures(bulkByScrollResponse)
);
} else {
// Successful completion of reindexing - remove read only and delete old index
setWriteBlock(
oldIndex,
false,
delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
);
// Successful completion of reindexing. Now we need to set the alias and remove the old index.
setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
if (aliasesResponse.hasErrors()) {
var e = new ElasticsearchException("Aliases request had errors");
for (var error : aliasesResponse.getErrors()) {
e.addSuppressed(error);
}
throw e;
}
logger.info(
"Successfully migrated old index [{}] to new index [{}] from feature [{}]",
oldIndexName,
migrationInfo.getNextIndexName(),
migrationInfo.getFeatureName()
);
delegate2.onResponse(bulkByScrollResponse);
}, e -> {
logger.error(
() -> format(
"An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
oldIndexName,
migrationInfo.getFeatureName()
),
e
);
removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
}));
}
}, e -> {
logger.error(
Expand Down Expand Up @@ -511,10 +534,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
);
}

private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
SystemIndexMigrationInfo migrationInfo,
BulkByScrollResponse bulkByScrollResponse
) {
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
Expand All @@ -533,30 +553,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
);
});

// Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
// while we're trying to migrate them.
return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
);
aliasesRequest.execute(listener);
}

/**
* Makes the index readonly if it's not set as a readonly yet
* Sets the write block on the index to the given value.
*/
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();

metadataUpdateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
TimeValue.ZERO,
readOnlySettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
index
),
listener
);
if (readOnlyValue) {
// Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
// in-flight writes are completed before returning.
baseClient.admin()
.indices()
.addBlock(
new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT),
listener.delegateFailureAndWrap((l, response) -> {
if (response.isAcknowledged() == false) {
throw new ElasticsearchException("Failed to acknowledge read-only block index request");
}
l.onResponse(response);
})
);
} else {
// The only way to remove a Block is via a settings update.
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
metadataUpdateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
TimeValue.ZERO,
readOnlySettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
index
),
listener
);
}
}

private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {
Expand Down