Skip to content

Commit

Permalink
Allow closing a write index of a data stream. (#71031)
Browse files Browse the repository at this point in the history
Backport of #70908 to 7.12 branch.

Prior to this commit when attempting to close a data stream a validation error is returned indicating that it is forbidden to close a write index of a data stream. The idea behind that is to ensure that a data stream always can accept writes. For the same reason deleting a write index is not allowed (the write index can only be deleted when deleting the entire data stream).

However closing an index isn't as destructive as deleting an index (an open index request makes the write index available again) and there are other cases where a data stream can't accept writes. For example when primary shards of the write index are not available. So the original reasoning for not allowing to close a write index isn't that strong.

On top of this is that this also avoids certain administrative operations from being performed. For example restoring a snapshot containing data streams that already exist in the cluster (in place restore).

Closes #70903 #70861
  • Loading branch information
martijnvg committed Mar 31, 2021
1 parent 95474d2 commit fab1fec
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 102 deletions.
7 changes: 4 additions & 3 deletions docs/reference/indices/open-close.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ Closed indices consume a significant amount of disk-space which can cause
problems in managed environments. Closing indices can be disabled via the cluster settings
API by setting `cluster.indices.close.enable` to `false`. The default is `true`.

The current write index on a data stream cannot be closed. In order to close
the current write index, the data stream must first be
On 7.12.0 and earlier the current write index on a data stream cannot be closed.
In order to close the current write index, the data stream must first be
<<data-streams-rollover,rolled over>> so that a new write index is created
and then the previous write index can be closed.
and then the previous write index can be closed. This restriction doesn't apply
from version 7.12.1.

// end::closed-index[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ POST /_snapshot/my_repository/my_snapshot/_restore

Use the restore snapshot API to restore a snapshot of a cluster, including all data streams and indices in the snapshot. If you do not want to restore the entire snapshot, you can select specific data streams or indices to restore.

NOTE: You cannot restore a data stream if a stream with the same name already
exists.
NOTE: On 7.12.0 and earlier, you cannot restore a data stream if a stream with
the same name already exists. This restriction doesn't apply from version
7.12.1.

You can run the restore operation on a cluster that contains an elected
<<master-node,master node>> and has data nodes with enough capacity to accommodate the snapshot
Expand Down
5 changes: 3 additions & 2 deletions docs/reference/snapshot-restore/restore-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ new indices if they didn't exist in the cluster.
If a data stream is restored, its backing indices are also restored. The restore
operation automatically opens restored backing indices if they were closed.

NOTE: You cannot restore a data stream if a stream with the same name already
exists.
NOTE: On 7.12.0 and earlier, you cannot restore a data stream if a stream with
the same name already exists. This restriction doesn't apply from version
7.12.1.

In addition to entire data streams, you can restore only specific backing
indices from a snapshot. However, restored backing indices are not automatically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,6 @@ public void closeIndices(final CloseIndexClusterStateUpdateRequest request, fina
if (concreteIndices == null || concreteIndices.length == 0) {
throw new IllegalArgumentException("Index name is required");
}
List<String> writeIndices = new ArrayList<>();
SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
for (Index index : concreteIndices) {
IndexAbstraction ia = lookup.get(index.getName());
if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
writeIndices.add(index.getName());
}
}
if (writeIndices.size() > 0) {
throw new IllegalArgumentException("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
}

clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.cluster.ClusterName;
Expand All @@ -27,11 +26,8 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -43,15 +39,11 @@
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.CoreMatchers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -72,8 +64,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MetadataIndexStateServiceTests extends ESTestCase {

Expand Down Expand Up @@ -379,35 +369,6 @@ public void testCloseFailedIfBlockDisappeared() {
assertThat(failedIndices, equalTo(disappearedIndices));
}

public void testCloseCurrentWriteIndexForDataStream() {
int numDataStreams = randomIntBetween(1, 3);
List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
List<String> writeIndices = new ArrayList<>();
for (int k = 0; k < numDataStreams; k++) {
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
int numBackingIndices = randomIntBetween(1, 5);
dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
writeIndices.add(DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices));
}
ClusterState cs = DataStreamTestHelper.getClusterStateWithDataStreams(dataStreamsToCreate,
org.elasticsearch.common.collect.List.of());

ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(cs);

List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
for (int k = 0; k < indicesToDelete.size(); k++) {
Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
indicesToDeleteArray[k] = indexToDelete;
}
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null, null);
CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
}

public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,32 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
import org.elasticsearch.xpack.core.action.GetDataStreamAction;
import org.elasticsearch.xpack.datastreams.DataStreamsPlugin;
import org.hamcrest.Matchers;
import org.junit.After;
Expand All @@ -46,13 +48,15 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -150,6 +154,109 @@ public void testSnapshotAndRestore() throws Exception {
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
}

public void testSnapshotAndRestoreAllDataStreamsInPlace() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());

// Close all indices:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("*");
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get();
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "*" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).get();
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"), equalTo("other-ds"))
);
java.util.List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(DS_BACKING_INDEX_NAME));
backingIndices = ds.getDataStreams().get(1).getDataStream().getIndices();
String expectedBackingIndexName = DataStream.getDefaultBackingIndexName("other-ds", 1);
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(expectedBackingIndexName));
}

public void testSnapshotAndRestoreInPlace() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());

// A rollover after taking snapshot. The new backing index should be a standalone index after restoring
// and not part of the data stream:
RolloverRequest rolloverRequest = new RolloverRequest("ds", null);
RolloverResponse rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
assertThat(rolloverResponse.isRolledOver(), is(true));
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 2)));

// Close all backing indices of ds data stream:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(".ds-ds-*");
closeIndexRequest.indicesOptions(IndicesOptions.strictExpandHidden());
assertAcked(client.admin().indices().close(closeIndexRequest).actionGet());

RestoreSnapshotResponse restoreSnapshotResponse = client.admin()
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices("ds")
.get();
assertEquals(1, restoreSnapshotResponse.getRestoreInfo().successfulShards());

assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
SearchHit[] hits = client.prepareSearch("ds").get().getHits().getHits();
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Request getDataSteamRequest = new GetDataStreamAction.Request(new String[] { "ds" });
GetDataStreamAction.Response ds = client.execute(GetDataStreamAction.INSTANCE, getDataSteamRequest).actionGet();
assertThat(
ds.getDataStreams().stream().map(e -> e.getDataStream().getName()).collect(Collectors.toList()),
contains(equalTo("ds"))
);
java.util.List<Index> backingIndices = ds.getDataStreams().get(0).getDataStream().getIndices();
assertThat(ds.getDataStreams().get(0).getDataStream().getIndices(), hasSize(1));
assertThat(backingIndices.stream().map(Index::getName).collect(Collectors.toList()), contains(equalTo(DS_BACKING_INDEX_NAME)));

// The backing index created as part of rollover should still exist (but just not part of the data stream)
assertThat(indexExists(DataStream.getDefaultBackingIndexName("ds", 2)), is(true));
// An additional rollover should create a new backing index (3th generation) and leave .ds-ds-...-2 index as is:
rolloverRequest = new RolloverRequest("ds", null);
rolloverResponse = client.admin().indices().rolloverIndex(rolloverRequest).actionGet();
assertThat(rolloverResponse.isRolledOver(), is(true));
assertThat(rolloverResponse.getNewIndex(), equalTo(DataStream.getDefaultBackingIndexName("ds", 3)));
}

public void testSnapshotAndRestoreAll() throws Exception {
CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
Expand All @@ -44,6 +46,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.SortedMap;

public final class TransportFreezeIndexAction extends
TransportMasterNodeAction<FreezeRequest, FreezeResponse> {
Expand Down Expand Up @@ -158,6 +161,20 @@ public void onFailure(Exception e) {
}) {
@Override
public ClusterState execute(ClusterState currentState) {
List<String> writeIndices = new ArrayList<>();
SortedMap<String, IndexAbstraction> lookup = currentState.metadata().getIndicesLookup();
for (Index index : concreteIndices) {
IndexAbstraction ia = lookup.get(index.getName());
if (ia != null && ia.getParentDataStream() != null &&
ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
writeIndices.add(index.getName());
}
}
if (writeIndices.size() > 0) {
throw new IllegalArgumentException("cannot freeze the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
}

final Metadata.Builder builder = Metadata.builder(currentState.metadata());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
for (Index index : concreteIndices) {
Expand Down

0 comments on commit fab1fec

Please sign in to comment.